Wednesday, December 30, 2009

Solution - Asymmetric Latency

The robots cannot figure out which room they are in, and they cannot distinguish the symmetric case from asymmetric cases. They can measure the round trip time, but never the one-way trip time.

Consider that the robots' actions are determined entirely by previously sent and received messages. Delay all messages by five seconds, and a robot's response will be five seconds later. Therefore, if we delay all incoming messages by five seconds, and accelerate all outgoing messages by five seconds, the net delay on the response is 0 seconds. Changing the symmetric case of 2s/2s to 1s/3s will have no effect on the robots' actions because the net delay is unchanged. They cannot tell the cases apart.

It is easier to understand this proof visually. Suppose the delays were equal. Then the propagation of messages might look like this:









Changing the one-way delay while holding the round-trip time constant is equivalent to sliding one of the timelines. The exact same case, except with all of the delay in one direction, looks like this:








Notice that, from the robots' perspective, both cases are equivalent. A2 arrives the same amount of time after A1, B2 arrives the same amount of time after B1, etc. In fact we can make one of the directions have a negative delay (go back in time) and the robots still can't tell the difference (the round trip time is still positive)!

Wednesday, December 23, 2009

Puzzle - Asymmetric Latency

Suppose two robots are placed in separate isolated rooms, called A and B. They can communicate, but the communication delay is not symmetric: it takes one second for a message to travel from room B to room A, but three seconds for a message to travel from room A to room B. The robots do not know which room they are in, and they do not have synchronized clocks. The only information they share is some preconceived plan/program for sending messages and responses.


Questions:
1) Can the robots figure out which rooms they are in?
2T) If so, could they figure out the exact values of the one way latencies?
2F) If not, could they at least distinguish cases where the latency is asymmetric from cases where the latency is equal both ways?

Tuesday, November 10, 2009

The Assumptions We Make

I loved my algorithms courses in university. The material just 'clicked' and I did extremely well. But, one day awhile after I had finished the course, a thought about hashing occurred. I realized I had been making some very serious assumptions when analyzing algorithmic complexity. This post covers two of those assumptions.

The thought, restructured to actually follow a logical argument, goes something like this: hash tables can't be constant time. There's just no way. If a hash function takes constant time then its output has constant size. If the output has size N, then a table using that output has maximum size 2^N. If the table has a maximum size then we can overflow it by just adding more and more items (ie. it will eventually degenerate into linear time searches of the bins). What's going on?!

Constant Word Size

The constant word size assumption is the assumption that, even though you may have an unbounded number of unique items, they can still have constant size and can still be manipulated in constant time. Copying, comparing, swapping, and a bunch of other operations we often assume are constant time would necessarily become more expensive with huge input sizes.

Suppose you want to sort 2^2048 items (clearly this is a thought experiment!). If each of those items is unique, they must each have a unique binary representation. So, on average, each item must use at least 2048 bits. It suddenly becomes clear that comparing items can't be O(1), but must be at least O(lg n) [because the items have O(lg n) bits].

Let's analyze merge sort using this new realization. It should be immediately obvious that merging becomes more expensive: merging two sublists of size L from a list of size N requires O(L * lg n) time. Since we must merge sublists whose combined size is O(N) for O(lg n) levels before the list is sorted, merge sort will take O(N * (lg n) * (lg n)) time. Wait, what?! Merge sort is O(n lg n), not O(n lg^2 n)! What's going on?!

What's going on is the constant word size assumptions. The O(n lg n) bound actually applies to the number of comparisons you have to perform, not the total computational time. They just happen to coincide exceedingly well in practice (eg. I doubt anyone will ever actually sort 2^64 unique items, certainly never 2^128 unique items).

Essentially every algorithm has a hidden theoretical O(lg n) word-size factor hidden away. Once I realized this, my constant-time hashing dilemma went away. It's the exact same issue, except I was focusing on the output size instead of the input size.

RAM Model vs Physics

