24017

Python multiprocessing Pool Queues communication

<h3>Question</h3>

I'm trying to implement a pool of two processes that run in parallel and communicate through a queue.

The goal is to have a <em>writer</em> process that passes a message to a <em>reader</em> process by using a <em>queue</em>.

Each process is printing a feedback on the terminal in order to have a feedback.

Here is the code:

#!/usr/bin/env python import os import time import multiprocessing as mp import Queue def writer(queue): pid = os.getpid() for i in range(1,4): msg = i print "### writer ", pid, " -> ", msg queue.put(msg) time.sleep(1) msg = 'Done' print '### '+msg queue.put(msg) def reader(queue): pid = os.getpid() time.sleep(0.5) while True: print "--- reader ", pid, " -> ", msg = queue.get() print msg if msg == 'Done': break if __name__ == "__main__": print "Initialize the experiment PID: ", os.getpid() mp.freeze_support() queue = mp.Queue() pool = mp.Pool() pool.apply_async(writer, (queue)) pool.apply_async(reader, (queue)) pool.close() pool.join()

The output I am expecting should be something like this:

Initialize the experiment PID: 2341 writer 2342 -> 1 reader 2343 -> 1 writer 2342 -> 2 reader 2343 -> 2 writer 2342 -> 3 reader 2343 -> 3 Done

However I only get the line:

Initialize the experiment PID: 2341

then the script quits.

What is the correct way to implement the interprocess communication of two processes in a pool that communicates through a queue?


<h3>Answer1:</h3>

I Used mp.Manager().Queue() as the queue because we couldn't directly pass Queue. Trying to directly use the Queue was causing exceptions but getting unhandled since we were using apply_async.

I updated your codes to:

#!/usr/bin/env python import os import time import multiprocessing as mp import Queue def writer(queue): pid = os.getpid() for i in range(1,4): msg = i print "### writer ", pid, " -> ", msg queue.put(msg) time.sleep(1) msg = 'Done' print '### '+msg queue.put(msg) def reader(queue): pid = os.getpid() time.sleep(0.5) while True: print "--- reader ", pid, " -> ", msg = queue.get() print msg if msg == 'Done': break if __name__ == "__main__": print "Initialize the experiment PID: ", os.getpid() manager = mp.Manager() queue = manager.Queue() pool = mp.Pool() pool.apply_async(writer, (queue,)) pool.apply_async(reader, (queue,)) pool.close() pool.join()

And I got this output:

Initialize the experiment PID: 46182 ### writer 46210 -> 1 --- reader 46211 -> 1 ### writer 46210 -> 2 --- reader 46211 -> 2 ### writer 46210 -> 3 --- reader 46211 -> 3 ### Done --- reader 46211 -> Done

I believe this is what you expected.

来源:https://stackoverflow.com/questions/34581072/python-multiprocessing-pool-queues-communication

Recommend