【问题标题】:How can I improve the following code performance to ingest 1 million record /second如何提高以下代码性能以摄取 100 万条记录/秒
【发布时间】:2016-03-11 04:55:39
【问题描述】:

以下代码每秒摄取 10k-20k 条记录,我想提高它的性能。我正在阅读 json 格式并使用 Kafka 将其摄取到数据库中。 -我在安装了 zookeeper 和 Kafka 的五个节点的集群上运行它。

你能给我一些改进的建议吗?

import os
import json
from multiprocessing import Pool
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer


def process_line(line):
    producer = SimpleProducer(client)
    try:
       jrec = json.loads(line.strip())
       producer.send_messages('twitter2613',json.dumps(jrec))
    except ValueError, e:
                {}


if __name__ == "__main__":
    client = KafkaClient('10.62.84.35:9092')
    myloop=True
    pool = Pool(30)


    direcToData = os.listdir("/FullData/RowData")
    for loop in direcToData:
        mydir2=os.listdir("/FullData/RowData/"+loop)

        for i in mydir2:
            if  myloop:
                 with open("/FullData/RowData/"+loop+"/"+i) as source_file:
                     # chunk the work into batches of 4 lines at a time
                     results = pool.map(process_line, source_file, 30)

【问题讨论】:

标签: python json apache-kafka kafka-python


【解决方案1】:

您可以只导入您需要的操作系统的功能。这可以是第一次优化。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-27
    • 2016-06-20
    • 2019-08-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多