Monday, August 17, 2009

Concurrency #3: The Queue is the Mailbox

Introduction

So far in this series, I have introduced futures and a lock-free queue. Now we are going to use those tools to create a call queue, which will just happen to return futures. This queue will allow us to do message-based synchronization, which will be covered in the next post.

Designing the Interface

A call queue's interface is trivial. It accepts actions to run and, in this case, returns a future for when the action has completed.
  
Public Interface ICallQueue
Function QueueAction(ByVal action As Action) As IFuture
End Interface

We also want to be able to work with functions, so we will add an extension method that accepts functions and returns a future for the value computed when the queued function finally runs.
  
Public Module ExtensionsForICallQueue
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module

Notice that the interface does not specify how the actions run. The queue might run them on a separate thread, on the thread pool, or in any other context it wants. The only requirements are that the queued calls must eventually run and they must run in order.

Implementing the Call Queue

The call queue is going to be lock free, making things fast and (more importantly) interesting. The call queue implementation will handle guaranteeing the eventual execution of queued calls, since the lock-free queue handles the in-order stuff. We will need to create a consumer when the queue becomes non-empty, destroy the consumer when the queue becomes empty, and do all of this in parallel.

To simplify reasoning about the very large number of possible thread interleavings, I am only going to focus on the exit point of methods. At every method exit point the queue will guarantee "empty or moving". To be more explicit, at every point the queue guarantees ONE of the following:
1) Queued calls are being executed
2) There is another thread executing inside the queue (which will affect shared state before it finishes)
3) The queue is empty
Given "empty or moving", it is impossible for a method to leave the queue idle in a non-empty state (which would result in calls potentially never being executed).

Alright, let's implement a queue satisfying the above property! All the difficult logic will be placed into private methods for acquiring and releasing consumer responsibilities, and the queue will call them appropriately before exiting methods. Let's start with the simple stuff so we see the big picture. We need a lock-free queue to store queued actions, a method to enqueue actions, and an overridable method to consume them. The consume method is overridable so child classes can override it and do things like call the base method on a new thread, giving a threaded call queue.
  
Public MustInherit Class AbstractCallQueue
Implements ICallQueue

Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore

Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class

Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)
If TryAcquireConsumer() Then
Call Consume()
End If
Return item.future
End Function

Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
...

Notice that the last shared-state-actions performed by QueueAction and Consume are TryAcquireConsumer and TryReleaseConsumer respectively. Therefore if those two methods meet "empty or moving", the queue will work.

The TryAcquireConsumer method will be simple and optimistic, because many producers can call it at the same time. It will just optimistically fail if the queue is empty, and otherwise try to atomically acquire the consumer responsibilities. Now to prove all possible exits meet "empty or moving". The optimistic failure is safe, because if another producer exists to enqueue an item in parallel, then there is some other thread executing inside the queue. The attempt to acquire the lock is safe because if it fails then there is a consumer executing inside the queue, and if it succeeds then we will create a consumer executing inside the queue. Therefore, when this method ends the queue will be empty or there will be some thread executing inside the queue, which proves "empty or moving".
  
...
Private Function TryAcquireConsumer() As Boolean
If queue.WasEmpty Then Return False
Return Threading.Interlocked.Exchange(running, 1) = 0
End Function
...

Try TryReleaseConsumer method will be less simply. It will first check if the queue is not empty, in which case clearly we should still be consuming and it will fail. Then it will optimistically release the consumer responsibilities. But, between the empty check and releasing responsibilities, a producer may have enqueued an item. Therefore, we have to try to re-acquire consumer responsibilities. If that fails, then we know the queue is empty or another thread is now the consumer. Therefore, if we fail to re-acquire consumer, responsibilities we are done and can safely exit.