Perhaps the most ingrained assumption is the random-access memory model, where memory is unbounded with a constant access time. Unlike the constant word size assumption, this assumption was at least stated in my algorithms course. But reality is, of course, not so forgiving.

You can only fit so much information into a given area, and distance from the processor lower bounds the time it takes to retrieve a bit of information. A sphere with volume N has radius O(N^(1/3)), so accessing a particular bit of data is not just O(lg n), it's O(n^(1/3))! [I remember reading a black hole will eventually form if you exceed O(surface area), so it could even be O(n^(1/2))!]

I don't even want to think about what that does to the time complexity of sorting or hashing.

Summary

It's a bit unsettling to realize I'd been ignoring all these logarithmic factors, and that no one had ever mentioned them to me. Oh well, at least it doesn't matter in practice... Actually, the word size of processors has actually exceeded the increases in the size of data, subtly reinforcing the constant-size assumption.

Monday, September 28, 2009

.Net Design Mistakes - Streams

For the most part, .Net is a great framework. But it has its share of "WTF?" classes and design decisions. In this series I will cover (vent) these objective (subjective) faults. Today I am covering the mess better known as System.IO.Stream: a class trying to be five interfaces at once. Let's dive right in and cover the major problems.

Problem #1: Tedious to Implement

Suppose I want to implement a stream that flips the bits of a substream. Sounds pretty simple, but here is what comes out:

Public Class BinaryNotStream
Inherits IO.Stream
Private ReadOnly substream As IO.Stream

Public Sub New(ByVal substream As IO.Stream)
If substream Is Nothing Then Throw New ArgumentNullException("substream")
If Not substream.CanRead Then Throw New ArgumentException("substream")
Me.substream = substream
End Sub

Public Overrides Function Read(ByVal buffer() As Byte, ByVal offset As Integer, ByVal count As Integer) As Integer
Dim n = substream.Read(buffer, offset, count)
For i = offset To offset + n - 1
buffer(i) = Not buffer(i)
Next i
Return n
End Function

Public Overrides ReadOnly Property CanRead As Boolean
Get
Return True
End Get
End Property

Protected Overrides Sub Dispose(ByVal disposing As Boolean)
If disposing Then
substream.Dispose()
End If
MyBase.Dispose(disposing)
End Sub

Public Overrides ReadOnly Property CanSeek As Boolean
Get
Return False
End Get
End Property
Public Overrides ReadOnly Property CanWrite As Boolean
Get
Return False
End Get
End Property
Public Overrides Sub Flush()
End Sub
Public Overrides ReadOnly Property Length As Long
Get
Throw New NotSupportedException()
End Get
End Property
Public Overrides Property Position As Long
Get
Throw New NotSupportedException()
End Get
Set(ByVal value As Long)
Throw New NotSupportedException()
End Set
End Property
Public Overrides Function Seek(ByVal offset As Long, ByVal origin As System.IO.SeekOrigin) As Long
Throw New NotSupportedException()
End Function
Public Overrides Sub SetLength(ByVal value As Long)
Throw New NotSupportedException()
End Sub
Public Overrides Sub Write(ByVal buffer() As Byte, ByVal offset As Integer, ByVal count As Integer)
Throw New NotSupportedException()
End Sub
End Class

Allow me to summarize my reaction to this: AAAAAAAAAAAAAAAUUUUUUUUUUUUUUUUUUUUUUGGGHHH! Half of the code is throwing NotSupportedExceptions (which is appropriate!)! Pick almost any stream class and you'll find this "NotSupported" boilerplate code. It is a pain to write, a pain to read, and guarantees even dead simple stream class are at least a few dozen lines.

Problem #2: Guess the Type

It boggles the mind that a class must call itself a 'stream' when it can't actually do two thirds of the things a 'stream' does. There is no way to tell the type system a class is a "readable" stream, and no way to require a readable stream parameter. When you implement a function that accepts a stream, you must check that the stream supports reading by querying CanRead. Otherwise, an exception will be thrown when you try to use the stream (possibly much later). Technically, a stream could even change from readable to writable during execution.

