【问题标题】:Read from Kinesis is giving empty records when run using previous sequence number or timestamp使用先前的序列号或时间戳运行时,从 Kinesis 读取会给出空记录
【发布时间】:2017-10-03 12:34:06
【问题描述】:

我正在尝试在

的帮助下阅读推送到 Kinesis 流的消息

get_records() 和 get_shard_iterator() API。

我的生产者在处理结束时不断推送记录,消费者也每 30 分钟以 cron 的形式运行。因此,我尝试将读取的当前消息的序列号存储在我的数据库中,并使用 AFTER_SEQUENCE_NUMBER 分片迭代器以及上次读取的序列号。但是,在推送新消息后,第二次(第一次成功读取流中的所有消息)将无法正常工作。

我还尝试使用 AT_TIMESTAMP 以及生产者推送到流的消息时间戳作为消息的一部分,并存储该消息以供进一步使用。同样,第一次运行处理所有消息,第二次运行我得到空记录。

我真的不知道我哪里出错了。如果有人可以帮助我,我将不胜感激。

使用时间戳提供下面的代码,但对于序列号方法也做了同样的事情。

def listen_to_kinesis_stream():
kinesis_client = boto3.client('kinesis', region_name=SETTINGS['region_name'])
stream_response = kinesis_client.describe_stream(StreamName=SETTINGS['kinesis_stream'])

for shard_info in stream_response['StreamDescription']['Shards']:
    kinesis_stream_status = mongo_coll.find_one({'_id': "DOC_ID"})
    last_read_ts = kinesis_stream_status.get('state', {}).get(
        shard_info['ShardId'], datetime.datetime.strftime(datetime.date(1970, 01, 01), "%Y-%m-%dT%H:%M:%S.%f"))

    shard_iterator = kinesis_client.get_shard_iterator(
        StreamName=SETTINGS['kinesis_stream'],
        ShardId=shard_info['ShardId'],
        ShardIteratorType='AT_TIMESTAMP',
        Timestamp=last_read_ts)

    get_response = kinesis_client.get_records(ShardIterator=shard_iterator['ShardIterator'], Limit=1)
    if len(get_response['Records']) == 0:
        continue

    message = json.loads(get_response['Records'][0]['Data'])
    process_resp = process_message(message)
    if process_resp['success'] is False:
        print process_resp
    generic_config_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
    print "Processed {0}".format(message)

    while 'NextShardIterator' in get_response:
        get_response = kinesis_client.get_records(ShardIterator=get_response['NextShardIterator'], Limit=1)
        if len(get_response['Records']) == 0:
            break

        message = json.loads(get_response['Records'][0]['Data'])
        process_resp = process_message(message)
        if process_resp['success'] is False:
            print process_resp
        mongo_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
        print "Processed {0}".format(message)

logger.debug("Processed all messages from Kinesis stream")
print "Processed all messages from Kinesis stream"

【问题讨论】:

    标签: mongodb python-2.7 boto3 amazon-kinesis


    【解决方案1】:

    根据我与 AWS 技术支持人员的讨论,可能会有一些记录为空的消息,因此当 len(get_response['Records']) == 0 时中断不是一个好主意。

    建议的更好方法是 - 我们可以有一个计数器来指示您在读取尽可能多的消息后在运行和退出循环中读取的最大消息数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-27
      • 1970-01-01
      • 2021-07-29
      • 2019-03-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多