【问题标题】:How to wait for other python processes to put string into queue before putting next item into queue如何在将下一个项目放入队列之前等待其他python进程将字符串放入队列
【发布时间】:2021-04-19 22:23:29
【问题描述】:

我有一个小例子来解决我的问题:

from multiprocessing import Process, Queue

def test_1(q,j):
    for i in range(10):
        q.put('test_{}: '.format(j) + str(i),block=False)



q = Queue()
p1 = Process(target=test_1, args=(q,1))
p2 = Process(target=test_1, args=(q,2))
p1.start()
p2.start()

with open('test1.txt', 'w') as file:
    while p1.is_alive() or p2.is_alive() or not q.empty():
        try:
            value = q.get(timeout = 1)
            file.write(value + '\n')
        except Exception as qe:
            print("Empty Queue or dead process")

p1.join()
p2.join()

那么我的输出是这样的:

test_1: 0
test_1: 1
test_1: 2
test_1: 3
test_1: 4
test_1: 5
test_1: 6
test_1: 7
test_1: 8
test_1: 9
test_2: 0
test_2: 1
test_2: 2
test_2: 3
test_2: 4
test_2: 5
test_2: 6
test_2: 7
test_2: 8
test_2: 9

我怎样才能得到这样的输出:

test_1: 0
test_2: 0
test_1: 1
test_2: 1
test_1: 2
test_2: 2
.
.
.

等等。

如果有人可以帮助我,那就太好了。

出于您的兴趣,我想稍后使用它来匹配输入和输出向量作为机器学习方法的训练数据。

提前致谢,

亚兹

【问题讨论】:

    标签: python multiprocessing queue python-multiprocessing python-multithreading


    【解决方案1】:

    一般来说,当我想在 python 中使用多线程时,我使用 ``ThreadPool``` 我不确定这是否真的是您想要的,但您可以使用此处编写的此函数轻松获得结果:

    from multiprocessing.pool import ThreadPool
    from psutil import cpu_count
    
    def test_1(thread):
        for i in range(10):
            string = ('test_{}: '.format(thread) + str(i))
            print(string)
    
    pool = ThreadPool(processes=cpu_count(logical=True))
    
    lines_async = pool.apply_async(test_1, args=([1]))
    lines_async2 = pool.apply_async(test_1, args=([2]))
    lines_async.get()
    lines_async2.get()
    

    结果在这里:

    
    test_1: 0
    test_2: 0
    test_1: 1
    test_2: 1
    test_1: 2
    test_2: 2
    test_1: 3
    test_2: 3
    test_1: 4
    test_2: 4
    test_1: 5
    test_2: 5
    test_1: 6
    test_2: 6
    test_1: 7
    test_2: 7
    test_2: 8
    test_1: 8
    test_2: 9
    test_1: 9
    

    但这与您执行的方法不同

    编辑:如果你想获得函数的值,你可以使用它:

    from multiprocessing.pool import ThreadPool
    from psutil import cpu_count
    
    pool = ThreadPool(processes=cpu_count(logical=True))
    
    def test_1(thread, i):
        string = ('test_{}: '.format(thread) + str(i))
        return string
    
    for i in range(10):
        lines_async = pool.apply_async(test_1, args=([1,i]))
        lines_async2 = pool.apply_async(test_1, args=([2,i]))
        string1 = lines_async.get()
        string2 = lines_async2.get()
        print(string1)
        print(string2)
    

    这比你想要的更相似,并给出相同的结果。

    【讨论】:

      【解决方案2】:

      这里有一个基于并发的解决方案(真正的并发问题/解决方案):

      from multiprocessing import Queue
      from threading import Thread, Lock, Condition
      
      
      def test_1(q,j,sem,other_has_writen,th):
          for i in range(10):
              # print("Th%i adquires conditional sem"%j)
              other_has_writen.acquire()
              # print("Th%i th=%i"%(j,th[0]))
              if th[0]!=j:
                  # print("Th%i on IF"%j)
                  # print("Th%i adquires sem"%j)
                  sem.acquire()
                  th[0] = j
                  # print("Th%i releases sem"%j)
                  sem.release()
                  # print("Th%i th_after=%i"%(j,th[0]))
                  # print("Th%i prints -----> %i"%(j,i))
                  q.put('test_{}: '.format(j) + str(i),block=False)
                  other_has_writen.notify_all()
              else:
                  # print("Th%i waits"%j)
                  other_has_writen.wait()
                  q.put('test_{}: '.format(j) + str(i),block=False)
              other_has_writen.release()
              # print("Th%i releases conditional sem"%j)
          # other_has_writen.notify_all()
              
      th = [1]
      sem = Lock()
      other_has_writen = Condition()
      q = Queue()
      p1 = Thread(target=test_1, args=(q,1,sem,other_has_writen,th))
      p2 = Thread(target=test_1, args=(q,2,sem,other_has_writen,th))
      p1.start()
      p2.start()
      
      # print('------- Program continues -------')
      
      with open('test1.txt', 'w') as file:
          while p1.is_alive() or p2.is_alive() or not q.empty():
              try:
                  value = q.get(timeout = 1)
                  file.write(value + '\n')
              except Exception as qe:
                  print("Empty Queue or dead process")
      
      p1.join()
      p2.join()
      

      输出将始终交替,因为您需要另一个线程通知他已完成写入:

      test_2: 0
      test_1: 0
      test_1: 1
      test_2: 1
      test_2: 2
      test_1: 2
      test_1: 3
      test_2: 3
      test_2: 4
      test_1: 4
      test_1: 5
      test_2: 5
      test_2: 6
      test_1: 6
      test_1: 7
      test_2: 7
      test_2: 8
      test_1: 8
      test_1: 9
      test_2: 9
      

      【讨论】:

        猜你喜欢
        • 2012-05-29
        • 1970-01-01
        • 1970-01-01
        • 2012-03-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-09-12
        相关资源
        最近更新 更多