Many .net streams change their readability based on arguments given to their constructor. For example, the file stream is not a readable stream and it is not a writable stream. The file stream is a "readable and/or writable" stream (depending on the open type specified). Similarly, the compression streams are "readable xor writable" streams (when compressing you may only write and when decompressing you may only read).

Because of this type flexibility, many of the stream classes have too many responsibilities. Poor DeflateStream has to both compress and decompress data, even though those operations have very different behaviours. Similarly, FileStream has to enforce read/write permissions at runtime instead of just relying on the type system.

Problem #3: Redundant Redundancy

First, streams have a Position property and a Seek method and both have to be implemented, despite the fact that Seek can be implemented entirely in terms of Position and Length. Every time you implement a seekable stream, you re-implement Seek exactly the same way as last time. What a waste of code, and what a great way to introduce blind copy-paste bugs.

Second, streams have an overridable Close method and a Dispose method. Here are their implementations in IO.Stream according to Reflector:

public virtual void Close() {
this.Dispose(true);
GC.SuppressFinalize(this);
}
public void Dispose() {
this.Close();
}

I think they assumed programmers would find it easier to understanding "closing" a stream as opposed to "disposing" a stream. This was a bad idea. Not only does it create confusion (is disposing equivalent to closing?) and add cruft, it implies you can later re-"open" the stream (you can't). Actually, this issue was mentioned in a Microsoft channel 9 show about the upcoming Linq-to-Events/Reactive-framework (as one of the reasons they decided to only use Dispose).

Problem #4: Not an Interface

Extension methods didn't exist when the stream class was designed and implemented, so the only way to include derived methods (eg. ReadByte uses Read) 'within' a type was an overridable class (I am guessing at Microsoft's reasoning, of course).

I don't really fault Microsoft for this problem: they probably chose the best option at the time. However, times have changed and there are downsides to using classes instead of interfaces. Classes don't allow multiple inheritance, which is probably why there is a single stream type instead of multiple (eg. ReadableStream, WriteableStream). It also means implementers can't be derived from a non-stream base class, which is annoying because sometimes "oh, and you can use it like a stream" is a feature you want to tack onto an existing class.

Proposed Solution

Split IO.Stream's various responsibilities into interfaces:

Public Interface IReadableStream
Inherits IDisposable
Function Read(ByVal buffer() As Byte, ByVal offset As Integer, ByVal count As Integer) As Integer
End Interface
Public Interface IWritableStream
Inherits IDisposable
Sub Write(ByVal buffer() As Byte, ByVal offset As Integer, ByVal count As Integer)
Sub Flush()
End Interface
Public Interface ISeekableStream
Inherits IDisposable
Property Position As Long
Property Length As Long
End Interface

IO.Stream can implement the interfaces for forwards compatibility. Extension methods, which wrap stream interface instances inside an IO.Stream, provide backwards compatibility. Extension methods can implement the derived methods from IO.Stream:

<Extension()>
Public Function ReadByte(ByVal this As IReadableStream) As Integer
If this Is Nothing Then Throw New ArgumentNullException("this")
Dim buffer(0 To 0) As Byte
Dim n = this.Read(buffer, 0, 1)
If n = 0 Then Return -1
Return buffer(0)
End Function

Eventually, IO.Stream can be deprecated.

Furthermore, define interfaces for combinations of responsibilities:

Public Interface IRereadableStream
Inherits IReadableStream
Inherits ISeekableStream
End Interface
Public Interface IRewritableStream
Inherits IWritableStream
Inherits ISeekableStream
End Interface
...


The result is the same functionality we have now, except type safe. You no longer need to check if a stream is readable, you no longer need to throw NotSupportExceptions, and you no longer need to write blog posts about how bad streams are.

Tuesday, September 15, 2009

Exceptions to Die By - The InvalidStateException

Introduction

