【发布时间】:2017-11-05 02:21:45
【问题描述】:
我试图将我的流数据从 spark 保存到 cassandra,spark 连接到 kafka 并且工作正常,但是保存到 cassandra 让我变得疯狂。我正在使用 spark 2.0.2、kafka 0.10 和 cassandra 2.23,
这就是我提交给 spark 的方式
spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing
这是我的代码,它只是对 spark 示例的一些修改,它的工作原理,但我无法将这些数据保存到 cassandra....
这就是我试图做的,但只是计数结果 http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/
from __future__ import print_function
import sys
import os
import time
import pyspark_cassandra
import pyspark_cassandra.streaming
from pyspark_cassandra import CassandraSparkContext
import urllib
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max
from pyspark.sql.types import FloatType
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 1)
sqlContext = SQLContext(sc)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
counts=lines.count()
counts.saveToCassandra("spark", "count")
counts.pprint()
ssc.start()
ssc.awaitTermination()
我收到了这个错误,
Traceback(最近一次调用最后一次): 文件“/tmp/direct_kafka_wordcount5.py”,第 88 行,在 counts.saveToCassandra("spark", "count")
【问题讨论】:
标签: python apache-spark cassandra streaming apache-kafka