We're not done yet. There is one last possibility: some other thread may have acquired consumer responsibilities, emptied the queue, and released those responsibilities just before we acquired them; leaving us with a potentially empty queue! Therefore, if we acquire consumer responsibilities then the first thing we need to do is try to release consumer responsibilities all over again! Therefore we must keep trying until we fail to acquire (in which case we exit the queue) or find a non-empty queue (in which case we process some items then come back). But, as long as we don't give up, "empty or moving" will be satisfied. Therefore, even though the caller can potentially be live-locked, TryReleaseConsumer satisfies "empty or moving". (The live lock is not an issue because, in order for it to occur, progress must be occurring).
  
...
Private Function TryReleaseConsumer() As Boolean
Do
If Not queue.WasEmpty Then Return False
Threading.Interlocked.Exchange(running, 0)
If Not TryAcquireConsumer() Then Return True
Loop
End Function
End Class

A couple call queue types

Hurray, we have an abstract lock-free call queue! Now let's go nuts and implement some concrete types we'll actually want to use. The obvious one is a ThreadedCallQueue, which runs queued calls on an independent thread:
  
'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class

However, we might have a very large number of call queues in a program, and the start-up and tear-down cost for threads is quite high. The .Net framework provides a ThreadPool class that accepts work items to execute on a limited number of threads. We should create a ThreadPooledCallQueue for running queued calls with the thread pool:
  
'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class

Lastly, we might require our calls to run on the UI thread (attempting to access form controls from another thread causes exceptions). It would be extremely convenient if we had a call queue which didn't require us to Invoke all over the place. Enter the InvokedCallQueue:
  
'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class

There are surely plenty of other interesting ways to run queued calls, but these three have met my needs so far.

Summary

Now we have a bunch of lock-free call queues we can use to order calls from multiple threads. In the next post in this series I will cover how to use the queue for synchronization and compare it with a lock based approach. Fully commented code follows.
  
'''<summary>Describes a thread-safe call queue for non-blocking calls.</summary>
Public Interface ICallQueue
'''<summary>Queues an action to be run and returns a future for the action's eventual completion.</summary>
Function QueueAction(ByVal action As Action) As IFuture
End Interface

Public Module ExtensionsForICallQueue
'''<summary>Queues a function to be run and returns a future for the function's eventual output.</summary>
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module

''' <summary>
''' A queue for running actions in order.
''' Debug.Prints unexpected exceptions from queued calls.
''' </summary>
''' <remarks>
''' Consumes items produced by multiple producers.
''' Ensures that enqueued items are consumed by ensuring at all exit points that either:
''' - the queue is empty
''' - or exactly one consumer exists
''' - or another exit point will be hit [by another thread]
''' </remarks>
Public MustInherit Class AbstractCallQueue
Implements ICallQueue

Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore
Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class

''' <summary>
''' Queues an action to be run and returns a future for the action's eventual completion.
''' Starts running calls from the if they were not already being run.
'''</summary>
''' <remarks>
''' Enqueues a sequence of items to be consumed by the consumer.
''' The items are guaranteed to end up adjacent in the queue.
''' </remarks>
Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)

'Start the consumer if it is not already running
If TryAcquireConsumer() Then
Call Consume()
End If

Return item.future
End Function

'''<summary>Returns true if consumer responsibilities were acquired by this thread.</summary>
Private Function TryAcquireConsumer() As Boolean
'Don't bother acquiring if there are no items to consume
'This unsafe check is alright because enqueuers call this method after enqueuing
'Even if an item is queued between the check and returning false, the enqueuer will call this method again
'So we never end up with a non-empty idle queue
If queue.WasEmpty Then Return False

'Try to acquire consumer responsibilities
Return Threading.Interlocked.Exchange(running, 1) = 0

'Note that between the empty check and acquiring the consumer, all queued actions may have been processed.
'Therefore the queue may be empty at this point, but that's alright. Just a bit of extra work, nothing unsafe.
End Function
'''<summary>Returns true if consumer responsibilities were released by this thread.</summary>
Private Function TryReleaseConsumer() As Boolean
Do
'Don't release while there's still things to consume
If Not queue.WasEmpty Then Return False

'Release consumer responsibilities
Threading.Interlocked.Exchange(running, 0)