When your program fails, you want to throw the right type of exception. You want to provide useful information in the type so the caller can react appropriately to the problem. Lack of type information is one reason why throwing an exception of type Exception is such a bad idea. The .net base class library includes exceptions for most common cases (ArgumentException, InvalidOperationException, IOException, etc.). This post covers a class of custom exceptions I like to use in my own projects, which I call invalid state exceptions.

Description

Most bugs immediately cause an exception to be thrown: if you try to pass null into a non-null method you get an ArgumentNullException, and if you try to pop an empty stack and you get an InvalidOperationException. But the difficult bugs don't throw exceptions immediately: they occur, corrupt the program state, and at some later point this causes an exception to be thrown.

Enter the InvalidStateException. An invalid state exception occurs when the program fails because a state-corrupting bug has already occurred. For example, a list enumeration failing because the list was modified should be an InvalidStateException instead of an InvalidOperationException.

'''<summary>Indicates an invalid program state has been detected (eg. due to a fault).</summary>
Public Class InvalidStateException
Inherits InvalidOperationException
Public Sub New(Optional ByVal message As String = Nothing,
Optional ByVal innerException As Exception = Nothing)
MyBase.New(If(message, "Reached an unexpected state."), innerException)
End Sub
End Class

Invalid state exceptions are generally irrecoverable because it is already too late to fix the problem. They tend to indicate bugs in the interaction of multiple components, and are usually thrown by assumption-checking code. I have found that adding this exception type has increased the amount of assumption checking code I write, which is definitely a good thing.

Derived Types

Usually we know more than just "the program is in an invalid state" when throwing an invalid state exception. The most common assumption I check is that a switch statement hits a defined case, so I have a derived type called ImpossibleValueException. Here are the three derived types I have used:

1. UnreachableException: I usually use this one to silence the compiler's reachability analyzer or to indicate to readers that a case is impossible.


'''<summary>Indicates a path expected to be unreachable has been executed.</summary>
Public Class UnreachableException
Inherits InvalidStateException
Public Sub New(Optional ByVal message As String = Nothing,
Optional ByVal innerException As Exception = Nothing)
MyBase.New(If(message, "Reached a state which was expected to not be reachable."), innerException)
End Sub
End Class

2. ImpossibleValueException: I often throw this one in the default cases of switch statements over enums. I even defined an extension method for constructing it from any value.


'''<summary>Indicates an internal value expected to be impossible has been encountered.</summary>
Public Class ImpossibleValueException(Of T)
Inherits InvalidStateException
Public ReadOnly Value As T
Public Sub New(ByVal value As T,
Optional ByVal message As String = Nothing,
Optional ByVal innerException As Exception = Nothing)
MyBase.new(If(message, "The {0} value ""{1}"" was not expected.".Frmt(GetType(T).Name, String.Concat(value))),
innerException)
Me.Value = value
End Sub
End Class

3. InfiniteLoopException: I defined this one when making a single linked list iterator with cycle detection.


'''<summary>Indicates the program entered an infinite loop but the problem was caught and the loop aborted.</summary>
Public Class InfiniteLoopException
Inherits InvalidStateException
Public Sub New(Optional ByVal message As String = Nothing,
Optional ByVal innerException As Exception = Nothing)
MyBase.New(If(message, "Detected and aborted an infinite loop."), innerException)
End Sub
End Class

Conclusion

The invalid state exception is not well covered by any of the default exception types, which is why I defined it. Once defined, I found myself using it in all kinds of (appropriate!) places. The moral: don't be afraid to define generic exceptions when you find yourself shoehorning a weird exception into the existing types. (Just don't go crazy.)

Concurrency #4: Using the Call Queue

Introduction

Now that we have implemented a lock free queue, futures, and a call queue, we will start using them to perform concurrent programming. For example, consider a bank account class synchronized using locks:

