【问题标题】:KSQLDB right end side of inner joined streams is always null内部连接流的 KSQLDB 右端始终为空
【发布时间】:2020-11-10 16:38:08
【问题描述】:

运行 KSQLDB 版本:0.12.0

我在加入流时遇到问题。

当创建一个流作为窗口内连接查询时,右侧字段都是空的,即使是连接条件的一部分。 独立运行相同的查询时,我可以正常获取字段。

设置如下:

使用 docker-compose 运行服务器:

services:
  ksqldb-server:
    image: confluentinc/ksqldb-server:0.12.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: mybroker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
      KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'

流和连接定义

CREATE STREAM generic_orders (
    ...
    clientrequestid_ string KEY,
    ...
)   WITH (
    KAFKA_TOPIC='thetopic',
    VALUE_FORMAT='AVRO'
);

CREATE STREAM specific_order (
    originatoruserid_ string,
    request_clientid_ bigint,
    bidquoteinfo_volume_ bigint
)   WITH (
    KAFKA_TOPIC='anothertopic',
    VALUE_FORMAT='AVRO'
);

当加入上面两个它不起作用时,我尝试将request_clientid_ 也设为一个键。 由于clientrequestid_request_clientid_ 的类型不同,我怀疑它不起作用,所以我创建了另一个流:

CREATE STREAM specific_order_typed AS
    select originatoruserid_ ,
    CAST(request_clientid_ AS string) as request_clientid_ KEY, 
    bidquoteinfo_volume_ 
    FROM ice_massquote_order
    EMIT CHANGES
    ;

这并没有帮助......

这是加入的流:

CREATE STREAM enriched_orders AS
    SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq  FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_ 
    EMIT CHANGES;

还尝试交换 from 流和 join 流...结果始终为空:

|623562762 |null
...

当直接运行查询时,虽然我得到了我的预期

    SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq  FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_ 
    EMIT CHANGES;
|623562762 |623562762

有人知道发生了什么吗?我已经用尽了所有的想法

【问题讨论】:

  • thetopic 中,实际的消息键是包含clientrequestid_ 的字符串吗?如果您PRINT thetopic,您应该会看到键值。

标签: apache-kafka ksqldb


【解决方案1】:

所以我通过降级到 0.10.2 解决了我的问题。

我注意到的第一个区别是,在尝试加入原始流 generic_ordersspecific_order 时,stringbigint 之间的比较出现了正确的错误

CREATE STREAM generic_orders (
    ...
    clientrequestid_ string,
    ...
)   WITH (
    KAFKA_TOPIC='thetopic',
    VALUE_FORMAT='AVRO'
);

CREATE STREAM specific_order (
    originatoruserid_ string,
    request_clientid_ bigint,
    bidquoteinfo_volume_ bigint
)   WITH (
    KAFKA_TOPIC='anothertopic',
    VALUE_FORMAT='AVRO'
);

然后我只需要将 bigint 转换为字符串(我之前也尝试过 0.12):

CREATE STREAM enriched_orders AS
    SELECT * FROM specific_order_typed i
    INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = CAST(i.request_clientid_ AS String)
    EMIT CHANGES;

最后,在读取该流时,这些值不再为空。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多