【问题标题】:read a data from kafka topic and aggregate using spark tempview?从 kafka 主题读取数据并使用 spark tempview 聚合?
【发布时间】:2018-12-05 16:03:30
【问题描述】:

我想从 kafka 主题中读取数据,并创建 spark tempview 以按某些列分组?

+----+--------------------+
| key|               value|          
+----+--------------------+
|null|{"e":"trade","E":...|
|null|{"e":"trade","E":...|
|null|{"e":"trade","E":...|

但我无法从 tempview 聚合数据?? value 列数据存储为 String???

Dataset<Row> data = spark
                  .readStream()
                  .format("kafka")
                  .option("kafka.bootstrap.servers", "localhost:9092,localhost:9093")
                  .option("subscribe", "data2-topic")
                  .option("startingOffsets", "latest")
                  .option ("group.id", "test")
                  .option("enable.auto.commit", "true")
                  .option("auto.commit.interval.ms", "1000")          
                  .load();
          data.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
          data.createOrReplaceTempView("Tempdata");
          data.show();
Dataset<Row> df2=spark.sql("SELECT e FROM Tempdata group by e");
df2.show();

【问题讨论】:

    标签: apache-spark apache-kafka dataset


    【解决方案1】:

    值列数据存储为字符串???

    是的..因为你CAST(value as STRING)

    您需要使用from_json 函数将行加载到您可以在其中搜索的适当数据框中。

    有关示例,请参阅 Structured Streaming on Kafka 上的 Databrick 博客

    如果主要目标只是对某些字段进行分组,那么 KSQL 可能是一种替代方案。

    【讨论】:

    • 如何聚合字符串值??
    • 我不知道你的主题中有什么数据,所以很难说。不过,我建议先获取整个值数据框,而不是字符串。如果您有新问题,最好为它创建一个新帖子
    猜你喜欢
    • 2021-06-11
    • 2023-03-19
    • 2020-09-18
    • 1970-01-01
    • 1970-01-01
    • 2021-08-20
    • 2019-06-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多