【问题标题】:Kafka createDirectStream using PySparkKafka 使用 PySpark 创建DirectStream
【发布时间】:2020-05-27 12:14:59
【问题描述】:

我的主要目标是连接 Kafka,创建一个 DStream,将其作为行保存到局部变量并将其写入 mongo db,并在 PySpark 中实现端到端流程。

但是我在第一步中遇到了问题,在创建 DStream 时,错误是“java.util.ArrayList 无法转换为 java.lang.String”。你能帮我确定修复吗? 详情如下,

我正在尝试使用 pyspark 连接 kafka,如下所示,

kafkaParams = {"metadata.broker.list": ['host1:port','host2:port','host3:port'],
"security.protocol":"ssl",
"ssl.key.password":"***",
"ssl.keystore.location":"/path1/file.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"/path1/file2.jks",
"ssl.truststore.password":"***"}

directKafkaStream = KafkaUtils.createDirectStream(ssc,["lac.mx.digitalchannel.raw.s015-txn-qrdc"],kafkaParams)

但我收到错误,不知道如何处理它,

py4j.protocol.Py4JJavaError: An error occurred while calling o120.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
        at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:419)
        at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:54)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:131)
        at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
        at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
        at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)

另外,打开我正在使用的 PySpark CLI,

pyspark2 --master local --jars /path/spark-streaming-kafka-0-10_2.11-2.4.0.cloudera2.jar,/path/kafka-clients-2.0.0-cdh6.1.0.jar,/path/spark-sql-kafka-0-10_2.11-2.4.0.cloudera2.jar  --files file.jks,file2.jks

【问题讨论】:

  • 注意:从 Spark 2.4 开始,Spark Streaming API 已被弃用

标签: apache-spark pyspark apache-kafka spark-streaming


【解决方案1】:

metadata.broker.list 需要是逗号分隔的字符串,而不是列表

主要目的是连接Kafka,创建一个DStream,将其作为行保存到局部变量并写入mongo

  1. Mongo 支持结构化流写入

  2. Mongo 还有一个 Kafka Connect 插件,只需要一个配置文件,不需要 Spark 集群或代码

【讨论】:

  • 感谢它与您推荐的更改一起工作,我通过阅读结构化流实现了这一点。现在坚持使用Mongo。在执行写入时出错,因为 u"'write' 不能在流数据集/数据帧上调用;"。你有什么建议吗?
  • 替换为writeStream
猜你喜欢
  • 2016-08-02
  • 2015-11-05
  • 2018-12-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-11-02
  • 2018-10-17
  • 1970-01-01
相关资源
最近更新 更多