Saturday, July 4, 2009

Concurrency #1: A Multiple-Producer Single-Consumer Lock Free Queue

Introduction

The later parts of this series are going to require a call queue which accepts calls to run from multiple threads, then ensures they are run in the order they are received. But before we get to a call queue we're going to need a plain vanilla thread-safe queue. My first implementation used the obvious approach: place locks around the Enqueue and Dequeue methods. But when I used this queue, profiling revealed a full quarter of the program's time was spent acquiring and releasing locks! Half of that was attributable to the call queues. I do put a ton of calls through these queues, so I need them be as fast as possible.

Enter lock free data structures, which I'd read a little bit about at the time. Instead of using locks, they use simple atomic operations provided by the hardware. This allows them to be much faster, but it also makes them incredibly hard to write correctly. You need to figure out how to do it without locks, then you need to worry about reads and writes being reordered by modern superscalar processors, then you need to worry about the compiler moving things around, caching reads, removing writes, and other optimizations that work great for single-threaded programs. I really don't want to understate exactly how hard you have to think about lock free stuff. Don't write lock-free stuff unless you're prepared to write an essay explaining why the code is correct (only to find some thread interleaving that takes the whole thing down).

Most lock free queue examples out there use the humble CAS (compare-and-swap) atomic primitive. CAS atomically assigns a value to a variable only if the variable is equal to another value. Here is some pseudo code for CAS. Note that all my code samples are in VB10, which is in beta at the time of this writing.
      Public Function CompareAndSwap(ByRef target As Integer,
ByVal newValue As Integer,
ByVal comparisonValue As Integer) As Boolean
SyncLock GlobalLock
If target <> comparisonValue Then Return False
target = comparisonValue
Return True
End SyncLock
End Function
I wanted to write a unique queue, so I decided to try to avoid using CAS. I ended up using the XCHG (Exchange) atomic primitive. XCHG atomically gets-and-sets a variable, effectively exchanging a new value for the old value. Pseudo code:
      Public Function Exchange(ByRef target As Integer,
ByVal newValue As Integer) As Integer
SyncLock GlobalLock
Dim oldValue = target
target = newValue
Return oldValue
End SyncLock
End Function
This is the only atomic primitive I needed for the queue.

Designing the Queue

Queues are typically stored in two ways: as an array or as a linked list. I don't want to even think about how hard it would be to resize an array without acquiring any locks, so the queue is stored using a linked list. There will be a head node, which the consumer will advance in order to dequeue elements, and a tail node which producers will use to enqueue items. The producers will never manipulate the head variable, and items will be published to the queue purely by linking them to the insertion point.
      Public Class SingleConsumerLockFreeQueue(Of T)
Private head As Node = New Node(Nothing)
Private insertionPoint As Node = head
Private Class Node
Public ReadOnly value As T
Public [next] As Node
Public Sub New(ByVal value As T)
Me.value = value
End Sub
End Class
...
Essentially the same starting definition you use for any linked list queue. The consumer part is also easy. There is no need for any locks or atomic operations because there is only one consumer, and the producers never manipulate the head variable. We can just implement it as if we were working with a single threaded queue:
          ...
