Skip to content

C/C++/C#

Blocking Collection and the Producer-Consumer Problem

This time I want to discuss features that belong to the new System.Collections.Concurrent namespace in the.NET Framework 4. When you design parallel applications, you often need thread-safe data storage as well as some mechanism of sending messages between tasks. Once again, this post will touch on just the basics and the most common problems a beginner might encounter, but I’ll provide links for further reading.

This is the fourth post in the parallel programming series. Here’s a list of all the posts:

To keep things short, I’ll start with the code that I have at the end of the Task Schedulers and Synchronization Context post. This is a small parallel WPF application with a responsive UI that has one Start button and displays the results of long-running operations in a text box.

public partial class MainWindow : Window
{
    public MainWindow()
    {
        InitializeComponent();
    }

    public static double SumRootN(int root)
    {
        double result = 0;
        for (int i = 1; i < 10000000; i++)
        {
            result += Math.Exp(Math.Log(i) / root);
        }
        return result;
    }

    private void start_Click(object sender, RoutedEventArgs e)
    {
        textBlock1.Text = "";
        label1.Content = "Milliseconds: ";

        var watch = Stopwatch.StartNew();
        List<Task> tasks = new List<Task>();
        var ui = TaskScheduler.FromCurrentSynchronizationContext();
        for (int i = 2; i < 20; i++)
        {
            int j = i;
            var compute = Task.Factory.StartNew(() =>
            {
                return SumRootN(j);
            });
            tasks.Add(compute);

            var display = compute.ContinueWith(resultTask =>
                            textBlock1.Text += "root " + j.ToString() + " " +
                                               compute.Result.ToString() +
                                               Environment.NewLine,
                                ui);
        }

        Task.Factory.ContinueWhenAll(tasks.ToArray(),
            result =>
            {
                var time = watch.ElapsedMilliseconds;
                label1.Content += time.ToString();
            }, CancellationToken.None, TaskContinuationOptions.None, ui);
    }
}

But imagine that I’m designing a larger application and I need to store the results of the long-running parallel operations somewhere. (I don’t do this in the current version at all.)

Until .NET Framework 4, this task was challenging for C# developers: the collections in the System.Collections and System.Collections.Generic namespaces do not guarantee thread safety, and developers needed to design the locking and synchronization mechanisms themselves. But now generic thread-safe collections are a part of the .NET Framework. So, let me introduce the new namespace: System.Collections.Concurrent.

I’m going to use the BlockingCollection<T> class. This class can help you implement the well-known producer-consumer pattern, where items are produced and consumed by different operations at different rates. I will update my application to imitate the producer-consumer scenario, so that the compute task will become a producer, and the display task will become a consumer.

var results = new BlockingCollection<double>();

I’ll also update the compute task, so that instead of returning the result, it will add it to the results collection.

var compute = Task.Factory.StartNew(() =>
{
    results.Add(SumRootN(j));
});

When all the results are ready, I’ll set the collection to the “completed” state, so the consumer will know that this collection won’t be updated anymore. For this purpose, I’ll call the CompleteAdding method, which will set the IsCompleted property of the results collection to true. The good place to perform this operation is the task that calculates the total time: it waits for all other tasks to finish, which is exactly what I need.

Task.Factory.ContinueWhenAll(tasks.ToArray(),
    result =>
    {
        results.CompleteAdding();
        var time = watch.ElapsedMilliseconds;
        label1.Content += time.ToString();
    }, CancellationToken.None, TaskContinuationOptions.None, ui);

As you can see, the producer part is easy: all tasks can safely write to the results collection, and all locking and synchronization issues are managed by the .NET Framework and TPL.

Now let’s move to the consumer side. I’ll do a small refactoring: I’ll convert the display task into a consume task that will run the display method:

var consume = Task.Factory.StartNew(() => display(results));

I want to start the consume task before I start any of the compute tasks so that the consumer can wait for the producer and I can see the real-time results. That’s why I put the above line right before the main for loop in the button’s event handler.

This is what the naïve first version of the display method might look like. (Don’t forget to convert the ui task scheduler into a field. It’s a local variable in the original code.)

public void display(BlockingCollection<double> results)
{
    double item;
    while (!results.IsCompleted)
    {
        while (!results.TryTake(out item));
        double currentItem = item;
        Task.Factory.StartNew(new Action(() =>
                  textBlock1.Text += currentItem.ToString() + Environment.NewLine),
             CancellationToken.None, TaskCreationOptions.None, ui);
    }
}

This method checks for new elements in the collection, until it’s notified that the collection is completed, which means that it has finished adding new items. If it gets a new element from the collection, it immediately removes the item and prints the value into the UI threads. Did you notice that I copied item to currentItem? It’s all about closure again: you’ll get a list of zero’s otherwise.

This version works and you won’t get any exceptions. But if you run it on a dual-core computer like I did, you’ll discover that it takes twice as long as the version that doesn’t use the collection. In fact, it runs as if the application weren’t parallelized at all! This is just one of the problems that you might run into, so don’t forget to always measure the performance of your parallel applications: it’s easy to cancel out the benefits of parallelization.

Of course, the problem is in this line:

