【问题标题】:Integrating Kafka with Apache Calcite将 Kafka 与 Apache Calcite 集成
【发布时间】:2017-02-24 08:56:17
【问题描述】:

我正在尝试将方解石与 Kafka 集成,我参考了 CsvStreamableTable。

使用以下代码将每个 ConsumerRecord 转换为 Object[]:

static class ArrayRowConverter extends RowConverter<Object[]> {
    private List<Schema.Field> fields;

    public ArrayRowConverter(List<Schema.Field> fields) {
        this.fields = fields;
    }

    @Override
    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
        Object[] objects = new Object[fields.size()+1];
        int i = 0 ;
        objects[i++] = consumerRecord.timestamp();
        for(Schema.Field field : this.fields) {
            Object obj = consumerRecord.value().get(field.name());
            if( obj instanceof Utf8 ){
                objects[i ++] = obj.toString();
            }else {
                objects[i ++] = obj;
            }
        }
        return objects;
    }
}

枚举器实现如下,一个线程不断从kafka轮询记录并将它们放入队列,getRecord()方法从该队列轮询:

public E current() {
    return current;
}

public boolean moveNext() {
for(;;) {
    if(cancelFlag.get()) {
        return false;
    }
    ConsumerRecord<String, GenericRecord> record = getRecord();
    if(record ==  null) {
        try {
            Thread.sleep(200L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        continue;
    }
    current = rowConvert.convertRow(record);
    return true;
    }
}

我测试了SELECT STREAM * FROM Kafka.clicks,它工作正常。 rowtime 是显式添加的第一列,值为 Kafka 的记录 Timestamp。

但是当我尝试时

SELECT STREAM FLOOR(rowtime TO HOUR) 
AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip

抛出异常

java.sql.SQLException: Error while executing SQL "SELECT STREAM FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line 1, column 119: Streaming aggregation requires at least one monotonic expression in GROUP BY clause
    at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
    at org.apache.calcite.avatica.Helper.createException(Helper.java:41)

【问题讨论】:

    标签: java sql stream apache-kafka apache-calcite


    【解决方案1】:

    您需要声明“ROWTIME”列是单调的。在MockCatalogReader 中,请注意“ROWTIME”如何在“ORDERS”和“SHIPMENTS”流中声明为单调的。这就是为什么SqlValidatorTest.testStreamGroupBy() 中的某些查询有效而其他查询无效的原因。验证器依赖的关键方法是SqlValidatorTable.getMonotonicity(String columnName)

    【讨论】:

    • 谢谢朱利安,有没有简单的方法来声明一个单调的列,或者我应该像 MockTable 那样实现?
    • @user2283216,根据这个code snipped应该足以将rowtime定义为第一列(例如index=0)以确保其单调性。
    猜你喜欢
    • 1970-01-01
    • 2015-07-29
    • 1970-01-01
    • 2017-04-02
    • 2021-01-11
    • 1970-01-01
    • 1970-01-01
    • 2017-06-07
    • 2019-09-18
    相关资源
    最近更新 更多