【问题标题】:Windowed Joins in Apache BeamApache Beam 中的窗口连接
【发布时间】:2021-04-13 23:33:29
【问题描述】:

我对 Apache Beam 还很陌生,并实现了我的第一个管道。

但现在我到了一个点,我对如何结合窗口和连接感到困惑。


问题定义

我有两个数据流,一个是用户的浏览量,另一个是用户的请求。它们共享描述用户会话的密钥 session_id,但每个都有其他附加数据。

目标是在请求发生之前计算会话中的综合浏览量。这意味着,我想要一个数据流,其中包含每个请求以及请求之前的综合浏览量。假设最后 5 分钟的浏览量就足够了。


我尝试了什么

为了加载请求,我使用了这个 sn-p,它从 pubsub 订阅加载请求,然后提取 session_id 作为键。最后,我应用了一个窗口,它在收到每个请求时直接发出它。

    requests = (p
               | 'Read Requests' >> (
                    beam.io.ReadFromPubSub(subscription=request_sub)
                    | 'Extract'        >> beam.Map(lambda x: json.loads(x))
                    | 'Session as Key' >> beam.Map(lambda request: (request['session_id'], request))
                    | 'Window'         >> beam.WindowInto(window.SlidingWindows(5 * 60, 1 * 60, 0),
                            trigger=trigger.AfterCount(1),
                            accumulation_mode=trigger.AccumulationMode.DISCARDING
                    )
                )
            )

同样,这个 sn-p 加载页面浏览量,它应用一个滑动窗口,每当页面浏览量进入时就会累积。

pageviews = (p
               | 'Read Pageviews' >> (
                  beam.io.ReadFromPubSub(subscription=pageview_sub)
                  | 'Extract'        >> beam.Map(lambda x: json.loads(x))
                  | 'Session as Key' >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
                  | 'Window'         >> beam.WindowInto(
                            windowfn=window.SlidingWindows(5 * 60, 1 * 60, 0),
                            trigger=trigger.AfterCount(1),
                            accumulation_mode=trigger.AccumulationMode.ACCUMULATING
                  )
               )
            )

为了申请加入,我试过了

combined = (
        {
            'requests': requests,
            'pageviews': pageviews
        }
    | 'Merge' >> beam.CoGroupByKey()
    | 'Print' >> beam.Map(print)
)

当我运行这个管道时,在合并的行中从来没有包含请求的行以及页面浏览量,只有其中一个存在。

我的想法是在请求之前过滤掉页面浏览量,并在 cogroupby 之后计算它们。我需要做什么?我想我的问题在于窗口和触发策略。

以低延迟处理请求也很重要,可能会丢弃迟到的网页浏览量。

