I love new toys, so of course when .NET 4.0 came out I felt like the proverbial kid in the candy store! Now, some people get all excited about the IDE and it’s new features or about changes to WPF and Silver Light and yes, those are all very fine and grand. But me, I get all excited about things that tend to affect my life on the backside of development. That’s why when I heard there were going to be concurrent container implementations in the latest version of .NET I was salivating like Pavlov’s dog at the dinner bell. They seem so simple, really, that one could easily overlook them. Essentially they are implementations of containers (many that mirror the generic collections, others are new) that have either been optimized with very efficient, limited, or no locking but are still completely thread safe -- and I just had to see what kind of an improvement that would translate into. Since part of my job as a solutions architect here where I work is to help design, develop, and maintain the systems that process tons of requests each second, the thought of extremely efficient thread-safe containers was extremely appealing. Of course, they also rolled out a whole parallel development framework which I won’t get into in this post but will cover bits and pieces of as time goes by. This time, I was mainly curious as to how well these new concurrent containers would perform compared to areas in our code where we manually synchronize them using lock or some other mechanism. So I set about to run a processing test with a series of producers and consumers that would be either processing a traditional System.Collections.Generic.Queue or a System.Collection.Concurrent.ConcurrentQueue. Now, I wanted to keep the code as common as possible to make sure that the only variance was the container, so I created a test Producer and a test Consumer. The test Producer takes an Action<string> delegate which is responsible for taking a string and placing it on whichever queue we’re testing in a thread-safe manner: 1: internal class Producer
2: {
3: public int Iterations { get; set; }
4: public Action<string> ProduceDelegate { get; set; }
5:
6: public void Produce()
7: {
8: for (int i = 0; i < Iterations; i++)
9: {
10: ProduceDelegate(“Hello”);
11: }
12: }
13: }
Then likewise, I created a consumer that took a Func<string> that would read from whichever queue we’re testing and return either the string if data exists or null if not. Then, if the item doesn’t exist, it will do a 10 ms wait before testing again. Once all the producers are done and join the main thread, a flag will be set in each of the consumers to tell them once the queue is empty they can shut down since no other data is coming:
1: internal class Consumer
2: {
3: public Func<string> ConsumeDelegate { get; set; }
4: public bool HaltWhenEmpty { get; set; }
5:
6: public void Consume()
7: {
8: bool processing = true;
9:
10: while (processing)
11: {
12: string result = ConsumeDelegate();
13:
14: if(result == null)
15: {
16: if (HaltWhenEmpty)
17: {
18: processing = false;
19: }
20: else
21: {
22: Thread.Sleep(TimeSpan.FromMilliseconds(10));
23: }
24: }
25: else
26: {
27: DoWork(); // do something non-trivial so consumers lag behind a bit
28: }
29: }
30: }
31: }
Okay, now that we’ve done that, we can launch threads of varying numbers using lambdas for each different method of production/consumption. First let's look at the lambdas for a typical System.Collections.Generics.Queue with locking:
1: // lambda for putting to typical Queue with locking...
2: var productionDelegate = s =>
3: {
4: lock (_mutex)
5: {
6: _mutexQueue.Enqueue(s);
7: }
8: };
9:
10: // and lambda for typical getting from Queue with locking...
11: var consumptionDelegate = () =>
12: {
13: lock (_mutex)
14: {
15: if (_mutexQueue.Count > 0)
16: {
17: return _mutexQueue.Dequeue();
18: }
19: }
20: return null;
21: };
Nothing new or interesting here. Just typical locks on an internal object instance. Now let's look at using a ConcurrentQueue from the System.Collections.Concurrent library:
1: // lambda for putting to a ConcurrentQueue, notice it needs no locking!
2: var productionDelegate = s =>
3: {
4: _concurrentQueue.Enqueue(s);
5: };
6:
7: // lambda for getting from a ConcurrentQueue, once again, no locking required.
8: var consumptionDelegate = () =>
9: {
10: string s;
11: return _concurrentQueue.TryDequeue(out s) ? s : null;
12: };
So I pass each of these lambdas and the number of producer and consumers threads to launch and take a look at the timing results. Basically I’m timing from the time all threads start and begin producing/consuming to the time that all threads rejoin.
I won't bore you with the test code, basically it just launches code that creates the producers and consumers and launches them in their own threads, then waits for them all to rejoin. The following are the timings from the start of all threads to the Join() on all threads completing. The producers create 10,000,000 items evenly between themselves and then when all producers are done they trigger the consumers to stop once the queue is empty.
These are the results in milliseconds from the ordinary Queue with locking:
1: Consumers Producers 1 2 3 Time (ms)
2: ---------- ---------- ------ ------ ------ ---------
3: 1 1 4284 5153 4226 4554.33
4: 10 10 4044 3831 5010 4295.00
5: 100 100 5497 5378 5612 5495.67
6: 1000 1000 24234 25409 27160 25601.00
And the following are the results in milliseconds from the ConcurrentQueue with no locking necessary:
1: Consumers Producers 1 2 3 Time (ms)
2: ---------- ---------- ------ ------ ------ ---------
3: 1 1 3647 3643 3718 3669.33
4: 10 10 2311 2136 2142 2196.33
5: 100 100 2480 2416 2190 2362.00
6: 1000 1000 7289 6897 7061 7082.33
Note that even though obviously 2000 threads is quite extreme, the concurrent queue actually scales really well, whereas the traditional queue with simple locking scales much more poorly.
I love the new concurrent collections, they look so much simpler without littering your code with the locking logic, and they perform much better. All in all, a great new toy to add to your arsenal of multi-threaded processing!