【发布时间】:2014-06-13 14:44:28
【问题描述】:
我有以下 Python 块来获取已放置到 Kinesis 流中的记录,然后将记录放入 S3 存储桶中。此流只有一个分片。
# Fetching the shard iterators from the Kinesis stream
shard_iterators = []
if response and 'StreamDescription' in response:
for shard_id in response['StreamDescription']['Shards']:
shard_id = shard_id['ShardId']
shard_iterator = kinesis_connection.get_shard_iterator(stream_name, shard_id, 'LATEST')
shard_iterators.append(shard_iterator['ShardIterator'])
# Iterating over the Kinesis stream and pushing data to S3
bucket = s3_connection.get_bucket(bucket_name)
k = Key(bucket)
for shard_iterator in shard_iterators:
while 1:
response = kinesis_connection.get_records(shard_iterator)
shard_iterator = response['NextShardIterator']
if len(response['Records'])> 0:
for res in response['Records']:
k.key = datetime.datetime.now().strftime('%Y/%m/%d/') + res['SequenceNumber']
k.set_contents_from_string(res['Data'])
数据最初从流中提取并推送到 S3,但在某些时候,我在 response = kinesis_connection.get_records(shard_iterator) 线上收到了 TypeError: expected string or buffer。分片上还有更多记录可供我提取。以前有没有人遇到过这个问题和/或有没有人对我应该尝试解决的问题有什么想法?
【问题讨论】:
标签: amazon-kinesis