【问题标题】:Error when trying to join a table and a stream尝试加入表和流时出错
【发布时间】:2018-07-22 08:17:20
【问题描述】:

我正在尝试加入一个表和一个流并创建另一个表,如下所示:

CREATE TABLE table_fx_latest AS
   SELECT t1.currencyid,
          t1.maxtimestamp,
          t2.midprice 
  FROM stream_fx2 t2 LEFT JOIN table_fx_latest3 t1 
  ON t1.currencyid = t2.currencyid AND 
     t1.timestamp = t2.maxtimestamp 
  GROUP BY t1.currencyid, 
           t1.maxtimestamp, 
           t2.midprice;

但是报如下错误:

Cannot RUN execution plan for this statement, CreateTableAsSelect{name=TABLE_FX_LATEST_PRICE6, query=Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=[T1.CURRENCYID T1_CURRENCYID, T1.MAXTIMESTAMP MAXTIMESTAMP, T2.MIDPRICE MIDPRICE]}, from=Join{type=LEFT, left=AliasedRelation{relation=STREAM_FX2, alias=T2}, right=AliasedRelation{relation=TABLE_FX_LATEST3, alias=T1}, criteria=Optional[JoinOn{((T1.CURRENCYID = T2.CURRENCYID) AND (T2.TIMESTAMP = T1.MAXTIMESTAMP))}]}, =null, where=null, groupBy=Optional[GroupBy{isDistinct=false, groupingElements=[SimpleGroupBy{columns=[T1.CURRENCYID]}, SimpleGroupBy{columns=[T1.MAXTIMESTAMP]}, SimpleGroupBy{columns=[T2.MIDPRICE]}]}], having=null, orderBy=[], limit=null}, orderBy=[]}, notExists=false, properties={}}
Caused by: io.confluent.ksql.parser.tree.LogicalBinaryExpression cannot be cast to io.confluent.ksql.parser.tree.ComparisonExpression

这是stream_fx2 流和table_fx_latest3 表的描述:

ksql> describe stream_fx2;

Field      | Type
----------------------------------------
ROWTIME    | BIGINT           (system)
ROWKEY     | VARCHAR(STRING)  (system)
ID         | INTEGER
CURRENCY   | VARCHAR(STRING)
CURRENCYID | INTEGER
TIMESTAMP  | BIGINT
BIDPRICE   | DOUBLE
MIDPRICE   | DOUBLE
OFFERPRICE | DOUBLE

ksql> describe table_fx_latest3;

Field        | Type
------------------------------------------
ROWTIME      | BIGINT           (system)
ROWKEY       | VARCHAR(STRING)  (system)
CURRENCYID   | INTEGER          (key)
MAXTIMESTAMP | BIGINT
------------------------------------------

我猜这可能是 KSQL 的一个错误(仍在开发人员预览中),但我想确保我没有遗漏任何东西。任何帮助将非常感激。

【问题讨论】:

    标签: apache-kafka confluent-platform ksqldb


    【解决方案1】:

    JOIN 标准只能是键相等。如果您有任何其他条件,您需要将它们放在WHERE 子句中。 请尝试以下操作:

    CREATE TABLE table_fx_latest AS
      SELECT t1.currencyid,
             t1.maxtimestamp,
             t2.midprice 
      FROM stream_fx2 t2 LEFT JOIN table_fx_latest3 t1 
        ON t1.currencyid = t2.currencyid
      WHERE 
        t1.timestamp = t2.maxtimestamp 
      GROUP BY t1.currencyid, 
               t1.maxtimestamp, 
               t2.midprice;
    

    让我知道这是否适合你。

    【讨论】:

    • 表创建成功,但出现异常:Error Serializing Avro message,即Caused by: java.lang.ClassCastException
    • 您能否检查以确保您正确定义了源流和表的架构。似乎其中一个字段未使用正确的类型定义。
    • 我一直在使用 float 的字段,所以我用 double 创建了一些新表和流。但出现了一些新问题。能否请您访问stackoverflow.com/questions/48785805/null-rowkey-in-a-stream 并解释一下?
    猜你喜欢
    • 1970-01-01
    • 2014-07-14
    • 1970-01-01
    • 2019-07-04
    • 2022-11-17
    • 2018-06-03
    • 1970-01-01
    • 2013-10-18
    • 2013-11-15
    相关资源
    最近更新 更多