50824

Throttling asynchronous tasks

Question:

I would like to run a bunch of async tasks, with a limit on how many tasks may be pending completion at any given time.

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

I also want to utilize a given number of threads if possible.

I came up with an extension method, ThrottleTasksAsync that does what I want. Is there a simpler solution already out there? I would assume that this is a common scenario.

Usage:

class Program { static void Main(string[] args) { Enumerable.Range(1, 10).ThrottleTasksAsync(5, 2, async i => { Console.WriteLine(i); return i; }).Wait(); Console.WriteLine("Press a key to exit..."); Console.ReadKey(true); } }

Here is the code:

static class IEnumerableExtensions { public static async Task<Result_T[]> ThrottleTasksAsync<Enumerable_T, Result_T>(this IEnumerable<Enumerable_T> enumerable, int maxConcurrentTasks, int maxDegreeOfParallelism, Func<Enumerable_T, Task<Result_T>> taskToRun) { var blockingQueue = new BlockingCollection<Enumerable_T>(new ConcurrentBag<Enumerable_T>()); var semaphore = new SemaphoreSlim(maxConcurrentTasks); // Run the throttler on a separate thread. var t = Task.Run(() => { foreach (var item in enumerable) { // Wait for the semaphore semaphore.Wait(); blockingQueue.Add(item); } blockingQueue.CompleteAdding(); }); var taskList = new List<Task<Result_T>>(); Parallel.ForEach(IterateUntilTrue(() => blockingQueue.IsCompleted), new ParallelOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }, _ => { Enumerable_T item; if (blockingQueue.TryTake(out item, 100)) { taskList.Add( // Run the task taskToRun(item) .ContinueWith(tsk => { // For effect Thread.Sleep(2000); // Release the semaphore semaphore.Release(); return tsk.Result; } ) ); } }); // Await all the tasks. return await Task.WhenAll(taskList); } static IEnumerable<bool> IterateUntilTrue(Func<bool> condition) { while (!condition()) yield return true; } }

The method utilizes BlockingCollection and SemaphoreSlim to make it work. The throttler is run on one thread, and all the async tasks are run on the other thread. To achieve parallelism, I added a maxDegreeOfParallelism parameter that's passed to a Parallel.ForEach loop re-purposed as a while loop.

The old version was:

foreach (var master = ...) { var details = ...; Parallel.ForEach(details, detail => { // Process each detail record here }, new ParallelOptions { MaxDegreeOfParallelism = 15 }); // Perform the final batch updates here }

But, the thread pool gets exhausted fast, and you can't do async/await.

<b>Bonus:</b> To get around the problem in BlockingCollection where an exception is thrown in Take() when CompleteAdding() is called, I'm using the TryTake overload with a timeout. If I didn't use the timeout in TryTake, it would defeat the purpose of using a BlockingCollection since TryTake won't block. Is there a better way? Ideally, there would be a TakeAsync method.

Answer1:

As suggested, use TPL Dataflow.

A <a href="http://msdn.microsoft.com/en-us/library/hh194782%28v=vs.110%29.aspx" rel="nofollow">TransformBlock<TInput, TOutput></a> may be what you're looking for.

You define a MaxDegreeOfParallelism to limit how many strings can be transformed (i.e., how many urls can be downloaded) in parallel. You then post urls to the block, and when you're done you tell the block you're done adding items and you fetch the responses.

var downloader = new TransformBlock<string, HttpResponse>( url => Download(url), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 50 } ); var buffer = new BufferBlock<HttpResponse>(); downloader.LinkTo(buffer); foreach(var url in urls) downloader.Post(url); //or await downloader.SendAsync(url); downloader.Complete(); await downloader.Completion; IList<HttpResponse> responses; if (buffer.TryReceiveAll(out responses)) { //process responses } <hr />

Note: The TransformBlock buffers both its input and output. Why, then, do we need to link it to a BufferBlock?

Because the TransformBlock won't complete until all items (HttpResponse) have been consumed, and await downloader.Completion would hang. Instead, we let the downloader forward all its output to a dedicated buffer block - then we wait for the downloader to complete, and inspect the buffer block.

Answer2:

<blockquote>

Say you have 1000 URLs, and you only want to have 50 requests open at a time; but as soon as one request completes, you open up a connection to the next URL in the list. That way, there are always exactly 50 connections open at a time, until the URL list is exhausted.

</blockquote>

The following simple solution has surfaced many times here on SO. It doesn't use blocking code and doesn't create threads explicitly, so it scales very well:

const int MAX_DOWNLOADS = 50; static async Task DownloadAsync(string[] urls) { using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async url => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); Console.WriteLine(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks); } }

The thing is, the <em>processing</em> of the downloaded data should be done on a <em>different</em> pipeline, with a <em>different</em> level of parallelism, especially if it's a CPU-bound processing.

E.g., you'd probably want to have 4 threads concurrently doing the data processing (the number of CPU cores), and up to 50 pending requests for more data (which do not use threads at all). AFAICT, this is not what your code is currently doing.

That's where TPL Dataflow or Rx may come in handy as a preferred solution. Yet it is certainly possible to implement something like this with plain TPL. Note, the only blocking code here is the one doing the actual data processing inside Task.Run:

const int MAX_DOWNLOADS = 50; const int MAX_PROCESSORS = 4; // process data class Processing { SemaphoreSlim _semaphore = new SemaphoreSlim(MAX_PROCESSORS); HashSet<Task> _pending = new HashSet<Task>(); object _lock = new Object(); async Task ProcessAsync(string data) { await _semaphore.WaitAsync(); try { await Task.Run(() => { // simuate work Thread.Sleep(1000); Console.WriteLine(data); }); } finally { _semaphore.Release(); } } public async void QueueItemAsync(string data) { var task = ProcessAsync(data); lock (_lock) _pending.Add(task); try { await task; } catch { if (!task.IsCanceled && !task.IsFaulted) throw; // not the task's exception, rethrow // don't remove faulted/cancelled tasks from the list return; } // remove successfully completed tasks from the list lock (_lock) _pending.Remove(task); } public async Task WaitForCompleteAsync() { Task[] tasks; lock (_lock) tasks = _pending.ToArray(); await Task.WhenAll(tasks); } } // download data static async Task DownloadAsync(string[] urls) { var processing = new Processing(); using (var semaphore = new SemaphoreSlim(MAX_DOWNLOADS)) using (var httpClient = new HttpClient()) { var tasks = urls.Select(async (url) => { await semaphore.WaitAsync(); try { var data = await httpClient.GetStringAsync(url); // put the result on the processing pipeline processing.QueueItemAsync(data); } finally { semaphore.Release(); } }); await Task.WhenAll(tasks.ToArray()); await processing.WaitForCompleteAsync(); } }

Answer3:

As requested, here's the code I ended up going with.

The work is set up in a master-detail configuration, and each master is processed as a batch. Each unit of work is queued up in this fashion:

var success = true; // Start processing all the master records. Master master; while (null != (master = await StoredProcedures.ClaimRecordsAsync(...))) { await masterBuffer.SendAsync(master); } // Finished sending master records masterBuffer.Complete(); // Now, wait for all the batches to complete. await batchAction.Completion; return success;

Masters are buffered one at a time to save work for other outside processes. The details for each master are dispatched for work via the masterTransform TransformManyBlock. A BatchedJoinBlock is also created to collect the details in one batch.

The actual work is done in the detailTransform TransformBlock, asynchronously, 150 at a time. BoundedCapacity is set to 300 to ensure that too many Masters don't get buffered at the beginning of the chain, while also leaving room for enough detail records to be queued to allow 150 records to be processed at one time. The block outputs an object to its targets, because it's filtered across the links depending on whether it's a Detail or Exception.

The batchAction ActionBlock collects the output from all the batches, and performs bulk database updates, error logging, etc. for each batch.

There will be several BatchedJoinBlocks, one for each master. Since each ISourceBlock is output sequentially and each batch only accepts the number of detail records associated with one master, the batches will be processed in order. Each block only outputs one group, and is unlinked on completion. Only the last batch block propagates its completion to the final ActionBlock.

The dataflow network:

// The dataflow network BufferBlock<Master> masterBuffer = null; TransformManyBlock<Master, Detail> masterTransform = null; TransformBlock<Detail, object> detailTransform = null; ActionBlock<Tuple<IList<object>, IList<object>>> batchAction = null; // Buffer master records to enable efficient throttling. masterBuffer = new BufferBlock<Master>(new DataflowBlockOptions { BoundedCapacity = 1 }); // Sequentially transform master records into a stream of detail records. masterTransform = new TransformManyBlock<Master, Detail>(async masterRecord => { var records = await StoredProcedures.GetObjectsAsync(masterRecord); // Filter the master records based on some criteria here var filteredRecords = records; // Only propagate completion to the last batch var propagateCompletion = masterBuffer.Completion.IsCompleted && masterTransform.InputCount == 0; // Create a batch join block to encapsulate the results of the master record. var batchjoinblock = new BatchedJoinBlock<object, object>(records.Count(), new GroupingDataflowBlockOptions { MaxNumberOfGroups = 1 }); // Add the batch block to the detail transform pipeline's link queue, and link the batch block to the the batch action block. var detailLink1 = detailTransform.LinkTo(batchjoinblock.Target1, detailResult => detailResult is Detail); var detailLink2 = detailTransform.LinkTo(batchjoinblock.Target2, detailResult => detailResult is Exception); var batchLink = batchjoinblock.LinkTo(batchAction, new DataflowLinkOptions { PropagateCompletion = propagateCompletion }); // Unlink batchjoinblock upon completion. // (the returned task does not need to be awaited, despite the warning.) batchjoinblock.Completion.ContinueWith(task => { detailLink1.Dispose(); detailLink2.Dispose(); batchLink.Dispose(); }); return filteredRecords; }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 }); // Process each detail record asynchronously, 150 at a time. detailTransform = new TransformBlock<Detail, object>(async detail => { try { // Perform the action for each detail here asynchronously await DoSomethingAsync(); return detail; } catch (Exception e) { success = false; return e; } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 150, BoundedCapacity = 300 }); // Perform the proper action for each batch batchAction = new ActionBlock<Tuple<IList<object>, IList<object>>>(async batch => { var details = batch.Item1.Cast<Detail>(); var errors = batch.Item2.Cast<Exception>(); // Do something with the batch here }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }); masterBuffer.LinkTo(masterTransform, new DataflowLinkOptions { PropagateCompletion = true }); masterTransform.LinkTo(detailTransform, new DataflowLinkOptions { PropagateCompletion = true });

Recommend

