So far in this series, I have introduced futures and a lock-free queue. Now we are going to use those tools to create a call queue, which will just happen to return futures. This queue will allow us to do message-based synchronization, which will be covered in the next post.
Designing the Interface
A call queue's interface is trivial. It accepts actions to run and, in this case, returns a future for when the action has completed.
Public Interface ICallQueue
Function QueueAction(ByVal action As Action) As IFuture
End Interface
We also want to be able to work with functions, so we will add an extension method that accepts functions and returns a future for the value computed when the queued function finally runs.
Public Module ExtensionsForICallQueue
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module
Notice that the interface does not specify how the actions run. The queue might run them on a separate thread, on the thread pool, or in any other context it wants. The only requirements are that the queued calls must eventually run and they must run in order.
Implementing the Call Queue
The call queue is going to be lock free, making things fast and (more importantly) interesting. The call queue implementation will handle guaranteeing the eventual execution of queued calls, since the lock-free queue handles the in-order stuff. We will need to create a consumer when the queue becomes non-empty, destroy the consumer when the queue becomes empty, and do all of this in parallel.
To simplify reasoning about the very large number of possible thread interleavings, I am only going to focus on the exit point of methods. At every method exit point the queue will guarantee "empty or moving". To be more explicit, at every point the queue guarantees ONE of the following:
1) Queued calls are being executed
2) There is another thread executing inside the queue (which will affect shared state before it finishes)
3) The queue is empty
Given "empty or moving", it is impossible for a method to leave the queue idle in a non-empty state (which would result in calls potentially never being executed).
Alright, let's implement a queue satisfying the above property! All the difficult logic will be placed into private methods for acquiring and releasing consumer responsibilities, and the queue will call them appropriately before exiting methods. Let's start with the simple stuff so we see the big picture. We need a lock-free queue to store queued actions, a method to enqueue actions, and an overridable method to consume them. The consume method is overridable so child classes can override it and do things like call the base method on a new thread, giving a threaded call queue.
Public MustInherit Class AbstractCallQueue
Implements ICallQueue
Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore
Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class
Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)
If TryAcquireConsumer() Then
Call Consume()
End If
Return item.future
End Function
Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
...
Notice that the last shared-state-actions performed by QueueAction and Consume are TryAcquireConsumer and TryReleaseConsumer respectively. Therefore if those two methods meet "empty or moving", the queue will work.
The TryAcquireConsumer method will be simple and optimistic, because many producers can call it at the same time. It will just optimistically fail if the queue is empty, and otherwise try to atomically acquire the consumer responsibilities. Now to prove all possible exits meet "empty or moving". The optimistic failure is safe, because if another producer exists to enqueue an item in parallel, then there is some other thread executing inside the queue. The attempt to acquire the lock is safe because if it fails then there is a consumer executing inside the queue, and if it succeeds then we will create a consumer executing inside the queue. Therefore, when this method ends the queue will be empty or there will be some thread executing inside the queue, which proves "empty or moving".
...
Private Function TryAcquireConsumer() As Boolean
If queue.WasEmpty Then Return False
Return Threading.Interlocked.Exchange(running, 1) = 0
End Function
...
Try TryReleaseConsumer method will be less simply. It will first check if the queue is not empty, in which case clearly we should still be consuming and it will fail. Then it will optimistically release the consumer responsibilities. But, between the empty check and releasing responsibilities, a producer may have enqueued an item. Therefore, we have to try to re-acquire consumer responsibilities. If that fails, then we know the queue is empty or another thread is now the consumer. Therefore, if we fail to re-acquire consumer, responsibilities we are done and can safely exit.
We're not done yet. There is one last possibility: some other thread may have acquired consumer responsibilities, emptied the queue, and released those responsibilities just before we acquired them; leaving us with a potentially empty queue! Therefore, if we acquire consumer responsibilities then the first thing we need to do is try to release consumer responsibilities all over again! Therefore we must keep trying until we fail to acquire (in which case we exit the queue) or find a non-empty queue (in which case we process some items then come back). But, as long as we don't give up, "empty or moving" will be satisfied. Therefore, even though the caller can potentially be live-locked, TryReleaseConsumer satisfies "empty or moving". (The live lock is not an issue because, in order for it to occur, progress must be occurring).
...
Private Function TryReleaseConsumer() As Boolean
Do
If Not queue.WasEmpty Then Return False
Threading.Interlocked.Exchange(running, 0)
If Not TryAcquireConsumer() Then Return True
Loop
End Function
End Class
A couple call queue types
Hurray, we have an abstract lock-free call queue! Now let's go nuts and implement some concrete types we'll actually want to use. The obvious one is a ThreadedCallQueue, which runs queued calls on an independent thread:
'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class
However, we might have a very large number of call queues in a program, and the start-up and tear-down cost for threads is quite high. The .Net framework provides a ThreadPool class that accepts work items to execute on a limited number of threads. We should create a ThreadPooledCallQueue for running queued calls with the thread pool:
'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class
Lastly, we might require our calls to run on the UI thread (attempting to access form controls from another thread causes exceptions). It would be extremely convenient if we had a call queue which didn't require us to Invoke all over the place. Enter the InvokedCallQueue:
'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class
There are surely plenty of other interesting ways to run queued calls, but these three have met my needs so far.
Summary
Now we have a bunch of lock-free call queues we can use to order calls from multiple threads. In the next post in this series I will cover how to use the queue for synchronization and compare it with a lock based approach. Fully commented code follows.
'''<summary>Describes a thread-safe call queue for non-blocking calls.</summary>
Public Interface ICallQueue
'''<summary>Queues an action to be run and returns a future for the action's eventual completion.</summary>
Function QueueAction(ByVal action As Action) As IFuture
End Interface
Public Module ExtensionsForICallQueue
'''<summary>Queues a function to be run and returns a future for the function's eventual output.</summary>
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module
''' <summary>
''' A queue for running actions in order.
''' Debug.Prints unexpected exceptions from queued calls.
''' </summary>
''' <remarks>
''' Consumes items produced by multiple producers.
''' Ensures that enqueued items are consumed by ensuring at all exit points that either:
''' - the queue is empty
''' - or exactly one consumer exists
''' - or another exit point will be hit [by another thread]
''' </remarks>
Public MustInherit Class AbstractCallQueue
Implements ICallQueue
Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore
Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class
''' <summary>
''' Queues an action to be run and returns a future for the action's eventual completion.
''' Starts running calls from the if they were not already being run.
'''</summary>
''' <remarks>
''' Enqueues a sequence of items to be consumed by the consumer.
''' The items are guaranteed to end up adjacent in the queue.
''' </remarks>
Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)
'Start the consumer if it is not already running
If TryAcquireConsumer() Then
Call Consume()
End If
Return item.future
End Function
'''<summary>Returns true if consumer responsibilities were acquired by this thread.</summary>
Private Function TryAcquireConsumer() As Boolean
'Don't bother acquiring if there are no items to consume
'This unsafe check is alright because enqueuers call this method after enqueuing
'Even if an item is queued between the check and returning false, the enqueuer will call this method again
'So we never end up with a non-empty idle queue
If queue.WasEmpty Then Return False
'Try to acquire consumer responsibilities
Return Threading.Interlocked.Exchange(running, 1) = 0
'Note that between the empty check and acquiring the consumer, all queued actions may have been processed.
'Therefore the queue may be empty at this point, but that's alright. Just a bit of extra work, nothing unsafe.
End Function
'''<summary>Returns true if consumer responsibilities were released by this thread.</summary>
Private Function TryReleaseConsumer() As Boolean
Do
'Don't release while there's still things to consume
If Not queue.WasEmpty Then Return False
'Release consumer responsibilities
Threading.Interlocked.Exchange(running, 0)
'It is possible that a new item was queued between the empty check and actually releasing
'Therefore it is necessary to check if we can re-acquire in order to guarantee we don't leave a non-empty queue idle
If Not TryAcquireConsumer() Then Return True
'Even though we've now acquired consumer, we may have ended up with nothing to process!
'So let's repeat this whole check for empty/release dance!
'A caller could become live-locked here if other threads keep emptying and filling the queue.
'But only consumer threads call here, and the live-lock requires that progress is being made.
'So it's alright. We still make progress and we still don't end up in an invalid state.
Loop
End Function
''' <summary>
''' Runs queued calls until there are none left.
''' Child classes can override and call base implementation using desired method (eg. on a new thread).
''' </summary>
Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
End Class
'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class
'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class
'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class