【问题标题】:Twitter streaming using spark and kafka: How store the data in MongoDB使用 spark 和 kafka 进行 Twitter 流式传输:如何在 MongoDB 中存储数据
【发布时间】:2018-09-18 03:28:03
【问题描述】:

我正在使用这个 python 代码收集 Twitter 流数据 https://github.com/sridharswamy/Twitter-Sentiment-Analysis-Using-Spark-Streaming-And-Kafka/blob/master/app.py

之后,我运行此代码来创建流式上下文并将数据存储在 MongoDB 中。

def main():

  conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
  sc = SparkContext(conf=conf)
  ssc = StreamingContext(sc, 10)
  ssc.checkpoint("checkpoint")   
  kstream = KafkaUtils.createDirectStream(
  ssc, topics = ['topic1'], kafkaParams = {"metadata.broker.list": 
  'localhost:9092'})
  tweets = kstream.map(lambda x: x[1].encode("ascii", "ignore"))
  #................insert in MonGODB.........................
  db.mynewcollection.insert_one(tweets)
  ssc.start()
  ssc.awaitTerminationOrTimeout(100)
  ssc.stop(stopGraceFully = True)

if __name__=="__main__":
  urllib3.contrib.pyopenssl.inject_into_urllib3()
  connection = pymongo.MongoClient('....',...)
  db = connection['twitter1']
  db.authenticate('..','...')
  main()

但我收到了这个错误:

TypeError: document must be an instance of dict, bson.son.SON, bson.raw_bson.RawBSONDocument, or a type that inherits from collections.MutableMapping

我也尝试使用'foreachRDD'并创建函数'save'

tweets.foreachRDD(Save)

我将“插入”移到了这个函数中

def Save(rdd):
if not rdd.isEmpty():
    db.mynewcollection.insert_one(rdd)

但它不起作用

TypeError: can't pickle _thread.lock objects

谁能帮我知道如何在 MongoDB 中存储流数据

【问题讨论】:

    标签: python mongodb apache-spark apache-kafka spark-streaming


    【解决方案1】:
    • 出现第一个错误是因为您将分布式对象传递给db.mynewcollection.insert_one

    • 第二个错误是因为你在驱动上初始化了数据库连接,一般情况下连接对象不能被序列化。

    虽然存在许多 Spark / MongoDB 连接器,但您应该看看 (Getting Spark, Python, and MongoDB to work together) 一个通用模式是使用 foreachPartition。定义助手

    def insert_partition(xs):
        connection = pymongo.MongoClient('....',...)
        db = connection['twitter1']
        db.authenticate('..','...')
        db.mynewcollection.insert_many(xs)
    

    然后:

    def to_dict(s):
        return ... # Convert input to a format acceptable by `insert_many`, for example with json.loads
    
    tweets.map(to_dict) \
        .foreachRDD(lambda rdd: rdd.foreachPartition(insert_partition))
    

    【讨论】:

    • 我收到错误 'db.mynewcollection.insert_many(xs) 文件“C:\Users\1\Miniconda3\lib\site-packages\pymongo\collection.py”,第 742 行,在 insert_many blk .execute(self.write_concern.document, session=session) 文件“C:\Users\1\Miniconda3\lib\site-packages\pymongo\bulk.py”,第 414 行,在执行 raise InvalidOperation('没有要执行的操作') pymongo.errors.InvalidOperation: 没有要执行的操作'
    • 您还必须将输入解析为 mongo 可以接受的格式
    • 你能帮忙吗?我试过这个' tweets = kstream.map(lambda x: json.dumps(x[1]).encode("ascii", "ignore") '但它不起作用。
    • kstream.map(json.loads) 如果输入包含有效的 JSON 文档。
    • 我注意到在on_data 我只发送推文文本。我对其进行了修改以发送整个推文内容self.producer.send_messages(b'topic', data.encode('utf-8')),它现在正在工作。谢谢。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-11-22
    • 1970-01-01
    • 2019-05-13
    • 2015-12-12
    • 2019-05-19
    • 2018-07-22
    相关资源
    最近更新 更多