61159

adapt multiprocessing Pool to mpi4py

I'm using multiprocessing Pool to run a parallelized simulation in Python and it works well in a computer with multiple cores. Now I want to execute the program on a cluster using several nodes. I suppose multiprocessing cannot apply on distributed memory. But mpi4py seems a good option. So what is the simplest mpi4py equivalence to these codes:

from multiprocessing import Pool pool = Pool(processes=16) pool.map(functionName,parameters_list)

Answer1:

There's an old package of mine that is built on mpi4py which enables a functional parallel map for MPI jobs. It's not built for speed -- it was built to enable aMPI parallel map from the interpreter onto a compute cluster (i.e. without the need to run from the mpiexec from the command line). Essentially:

>>> from pyina.launchers import MpiPool, MpiScatter >>> pool = MpiPool() >>> jobs = MpiScatter() >>> def squared(x): ... return x**2 ... >>> pool.map(squared, range(4)) [0, 1, 4, 9] >>> jobs.map(sqaured, range(4)) [0, 1, 4, 9]

Showing off the "worker pool" strategy and the "scatter-gather" strategy of distributing jobs to the workers. Of course, I wouldn't use it for such a small job like squared because the overhead of spawning the MPI world is really quite high (much higher than setting up a multiprocessing Pool). However, if you have a big job to run, like you would normally run on a cluster using MPI, then pyina can be a big benefit for you.

However, the real advantage of using pyina is that it can not only spawn jobs with MPI, but it can spawn jobs to a scheduler. pyina understands and abstracts the launch syntax for several schedulers.

A typical call to a pyina map using a scheduler goes like this:

>>> # instantiate and configure a scheduler >>> from pyina.schedulers import Torque >>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'} >>> torque = Torque(**config) >>> >>> # instantiate and configure a worker pool >>> from pyina.launchers import Mpi >>> pool = Mpi(scheduler=torque) >>> >>> # do a blocking map on the chosen function >>> pool.map(pow, [1,2,3,4], [5,6,7,8]) [1, 64, 2187, 65536]

Several common configurations are available as pre-configured maps. The following is identical to the above example:

>>> # instantiate and configure a pre-configured worker pool >>> from pyina.launchers import TorqueMpiPool >>> config = {'nodes'='32:ppn=4', 'queue':'dedicated', 'timelimit':'11:59'} >>> pool = TorqueMpiPool(**config) >>> >>> # do a blocking map on the chosen function >>> pool.map(pow, [1,2,3,4], [5,6,7,8]) [1, 64, 2187, 65536]

pyina needs some TLC, in that it's still python2.7 and that it hasn't had a release in several years… but it's been kept up to date (on github) otherwise and is able to "get the job done" for me running jobs on large-scale computing clusters over the past 10 years -- especially when coupled with pathos (which provides ssh tunneling and a unified interface for multiprocessing and ParallelPython maps). pyina doesn't yet utilize shared memory, but does do embarrassingly functional parallel computing pretty well. The interactions with the scheduler are pretty good in general, but can be a bit rough around the edges for several failure cases -- and the non-blocking maps need a lot of work. That having been said, it provides a pretty useful interface to run embarrassingly parallel jobs on a cluster with MPI.

Get pyina (and pathos) here: https://github.com/uqfoundation

Answer2:

There is an MPIPool class implemented here.

For an example of how I use this, check out this gist on GitHub.

Answer3:

I use the following code to be equivalent to multiprocessing.Pool. It has not yet been tested extensively, but it seems to work just fine:

from functools import partial function = partial(...) # Store all fixed parameters this way if needed if use_MPI: arguments = range(num_runs) run_data = None # mpi4py comm = MPI.COMM_SELF.Spawn(sys.executable, args=['MPI_slave.py'], maxprocs=num_runs) # Init comm.bcast(function, root=MPI.ROOT) # Equal for all processes comm.scatter(arguments, root=MPI.ROOT) # Different for each process comm.Barrier() # Wait for everything to finish... run_data = comm.gather(run_data, root=MPI.ROOT) # And gather everything up else: # multiprocessing p = Pool(multiprocessing.cpu_count()) run_data = p.map(function, range(num_runs))

This then uses a separate file 'MPI_slave.py':

from mpi4py import MPI # import the function you actually pass to this file here!!! comm = MPI.COMM_SELF.Get_parent() size = comm.Get_size() rank = comm.Get_rank() def runSlaveRun(): function = None options = None # print("Process {}/{} reporting for duty!".format(rank, size)) function = comm.bcast(function, root=0) arguments = comm.scatter(options, root=0) results = function(arguments) comm.Barrier() comm.gather(results, root=0) comm.Disconnect() if __name__ == '__main__': runSlaveRun()

Recommend

  • Auto-enable macro with digital cert?
  • Angular JS ngShow and ngHide
  • C# crash when loading C++ dll
  • Remote Service as apk
  • Integration testing: Start a blocking server during `unittest.setUp` before testing it?
  • How to use threads in Perl?
  • Dynamic Bytecode Instrumentation fails without any error
  • Why is the “map” version of ThreeSum so slow?
  • overhead of reserving address space using mmap
  • How to capture or listen to browser notifications?
  • Using Node cluster module with SailsJs: EADDRINUSE
  • how tensorflow worker driver training process and cause variables update on ps job?
  • Why are `colMeans()` and `rowMeans()` functions faster than using the mean function with `lapply()`?
  • Downloading files from Google Storage using Spark (Python) and Dataproc
  • What is the difference between Socket.Send and Stream.Write? (in relation to tcp ip connections)
  • Is an if-let or a normal if condition better?
  • MySql - get days remaining
  • WebApp in AppServices vs CloudService
  • How do i find all references to a user control
  • Converting simple MySQL database to a NoSQL solution
  • Neo4j: Legacy Indexes and auto index vs new label bases schema indexes
  • Watson Conversation - Why is the ANYTHING ELSE node not chosen
  • Action Pack components in Rails
  • pickle.PicklingError: args[0] from __newobj__ args has the wrong class with hadoop python
  • Using $compile in a directive triggers AngularJS infinite digest error
  • How do I get the list of bad records that didn't load in Bigquery?
  • SharedPreferences or SQLite Database?
  • Thread safety of a fluent like class using clone() and non final fields
  • WPF - CanExecute dosn't fire when raising Commands from a UserControl
  • Could not find rake using whenever rails
  • Atlas images wrong size on iPad iOS 9
  • Q promise. Difference between .when and .then
  • Using $this when not in object context
  • Uncaught Error: Could not find module `ember-load-initializers`
  • Where to put my custom functions in Wordpress?
  • 'TypeError' while using NSGA2 to solve Multi-objective prob. from pyopt-sparse in OpenMDAO
  • what is the difference between the asp.net mvc application and asp.net web application
  • How get height of the a view with gone visibility and height defined as wrap_content in xml?
  • Linking SubReports Without LinkChild/LinkMaster
  • Programmatically clearing map cache