Public Class BankAccount
Private balance As UInteger
Private ReadOnly lock As New Object()
Public Function Deposit(ByVal value As UInteger) As UInteger
SyncLock lock
balance += value
Return balance
End SyncLock
End Function
Public Function TryWithdraw(ByVal value As UInteger) As UInteger?
SyncLock lock
If balance < style="color: blue;">Then Return Nothing
balance -= value
Return balance
End SyncLock
End Function
End Class

Rewriting the class to use queues and futures is dead simple, because the locking is at the method level. Just replace SyncLock blocks with QueueFunc blocks.

Public Class BankAccount
Private balance As UInteger
Private queue As ICallQueue = New ThreadPooledCallQueue()
Public Function FutureDeposit(ByVal value As UInteger) As IFuture(Of UInteger)
Return queue.QueueFunc(
Function()
balance += value
Return balance
End Function)
End Function
Public Function FutureTryWithdraw(ByVal value As UInteger) As IFuture(Of UInteger?)
Return queue.QueueFunc(
Function() As UInteger?
If balance < style="color: blue;">Then Return Nothing
balance -= value
Return balance
End Function)
End Function
End Class

Message Passing

Look closely at the bank account example. Calling FutureTryWithdraw is like sending a message to the bank account object (saying "please try to withdraw X dollars"), and the future returned by the method is like a reply saying either "Ok, you now have X dollars!" or "Not enough funds!". We've implemented a form of message passing, which is what highly concurrent languages like Erlang are based on.

Message passing is not the holy grail of concurrency, but it is an extremely powerful abstraction. For example, it would be much easier to message-pass to a remote machine than do some type of lock sharing. Message passing also makes it impossible to cause a deadlock (the closest analogue is creating a never-ending message cycle; much easier to debug). Most importantly, it is easier to reason about messages than locks. Proving a lock is used correctly requires looking at the entire program. Proving a message is used correctly only requires looking at part of the program (usually one or two functions).

Messages are not superior to locks in every way, of course. Different methods work better in different situations. For example, there is no message-passing analogue for holding two locks at the same time. In addition, it is natural to lock only part a method but it is not always clear what that would mean in terms of passing messages.

Those Darn Philosophers

I'm going to finish with an implementation of the dining philosophers done with message passing. Put it in a command line project (alongside the other classes introduced in this series). Notice how each class is simple on its own. In fact, the most complex parts are related to writing readable information to the console! In the next post in this series, I will supply implementations of futures for various asynchronous tasks.

Public Module Main
Public Sub Main()
Const NumPhilosphers As Integer = 5

'Setup table
Dim chopSticks(0 To NumPhilosphers - 1) As Chopstick
Dim philosophers(0 To NumPhilosphers - 1) As DiningPhilosopher
For i = 0 To NumPhilosphers - 1
chopSticks(i) = New Chopstick()
Next i
For i = 0 To NumPhilosphers - 1
philosophers(i) = New DiningPhilosopher(New Random(Environment.TickCount + i), chopSticks(i), chopSticks((i + 1) Mod NumPhilosphers))
Next i

'Give a bit of stabilizing time for measurements
Threading.Thread.Sleep(5000)

'Print status updates
Dim n = 0
Do
n += 1
System.Console.WriteLine("Update #" + n.ToString())
For i = 0 To NumPhilosphers - 1
System.Console.WriteLine("Philosopher #" + i.ToString() + " is " + philosophers(i).Status + ".")
Next i
Threading.Thread.Sleep(1000)
Loop
End Sub
End Module

'''Represents a shared resource.
Public Class Chopstick
Public queue As ICallQueue = New ThreadPooledCallQueue()
Private inUse As Boolean

Public Function QueueTryPickUp() As IFuture(Of HeldChopStick)
Return queue.QueueFunc(Function()
If inUse Then Return Nothing
inUse = True
Return New HeldChopStick(AddressOf QueuePutDown)
End Function)
End Function
Private Function QueuePutDown() As IFuture
Return queue.QueueAction(Sub()
inUse = False
End Sub)
End Function
End Class

'''Represents an acquired resource.
Public Class HeldChopStick
Private wasDropped As Boolean
Private ReadOnly drop As Func(Of IFuture)
Public Sub New(ByVal drop As Func(Of IFuture))
Me.drop = drop
End Sub
Public Function PutDown() As IFuture
If wasDropped Then Throw New InvalidOperationException()
wasDropped = True
Return drop()
End Function
End Class