【问题讨论】:

    标签: python google-cloud-dataflow apache-beam


    【解决方案1】:

    我自己找到了一个解决方案,如果有人有兴趣,这里是它:

    想法

    诀窍是使用beam.Flatten 操作合并两个流,并使用Stateful DoFn 计算一个请求之前的浏览量。每个流都包含 json 字典。我通过使用{'request' : request}{'pageview' : pageview} 作为包围块来嵌入它们,这样我就可以在Stateful DoFn 中将不同的事件分开。我还计算了诸如首次网页浏览时间戳和自首次网页浏览以来的秒数之类的东西。流必须使用session_id 作为键,这样Stateful DoFn 只能接收一个会话的所有事件。

    代码

    首先,这是管道代码:

    # Beam pipeline, that extends requests by number of pageviews before request in that session
    with beam.Pipeline(options=options) as p:
        # The stream of requests
        requests = (
              'Read from PubSub subscription'   >> beam.io.ReadFromPubSub(subscription=request_sub)
            | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
            | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
            | 'Use Session ID as stream key'    >> beam.Map(lambda request: (request['session_id'], request))
            | 'Add type of event'               >> beam.Map(lambda r: (r[0], ('request', r[1])))
        )
    
        # The stream of pageviews
        pageviews = (
              'Read from PubSub subscription'   >> beam.io.ReadFromPubSub(subscription=pageview_sub)
            | 'Extract JSON'                    >> beam.ParDo(ExtractJSON())
            | 'Add Timestamp'                   >> beam.ParDo(AssignTimestampFn())
            | 'Use Session ID as stream key'    >> beam.Map(lambda pageview: (pageview['session_id'], pageview))
            | 'Add type of event'               >> beam.Map(lambda p: (p[0], ('pageview', p[1])))
        )
    
        # Combine the streams and apply Stateful DoFn
        combined = (
            (
                p | ('Prepare requests stream' >> requests),
                p | ('Prepare pageviews stream' >> pageviews)
            )
            | 'Combine event streams'       >> beam.Flatten()
            | 'Global Window'               >> beam.WindowInto(windowfn=window.GlobalWindows(),
                                                                trigger=trigger.AfterCount(1),
                                                                accumulation_mode=trigger.AccumulationMode.DISCARDING)
            | 'Stateful DoFn'               >> beam.ParDo(CountPageviews())
            | 'Compute processing delay'    >> beam.ParDo(LogTimeDelay())
            | 'Format for BigQuery output'  >> beam.ParDo(FormatForOutputDoFn())
        )
    
        # Write to BigQuery.
        combined | 'Write' >> beam.io.WriteToBigQuery(
            requests_extended_table,
            schema=REQUESTS_EXTENDED_TABLE_SCHEMA,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    

    有趣的部分是使用beam.Flatten 和应用有状态DoFn CountPageviews() 的两个流的组合

    这是使用的自定义 DoFns 的代码:

    # This DoFn just loads a json message
    class ExtractJSON(beam.DoFn):
      def process(self, element):
        import json
    
        yield json.loads(element)
    
    # This DoFn adds the event timestamp of messages into it json elements for further processing
    class AssignTimestampFn(beam.DoFn):
      def process(self, element, timestamp=beam.DoFn.TimestampParam):
        import datetime
    
        timestamped_element = element
        timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
        timestamp = timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
        timestamped_element['timestamp_utc'] = timestamp_utc
        timestamped_element['timestamp'] = timestamp
        yield timestamped_element
    
    # This class is a stateful dofn
    # Input elements should be of form (session_id, {'event_type' : event}
    # Where events can be requests or pageviews
    # It computes on a per session basis the number of pageviews and the first pageview timestamp
    # in its internal state
    # Whenever a request comes in, it appends the internal state to the request and emits
    # a extended request
    # Whenever a pageview comes in, the internal state is updated but nothing is emitted
    class CountPageviewsStateful(beam.DoFn):
      # The internal states
      NUM_PAGEVIEWS = userstate.CombiningValueStateSpec('num_pageviews', combine_fn=sum)
      FIRST_PAGEVIEW = userstate.ReadModifyWriteStateSpec('first_pageview', coder=beam.coders.VarIntCoder())
    
      def process(self,
                  element,
                  num_pageviews_state=beam.DoFn.StateParam(NUM_PAGEVIEWS),
                  first_pageview_state=beam.DoFn.StateParam(FIRST_PAGEVIEW)
                  ):
        import datetime
    
        # Extract element
        session_id = element[0]
        event_type, event = element[1]
    
        # Process different event types
        # Depending on event type, different actions are done
        if event_type == 'request':
            # This is a request
            request = event
    
            # First, the first pageview timestamp is extracted and the seconds since first timestamp are calculated
            first_pageview = first_pageview_state.read()
            if first_pageview is not None:
                seconds_since_first_pageview = (int(request['timestamp_utc'].timestamp()) - first_pageview)
    
                first_pageview_timestamp_utc = datetime.datetime.utcfromtimestamp(float(first_pageview))
                first_pageview_timestamp = first_pageview_timestamp_utc.strftime("%Y-%m-%d %H:%M:%S")
            else:
                seconds_since_first_pageview = -1
                first_pageview_timestamp = None
    
            # The calculated data is appended to the request
            request['num_pageviews'] = num_pageviews_state.read()
            request['first_pageview_timestamp'] = first_pageview_timestamp
            request['seconds_since_first_pageview'] = seconds_since_first_pageview
            
            # The pageview counter is reset
            num_pageviews_state.clear()
            
            # The request is returned
            yield (session_id, request)
        elif event_type == 'pageview':
            # This is a pageview
            pageview = event
    
            # Update first pageview state
            first_pageview = first_pageview_state.read()
            if first_pageview is None:
                first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
            elif first_pageview > int(pageview['timestamp_utc'].timestamp()):
                first_pageview_state.write(int(pageview['timestamp_utc'].timestamp()))
    
            # Increase number of pageviews
            num_pageviews_state.add(1)
              
            # Do not return anything, pageviews are not further processed
    
    # This DoFn logs the delay between the event time and the processing time
    class LogTimeDelay(beam.DoFn):
      def process(self, element, timestamp=beam.DoFn.TimestampParam):
        import datetime
        import logging
    
        timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
        seconds_delay = (datetime.datetime.utcnow() - timestamp_utc).total_seconds()
    
        logging.warning('Delayed by %s seconds', seconds_delay)
    
        yield element
    

    这似乎有效,并且在直接跑步者上给了我大约 1-2 秒的平均延迟。在 Cloud Dataflow 上,平均延迟约为 0.5-1 秒。所以总而言之,这似乎解决了问题定义。

    进一步考虑

    不过,还有一些悬而未决的问题:

    • 我正在使用全局窗口,这意味着就我而言,内部状态将永远保留。也许会话窗口是正确的方法:当 x 秒内没有浏览量/请求时,窗口关闭并释放内部状态。
    • 处理延迟有点高,但也许我需要稍微调整一下 pubsub 部分。
    • 我不知道这个解决方案比标准光束方法增加了多少开销或内存消耗。我也没有测试高工作负载和并行化。

    【讨论】:

      猜你喜欢
      • 2021-12-22
      • 1970-01-01
      • 2021-11-25
      • 2022-01-23
      • 1970-01-01
      • 1970-01-01
      • 2018-05-31
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多