【问题标题】:Tumbling window 'expires' parameter less than 'size' doesn't work滚动窗口“过期”参数小于“大小”不起作用
【发布时间】:2021-03-03 22:12:33
【问题描述】:

重现步骤

我正在尝试在翻滚窗口中聚合一些数据,然后将处理函数应用于窗口中的数据。 我正在使用 expires 参数来处理 late events(假设我们可以在前 10 分钟内获得属于 n 分钟的事件n+1 分钟的秒数)。

def parse_millis(ms):
    return datetime.fromtimestamp(int(ms) / 1000)


def process_window_function(window_info, values: list):
    logger.info(f"Processing window with "
                f"start = {datetime.fromtimestamp(window_info[1][0])}, "
                f"end = {datetime.fromtimestamp(window_info[1][1])}")
    logger.info(values)


class InputClass(faust.Record, coerce=True):
    id: str
    timestamp: datetime = DatetimeField(date_parser=parse_millis)
    value: int


tumbling_window_table = (
    app.Table(
        'tumbling_window_table',
        default=list,
        on_window_close=process_window_function,
    )
        .tumbling(size=60, expires=timedelta(seconds=10))
        .relative_to_field(InputClass.timestamp)
)

input_topic = app.topic("input.topic", value_type=InputClass)


@app.agent(input_topic)
async def process_input(stream):
    event: InputClass
    async for event in stream:
        logger.info(f"Event with timestamp {event.timestamp} is stored in window state")
        list_of_values = tumbling_window_table[event.id].value()
        list_of_values.append(event.value)
        tumbling_window_table[event.id] = list_of_values

预期行为

我希望 process_window_function 仅在 n+1 窗口的 10 秒将被传递以处理 延迟事件 时调用 n 窗口p>

实际行为

如果 Table 的 expires 参数,则窗口 n 的 process_window_function 将在窗口 n+1 的第一个事件之后立即调用小于 size 参数。看起来浮士德只是忽略了 expires。对于这种行为,迟到的事件将被跳过。

如果 expires 参数等于或大于大小,则会正确处理迟到的事件,但我不希望延迟超过 10 秒。

卡夫卡输入

{"id":"sensor-1","timestamp":1614808641000,"value":1}
{"id":"sensor-1","timestamp":1614808677000,"value":2}
{"id":"sensor-1","timestamp":1614808681000,"value":3}

日志

[2021-03-03 21:58:07,510] [1] [INFO] [^Worker]: Ready 
[2021-03-03 21:58:41,955] [1] [INFO] Event with timestamp 2021-03-03 21:57:21 is stored in window state 
[2021-03-03 21:59:00,574] [1] [INFO] Event with timestamp 2021-03-03 21:57:57 is stored in window state 
[2021-03-03 21:59:16,963] [1] [INFO] Event with timestamp 2021-03-03 21:58:01 is stored in window state 
[2021-03-03 21:59:16,987] [1] [INFO] Processing window with start = 2021-03-03 21:57:00, end = 2021-03-03 21:57:59.900000 
[2021-03-03 21:59:16,988] [1] [INFO] [1, 2] 

版本

  • Python 版本3.7.9
  • 浮士德版faust-streaming==0.6.1
  • RocksDB 版本python-rocksdb

我有可能在 Flink 中实现这种行为,但在 Faust 中遇到了这个问题。 我做错了什么?

【问题讨论】:

    标签: python faust


    【解决方案1】:

    这可能是我遇到的相同问题,如果是这样,这可能是解决方案。 我必须手动设置clean_up_interval,因为它默认为 30 秒。 该属性是检查过期表数据之前的时间。

    您可以通过在以典型的app = faust.App() 方式定义您的应用后设置app.conf.table_clean_up_interval = <time as int or float> 来实现。

    您可以在settings.py 文件和一个工作示例中找到此方法(最近可能更改了?)here

    唯一的问题似乎是 on_window_close 似乎不会在应用程序崩溃(或重新平衡??永远不知道他们。但是我还没有使用 RocksDB,只是在内存中,所以也许还有更多您可以提供帮助的东西?

    我仍在努力了解迟到的事件,因为我正在使用相同的过程来执行非常长的聚合(例如 3 个月,间隔为 1 秒),但我无法弄清楚是否很旧的数据是放入与其时间戳匹配的窗口或仅当前窗口。我认为它会根据与窗口的时间戳关系将其放入正确的窗口中,但无法确认。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-04-23
      • 2019-11-20
      • 1970-01-01
      • 2016-08-28
      • 2012-03-10
      相关资源
      最近更新 更多