【问题标题】:Syntax error in CQL query when trying to write to cassandra from python尝试从 python 写入 cassandra 时 CQL 查询中的语法错误
【发布时间】:2019-03-19 05:08:31
【问题描述】:

所以,我正在用 python 构建一个应用程序,它从 twitter 获取数据,然后将其保存到 cassandra。我目前的问题在于一个脚本,它从 kafka 读取数据并尝试将其写入 cassandra,如下所示:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer




class Consumer(multiprocessing.Process):
   def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_event = multiprocessing.Event()

    def stop(self):
         self.stop_event.set()

    def run(self):
       consumer = KafkaConsumer(bootstrap_servers='localhost:9092',
                                 auto_offset_reset='earliest',
                                 consumer_timeout_ms=1000)
        consumer.subscribe(['twitter'])



    while not self.stop_event.is_set():
        for message in consumer:
            # session.execute(
            #     """
            #     INSERT INTO mensaje_73 (tweet)
            #     VALUES (message)
            #     """
            # )
            print(message)
            cluster = Cluster()
            session = cluster.connect('twitter')
            session.execute(
                    """
                    INSERT INTO mensaje_73 (tweet)
                    VALUES (message)
                    """
                )

            # if self.stop_event.is_set():
            #     break

    consumer.close()


   def main():

    tasks = [
        Consumer()
    ]

    for t in tasks:
        t.start()

    time.sleep(10)

    for task in tasks:
        task.stop()



if __name__ == "__main__":
     logging.basicConfig(
        format='%(asctime)s.%(msecs)s:%(name)s:%(thread)d:% 
   (levelname)s:%(process)d:%(message)s',
        level=logging.INFO
    )
    main()

我尝试将测试消息插入到表 twitter.mensaje_73 中,并且效果很好,如下所示:

import threading, logging, time
import multiprocessing
from cassandra.cluster import Cluster

from kafka import KafkaConsumer, KafkaProducer


cluster = Cluster()
session = cluster.connect('twitter')
session.execute(
    """
    INSERT INTO mensaje_73 (tweet)
    VALUES ('helooo')
    """
)

任何帮助将不胜感激:)

【问题讨论】:

    标签: python-3.x cassandra kafka-consumer-api cassandra-driver


    【解决方案1】:

    所以这里的问题是,您的 message 变量在 CQL 中被视为文字,如果没有单引号,它将无法工作。因此,错误。

    为了解决这个问题,我会使用准备好的语句,然后将message绑定到它:

    session = cluster.connect('twitter')
    preparedTweetInsert = session.prepare(
            """
            INSERT INTO mensaje_73 (tweet)
            VALUES (?)
            """
        )
    session.execute(preparedTweetInsert,[message])
    

    试一试,看看是否有帮助。

    此外,这似乎是一个简单的数据模型。但是要问自己一件事,您将如何查询这些数据?除非tweet 是您唯一的主键,否则这将不起作用。这也意味着您可以查询单个推文的唯一方法是通过消息的确切文本。需要考虑一些事情,但按天分区可能是更好的选择,因为它可以很好地分布并提供更好的查询模型。

    【讨论】:

    • 嘿!太感谢了。您的回答非常有见地,我实际上只是想将 anything 保存到 cassandra,然后再担心如何正确查询它。我一回到家,我会尝试你的解决方法并让你知道。真的非常感谢!! :)
    • 所以!你是绝对正确的,但是,你能告诉我如何使用 python 将数据(来自 Kafka)插入到 cassandra 中吗?非常感谢你:)
    猜你喜欢
    • 2012-06-23
    • 2016-09-26
    • 2014-01-25
    • 2019-05-18
    • 2016-08-17
    • 2022-01-16
    • 2013-01-02
    • 2014-04-21
    • 1970-01-01
    相关资源
    最近更新 更多