'''Represents a philosopher trying to share chopsticks for eating.
Public Class DiningPhilosopher
Private ReadOnly leftChopStick As Chopstick
Private ReadOnly rightChopStick As Chopstick
Private feedings As Integer
Private lastTime As Date
Private weightedRate As Double

Public Sub New(ByVal rng As Random, ByVal leftChopStick As Chopstick, ByVal rightChopStick As Chopstick)
Me.lastTime = Now
Me.leftChopStick = leftChopStick
Me.rightChopStick = rightChopStick
Call TryGrabChopsticks(rng)
End Sub

'This method isn't thread-safe, but the errors will be marginal [race condition on increments and resets of 'feeding']
Public ReadOnly Property Status As String
Get
'Measure
Dim t = Now
Dim dt = t - lastTime
Dim rate = feedings / dt.TotalSeconds

'Estimate
If weightedRate = 0 Then
weightedRate = rate
Else
'Exponential approach w.r.t. time
Dim lambda = 0.9 ^ dt.TotalSeconds
weightedRate *= lambda
weightedRate += rate * (1 - lambda)
End If

'Categorize
Dim category = ""
Select Case weightedRate
Case 0 : category = "Empty"
Case 0 To 1 : category = "Starving"
Case 1 To 2 : category = "Hungry"
Case 2 To 3 : category = "Satisfied"
Case Else : category = "Plump"
End Select

'Finish
lastTime = t
feedings = 0
Return category + " (~" + weightedRate.ToString("0.00") + "/s)"
End Get
End Property

Private Sub TryGrabChopsticks(ByVal rng As Random)
'Try to grab both chopsticks
Dim futureLeft = leftChopStick.QueueTryPickUp()
Dim futureRight = rightChopStick.QueueTryPickUp()

'React once both chopsticks have been attempted
futureLeft.CallWhenValueReady(
Sub(left) futureRight.CallWhenValueReady(
Sub(right)
'Call on a new thread because there are sleeps
Call New Threading.Thread(Sub() TryHoldChopsticks(rng, left, right)).Start()
End Sub))
End Sub
Private Sub TryHoldChopsticks(ByVal rng As Random, ByVal left As HeldChopStick, ByVal right As HeldChopStick)
If right Is Nothing OrElse left Is Nothing Then
'Oh no! Put down and wait a bit before trying again.
If left IsNot Nothing Then left.PutDown()
If right IsNot Nothing Then right.PutDown()
Threading.Thread.Sleep(10 + rng.Next(10))
Else
'Yay! Food!
feedings += 1
Threading.Thread.Sleep(50 + rng.Next(50))
left.PutDown()
right.PutDown()

'Snooze
Threading.Thread.Sleep(50 + rng.Next(50))
End If

'Hungry! Repeat!
Call TryGrabChopsticks(rng)
End Sub
End Class


Monday, August 17, 2009

Concurrency #3: The Queue is the Mailbox

Introduction

So far in this series, I have introduced futures and a lock-free queue. Now we are going to use those tools to create a call queue, which will just happen to return futures. This queue will allow us to do message-based synchronization, which will be covered in the next post.

Designing the Interface

A call queue's interface is trivial. It accepts actions to run and, in this case, returns a future for when the action has completed.
  
Public Interface ICallQueue
Function QueueAction(ByVal action As Action) As IFuture
End Interface

We also want to be able to work with functions, so we will add an extension method that accepts functions and returns a future for the value computed when the queued function finally runs.
  
Public Module ExtensionsForICallQueue
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module

Notice that the interface does not specify how the actions run. The queue might run them on a separate thread, on the thread pool, or in any other context it wants. The only requirements are that the queued calls must eventually run and they must run in order.

Implementing the Call Queue

The call queue is going to be lock free, making things fast and (more importantly) interesting. The call queue implementation will handle guaranteeing the eventual execution of queued calls, since the lock-free queue handles the in-order stuff. We will need to create a consumer when the queue becomes non-empty, destroy the consumer when the queue becomes empty, and do all of this in parallel.

To simplify reasoning about the very large number of possible thread interleavings, I am only going to focus on the exit point of methods. At every method exit point the queue will guarantee "empty or moving". To be more explicit, at every point the queue guarantees ONE of the following:
1) Queued calls are being executed
2) There is another thread executing inside the queue (which will affect shared state before it finishes)
3) The queue is empty
Given "empty or moving", it is impossible for a method to leave the queue idle in a non-empty state (which would result in calls potentially never being executed).

Alright, let's implement a queue satisfying the above property! All the difficult logic will be placed into private methods for acquiring and releasing consumer responsibilities, and the queue will call them appropriately before exiting methods. Let's start with the simple stuff so we see the big picture. We need a lock-free queue to store queued actions, a method to enqueue actions, and an overridable method to consume them. The consume method is overridable so child classes can override it and do things like call the base method on a new thread, giving a threaded call queue.
  
Public MustInherit Class AbstractCallQueue
Implements ICallQueue

Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore

Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class

Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)
If TryAcquireConsumer() Then
Call Consume()
End If
Return item.future
End Function

Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
...

Notice that the last shared-state-actions performed by QueueAction and Consume are TryAcquireConsumer and TryReleaseConsumer respectively. Therefore if those two methods meet "empty or moving", the queue will work.

The TryAcquireConsumer method will be simple and optimistic, because many producers can call it at the same time. It will just optimistically fail if the queue is empty, and otherwise try to atomically acquire the consumer responsibilities. Now to prove all possible exits meet "empty or moving". The optimistic failure is safe, because if another producer exists to enqueue an item in parallel, then there is some other thread executing inside the queue. The attempt to acquire the lock is safe because if it fails then there is a consumer executing inside the queue, and if it succeeds then we will create a consumer executing inside the queue. Therefore, when this method ends the queue will be empty or there will be some thread executing inside the queue, which proves "empty or moving".
  
...
Private Function TryAcquireConsumer() As Boolean
If queue.WasEmpty Then Return False
Return Threading.Interlocked.Exchange(running, 1) = 0
End Function
...

Try TryReleaseConsumer method will be less simply. It will first check if the queue is not empty, in which case clearly we should still be consuming and it will fail. Then it will optimistically release the consumer responsibilities. But, between the empty check and releasing responsibilities, a producer may have enqueued an item. Therefore, we have to try to re-acquire consumer responsibilities. If that fails, then we know the queue is empty or another thread is now the consumer. Therefore, if we fail to re-acquire consumer, responsibilities we are done and can safely exit.

We're not done yet. There is one last possibility: some other thread may have acquired consumer responsibilities, emptied the queue, and released those responsibilities just before we acquired them; leaving us with a potentially empty queue! Therefore, if we acquire consumer responsibilities then the first thing we need to do is try to release consumer responsibilities all over again! Therefore we must keep trying until we fail to acquire (in which case we exit the queue) or find a non-empty queue (in which case we process some items then come back). But, as long as we don't give up, "empty or moving" will be satisfied. Therefore, even though the caller can potentially be live-locked, TryReleaseConsumer satisfies "empty or moving". (The live lock is not an issue because, in order for it to occur, progress must be occurring).
  
...
Private Function TryReleaseConsumer() As Boolean
Do
If Not queue.WasEmpty Then Return False
Threading.Interlocked.Exchange(running, 0)
If Not TryAcquireConsumer() Then Return True
Loop
End Function
End Class

A couple call queue types

Hurray, we have an abstract lock-free call queue! Now let's go nuts and implement some concrete types we'll actually want to use. The obvious one is a ThreadedCallQueue, which runs queued calls on an independent thread:
  
'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class

However, we might have a very large number of call queues in a program, and the start-up and tear-down cost for threads is quite high. The .Net framework provides a ThreadPool class that accepts work items to execute on a limited number of threads. We should create a ThreadPooledCallQueue for running queued calls with the thread pool:
  
'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class

Lastly, we might require our calls to run on the UI thread (attempting to access form controls from another thread causes exceptions). It would be extremely convenient if we had a call queue which didn't require us to Invoke all over the place. Enter the InvokedCallQueue:
  
'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class

There are surely plenty of other interesting ways to run queued calls, but these three have met my needs so far.

Summary

Now we have a bunch of lock-free call queues we can use to order calls from multiple threads. In the next post in this series I will cover how to use the queue for synchronization and compare it with a lock based approach. Fully commented code follows.
  
'''<summary>Describes a thread-safe call queue for non-blocking calls.</summary>
Public Interface ICallQueue
'''<summary>Queues an action to be run and returns a future for the action's eventual completion.</summary>
Function QueueAction(ByVal action As Action) As IFuture
End Interface

Public Module ExtensionsForICallQueue
'''<summary>Queues a function to be run and returns a future for the function's eventual output.</summary>
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module

