Once again we consider some of the lesser known classes and keywords of C#. In this series of posts, we will discuss how the concurrent collections have been developed to help alleviate these multi-threading concerns.
Last week’s post began with a general introduction and discussed the ConcurrentStack<T> and ConcurrentQueue<T>. Today's post discusses the ConcurrentDictionary<T> (originally I had intended to discuss ConcurrentBag this week as well, but ConcurrentDictionary had enough information to create a very full post on its own!). Finally next week, we shall close with a discussion of the ConcurrentBag<T> and BlockingCollection<T>.
For more of the "Little Wonders" posts, see the index here.
Recap
As you'll recall from the previous post, the original collections were object-based containers that accomplished synchronization through a Synchronized member. While these were convenient because you didn't have to worry about writing your own synchronization logic, they were a bit too finely grained and if you needed to perform multiple operations under one lock, the automatic synchronization didn't buy much.
With the advent of .NET 2.0, the original collections were succeeded by the generic collections which are fully type-safe, but eschew automatic synchronization. This cuts both ways in that you have a lot more control as a developer over when and how fine-grained you want to synchronize, but on the other hand if you just want simple synchronization it creates more work.
With .NET 4.0, we get the best of both worlds in generic collections. A new breed of collections was born called the concurrent collections in the System.Collections.Concurrent namespace. These amazing collections are fine-tuned to have best overall performance for situations requiring concurrent access. They are not meant to replace the generic collections, but to simply be an alternative to creating your own locking mechanisms.
Among those concurrent collections were the ConcurrentStack<T> and ConcurrentQueue<T> which provide classic LIFO and FIFO collections with a concurrent twist. As we saw, some of the traditional methods that required calls to be made in a certain order (like checking for not IsEmpty before calling Pop()) were replaced in favor of an umbrella operation that combined both under one lock (like TryPop()).
Now, let's take a look at the next in our series of concurrent collections!For some excellent information on the performance of the concurrent collections and how they perform compared to a traditional brute-force locking strategy, see this wonderful whitepaper by the Microsoft Parallel Computing Platform team here.
ConcurrentDictionary – the fully thread-safe dictionary
The ConcurrentDictionary<TKey,TValue> is the thread-safe counterpart to the generic Dictionary<TKey, TValue> collection. Obviously, both are designed for quick – O(1) – lookups of data based on a key. If you think of algorithms where you need lightning fast lookups of data and don’t care whether the data is maintained in any particular ordering or not, the unsorted dictionaries are generally the best way to go.
Note: as a side note, there are sorted implementations of IDictionary, namely SortedDictionary and SortedList which are stored as an ordered tree and a ordered list respectively. While these are not as fast as the non-sorted dictionaries – they are O(log2 n) – they are a great combination of both speed and ordering -- and still greatly outperform a linear search.
Now, once again keep in mind that if all you need to do is load a collection once and then allow multi-threaded reading you do not need any locking. Examples of this tend to be situations where you load a lookup or translation table once at program start, then keep it in memory for read-only reference. In such cases locking is completely non-productive.
However, most of the time when we need a concurrent dictionary we are interleaving both reads and updates. This is where the ConcurrentDictionary really shines! It achieves its thread-safety with no common lock to improve efficiency. It actually uses a series of locks to provide concurrent updates, and has lockless reads! This means that the ConcurrentDictionary gets even more efficient the higher the ratio of reads-to-writes you have.
ConcurrentDictionary and Dictionary differences
For the most part, the ConcurrentDictionary<TKey,TValue> behaves like it’s Dictionary<TKey,TValue> counterpart with a few differences. Some notable examples of which are:
Add() does not exist in the concurrent dictionary.
This means you must use TryAdd(), AddOrUpdate(), or GetOrAdd(). It also means that you can’t use a collection initializer with the concurrent dictionary.
TryAdd() replaced Add() to attempt atomic, safe adds.
Because Add() only succeeds if the item doesn’t already exist, we need an atomic operation to check if the item exists, and if not add it while still under an atomic lock.
TryUpdate() was added to attempt atomic, safe updates.
If we want to update an item, we must make sure it exists first and that the original value is what we expected it to be. If all these are true, we can update the item under one atomic step.
TryRemove() was added to attempt atomic, safe removes.
To safely attempt to remove a value we need to see if the key exists first, this checks for existence and removes under an atomic lock.
AddOrUpdate() was added to attempt an thread-safe “upsert”.
There are many times where you want to insert into a dictionary if the key doesn’t exist, or update the value if it does. This allows you to make a thread-safe add-or-update.
GetOrAdd() was added to attempt an thread-safe query/insert.
Sometimes, you want to query for whether an item exists in the cache, and if it doesn’t insert a starting value for it. This allows you to get the value if it exists and insert if not.
Count, Keys, Values properties take a snapshot of the dictionary.
Accessing these properties may interfere with add and update performance and should be used with caution.
ToArray() returns a static snapshot of the dictionary.
That is, the dictionary is locked, and then copied to an array as a O(n) operation.
GetEnumerator() is thread-safe and efficient, but allows dirty reads.
Because reads require no locking, you can safely iterate over the contents of the dictionary. The only downside is that, depending on timing, you may get dirty reads.
Dirty reads during iteration
The last point on GetEnumerator() bears some explanation. Picture a scenario in which you call GetEnumerator() (or iterate using a foreach, etc.) and then, during that iteration the dictionary gets updated. This may not sound like a big deal, but it can lead to inconsistent results if used incorrectly.
The problem is that items you already iterated over that are updated a split second after don’t show the update, but items that you iterate over that were updated a split second before do show the update. Thus you may get a combination of items that are “stale” because you iterated before the update, and “fresh” because they were updated after GetEnumerator() but before the iteration reached them.
Let’s illustrate with an example, let’s say you load up a concurrent dictionary like this:
1: // load up a dictionary.
2: var dictionary = new ConcurrentDictionary<string, int>();
3:
4: dictionary["A"] = 1;
5: dictionary["B"] = 2;
6: dictionary["C"] = 3;
7: dictionary["D"] = 4;
8: dictionary["E"] = 5;
9: dictionary["F"] = 6;
Then you have one task (using the wonderful TPL!) to iterate using dirty reads:
1: // attempt iteration in a separate thread
2: var iterationTask = new Task(() =>
3: {
4: // iterates using a dirty read
5: foreach (var pair in dictionary)
6: {
7: Console.WriteLine(pair.Key + ":" + pair.Value);
8: }
9: });
And one task to attempt updates in a separate thread (probably):
1: // attempt updates in a separate thread
2: var updateTask = new Task(() =>
3: {
4: // iterates, and updates the value by one
5: foreach (var pair in dictionary)
6: {
7: dictionary[pair.Key] = pair.Value + 1;
8: }
9: });
Now that we’ve done this, we can fire up both tasks and wait for them to complete:
1: // start both tasks
2: updateTask.Start();
3: iterationTask.Start();
4:
5: // wait for both to complete.
6: Task.WaitAll(updateTask, iterationTask);
Now, if I you didn’t know about the dirty reads, you may have expected to see the iteration before the updates (such as A:1, B:2, C:3, D:4, E:5, F:6). However, because the reads are dirty, we will quite possibly get a combination of some updated, some original. My own run netted this result:
1: F:6
2: E:6
3: D:5
4: C:4
5: B:3
6: A:2
Note that, of course, iteration is not in order because ConcurrentDictionary, like Dictionary, is unordered. Also note that both E and F show the value 6. This is because the output task reached F before the update, but the updates for the rest of the items occurred before their output (probably because console output is very slow, comparatively).
If we want to always guarantee that we will get a consistent snapshot to iterate over (that is, at the point we ask for it we see precisely what is in the dictionary and no subsequent updates during iteration), we should iterate over a call to ToArray() instead:
1: // attempt iteration in a separate thread
2: var iterationTask = new Task(() =>
3: {
4: // iterates using a dirty read
5: foreach (var pair in dictionary.ToArray())
6: {
7: Console.WriteLine(pair.Key + ":" + pair.Value);
8: }
9: });
The atomic Try…() methods
As you can imagine TryAdd() and TryRemove() have few surprises. Both first check the existence of the item to determine if it can be added or removed based on whether or not the key currently exists in the dictionary:
1: // try add attempts an add and returns false if it already exists
2: if (dictionary.TryAdd("G", 7))
3: Console.WriteLine("G did not exist, now inserted with 7");
4: else
5: Console.WriteLine("G already existed, insert failed.");
TryRemove() also has the virtue of returning the value portion of the removed entry matching the given key:
1: // attempt to remove the value, if it exists it is removed and the original is returned
2: int removedValue;
3: if (dictionary.TryRemove("C", out removedValue))
4: Console.WriteLine("Removed C and its value was " + removedValue);
5: else
6: Console.WriteLine("C did not exist, remove failed.");
Now TryUpdate() is an interesting creature. You might think from it’s name that TryUpdate() first checks for an item’s existence, and then updates if the item exists, otherwise it returns false. Well, note quite...
It turns out when you call TryUpdate() on a concurrent dictionary, you pass it not only the new value you want it to have, but also the value you expected it to have before the update.
If the item exists in the dictionary, and it has the value you expected, it will update it to the new value atomically and return true. If the item is not in the dictionary or does not have the value you expected, it is not modified and false is returned.
1: // attempt to update the value, if it exists and if it has the expected original value
2: if (dictionary.TryUpdate("G", 42, 7))
3: Console.WriteLine("G existed and was 7, now it's 42.");
4: else
5: Console.WriteLine("G either didn't exist, or wasn't 7.");
The composite Add methods
The ConcurrentDictionary also has composite add methods that can be used to perform updates and gets, with an add if the item is not existing at the time of the update or get.
The first of these, AddOrUpdate(), allows you to add a new item to the dictionary if it doesn’t exist, or update the existing item if it does. For example, let’s say you are creating a dictionary of counts of stock ticker symbols you’ve subscribed to from a market data feed:
1: public sealed class SubscriptionManager
2: {
3: private readonly ConcurrentDictionary<string, int> _subscriptions = new ConcurrentDictionary<string, int>();
4:
5: // adds a new subscription, or increments the count of the existing one.
6: public void AddSubscription(string tickerKey)
7: {
8: // add a new subscription with count of 1, or update existing count by 1 if exists
9: var resultCount = _subscriptions.AddOrUpdate(tickerKey, 1, (symbol, count) => count + 1);
10:
11: // now check the result to see if we just incremented the count, or inserted first count
12: if (resultCount == 1)
13: {
14: // subscribe to symbol...
15: }
16: }
17: }
Notice the update value factory Func delegate. If the key does not exist in the dictionary, the add value is used (in this case 1 representing the first subscription for this symbol), but if the key already exists, it passes the key and current value to the update delegate which computes the new value to be stored in the dictionary. The return result of this operation is the value used (in our case: 1 if added, existing value + 1 if updated).
Likewise, the GetOrAdd() allows you to attempt to retrieve a value from the dictionary, and if the value does not currently exist in the dictionary it will insert a value. This can be handy in cases where perhaps you wish to cache data, and thus you would query the cache to see if the item exists, and if it doesn’t you would put the item into the cache for the first time:
1: public sealed class PriceCache
2: {
3: private readonly ConcurrentDictionary<string, double> _cache = new ConcurrentDictionary<string, double>();
4:
5: // adds a new subscription, or increments the count of the existing one.
6: public double QueryPrice(string tickerKey)
7: {
8: // check for the price in the cache, if it doesn't exist it will call the delegate to create value.
9: return _cache.GetOrAdd(tickerKey, symbol => GetCurrentPrice(symbol));
10: }
11:
12: private double GetCurrentPrice(string tickerKey)
13: {
14: // do code to calculate actual true price.
15: }
16: }
There are other variations of these two methods which vary whether a value is provided or a factory delegate, but otherwise they work much the same.
Oddities with the composite Add methods
The AddOrUpdate() and GetOrAdd() methods are totally thread-safe, on this you may rely, but they are not atomic. It is important to note that the methods that use delegates execute those delegates outside of the lock. This was done intentionally so that a user delegate (of which the ConcurrentDictionary has no control of course) does not take too long and lock out other threads.
This is not necessarily an issue, per se, but it is something you must consider in your design. The main thing to consider is that your delegate may get called to generate an item, but that item may not be the one returned!
Consider this scenario: A calls GetOrAdd and sees that the key does not currently exist, so it calls the delegate. Now thread B also calls GetOrAdd and also sees that the key does not currently exist, and for whatever reason in this race condition it’s delegate completes first and it adds its new value to the dictionary. Now A is done and goes to get the lock, and now sees that the item now exists. In this case even though it called the delegate to create the item, it will pitch it because an item arrived between the time it attempted to create one and it attempted to add it.
Let’s illustrate, assume this totally contrived example program which has a dictionary of char to int. And in this dictionary we want to store a char and it’s ordinal (that is, A = 1, B = 2, etc). So for our value generator, we will simply increment the previous value in a thread-safe way (perhaps using Interlocked):
1: public static class Program
2: {
3: private static int _nextNumber = 0;
4:
5: // the holder of the char to ordinal
6: private static ConcurrentDictionary<char, int> _dictionary
7: = new ConcurrentDictionary<char, int>();
8:
9: // get the next id value
10: public static int NextId
11: {
12: get { return Interlocked.Increment(ref _nextNumber); }
13: }
Then, we add a method that will perform our insert:
1: public static void Inserter()
2: {
3: for (int i = 0; i < 26; i++)
4: {
5: _dictionary.GetOrAdd((char)('A' + i), key => NextId);
6: }
7: }
Finally, we run our test by starting two tasks to do this work and get the results…
1: public static void Main()
2: {
3: // 3 tasks attempting to get/insert
4: var tasks = new List<Task>
5: {
6: new Task(Inserter),
7: new Task(Inserter)
8: };
9:
10: tasks.ForEach(t => t.Start());
11: Task.WaitAll(tasks.ToArray());
12:
13: foreach (var pair in _dictionary.OrderBy(p => p.Key))
14: {
15: Console.WriteLine(pair.Key + ":" + pair.Value);
16: }
17: }
If you run this with only one task, you get the expected A:1, B:2, ..., Z:26. But running this in parallel you will get something a bit more complex. My run netted these results:
1: A:1
2: B:3
3: C:4
4: D:5
5: E:6
6: F:7
7: G:8
8: H:9
9: I:10
10: J:11
11: K:12
12: L:13
13: M:14
14: N:15
15: O:16
16: P:17
17: Q:18
18: R:19
19: S:20
20: T:21
21: U:22
22: V:23
23: W:24
24: X:25
25: Y:26
26: Z:27
Notice that B is 3? This is most likely because both threads attempted to call GetOrAdd() at roughly the same time and both saw that B did not exist, thus they both called the generator and one thread got back 2 and the other got back 3. However, only one of those threads can get the lock at a time for the actual insert, and thus the one that generated the 3 won and the 3 was inserted and the 2 got discarded.
This is why on these methods your factory delegates should be careful not to have any logic that would be unsafe if the value they generate will be pitched in favor of another item generated at roughly the same time. As such, it is probably a good idea to keep those generators as stateless as possible.
Summary
The ConcurrentDictionary is a very efficient and thread-safe version of the Dictionary generic collection. It has all the benefits of type-safety that it’s generic collection counterpart does, and in addition is extremely efficient especially when there are more reads than writes concurrently.
Tweet
Technorati Tags: C#, .NET, Concurrent Collections, Collections, Little Wonders, Black Rabbit Coder,James Michael Hare