【问题标题】:How to process and save the continuous data coming from sensor如何处理和保存来自传感器的连续数据
【发布时间】:2020-03-24 18:44:59
【问题描述】:

例子:

我在车上安装了一个传感器,就是连续发送数据,现在,我必须处理(融合)来自传感器的连续数据,但同时过程将完成其执行,数据也将即将到来,如何存储即将到来的数据,而进程需要时间执行以供将来使用?

    sample code:

    buffer1=[]
    buffer2=[]

    def process_function(buffer):
        //processing

    while true:
        //data receiving continously
        buffer1.append(data)
        if len(buffer1)>0: process(buffer1)
        buffer2.append(data)

(while the process_function will take buffer1 to process, at the same time, the continuous data should be stored in buffer2 so that after finishing the process_function with buffer1 can process with buffer2 and repeat.)

【问题讨论】:

  • 我的建议你可以使用 kafka 流媒体工具。您不需要使用缓冲区管理数据。所有你需要的卡夫卡都会处理
  • 我们需要的信息远不止这些。
  • @AlexanderCécile 请说出您的需求。
  • @prabhuiitdhn 有关传感器的信息,一个...
  • 正如@beyhan 所建议的那样,你为什么不使用 Kafka.. 它非常简单.. 有一个进程通过 Kafka 一直等待传感器信息.. 并从传感器..如果将来你有来自同一个队列的多线程进程,Kafka 本身将管理..

标签: python python-multiprocessing python-multithreading


【解决方案1】:

您可以使用一个多处理队列和两个进程。一份给生产者,一份给消费者:

from multiprocessing import Process, Queue

def collection_sensor_values(mp_queue):
    fake_value = 0
    while True:
        mp_queue.put(f"SENSOR_DATA_{fake_value}")
        fake_value += 1
        time.sleep(2)

def process_function(mp_queue):
    while True:
        sensor_reading = mp_queue.get(block=True)
        print(f"Received sensor reading: {sensor_reading}")

q = Queue()
sensor_collector_process = Process(target=collection_sensor_values, args=(q,))
readings_process = Process(target=process_function, args=(q,))
all_procs = [sensor_collector_process, readings_process]

for p in all_procs:
    p.start()

for p in all_procs:
    # run until either process stops
    if p.is_alive():
        p.join()

for p in all_procs:
    if p.is_alive():
        p.terminate()

【讨论】:

    猜你喜欢
    • 2017-06-11
    • 1970-01-01
    • 2016-12-31
    • 1970-01-01
    • 1970-01-01
    • 2021-02-09
    • 2011-06-04
    • 2016-05-02
    • 1970-01-01
    相关资源
    最近更新 更多