79477

How to get all pool.apply_async processes to stop once any one process has found a match in python

Question:

I have the following code that is leveraging multiprocessing to iterate through a large list and find a match. How can I get all processes to stop once a match is found in any one processes? I have seen examples but I none of them seem to fit into what I am doing here.

#!/usr/bin/env python3.5 import sys, itertools, multiprocessing, functools alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;" num_parts = 4 part_size = len(alphabet) // num_parts def do_job(first_bits): for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): # CHECK FOR MATCH HERE print(''.join(x)) # EXIT ALL PROCESSES IF MATCH FOUND if __name__ == '__main__': pool = multiprocessing.Pool(processes=4) results = [] for i in range(num_parts): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] pool.apply_async(do_job, (first_bit,)) pool.close() pool.join()

Thanks for your time.

<strong>UPDATE 1:</strong>

I have implemented the changes suggested in the great approach by @ShadowRanger and it is nearly working the way I want it to. So I have added some logging to give an indication of progress and put a 'test' key in there to match. I want to be able to increase/decrease the iNumberOfProcessors independently of the num_parts. At this stage when I have them both at 4 everything works as expected, 4 processes spin up (one extra for the console). When I change the iNumberOfProcessors = 6, 6 processes spin up but only for of them have any CPU usage. So it appears 2 are idle. Where as my previous solution above, I was able to set the number of cores higher without increasing the num_parts, and all of the processes would get used.

<a href="https://i.stack.imgur.com/YjNj4.png" rel="nofollow"><img alt="enter image description here" class="b-lazy" data-src="https://i.stack.imgur.com/YjNj4.png" data-original="https://i.stack.imgur.com/YjNj4.png" src="https://etrip.eimg.top/images/2019/05/07/timg.gif" /></a>

I am not sure about how to refactor this new approach to give me the same functionality. Can you have a look and give me some direction with the refactoring needed to be able to set iNumberOfProcessors and num_parts independently from each other and still have all processes used?

Here is the updated code:

#!/usr/bin/env python3.5 import sys, itertools, multiprocessing, functools alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;" num_parts = 4 part_size = len(alphabet) // num_parts iProgressInterval = 10000 iNumberOfProcessors = 6 def do_job(first_bits): iAttemptNumber = 0 iLastProgressUpdate = 0 for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): sKey = ''.join(x) iAttemptNumber = iAttemptNumber + 1 if iLastProgressUpdate + iProgressInterval <= iAttemptNumber: iLastProgressUpdate = iLastProgressUpdate + iProgressInterval print("Attempt#:", iAttemptNumber, "Key:", sKey) if sKey == 'test': print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey) return True def get_part(i): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] return first_bit if __name__ == '__main__': # with statement with Py3 multiprocessing.Pool terminates when block exits with multiprocessing.Pool(processes = iNumberOfProcessors) as pool: # Don't need special case for final block; slices can for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))): if gotmatch: break else: print("No matches found")

<strong>UPDATE 2:</strong>

Ok here is my attempt at trying @noxdafox suggestion. I have put together the following based on the link he provided with his suggestion. Unfortunately when I run it I get the error:

... line 322, in apply_async raise ValueError("Pool not running") ValueError: Pool not running

Can anyone give me some direction on how to get this working.

Basically the issue is that my first attempt did multiprocessing but did not support canceling all processes once a match was found.

My second attempt (based on @ShadowRanger suggestion) solved that problem, but broke the functionality of being able to scale the number of processes and num_parts size independently, which is something my first attempt could do.

My third attempt (based on @noxdafox suggestion), throws the error outlined above.

If anyone can give me some direction on how to maintain the functionality of my first attempt (being able to scale the number of processes and num_parts size independently), and add the functionality of canceling all processes once a match was found it would be much appreciated.

Thank you for your time.

Here is the code from my third attempt based on @noxdafox suggestion:

#!/usr/bin/env python3.5 import sys, itertools, multiprocessing, functools alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;" num_parts = 4 part_size = len(alphabet) // num_parts iProgressInterval = 10000 iNumberOfProcessors = 4 def find_match(first_bits): iAttemptNumber = 0 iLastProgressUpdate = 0 for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): sKey = ''.join(x) iAttemptNumber = iAttemptNumber + 1 if iLastProgressUpdate + iProgressInterval <= iAttemptNumber: iLastProgressUpdate = iLastProgressUpdate + iProgressInterval print("Attempt#:", iAttemptNumber, "Key:", sKey) if sKey == 'test': print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey) return True def get_part(i): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] return first_bit def grouper(iterable, n, fillvalue=None): args = [iter(iterable)] * n return itertools.zip_longest(*args, fillvalue=fillvalue) class Worker(): def __init__(self, workers): self.workers = workers def callback(self, result): if result: self.pool.terminate() def do_job(self): print(self.workers) pool = multiprocessing.Pool(processes=self.workers) for part in grouper(alphabet, part_size): pool.apply_async(do_job, (part,), callback=self.callback) pool.close() pool.join() print("All Jobs Queued") if __name__ == '__main__': w = Worker(4) w.do_job()