  • How do I check assembly output of Java code?
  • How to remove default command line arguments provided by Eclipse?
  • Connect to OLEDB data source from Java?
  • Windows batch string manipulation in loop
  • Can you render a page with multiple forms instead of one form for a jQuery, Rails 3 update?
  • Is it possible to make imports depend on the location of my Lua script instead of the current direct
  • SSRS 2008 - Sorting within a group
  • Install different versions of nuget packages inside one solution file with two projects
  • How to best manage SMTP clients
  • Is it mandatory to have a doGet or doPost method?
  • Unicorn and Rails eat up 2x MySQL connections
  • goJS dropdown remove items
  • Manually Timing out a C# Thread
  • in batch how do i use taskkill properly
  • Where these are stored?
  • Mixing WebForms and MVC: What should I do with the MasterPage?
  • Jenkins Grails plugin does not list lastest versions of Grails
  • How to get links to open in the native browser in iOS Meteor apps?
  • Application level floating views with navigation in Android
  • How to specify input and output paths from cmd.exe for a PowerShell script?
  • presentShareDialogWithParams posts to FB wall, but callback handler results say error
  • Unable to get column index with table.getColumn method using custom table Model
  • How can the INSERT … ON CONFLICT (id) DO UPDATE… syntax be used with a sequence ID?
  • Marklogic : Query response time is very high
  • How can I sort a a table with VBA with given text condition?
  • Transactional Create with Validation in ServiceStack Redis Client
  • Hardware Accelerated Image Scaling in windows using C++
  • Why querying a date BC is changed to AD in Java?
  • Record samples being played with OpenAL
  • how to adjust image in a panel in Java swing?
  • Ajax Loaded meta Tags
  • Master page gives error
  • Possible to stop flickering java tooltip in heavyweight mode?
  • How do I rollback to a specific git commit
  • Matrix multiplication with MKL
  • Suggestions to manage Login/Logout transitions
  • JTable with a ScrollPane misbehaving
  • Understanding cpu registers
  • How do I configure my settings file to work with unit tests?
  • How do I use LINQ to get all the Items that have a particular SubItem?