Public ReadOnly Property WasEmpty As Boolean
Get
Return head.next Is Nothing
End Get
End Property
Public Function Peek() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
Return head.next.value
End Function
Public Function Dequeue() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
head = head.next
Return head.value
End Function
...
Note that Peek and Dequeue must only be called from the Consumer thread. You can call WasEmpty from a producer thread, but its return value won't be stable. It's only stable if the queue is non-empty and we called from the Consumer thread (which is why it's WasEmpty instead of IsEmpty). If you've seen CAS-based queues, you'll also notice I avoided some issues by making ".next != null" implicitely equivalent to "another item is queued".

The producers are more complicated. They're going to have to share the insertion point variable by using the XCHG operation. Each producer will exchange a new node for the current insertion point, which will stop producers from interfering with each other:
          ...
Public Sub BeginEnqueue(ByVal item As T)
Dim newNode = New Node(item)
Dim oldInsertionPoint = Threading.Interlocked.Exchange(Me.insertionPoint, newNode)
oldInsertionPoint.next = newNode
End Sub
...
This queue is essentially identical to a normal queue, except for the XCHG magic in BeginEnqueue. Note that there's a reason I call it BeginEnqueue instead of Enqueue, which comes up later in this post.

Correctness

With the producer and consumer methods defined, we now have a complete queue. But is it correct? The consumer part probably is, but the producer part looks like it could get crazy with lots of threads in there! Let's start by proving the producers don't destructively interfere with each other.

Notice that, for each call to BeginEnqueue, a brand new node is passed as the second argument to XCHG. Therefore, the second argument to XCHG will always be unique. Therefore, the value returned by XCHG will always be unique, because it essentially lags the second argument by one call. Therefore, each producer gets a unique oldInsertionPoint. Also notice that the producers never modify the .next pointer of its own constructed node: they leave that responsibility to the next producer to call XCHG. In fact, the only time a node's .next pointer is ever modified is "oldInsertionPoint.next = newNode". Therefore, there is no write-write race condition on oldInsertionPoint.next = newNode, because oldInsertionPoint is unique to each call and the .next pointer is modified nowhere else. Finally, notice that the .next pointer can only change in the future if it is currently null. This guarantees the property I mentioned earlier: ".next != null" == "another item is queued".

A good mental image of the queue is a big disassembled and growing chain randomly linking itself together until it is a big assembled chain. In later posts I will talk about how I tested the queue in order to truly convince myself it was correct (It wasn't just stress testing.). For now, let's convince ourselves by looking at an example of what happens when two threads contend while enqueueing items.



We start with an empty queue, with the insertion point equal to the head. Then two threads call BeginEnqueue. The threads race, but Thread1 calls XCHG first. Its gets the head as its old insertion point and sets the new insertion point to its constructed node. Now Thread2 pre-empts Thread1 and runs its call to XCHG, getting Thread1's constructed node as its old insertion point. Thread2 continues and finishes, linking Prev2.next to Node2, but leaving the queue still technically empty. Finally, Thread1 finishes and links Prev1.next to Node1, leaving the queue with two items. Even though the thread order was perturbed, the chain still came together correctly. This works for any number of threads. Try it.

To finish this section off, notice that I can actually write a modified BeginEnqueue method to enqueue a sequence of items and guarantee they end up adjacent in the queue. Do you see how to do it?

Performance

Now that I've convinced myself the queue is correct, I want to know how it performs compared to the old locking queue and to CAS-based lock free queues. Actually, comparing it to the old queue was a joke. I profiled my code after I inserted this queue, and it just destroyed the old queue. The queueing overhead was essentially gone. So, I'm going to jump straight to armchair comparing it against CAS queues.

The obvious thing to notice is that all XCHG queue operations are guaranteed constant time. CAS queues, on the other hand, can spend arbitrarily long trying to enqueue their value (in practice this is extremely unlikely). The XCHG operation is also simpler than CAS. Overall, I expect producer throughput to be non-negligibly faster than a CAS based queue. The ability to safely enqueue multiple adjacent items is also nice and could be used to significantly reduce the number of calls to the expensive atomic operations.

The consumer side is essentially equivalent to a CAS queue, in that it could be modified to be multiple-consumer by using CAS. We don't need multiple consumers for a call queue, so I didn't bother making it safe for multiple consumers. The major consumer benefit compared to a CAS queue is that garbage collection is not required. Most lock free data structures require garbage collection, because anytime you explicitly free a node there is a risk that some slow thread still has a reference to it. Here the consumer can safely free nodes when their .next is consumed (because setting .next is the last thing a producer does), making a translation to a low level language like C trivial.

One downside I do want to note, however, is the non-obvious priority inversion. Remember, from the enqueueing example I gave, how Node2 wasn't technically queued when Thread2 finished? It was only queued when Thread1 finished later on. Now imagine a very low priority thread and a very high priority thread continuously enqueueing items. The low priority thread would run the exchange line, and essentially stall the queue until it updated the .next pointer on the next line. The high priority thread would be appending items, but they would not be dequeueable until the low priority queue finished. In other words, the average delay depends on the slowest producer. Luckily, the buffering effect stops this from affecting the average throughput. I would not recommend this queue for real-time systems where milliseconds count and prioritization is king.

Overly Simple Profiling

In order to get an idea of the queue contention, I profiled the time it took 1 to 100 threads to queue 10 000 items each. I'm running on a Core2 Duo processor, and the times were measured using GetTickCount, which probably explains why the low data points are a bit wild. I don't see much usefulness in these numbers, but I needed a pretty graph so make of it what you will. I'm open to suggestions for more relevant tests.


Conclusion

The completed queue class is posted below, along with comments. The class is 48 lines of code plus 82 lines of comments and white space. Lock-free stuff is not a joke: you have to explain yourself. In my next post I will be discussing futures, which will be another integral part of the call queue coming later on.
      ''' <summary>
''' A multiple-producer, single-consumer lock-free queue.
''' Does NOT guarantee an item has been queued when BeginEnqueue finishes.
''' Does guarantee an item will eventually be queued after BeginEnqueue finishes.
''' Does guarantee that, for calls simultaneously in BeginEnqueue, at least one will finish with its item having been enqueued.
''' Does guarantee that, if BeginEnqueue(Y) is called after BeginEnqueue(X) finishes, Y will follow X in the queue.
''' </summary>
''' <remarks>
''' Performance characteristics:
''' - All operations are guaranteed constant time.
''' - Latency between BeginEnqueue finishing and the item being enqueued can be delayed arbitrarily by slowing down only one of the producers.
''' - (How does this compare to CAS-based implementations in terms of average throughput? It should be higher?)
''' </remarks>
Public Class SingleConsumerLockFreeQueue(Of T)
''' <summary>
''' Owned by the consumer.
''' This node is the end marker of the consumed nodes.
''' This node's next is the next node to be consumed.
''' </summary>
Private head As Node = New Node(Nothing)
''' <summary>
''' This node is the tail of the last partially or fully inserted chain.
''' The next inserted chain will exchange its tail for the insertionPoint, then set the old insertionPoint's next to the chain's head.
''' </summary>
Private insertionPoint As Node = head
''' <summary>
''' Singly linked list node containing queue items.
''' </summary>
Private Class Node
Public ReadOnly value As T
Public [next] As Node
Public Sub New(ByVal value As T)
Me.value = value
End Sub
End Class

''' <summary>
''' Begins adding new items to the queue.
''' The items may not be dequeueable when this method finishes, but eventually they will be.
''' The items are guaranteed to end up adjacent in the queue and in the correct order.
''' </summary>
''' <remarks>
''' An example of what can occur when two items are queued simultaneously:
''' Initial state:
''' insert=head -> null
''' [queue is empty]
''' Step 1: First item is created and exchanged with insertion point.
''' head=prev1 -> null
''' insert=node1 -> null
''' [queue is empty]
''' Step 2: Second thread pre-empts and second item is created and exchanged with insertion point.
''' head=prev1 -> null
''' node1=prev2 -> null
''' insert=node2 -> null
''' [queue is empty]
''' Step 3: Second thread finishes setting prev.next.
''' head=prev1 -> null
''' node1=prev2 -> insert=node2 -> null
''' [queue is empty]
''' Step 4: First thread comes back and finishes setting prev.next.
''' head=prev1 -> node1=prev2 -> insert=node2 -> null
''' [queue contains 2 elements]
''' </remarks>
''' <implementation>
''' Each producer creates a new chain, and exchanges the shared insertion point for the tail of the new chain.
''' The producer then links the old insertion point to the head of the new chain.
''' A new chain might not be in the main chain when the function exits, but it will be in a chain that will eventually be in the main chain.
''' </implementation>
Public Sub BeginEnqueue(ByVal items As IEnumerable(Of T))
If items Is Nothing Then Throw New ArgumentNullException("items")
If Not items.Any Then Return

'Create new chain
Dim chainHead As Node = Nothing
Dim chainTail As Node = Nothing
For Each item In items
If chainHead Is Nothing Then
chainHead = New Node(item)
chainTail = chainHead
Else
chainTail.next = New Node(item)
chainTail = chainTail.next
End If
Next item

'Append chain to previous chain
Dim prevChainTail = Threading.Interlocked.Exchange(Me.insertionPoint, chainTail)
prevChainTail.next = chainHead
End Sub
''' <summary>
''' Begins adding a new item to the queue.
''' The item may not be dequeueable when this method finishes, but eventually it will be.
''' </summary>
''' <implementation>
''' Just an inlined and simplified version of BeginEnqueue(IEnumerable(Of T))
''' </implementation>
Public Sub BeginEnqueue(ByVal item As T)
Dim chainOfOne = New Node(item)
Dim prevChainTail = Threading.Interlocked.Exchange(Me.insertionPoint, chainOfOne)
prevChainTail.next = chainOfOne
End Sub

''' <summary>
''' Returns true if there were any items in the queue.
''' The return value is only stable if the queue is non-empty and you are calling from the consumer thread.
''' </summary>
Public ReadOnly Property WasEmpty As Boolean
Get
Return head.next Is Nothing
End Get
End Property

''' <summary>
''' Returns the next item in the queue.
''' This function must only be called from the consumer thread.
''' </summary>
Public Function Peek() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
Return head.next.value
End Function
''' <summary>
''' Removes and returns an item from the queue.
''' This function must only be called from the consumer thread.
''' </summary>
Public Function Dequeue() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
head = head.next
Return head.value
End Function
End Class

No comments:

Post a Comment