【问题标题】:Saving data back into Cassandra as RDD将数据作为 RDD 保存回 Cassandra
【发布时间】:2016-05-26 16:14:21
【问题描述】:

我正在尝试从 Kafka 读取消息,处理数据,然后将数据添加到 cassandra,就好像它是一个 RDD。

我的麻烦是将数据保存回 cassandra。

from __future__ import print_function

from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext

appName = 'Kafka_Cassandra_Test'
kafkaBrokers = '1.2.3.4:9092'
topic = 'test'
cassandraHosts = '1,2,3'
sparkMaster = 'spark://mysparkmaster:7077'


if __name__ == "__main__":
    conf = SparkConf()
    conf.set('spark.cassandra.connection.host', cassandraHosts)

    sc = SparkContext(sparkMaster, appName, conf=conf)

    ssc = StreamingContext(sc, 1)

    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBrokers})
    lines = kvs.map(lambda x: x[1])
    counts = lines.flatMap(lambda line: line.split(" ")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a+b)
    counts.saveToCassandra('coreglead_v2', 'wordcount')

    ssc.start()
    ssc.awaitTermination()

还有错误:

[root@gasweb2 ~]# spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/var/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
datastax#spark-cassandra-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 in spark-packages
    found org.apache.cassandra#cassandra-clientutil;2.2.2 in central
    found com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 in central
    found io.netty#netty-handler;4.0.33.Final in central
    found io.netty#netty-buffer;4.0.33.Final in central
    found io.netty#netty-common;4.0.33.Final in central
    found io.netty#netty-transport;4.0.33.Final in central
    found io.netty#netty-codec;4.0.33.Final in central
    found io.dropwizard.metrics#metrics-core;3.1.2 in central
    found org.slf4j#slf4j-api;1.7.7 in central
    found org.apache.commons#commons-lang3;3.3.2 in central
    found com.google.guava#guava;16.0.1 in central
    found org.joda#joda-convert;1.2 in central
    found joda-time#joda-time;2.3 in central
    found com.twitter#jsr166e;1.1.0 in central
    found org.scala-lang#scala-reflect;2.11.7 in central
:: resolution report :: resolve 647ms :: artifacts dl 15ms
    :: modules in use:
    com.datastax.cassandra#cassandra-driver-core;3.0.0-rc1 from central in [default]
    com.google.guava#guava;16.0.1 from central in [default]
    com.twitter#jsr166e;1.1.0 from central in [default]
    datastax#spark-cassandra-connector;1.5.0-RC1-s_2.11 from spark-packages in [default]
    io.dropwizard.metrics#metrics-core;3.1.2 from central in [default]
    io.netty#netty-buffer;4.0.33.Final from central in [default]
    io.netty#netty-codec;4.0.33.Final from central in [default]
    io.netty#netty-common;4.0.33.Final from central in [default]
    io.netty#netty-handler;4.0.33.Final from central in [default]
    io.netty#netty-transport;4.0.33.Final from central in [default]
    joda-time#joda-time;2.3 from central in [default]
    org.apache.cassandra#cassandra-clientutil;2.2.2 from central in [default]
    org.apache.commons#commons-lang3;3.3.2 from central in [default]
    org.joda#joda-convert;1.2 from central in [default]
    org.scala-lang#scala-reflect;2.11.7 from central in [default]
    org.slf4j#slf4j-api;1.7.7 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   16  |   0   |   0   |   0   ||   16  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 16 already retrieved (0kB/14ms)
16/02/15 16:26:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Traceback (most recent call last):
  File "/var/spark/scripts/kafka_cassandra.py", line 27, in <module>
    counts.saveToCassandra('coreglead_v2', 'wordcount')
AttributeError: 'TransformedDStream' object has no attribute 'saveToCassandra'

通过四处搜索,我找到了this GitHub issue,但这似乎与另一个库有关(我无法使用该库,因为我使用的是 Cassandra 3.0,并且尚不支持)。

目标是从单个消息创建聚合数据(字数仅用于测试)并将其插入到多个表中。

我几乎只使用Datastax Python Driver 并自己编写语句,但有没有更好的方法来实现这一点?

【问题讨论】:

    标签: python apache-spark cassandra pyspark spark-cassandra-connector


    【解决方案1】:

    您正在使用 Datastax 的 Spark Cassandra 连接器,它在 RDD / DStream 级别不支持 python。仅支持数据帧。请参阅docs 了解更多信息。

    我为上述连接器编写了一个包装器:PySpark Cassandra。 Datastax 的连接器功能并不完整,但有很多东西。此外,如果性能很重要,那么调查性能影响可能是值得的。

    最后,Spark 附带了 python example 使用来自 hadoop mapreduce 的 CqlInput/OutputFormat。在我看来,这不是一个对开发人员非常友好的选项,但它就在那里。

    【讨论】:

    • 感谢您的回答。我看过你的 PySpark-Cassandra 库,但是好像不支持 Cassandra 3,现在还是这样吗?
    • @JimWright,pyspark-cassandra 建立在 github.com/datastax/spark-cassandra-connector 之上。不久前发布了兼容 Cassandra 3 的版本(v1.5)。 pyspark-cassandra 的 0.3.1 版本基于该版本构建,因此支持 Cassandra 3。
    • 嗨,在 10 月 11 日,您更改了 README.md,添加了以下语句:“不再维护 PySpark Cassandra。开发工作已从 Spark 转移到纯 Python 环境。”。那么您对答案的更新是什么?
    【解决方案2】:

    查看您的代码并阅读您的问题描述:您似乎没有使用任何 Cassandra 连接器。 Spark 没有开箱即用的 Cassandra 支持,因为 RDD 和 DStream 数据类型没有 saveToCassandra 方法。您需要导入一个扩展 RDD 和 DStream 类型以支持 Cassandra 集成的外部 Spark-Cassandra 连接器。

    这就是您收到错误消息的原因:Python 在 DStream 类型上找不到任何函数 saveToCassandra,因为当前不存在。

    您需要获取 DataStax 连接器或其他连接器以使用 saveToCassandra 扩展 DStream 类型。

    【讨论】:

    • 感谢您的回复,我正在使用 datastax 的连接器:github.com/datastax/spark-cassandra-connector,我已经指定了我运行 spark-submit 的内容。我是 Python 新手,我怎么知道我应该导入什么?
    • @JimWright 你如何设置 Spark 和 PySpark?您在使用 DataStax Enterprise 吗?另外,您是在使用 pyspark shell 还是在尝试执行代码?
    • 我使用的是社区版,通过 spark-submit 在命令行上运行: spark-submit --jars /var/spark/lib/spark-streaming-kafka-assembly_2.10- 1.6.0.jar --packages datastax:spark-cassandra-connector:1.5.0-RC1-s_2.11 /var/spark/scripts/kafka_cassandra.py
    猜你喜欢
    • 2016-01-30
    • 2017-04-10
    • 2015-03-06
    • 2016-07-28
    • 1970-01-01
    • 1970-01-01
    • 2015-08-19
    • 2016-05-01
    • 2019-01-26
    相关资源
    最近更新 更多