''' <summary>
''' A queue for running actions in order.
''' Debug.Prints unexpected exceptions from queued calls.
''' </summary>
''' <remarks>
''' Consumes items produced by multiple producers.
''' Ensures that enqueued items are consumed by ensuring at all exit points that either:
''' - the queue is empty
''' - or exactly one consumer exists
''' - or another exit point will be hit [by another thread]
''' </remarks>
Public MustInherit Class AbstractCallQueue
Implements ICallQueue

Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore
Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class

''' <summary>
''' Queues an action to be run and returns a future for the action's eventual completion.
''' Starts running calls from the if they were not already being run.
'''</summary>
''' <remarks>
''' Enqueues a sequence of items to be consumed by the consumer.
''' The items are guaranteed to end up adjacent in the queue.
''' </remarks>
Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)

'Start the consumer if it is not already running
If TryAcquireConsumer() Then
Call Consume()
End If

Return item.future
End Function

'''<summary>Returns true if consumer responsibilities were acquired by this thread.</summary>
Private Function TryAcquireConsumer() As Boolean
'Don't bother acquiring if there are no items to consume
'This unsafe check is alright because enqueuers call this method after enqueuing
'Even if an item is queued between the check and returning false, the enqueuer will call this method again
'So we never end up with a non-empty idle queue
If queue.WasEmpty Then Return False

