【发布时间】:2020-11-10 16:38:08
【问题描述】:
运行 KSQLDB 版本:0.12.0
我在加入流时遇到问题。
当创建一个流作为窗口内连接查询时,右侧字段都是空的,即使是连接条件的一部分。 独立运行相同的查询时,我可以正常获取字段。
设置如下:
使用 docker-compose 运行服务器:
services:
ksqldb-server:
image: confluentinc/ksqldb-server:0.12.0
hostname: ksqldb-server
container_name: ksqldb-server
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: mybroker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://myregistry:8081
KSQL_KSQL_STREAMS_AUTO_OFFSET_RESET: 'earliest'
流和连接定义
CREATE STREAM generic_orders (
...
clientrequestid_ string KEY,
...
) WITH (
KAFKA_TOPIC='thetopic',
VALUE_FORMAT='AVRO'
);
CREATE STREAM specific_order (
originatoruserid_ string,
request_clientid_ bigint,
bidquoteinfo_volume_ bigint
) WITH (
KAFKA_TOPIC='anothertopic',
VALUE_FORMAT='AVRO'
);
当加入上面两个它不起作用时,我尝试将request_clientid_ 也设为一个键。
由于clientrequestid_ 和request_clientid_ 的类型不同,我怀疑它不起作用,所以我创建了另一个流:
CREATE STREAM specific_order_typed AS
select originatoruserid_ ,
CAST(request_clientid_ AS string) as request_clientid_ KEY,
bidquoteinfo_volume_
FROM ice_massquote_order
EMIT CHANGES
;
这并没有帮助......
这是加入的流:
CREATE STREAM enriched_orders AS
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
还尝试交换 from 流和 join 流...结果始终为空:
|623562762 |null
...
当直接运行查询时,虽然我得到了我的预期
SELECT i.request_clientid_ as reqid, o.clientrequestid_ as orderreq FROM specific_order_typed i
INNER JOIN generic_order o WITHIN 1 HOURS ON o.clientrequestid_ = i.request_clientid_
EMIT CHANGES;
|623562762 |623562762
有人知道发生了什么吗?我已经用尽了所有的想法
【问题讨论】:
-
在
thetopic中,实际的消息键是包含clientrequestid_的字符串吗?如果您PRINT thetopic,您应该会看到键值。
标签: apache-kafka ksqldb