【问题标题】:spark, cassandra, streaming, python, error, database, kafka火花,卡桑德拉,流媒体,蟒蛇,错误,数据库,卡夫卡
【发布时间】:2017-11-05 02:21:45
【问题描述】:

我试图将我的流数据从 spark 保存到 cassandra,spark 连接到 kafka 并且工作正常,但是保存到 cassandra 让我变得疯狂。我正在使用 spark 2.0.2、kafka 0.10 和 cassandra 2.23,

这就是我提交给 spark 的方式

spark-submit --verbose --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.0 --jars /tmp/pyspark-cassandra-0.3.5.jar --driver-class-path /tmp/pyspark-cassandra-0.3.5.jar --py-files /tmp/pyspark-cassandra-0.3.5.jar --conf spark.cassandra.connection.host=localhost /tmp/direct_kafka_wordcount5.py localhost:9092 testing

这是我的代码,它只是对 spark 示例的一些修改,它的工作原理,但我无法将这些数据保存到 cassandra....

这就是我试图做的,但只是计数结果 http://rustyrazorblade.com/2015/05/spark-streaming-with-python-and-kafka/

    from __future__ import print_function
import sys
import os
import time
import pyspark_cassandra
import pyspark_cassandra.streaming
from pyspark_cassandra import CassandraSparkContext
import urllib
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max
from pyspark.sql.types import FloatType
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: direct_kafka_wordcount.py <broker_list> <topic>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 1)
    sqlContext = SQLContext(sc)
    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    lines = kvs.map(lambda x: x[1])
    counts=lines.count()
    counts.saveToCassandra("spark", "count")
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

我收到了这个错误,

Traceback(最近一次调用最后一次): 文件“/tmp/direct_kafka_wordcount5.py”,第 88 行,在 counts.saveToCassandra("spark", "count")

【问题讨论】:

    标签: python apache-spark cassandra streaming apache-kafka


    【解决方案1】:

    Pyspark Casasndra 不久前停止更新,最新版本仅支持 Spark 1.6 https://github.com/TargetHolding/pyspark-cassandra

    另外

    counts=lines.count() // Returns data to the driver (not an RDD)
    

    counts 现在是一个整数。这意味着函数 saveToCassandra 不适用,因为这是 RDD 的函数

    【讨论】:

    • 如何将积分器传递给 savecassandra? ,我知道 pyspark cassandra 已经过时了,但我使用的是 spark 1.6
    猜你喜欢
    • 2017-11-14
    • 2016-01-27
    • 2018-08-14
    • 2018-08-15
    • 2016-08-03
    • 2018-09-15
    • 2019-04-11
    • 1970-01-01
    • 2018-02-24
    相关资源
    最近更新 更多