How to aggregate RabbitMq messages into single message by correlation ID


Let's say that I have a pipeline of commands that need to be executed sequentially and that some of the commands contain multiple operations that should be executed in parallel (same correlation id). And let's assume that I need to know when all parallel operations are executed in order to proceed with execution further in the pipeline.

Is it possible to achieve this kind of orchestration with RabbitMQ alone by using exchanges and queues without usage of external data sources like database ?

I am interested in the following use case: I have just published 3 messages of the same type on the same queue. Those messages are being processed in parallel. I would like to publish a new message only when all messages of the same correlation ID are finished successfully.

Is there a way to achieve this with RabbitMQ ?


It sounds like you could use the scatter-gather pattern. This explains that pattern quite well with a diagram:

<a href="http://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html" rel="nofollow">http://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html</a>

And here is a tutorial on how to implement with RabbitMQ: <a href="http://geekswithblogs.net/michaelstephenson/archive/2012/08/06/150373.aspx" rel="nofollow">http://geekswithblogs.net/michaelstephenson/archive/2012/08/06/150373.aspx</a>


  • Throttle consumption rate of all JMS consumers listening on an ActiveMQ queue
  • C++ - Execute function every X milliseconds
  • Are orchestration services unfit to implement a workflow process?
  • Why error occurs when I send multiple queries into mysqli_query?
  • How to pivot data in Perl with regex only?
  • What is the purpose of grant_type parameter in OAuth 2 Authentication
  • what is the proper class for an MVC DocuSign WebHook receiver in C#
  • Change timer interval in Azure for Azure Function published from Visual Studio
  • Can a Component Editor be executed on multiple components?
  • jquery remove text partially
  • RabbitMq and “Fatal error: handshake failure - handshake_decode_error”
  • Personalizing Content
  • How to send correlation id, into message, from sender and retrieval from receive into message header
  • How to properly define a TCustomFrame's child class?
  • Executing DISTINCT query with objectify for app engine
  • synchronization on single statement?
  • EventBus on Android: how to implement dynamic queues vs. class-based event subscription?
  • MSMQ on Azure Website
  • Django error 'unicode' object has no attribute 'objects'
  • Why won't this override of the Model.save() function in Django work?
  • SerialForms.pas(17): W1010 Method 'Create' hides virtual method of base type 'TCompon
  • Does Windows Phone 7 have a standard Edit/Add/Delete convention?
  • Netezza Incremental load from Sql server using SSIS
  • How to best manage SMTP clients
  • LNK1104: cannot open file 'kernel32.lib'
  • SSIS Designer is running VERY slowly
  • Pythons argparse default value doesn't work
  • Detection of framework usage on Mac system?
  • Error in making a socket connection
  • NHibernate manually control fetching
  • Memory error in python- how to use more memory
  • presentShareDialogWithParams posts to FB wall, but callback handler results say error
  • Sort List of Strings By Version
  • Yii2: Config params vs. const/define
  • How to delay loading a property with linq to sql external mapping?
  • Ajax Loaded meta Tags
  • Timeout for blocking function call, i.e., how to stop waiting for user input after X seconds?
  • KeystoneJS: Relationships in Admin UI not updating
  • Setting background image for body element in xhtml (for different monitors and resolutions)
  • JaxB to read class hierarchy