Answer1:

You can check <a href="https://stackoverflow.com/questions/33447055/python-multiprocess-pool-how-to-exit-the-script-when-one-of-the-worker-process/33450972#33450972" rel="nofollow">this question</a> to see an implementation example solving your problem.

This works also with concurrent.futures pool.

Just replace the map method with apply_async and iterated over your list from the caller.

Something like this.

for part in grouper(alphabet, part_size): pool.apply_async(do_job, part, callback=self.callback)

<a href="https://stackoverflow.com/questions/434287/what-is-the-most-pythonic-way-to-iterate-over-a-list-in-chunks" rel="nofollow">grouper recipe</a>

Answer2:

multiprocessing isn't really designed to cancel tasks, but you can simulate it for your particular case by using pool.imap_unordered and terminating the pool when you get a hit:

def do_job(first_bits): for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)): # CHECK FOR MATCH HERE print(''.join(x)) if match: return True # If we exit loop without a match, function implicitly returns falsy None for us # Factor out part getting to simplify imap_unordered use def get_part(i): if i == num_parts - 1: first_bit = alphabet[part_size * i :] else: first_bit = alphabet[part_size * i : part_size * (i+1)] if __name__ == '__main__': # with statement with Py3 multiprocessing.Pool terminates when block exits with multiprocessing.Pool(processes=4) as pool: # Don't need special case for final block; slices can for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))): if gotmatch: break else: print("No matches found")

This will run do_job for each part, returning results as fast as it can get them. When a worker returns True, the loop breaks, and the with statement for the Pool is exited, terminate-ing the Pool (dropping all work in progress).

Note that while this works, it's kind of abusing multiprocessing; it won't handle canceling individual tasks without terminating the whole Pool. If you need more fine grained task cancellation, you'll want to look at <a href="https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.Future.cancel" rel="nofollow">concurrent.futures</a>, but even there, it can only cancel undispatched tasks; once they're running, they can't be cancelled without terminating the Executor or using a side-band means of termination (having the task poll some interprocess object intermittently to determine if it should continue running).

Recommend

  • Docker + Laravel queue:work
  • Deadlock clarification?
  • Start a shell process in Gulp with callback when process started
  • TensorFlow C++, runtime issue
  • Deployments not visible in Kubernetes Dashboard
  • Which browser have this strange user agent? (IOS device)
  • Pythons argparse default value doesn't work
  • Detection of framework usage on Mac system?
  • change search magnifying glass to UIActivityIndicatorView
  • jQuery: add elements until a particular height is reached
  • Memory error in python- how to use more memory
  • Reduction and collapse clauses in OMP have some confusing points
  • Sort List of Strings By Version
  • PHP buffered output depending on server setting?
  • Functions in global context
  • Is there a javascript serializer for JSON.Net?
  • Projection media query: browser support and workarounds?
  • Is my CUDA kernel really runs on device or is being mistekenly executed by host in emulation?
  • Read text file and split every line in MSBuild
  • Fetching methods from BroadcastReceiver to update UI
  • Is possible to count alias result on mysql
  • Excel - Autoshape get it's name from cell (value)
  • Check if a string to interpolate provides expected placeholders
  • Where to put my custom functions in Wordpress?
  • Can a Chrome extension content script make an jQuery AJAX request for an html file that is itself a
  • Upload files with Ajax and Jquery
  • R: gsub and capture
  • Calling of Constructors in a Java
  • How to pass list parameters for each object using Spring MVC?
  • RestKit - RKRequestDelegate does not exist
  • Traverse Array and Display in markup
  • Transpose CSV data with awk (pivot transformation)
  • Buffer size for converting unsigned long to string
  • Proper way to use connect-multiparty with express.js?
  • Why can't I rebase on to an ancestor of source changesets if on a different branch?
  • How to set the response of a form post action to a iframe source?
  • Change div Background jquery
  • Qt: Run a script BEFORE make
  • reshape alternating columns in less time and using less memory
  • Binding checkboxes to object values in AngularJs