'It is possible that a new item was queued between the empty check and actually releasing
'Therefore it is necessary to check if we can re-acquire in order to guarantee we don't leave a non-empty queue idle
If Not TryAcquireConsumer() Then Return True

'Even though we've now acquired consumer, we may have ended up with nothing to process!
'So let's repeat this whole check for empty/release dance!
'A caller could become live-locked here if other threads keep emptying and filling the queue.
'But only consumer threads call here, and the live-lock requires that progress is being made.
'So it's alright. We still make progress and we still don't end up in an invalid state.
Loop
End Function

''' <summary>
''' Runs queued calls until there are none left.
''' Child classes can override and call base implementation using desired method (eg. on a new thread).
''' </summary>
Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
End Class

'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class

'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class

'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class

Tuesday, August 4, 2009

Emulating Yield-like Iterators in VB10

Introduction

One of the defining characteristics of VB10 and C#4 is feature convergence. VB and C# now share optional arguments, dynamic types, multiline lambdas, and auto-properties. However, there are still differences between the two languages. One of the glaring differences is the lack of yield-style Iterators in VB. In this post, I am going to address that difference by constructing a class that uses lambda expressions to achieve functionality similar to yield.

Explicit Iterators vs Yield Iterators

Explicitly implementing an iterator is a huge pain, because of the sheer amount of code required. You have to write a whole class, implement two interfaces (including their non-generic legacy versions), and manually track state. Put it all together and you get massive line waste.

Just to hammer this in, here is a typical iterator implementation to enumerate a contiguous subset of an array:

Public Class SubArrayIterator(Of T)
Implements IEnumerable(Of T)
Implements IEnumerator(Of T)

Private ReadOnly array As T()
Private ReadOnly offset As Integer
Private ReadOnly length As Integer
Private index As Integer
Private cur As T

Public Sub New(ByVal array As T(), ByVal offset As Integer, ByVal length As Integer)
Me.array = array
Me.offset = offset
Me.length = length
End Sub

Public Function GetEnumerator() As IEnumerator(Of T) Implements IEnumerable(Of T).GetEnumerator
Return New SubArrayIterator(Of T)(array, offset, length)
End Function

Private ReadOnly Property Current As T Implements IEnumerator(Of T).Current
Get
Return cur
End Get
End Property

Private Function MoveNext() As Boolean Implements IEnumerator.MoveNext
If index >= length Then Return False
cur = array(index)
index += 1
Return True
End Function

Private Sub Reset() Implements IEnumerator.Reset
index = 0
End Sub

Private Sub Dispose() Implements IDisposable.Dispose
'no unmanaged state
End Sub

Private Function GetEnumeratorObj() As IEnumerator Implements IEnumerable.GetEnumerator
Return GetEnumerator()
End Function
Private ReadOnly Property CurrentObj As Object Implements IEnumerator.Current
Get
Return cur
End Get
End Property
End Class

Augh! Fifty lines for something I can describe in less than ten words! Now compare that to this:

