【发布时间】:2020-05-27 12:14:59
【问题描述】:
我的主要目标是连接 Kafka,创建一个 DStream,将其作为行保存到局部变量并将其写入 mongo db,并在 PySpark 中实现端到端流程。
但是我在第一步中遇到了问题,在创建 DStream 时,错误是“java.util.ArrayList 无法转换为 java.lang.String”。你能帮我确定修复吗? 详情如下,
我正在尝试使用 pyspark 连接 kafka,如下所示,
kafkaParams = {"metadata.broker.list": ['host1:port','host2:port','host3:port'],
"security.protocol":"ssl",
"ssl.key.password":"***",
"ssl.keystore.location":"/path1/file.jks",
"ssl.keystore.password":"***",
"ssl.truststore.location":"/path1/file2.jks",
"ssl.truststore.password":"***"}
directKafkaStream = KafkaUtils.createDirectStream(ssc,["lac.mx.digitalchannel.raw.s015-txn-qrdc"],kafkaParams)
但我收到错误,不知道如何处理它,
py4j.protocol.Py4JJavaError: An error occurred while calling o120.createDirectStreamWithoutMessageHandler.
: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String
at org.apache.spark.streaming.kafka.KafkaCluster$SimpleConsumerConfig$.apply(KafkaCluster.scala:419)
at org.apache.spark.streaming.kafka.KafkaCluster.config(KafkaCluster.scala:54)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitionMetadata(KafkaCluster.scala:131)
at org.apache.spark.streaming.kafka.KafkaCluster.getPartitions(KafkaCluster.scala:120)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:212)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStream(KafkaUtils.scala:721)
at org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper.createDirectStreamWithoutMessageHandler(KafkaUtils.scala:689)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
另外,打开我正在使用的 PySpark CLI,
pyspark2 --master local --jars /path/spark-streaming-kafka-0-10_2.11-2.4.0.cloudera2.jar,/path/kafka-clients-2.0.0-cdh6.1.0.jar,/path/spark-sql-kafka-0-10_2.11-2.4.0.cloudera2.jar --files file.jks,file2.jks
【问题讨论】:
-
注意:从 Spark 2.4 开始,Spark Streaming API 已被弃用
标签: apache-spark pyspark apache-kafka spark-streaming