【发布时间】:2018-06-05 19:49:57
【问题描述】:
我将这项任务基于 Google 在此处提供的文档: https://cloud.google.com/appengine/docs/flexible/python/writing-and-responding-to-pub-sub-messages
我最终为我的 main 提供了这段代码,它取出了全局存储的消息变量,而是将每个变量写入一个平面文件。
# [START push]
@app.route('/pubsub/push', methods=['POST'])
def pubsub_push():
if (request.args.get('token', '') != current_app.config['PUBSUB_VERIFICATION_TOKEN']):
return 'Invalid request', 400
# Decode the data
envelope = json.loads(request.data.decode('utf-8'))
# Current time in UTC
current_date = datetime.utcnow()
payload = json.loads(base64.b64decode(envelope['message']['data']))
# Normalize and flatten the data
if "events" in payload and payload['events']:
payload['events'] = payload['events'][0]['data']
payload = parse_dict(init=payload, sep='_')
# Now jsonify all remaining lists and dicts
for key in payload.keys():
value = payload[key]
if isinstance(value, list) or isinstance(value, dict):
if value:
value = json.dumps(value)
else:
value = None
payload[key] = value
# Custom id with the message id and date string
id = "{}.{}".format(
payload['timestamp_unixtime_ms'],
payload['message_id']
)
filename_hourly = 'landing_path/{date}/{hour}/{id}.json'.format(
date=current_date.strftime("%Y%m%d"),
hour=current_date.strftime("%H"),
id=id
)
blob = bucket.get_blob(filename_hourly)
if blob: # We already have this file, skip this message
print('Already have {} stored.'.format(filename_hourly))
return 'OK', 200
blob_hourly = Blob(bucket=bucket, name=filename_hourly)
blob_hourly.upload_from_string(json.dumps(payload, indent=2, sort_keys=True))
# Returning any 2xx status indicates successful receipt of the message.
return 'OK', 200
# [END push]
这很好用,但我收到了大量的 502 和 504 错误。它显示在我创建的堆栈驱动程序仪表板中。
我的猜测是上传文件需要很长时间,但我不确定该怎么做。有什么建议么? appengine 盒子上的资源非常少,我的 API 限制甚至都没有接近。
有人建议吗?
【问题讨论】:
标签: python google-app-engine google-cloud-platform publish-subscribe