public IEnumerable IterateSubArray<T>(T[] array, int offset, int length) {
for (int i = 0; i <>
yield return array[i + offset];
}

Wow! We jumped from fifty lines to four! The compiler will explicitly implement a large class (it translates the function into a state machine), but all we see is four lines. There are downsides (eg. Reset is not supported, but nobody uses Reset anyways), but they are mostly inconsequential compared to the code savings. The main problem is the lazy evaluation of the iterator causes argument exceptions to be thrown when the iterator is used instead of when it is constructed.

I should probably point out that, using linq, the example enumeration can be just a single line. Remember that it is only for demonstration purposes.

The Controller

The basic idea I have is to use a generator function to implement the enumeration. The function will take a controller, in order to signal special commands like "end of enumeration".

Public Interface IEnumeratorController(Of T)
Function Break() As T
End Interface

You can add more control methods as you desire them. Other useful possible control methods are "Multiple", which would return a sequence of items to enumerate, and "Repeat", which would return no item to enumerate and run the generator again. Note that the expected usage style of the controller methods is in a return statement (like "return controller.Break()"). This avoids confusing cases like calling break then returning an element to enumerate (Which would take precedence?).

The Enumerator

Now that the controller is defined, we can write a class which implements IEnumerator using a generator function. It's relatively straightforward: you just track the generator's last returned item and a flag for whether or not controller.break has been called yet. We have line waste, but we only have to include this code once instead of once per iterator.

Public NotInheritable Class Enumerator(Of T)
Implements IEnumerator(Of T)
Implements IEnumeratorController(Of T)

Private ReadOnly generator As Func(Of IEnumeratorController(Of T), T)
Private cur As T
Private break As Boolean
Public Sub New(ByVal generator As Func(Of IEnumeratorController(Of T), T))
Me.generator = generator
End Sub

Public Function MoveNext() As Boolean Implements IEnumerator(Of T).MoveNext
If Me.break Then Return False 'have previously called controller.break
Me.cur = generator(Me)
If Me.break Then Return False 'just called controller.break
Return True
End Function

Private Function ControllerBreak() As T Implements IEnumeratorController(Of T).Break
Me.break = True
Return Nothing
End Function

Public ReadOnly Property Current As T Implements IEnumerator(Of T).Current
Get
Return Me.cur
End Get
End Property

Private ReadOnly Property CurrentObj As Object Implements IEnumerator.Current
Get
Return Me.cur
End Get
End Property
Private Sub Reset() Implements IEnumerator.Reset
Throw New NotSupportedException()
End Sub
Private Sub Dispose() Implements IDisposable.Dispose
'no unmanaged state
End Sub
End Class

The Generator

With our Enumerator and Controller in hand, we can write a function to enumerate a contiguous subset of an array. We will use a lambda expression for the generator, and a hoisted local variable to keep state:

Public Function EnumerateSubArray(Of T)(ByVal items() As T, ByVal offset As Integer, ByVal length As Integer) As IEnumerator(Of T)
Dim index = 0 'hoisted variable used for state
Return New Enumerator(Of T)(
Function(controller)
If index >= length Then Return controller.Break()
index += 1
Return items(index - 1)
End Function)
End Function

The result isn't be as compact as using C#'s yield, but is still a huge improvement over explicit implementation. It's essentially like implementing only the MoveNext method instead of an entire enumerator class. The main downside compared to yield is the loss of multiple entry points into the function (we always start at the beginning, not on the line after the last return). We do have one advantage over using yield: we can check arguments upon construction of the iterator (because everything before return New Enumerator(...) is executed immediately).

Enumerable vs Enumerator

An IEnumerable is an object that can be enumerated more than once. It returns a new enumerator for each enumeration, and is the type required by the useful 'for each' loop. In order to implement an IEnumerable we just pull the same trick again: seeding a class with a generator function. Luckily, this time we don't need a controller because an IEnumerable doesn't require state.

Public NotInheritable Class Enumerable(Of T)
Implements IEnumerable(Of T)
Private ReadOnly generator As Func(Of IEnumerator(Of T))
Public Sub New(ByVal generator As Func(Of IEnumerator(Of T)))
Me.generator = generator
End Sub
Public Function GetEnumerator() As IEnumerator(Of T) Implements IEnumerable(Of T).GetEnumerator
Return generator()
End Function
Private Function GetEnumeratorObj() As IEnumerator Implements IEnumerable.GetEnumerator
Return GetEnumerator()
End Function
End Class

At last, we can properly iterate over that contiguous array subset by using our previous EnumerateSubArray function to generate enumerators:

Public Function IterateSubArray(Of T)(ByVal items() As T, ByVal offset As Integer, ByVal length As Integer) As IEnumerable(Of T)
Return New Enumerable(Of T)(Function() EnumerateSubArray(items, offset, length))
End Function

Conclusion

Even though VB doesn't have a yield keyword, we can still cut the number of lines required for an iterator by a factor a five. All we needed were a couple classes to turn generator functions into an enumerator. It's not as good as using yield, but we don't have yield.