【问题标题】:Not able to execute multiple queries in Spark structured streaming无法在 Spark 结构化流中执行多个查询
【发布时间】:2018-10-28 21:06:40
【问题描述】:

我创建了一个示例代码来执行多个查询。但我只得到第一个查询的输出。 在日志中,我可以看到所有查询都在运行。不确定我做错了什么。

public class A extends D implements Serializable {

    public Dataset<Row> getDataSet(SparkSession session) {
        Dataset<Row> dfs = session.readStream().format("socket").option("host", hostname).option("port", port).load();
        publish(dfs.toDF(), "reader");
        return dfs;
    }

}

public class B extends D implements Serializable {

    public Dataset<Row> execute(Dataset<Row> ds) {
       Dataset<Row> d = ds.select(functions.explode(functions.split(ds.col("value"), "\\s+")));
        publish(d.toDF(), "component");
        return d;
    }
}

public class C extends D implements Serializable {

    public Dataset<Row> execute(Dataset<Row> ds) {

        publish(inputDataSet.toDF(), "console");
        ds.writeStream().format("csv").option("path", "hdfs://hostname:9000/user/abc/data1/")
                .option("checkpointLocation", "hdfs://hostname:9000/user/abc/cp").outputMode("append").start();
        return ds;
    }

}

public class D {

    public void publish(Dataset<Row> dataset, String directory) {
        dataset.writeStream().format("csv").option("path", "hdfs://hostname:9000/user/abc/" + directory)
                .option("checkpointLocation", "hdfs://hostname:9000/user/abc/checkpoint/" + directory).outputMode("append")
                .start();

    }
}

public static void main(String[] args) {

    SparkSession session = createSession();
    try {
        A a = new A();
        Dataset<Row> records = a.getDataSet(session);

        B b = new B();
        Dataset<Row> ds = b.execute(records);

        C c = new C();
        c.execute(ds);
        session.streams().awaitAnyTermination();
    } catch (StreamingQueryException e) {
        e.printStackTrace();
    }
}

【问题讨论】:

    标签: java apache-spark spark-structured-streaming


    【解决方案1】:

    问题是由于您正在读取的输入套接字。Spark 套接字源打开两个到 nc 的连接(即,因为您有两个启动)。 nc 的一个限制是它只能将数据提供给一个连接。对于其他输入源,您的查询应该运行良好。 请参阅相关问题: Executing separate streaming queries in spark structured streaming

    尝试了如下所示的简单测试并打印两个输出:

     val df1 = spark.readStream.format("socket").option("host","localhost").option("port",5430).load()
    
      val df9 = spark.readStream.format("socket").option("host","localhost").option("port",5431).load()
    
    
      val df2 = df1.as[String].flatMap(x=>x.split(","))
    
      val df3 = df9.as[String].flatMap(x=>x.split(",")).select($"value".as("name"))
    
     val sq1 = df3.writeStream.format("console").queryName("sq1")
        .option("truncate","false").trigger(Trigger.ProcessingTime(10 second)).start()
    
      val sq = df2.writeStream.format("console").queryName("sq")
        .option("truncate","false").trigger(Trigger.ProcessingTime(20 second)).start()
    
    spark.streams.awaitAnyTermination()
    

    【讨论】:

    • 我对单分区的 kafka 源有同样的问题
    猜你喜欢
    • 2018-01-18
    • 2021-02-06
    • 2018-08-11
    • 2018-02-17
    • 1970-01-01
    • 1970-01-01
    • 2021-02-19
    • 2018-12-12
    • 1970-01-01
    相关资源
    最近更新 更多