'Try to acquire consumer responsibilities
Return Threading.Interlocked.Exchange(running, 1) = 0

'Note that between the empty check and acquiring the consumer, all queued actions may have been processed.
'Therefore the queue may be empty at this point, but that's alright. Just a bit of extra work, nothing unsafe.
End Function
'''<summary>Returns true if consumer responsibilities were released by this thread.</summary>
Private Function TryReleaseConsumer() As Boolean
Do
'Don't release while there's still things to consume
If Not queue.WasEmpty Then Return False

'Release consumer responsibilities
Threading.Interlocked.Exchange(running, 0)

'It is possible that a new item was queued between the empty check and actually releasing
'Therefore it is necessary to check if we can re-acquire in order to guarantee we don't leave a non-empty queue idle
If Not TryAcquireConsumer() Then Return True

'Even though we've now acquired consumer, we may have ended up with nothing to process!
'So let's repeat this whole check for empty/release dance!
'A caller could become live-locked here if other threads keep emptying and filling the queue.
'But only consumer threads call here, and the live-lock requires that progress is being made.
'So it's alright. We still make progress and we still don't end up in an invalid state.
Loop
End Function

''' <summary>
''' Runs queued calls until there are none left.
''' Child classes can override and call base implementation using desired method (eg. on a new thread).
''' </summary>
Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
End Class

'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class

'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class

'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class