【问题标题】:Structured Stream-Stream join is not happening in JSON from Kafka topic来自 Kafka 主题的 JSON 中没有发生结构化流-流连接
【发布时间】:2019-04-10 00:13:28
【问题描述】:

应用监听2个kafka主题

  1. 用户事件

  2. 支付事件

用户事件的负载

{"userId":"Id_223","firstname":"fname_223","lastname":"lname_223","phonenumber":"P98202384_223","usertimestamp":"Apr 5, 2019 2:58:47 PM"}

payload 事件

{"paymentUserId":"Id_227","amount":1227.0,"location":"location_227","paymenttimestamp":"Apr 5, 2019 3:00:03 PM"}

基于 userId=paymentuserid ,我们需要合并记录。

似乎应用程序无法解析来自 Kafka 主题的记录。

from_json 上一定有一些我想念的东西。

有人可以提供早期反馈吗?

这是没有任何连接发生的控制台输出。没有记录。

+------+---------+--------+-----------+-------------+-------------+------+--------+----------------+
|userId|firstname|lastname|phonenumber|usertimestamp|paymentuserId|amount|location|paymenttimestamp|
+------+---------+--------+-----------+-------------+-------------+------+--------+----------------+
+------+---------+--------+-----------+-------------+-------------+------+--------+----------------+

这是代码。

import org.apache.spark.SparkConf;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import static org.apache.spark.sql.functions.expr;

@SpringBootApplication
public class Stream2StreamJoin  implements CommandLineRunner{



    private static final Logger LOGGER =
              LoggerFactory.getLogger(Stream2StreamJoin.class);

    @Value("${kafka.bootstrap.server}")
    private String bootstrapServers;

    @Value("${kafka.userevent}")
    private String usereventTopic;

    @Value("${kafka.paymentevent}")
    private String paymenteventTopic;

    public void processData() {

        System.out.println(bootstrapServers);
        System.out.println(usereventTopic);
        System.out.println(paymenteventTopic);

        LOGGER.info(bootstrapServers);
        LOGGER.info(usereventTopic);
        LOGGER.info(paymenteventTopic);


        SparkConf sparkConf = new SparkConf().setAppName("Stream2StreamJoin").setMaster("local[*]");

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));



        SparkSession spark = SparkSession
                  .builder()
                  .appName("Stream2StreamJoin")
                  .getOrCreate();

        spark.sparkContext().setLogLevel("ERROR");

        StructType userSchema =  DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("userId", DataTypes.StringType, true),
                DataTypes.createStructField("firstname", DataTypes.StringType, true),
                DataTypes.createStructField("lastname", DataTypes.StringType, true),
                DataTypes.createStructField("phonenumber", DataTypes.StringType, true),
                DataTypes.createStructField("usertimestamp", DataTypes.TimestampType, true)
                });


        StructType paymentSchema =  DataTypes.createStructType(new StructField[] { 
                DataTypes.createStructField("paymentuserId", DataTypes.StringType, true),
                DataTypes.createStructField("amount", DataTypes.StringType, true),
                DataTypes.createStructField("location", DataTypes.StringType, true),                
                DataTypes.createStructField("paymenttimestamp", DataTypes.TimestampType, true)
                });



        Dataset<Row> userDataSet=spark.readStream().format("kafka")
                  .option("kafka.bootstrap.servers", bootstrapServers)
                  .option("subscribe", usereventTopic)
                  .option("startingOffsets", "earliest")
                  .load().selectExpr("CAST(value  AS STRING) as userEvent")
                     .select(functions.from_json(functions.col("userEvent"),userSchema).as("user"))
                     .select("user.*")
                     ; 



        Dataset<Row> paymentDataSet=spark.readStream().format("kafka")
                  .option("kafka.bootstrap.servers", bootstrapServers)
                  .option("subscribe", paymenteventTopic)
                  .option("startingOffsets", "earliest")
                  .load().selectExpr("CAST( value AS STRING) as paymentEvent")
                     .select(functions.from_json(functions.col("paymentEvent"),paymentSchema).as("payment"))
                     .select("payment.*")
                     ;

        Dataset<Row> userDataSetWithWatermark = userDataSet.withWatermark("usertimestamp", "2 hours");

        Dataset<Row> paymentDataSetWithWatermark = paymentDataSet.withWatermark("paymenttimestamp", "3 hours");

        Dataset<Row> joindataSet =  userDataSetWithWatermark.join(
                paymentDataSetWithWatermark,
                  expr(
                          "userId = paymentuserId AND usertimestamp >= paymenttimestamp AND usertimestamp <= paymenttimestamp + interval 1 hour")
                );

        joindataSet.writeStream().format("console").start();



        try {

            spark.streams().awaitAnyTermination();
        } catch (StreamingQueryException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }



        }

    @Override
    public void run(String... args) throws Exception {
        processData();

    }

    public static void main(String[] args) throws Exception {

        System.setProperty("hadoop.home.dir", "/Users/workspace/java/spark-kafka-streaming");

        SpringApplication.run(Stream2StreamJoin.class, args);
    }

}

【问题讨论】:

  • 必须是Spark吗?也许 KSQL 或 Kafka Streams 会更容易?
  • 建议(增量构建):1)您是否检查了任一数据集是否产生记录(当发送到控制台流时)? 2) 尝试无水印并验证连接是否产生任何数据 3) 现在添加水印并检查最终结果
  • KQL 和 kafka 流在这里不是选项,因为我们坚信开源产品。感谢您在此处为其他人添加其他替代方案
  • 我们尝试仅以这种方式调试应用程序,但 google gson 在生成消息并显示与 jackson 相同的架构时确实表现得很奇怪,但 kafka 在消费时无法理解。
  • 也许你的日期格式不是 TimestampType 可以自动理解的。您可以将其转换为 StringType 并查看行是否正在打印,然后应用日期函数将其转换为 TimestampType。

标签: java apache-spark apache-kafka spark-streaming spark-structured-streaming


【解决方案1】:

解决了在事件生产者上使用 jackson 库而不是 google gson 库的问题。

消费者端无法理解从主题接收的 json 对象是什么。

~不断学习不断成长

【讨论】:

    猜你喜欢
    • 2020-02-27
    • 2019-11-25
    • 1970-01-01
    • 2020-10-17
    • 1970-01-01
    • 1970-01-01
    • 2020-09-23
    • 2020-08-18
    • 2019-07-29
    相关资源
    最近更新 更多