while (!results.TryTake(out item));

An empty loop is rarely a good idea. This was an attempt to implement some kind of messaging between the threads – the consumer is constantly checking the collection and starts working only if it can retrieve a value from it. And it does this over and over again, so one of my processors is fully occupied with this work and can’t compute the values anymore.

One simple trick is to make this loop to consume less processing power. It can be as easy as this (however, this is not a recommended way, but rather an illustration of the principle):

while (!results.TryTake(out item)) Thread.Sleep(200);

Now after each attempt the task simply waits for 200 milliseconds before trying again. And during those 200 milliseconds the processor can compute the results this task is actually waiting for. You can compile and run the code to make sure that the performance indeed improved.

However, it might be tricky to find the perfect wait time. Ideally, I need some kind of message from the collection notifying me that the value was added.

In a blocking collection, you can do this by using a foreach loop. The BlockingCollection class has the GetConsumingEnumerable method that can be used to enumerate through the blocking collection and consume its elements until the collection is completed. It might look like this:

public void display(BlockingCollection<double> results)
{
    foreach (var item in results.GetConsumingEnumerable())
    {
        double currentItem = item;
        Task.Factory.StartNew(new Action(() =>
             textBlock1.Text += currentItem.ToString() + Environment.NewLine),
        CancellationToken.None, TaskCreationOptions.None, ui);
    }
}

Now the display method checks whether there is an item in the results collection and, if there is, consumes the item. When the collection is completed and empty, execution exits the loop. All the locking, synchronization, and messaging between the tasks are managed by the TPL.

The resulting application will probably still be a little slower than the version that didn’t use the collection at all, but of course writing to and reading from thread-safe data storage added some overhead.

If you got lost in all the changes, here’s the full code:

public partial class MainWindow : Window
{
    TaskScheduler ui = TaskScheduler.FromCurrentSynchronizationContext();

    public MainWindow()
    {
        InitializeComponent();
    }

    public static double SumRootN(int root)
    {
        double result = 0;
        for (int i = 1; i < 10000000; i++)
        {
            result += Math.Exp(Math.Log(i) / root);
        }
        return result;
    }

    private void start_Click(object sender, RoutedEventArgs e)
    {
        textBlock1.Text = "";
        label1.Content = "Milliseconds: ";

        var results = new BlockingCollection<double>();
        var watch = Stopwatch.StartNew();
        List<Task> tasks = new List<Task>();

        var consume = Task.Factory.StartNew(() => display(results));

        for (int i = 2; i < 20; i++)
        {
            int j = i;
            var compute = Task.Factory.StartNew(() =>
            {
                results.Add(SumRootN(j));
            });
            tasks.Add(compute);
        }

        Task.Factory.ContinueWhenAll(tasks.ToArray(),
            result =>
            {
                results.CompleteAdding();
                var time = watch.ElapsedMilliseconds;
                label1.Content += time.ToString();
            }, CancellationToken.None, TaskContinuationOptions.None, ui);
    }

    public void display(BlockingCollection<double> results)
    {
        foreach (var item in results.GetConsumingEnumerable())
        {
            double currentItem = item;
            Task.Factory.StartNew(new Action(() =>
                 textBlock1.Text += currentItem.ToString() + Environment.NewLine),
            CancellationToken.None, TaskCreationOptions.None, ui);
        }
    }
}

For now, this is the last post in my parallel programming series. I hope that I’ve provided enough information and examples and bumped into and recovered from enough problems to enable even beginners to continue on their own. (At least I asked fewer questions while writing this post than I did for the first one!)

If you want to learn more about blocking collections, check out BlockingCollection Overview in the MSDN Library and BlockingCollection Extensions on the Parallel Programming with .NET blog.

I could not cover all the features provided by the TPL. If you want to see what else is available, here are some links:

  • Data Structures for Parallel Programming. This MSDN topic lists .NET 4 classes that are useful for parallel programming, such as thread-safe collections, synchronization primitives, and lazy initialization classes.
  • Introduction to PLINQ. Parallel Language Integrated Query, or PLINQ, enables quick and easy parallelization of LINQ queries.

Two more links that I used a lot in this series are Parallel Programming in the .NET Framework on MSDN and the Parallel Programming with .NET team blog.

P.S.

Thanks to Dmitry Lomov, Michael Blome, and Danny Shih for reviewing this and providing helpful comments, to Mick Alberts for editing.

Be Sociable, Share!
    The following two tabs change content below.

    Alexandra Rusina

    Latest posts by Alexandra Rusina (see all)

    One Comment (Add Yours)

    1. It doesn’t make any sense to put your consumer inside “GetConsumingEnumerable()”, because it:
      - 1) defeats the purpose of take / trytake;
      - 2) when fully iterated, the code will return from the method, even when items can still be added after;
      - 3) the take method is suited for this whole purpose. It blocks, until a new element is inside the collection or when the flag completion is set and all elements are processed…

      All you had to do was using the method take, as the underlying algorithm won’t poll it, but message it through the current message pumping from the thread running it.

      Simple as that.

      Hope this made sense,
      -Casper

    Add Your Comment (Get a Gravatar)