Monday, July 13, 2009

Concurrency #2: The Future is the Message

Introduction

A future is a thing that may not be available now, but will be available later. This simple abstraction simplifies many concurrent tasks by cleanly separating how they are performed and how their results are used. All the caller cares about is that the result will eventually be available, and all the callee cares about is eventually making the result available.

Implementation

We'll start by defining an interface for future values. Callers will need to be able to check if the future is ready yet, a way to be informed when the future does become ready, and a method to extract the future's value once it is ready. So let's start with an interface which does just that:
     Public Interface IFuture(Of Out R)
Event Readied()
ReadOnly Property IsReady() As Boolean
Function GetValue() As R
End Interface
That's good, but sometimes we don't want to include a value with the future (e.g. we just want to signal when a subroutine has finished), and some methods only care about the 'will eventually be ready' part of the future. Let's refactor the interface into two interfaces:
     Public Interface IFuture
Event Readied()
ReadOnly Property IsReady() As Boolean
End Interface
Public Interface IFuture(Of Out R)
Inherits IFuture
Function GetValue() As R
End Interface
Now IFuture(of A) and IFuture(of B) share their common functionality, which is a nice property to have. We will also need classes implementing the IFuture interfaces, so we actually have something to return from future functions. They aren't very hard to implement, so I'll just include what you get if you remove everything but the declarations. Notice that, although IFuture(of R) inherits from IFuture, Future(of R) doesn't inherit from Future. Do you see why?
     Public Class Future
Implements IFuture
Public Event Readied() Implements IFuture.Readied
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Public Function TrySetReady() As Boolean
Public Sub SetReady() 'Throws an InvalidOperationException if the future was already ready.
End Class
Public Class Future(Of R)
Implements IFuture(Of R)
Public Event Readied() Implements IFuture.Readied
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Public Function GetValue() As R Implements IFuture(Of R).GetValue 'Throws an InvalidOperationException if the value isn't ready yet.
Public Function TrySetValue(ByVal val As R) As Boolean
Public Sub SetValue(ByVal val As R) 'Throws an InvalidOperationException if the future was already ready.
End Class
Alright, so now we're all set to start using futures! Well, except these super simple interfaces would be a huge pain to use. We need to define some higher level operations to make things easier. We'll use extension methods to implement them as if they were part of the interface itself. The methods we will require are:
- CallWhenReady: Calls an action when a future is ready, and returns a future for when the action completes.
- EvalWhenReady
: Evaluates a function when a future is ready, and returns a future for the function's output.
- CallWhenValueReady<a>: Calls an action when a future is ready, passing the future's value as an argument, and returns a future for when the action completes.
- EvalWhenValueReady<a,r>: Evaluates a function when a future is ready, passing the future's value as an argument, and returns a future for the function's output.
- Futurize: Takes any value and returns an instantly-ready future for it.
- Defuturize: Takes a future of a future and returns a condensed version, which is just a normal future.
- Defuturize<r>: Takes a future of a future of a value and returns a condensed version, which is just a normal future for the final value.

The simplest and operation is Futurize, and it can be implemented like this:
         <Extension()>
Public Function Futurize(Of R)(ByVal value As R) As IFuture(Of R)
Dim f = New Future(Of R)
f.SetValue(value)
Return f
End Function
The most complicated operation to implement is CallWhenReady. It's also the most useful operation, because the other operations can all be implemented easily using CallWhenReady. Here it is:
         <Extension()>
Public Function CallWhenReady(ByVal future As IFuture,
ByVal action As Action) As IFuture
Dim lockVal As Integer
Dim f As New Future()
Dim notify As IFuture.ReadiedEventHandler
notify = Sub()
If Threading.Interlocked.Exchange(lockVal, 1) = 0 Then 'only run once
RemoveHandler future.Readied, notify
Call action()
f.SetReady()
End If
End Sub

AddHandler future.Readied, notify
If future.IsReady Then Call notify() 'In case the future was already ready

Return f
End Function
Wow, that's a bit complicated! At least the concept is simple. First, we define the notification subroutine, which will run the target action and set CallWhenReady's returned future to ready. Then we make sure the notification subroutine is called once the future is ready. Note that there is a chance the future will become ready between registering for the event and manually checking, which is why the notify subroutine is wrapped in a one time lock.

The rest of the higher level operations are short and sweet because they can be implemented using CallWhenReady. Try to implement them (or just cheat and look at the end of the post).

Some readers might not be familiar with Visual Basic, so I will quickly cover the things used by this method.
- The first line applies the Extension attribute to the method. That tells the compiler the method can be used as if it was part of IFuture. It allows me to write someFuture.CallWhenReady(...) instead of CallWhenReady(someFuture, ...), and includes the method in intellisense. Extension methods are extremely useful, because they make it much easier to discover helper methods.
- The sub ... end sub block is an anonymous subroutine, also called a lambda expression or a closure. Lambda expressions let you inline simple helper functions, but their real power comes from their access to the surrounding variables. For example, CallWhenReady's anonymous notify subroutine uses the arguments passed to CallWhenReady. This is achieved behind the scenes by 'hoisting' those local variables into a private class and passing it to the anonymous method.
- RemoveHandler and AddHandler are used to add and remove the methods called when an Event is raised. In this case we want notify to be called when Readied is fired, then we remove notify to remove unnecessary references (which would prevent the garbage collector from collecting futures derived from any future you held on to).

Example #1

Now we have all the pieces we need in order to use futures. Let's explore a simple example: replacing existing asynchronous methods with futures. We will replace the Net.Sockets.TcpClient BeginConnect/EndConnect methods with a single FutureConnect method. Here is an example of using BeginConnect/EndConnect:
    'Connecting code:
'...
'... do whatever you're doing leading up to a connection
'...

Dim client = New TcpClient
Try
'starts a new async task, which calls EndConnect:
client.BeginConnect(hostname, port, AddressOf EndConnect, client)
Catch e As SocketException
'deal with failure to connect
End Try

'...

Private Sub EndConnect(ByVal ar As IAsyncResult)
Dim client = CType(ar.AsyncState, TcpClient)
Try
client.EndConnect(ar)
Catch e As SocketException
'deal with failure to connect
End Try

'...
'do whatever you wanted to do with the client once it was connected
...
End Sub
I don't like the Begin/End style methods for a bunch of reasons. Do you see how the BeginConnect method mixes arguments about what you want and what to do with it? That you have to handle the IAsyncResult argument? That the ar.AsyncState member is type unsafe? How we have to call EndConnect for every call to BeginConnect? How this pattern has to repeated every single time you use BeginConnect/EndConnect? All these little problems add up! I don't want to care about these details!

Now we'll implement the same thing with futures. Here is a FutureConnect function:
     Public Function FutureConnect(ByVal hostname As String,
ByVal port As UShort) As IFuture(Of PossibleException(Of TcpClient, SocketException))
Dim f = New Future(Of PossibleException(Of TcpClient, SocketException))
Try
Dim client = New TcpClient
client.BeginConnect(hostname, port, Sub(ar)
Try
client.EndConnect(ar)
f.SetValue(client)
Catch e As SocketException
f.SetValue(e)
End Try
End Sub, Nothing)
Catch e As SocketException
f.SetValue(e)
End Try
Return f
End Function
Essentially, we just wrapped the existing BeginConnect/EndConnect methods inside a future. The main differences are the use of futures and the fact that we create a new TcpClient instead of affecting an existing TcpClient.

Let's use our fancy future connect method. Here is an example:
         '...
'... do whatever you're doing leading up to a connection
'...

FutureConnect(hostname, port).CallWhenValueReady(
Sub(possibleClient)
If possibleClient.Exception IsNot Nothing Then
'deal with failure to connect
Return
End If
Dim client = possibleClient.Value

'...
'do whatever you wanted to do with the client once it was connected
'...
End Sub
)

'...
Do you see how the "what I want" and "what I want to do with it" parts are separated? How everything is type safe? How there is no required EndFutureConnect call? We've managed to eliminate those little details. That is a good abstraction at work.

There is a downside to using futures shown here, though. Did you notice that the return value was a PossibleException? That's just a structure which stores a value or an exception. It is impossible for exceptions to safely propagate out of a future the way you expect, so you must encode them into the future's value. The upside is that, because the future's value is type safe, you can't accidentally ignore the possible exception.

Example #2

Suppose you want to do something after a subroutine finishes, but you want to run the subroutine on another thread. Or suppose it's a function running on the other thread and you need to wait for its return value. Futures make this problem trivial. You just write a method that returns a future, and runs the function or subroutine on another thread.

Public Function FutureThreadedAction(ByVal action As Action) As IFuture
Dim f As New Future
Call New Threading.Thread(
Sub()
Call action()
Call f.SetReady()
End Sub
).Start()
Return f
End Function
Public Function FutureThreadedFunction(Of R)(ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
FutureThreadedAction(Sub() f.SetValue(func()))
Return f
End Function
That's it. Now you can call these methods to run functions on other threads, and use futures to work with the return values. I really like this example because, even though we haven't solved any big problems, we've solve one of those tiny annoyances we deal with every day as programmers. The futures are making things easier.

Conclusion

We've really just scratched the surface here. In my next post I will be creating a concurrent call queue which uses futures. In the post after that I will put it all together to implement message passing.

An implementation of futures is posted below. The whole thing is just 170 lines, including comments and whitespace. (My original implementation took upwards of 500 lines, so I'm oddly proud of the line count).

Imports System.Runtime.CompilerServices

''''Provides type-safe methods for return values that will be ready in the future, and for passing future arguments into normal functions.
Namespace Futures
'''Represents a thread-safe read-only class that fires an event when it becomes ready.
Public Interface IFuture
'''Raised when the future becomes ready.
Event Readied()
'''Returns true if the future is ready.
ReadOnly Property IsReady() As Boolean
End Interface

'''Represents a thread-safe read-only class that fires an event when its value becomes ready.
Public Interface IFuture(Of Out R)
Inherits IFuture
'''
''' Returns the future's value.
''' Throws an InvalidOperationException if the value isn't ready yet.
'''
Function GetValue() As R
End Interface

'''A thread-safe class that fires an event when it becomes ready.
Public Class Future
Implements IFuture
Private lockVal As Integer
Public Event Readied() Implements IFuture.Readied

'''Returns true if the future is ready.
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Get
Return lockVal = 1
End Get
End Property

'''
''' Makes the future ready.
''' Throws an InvalidOperationException if the future was already ready.
'''
Public Sub SetReady()
If Not TrySetReady() Then
Throw New InvalidOperationException("Future readied more than once.")
End If
End Sub
'''
''' Makes the future ready.
''' Returns false if the future was already ready.
'''
Public Function TrySetReady() As Boolean
If Threading.Interlocked.Exchange(lockVal, 1) <> 0 Then Return False
RaiseEvent Readied()
Return True
End Function
End Class

'''A thread-safe class that fires an event when its value becomes ready.
Public Class Future(Of R)
Implements IFuture(Of R)
Private val As R
Private lockVal As Integer
Private ReadOnly lockReady As New OneTimeLock
Public Event Readied() Implements IFuture.Readied

'''Returns true if the future is ready.
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Get
Return lockVal = 1
End Get
End Property

'''
'''Returns the future's value.
'''Throws an InvalidOperationException if the value isn't ready yet.
'''
Public Function GetValue() As R Implements IFuture(Of R).GetValue
If Not IsReady Then Throw New InvalidOperationException("Attempted to get a future value before it was ready.")
Return val
End Function

'''
''' Sets the future's value and makes the future ready.
''' Throws a InvalidOperationException if the future was already ready.
'''
Public Sub SetValue(ByVal val As R)
If Not TrySetValue(val) Then
Throw New InvalidOperationException("Future readied more than once.")
End If
End Sub
'''
''' Sets the future's value and makes the future ready.
''' Fails if the future was already ready.
'''
Public Function TrySetValue(ByVal val As R) As Boolean
If Threading.Interlocked.Exchange(lockVal, 1) <> 0 Then Return False
Me.val = val
RaiseEvent Readied()
Return True
End Function
End Class

Public Module ExtensionsForIFuture
'''Runs an action once the future is ready, and returns a future for the action's completion.
<Extension()>
Public Function CallWhenReady(ByVal future As IFuture,
ByVal action As Action) As IFuture
Dim lockVal As Integer
Dim f As New Future()
Dim notify As IFuture.ReadiedEventHandler
notify = Sub()
If lock.TryAcquire Then 'only run once
RemoveHandler future.Readied, notify
Call action()
f.SetReady()
End If
End Sub

AddHandler future.Readied, notify
If future.IsReady Then Call notify() 'in case the future was already ready

Return f
End Function

'''Passes the future's value to an action once ready, and returns a future for the action's completion.
<Extension()>
Public Function CallWhenValueReady(Of A1)(ByVal future As IFuture(Of A1),
ByVal action As Action(Of A1)) As IFuture
Return future.CallWhenReady(Sub() action(future.GetValue))
End Function

'''Runs a function once the future is ready, and returns a future for the function's return value.
<Extension()>
Public Function EvalWhenReady(Of R)(ByVal future As IFuture,
ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
future.CallWhenReady(Sub() f.SetValue(func()))
Return f
End Function

'''Passes the future's value to a function once ready, and returns a future for the function's return value.
<Extension()>
Public Function EvalWhenValueReady(Of A1, R)(ByVal future As IFuture(Of A1),
ByVal func As Func(Of A1, R)) As IFuture(Of R)
Return future.EvalWhenReady(Function() func(future.GetValue))
End Function

'''Wraps a normal value as an instantly ready future.
<Extension()>
Public Function Futurize(Of R)(ByVal value As R) As IFuture(Of R)
Dim f = New Future(Of R)
f.SetValue(value)
Return f
End Function

'''Returns a future for the final value of a future of a future.
<Extension()>
Public Function Defuturize(Of R)(ByVal futureFutureVal As IFuture(Of IFuture(Of R))) As IFuture(Of R)
Dim f = New Future(Of R)
futureFutureVal.CallWhenValueReady(Sub(futureVal) futureVal.CallWhenValueReady(Sub(value) f.SetValue(value)))
Return f
End Function

'''Returns a future for the readyness of a future of a future.
<Extension()>
Public Function Defuturize(ByVal futureFuture As IFuture(Of IFuture)) As IFuture
Dim f = New Future
futureFuture.CallWhenValueReady(Sub(future) future.CallWhenReady(Sub() f.SetReady()))
Return f
End Function
End Module
End Namespace

Saturday, July 4, 2009

Concurrency #1: A Multiple-Producer Single-Consumer Lock Free Queue

Introduction

The later parts of this series are going to require a call queue which accepts calls to run from multiple threads, then ensures they are run in the order they are received. But before we get to a call queue we're going to need a plain vanilla thread-safe queue. My first implementation used the obvious approach: place locks around the Enqueue and Dequeue methods. But when I used this queue, profiling revealed a full quarter of the program's time was spent acquiring and releasing locks! Half of that was attributable to the call queues. I do put a ton of calls through these queues, so I need them be as fast as possible.

Enter lock free data structures, which I'd read a little bit about at the time. Instead of using locks, they use simple atomic operations provided by the hardware. This allows them to be much faster, but it also makes them incredibly hard to write correctly. You need to figure out how to do it without locks, then you need to worry about reads and writes being reordered by modern superscalar processors, then you need to worry about the compiler moving things around, caching reads, removing writes, and other optimizations that work great for single-threaded programs. I really don't want to understate exactly how hard you have to think about lock free stuff. Don't write lock-free stuff unless you're prepared to write an essay explaining why the code is correct (only to find some thread interleaving that takes the whole thing down).

Most lock free queue examples out there use the humble CAS (compare-and-swap) atomic primitive. CAS atomically assigns a value to a variable only if the variable is equal to another value. Here is some pseudo code for CAS. Note that all my code samples are in VB10, which is in beta at the time of this writing.
      Public Function CompareAndSwap(ByRef target As Integer,
ByVal newValue As Integer,
ByVal comparisonValue As Integer) As Boolean
SyncLock GlobalLock
If target <> comparisonValue Then Return False
target = comparisonValue
Return True
End SyncLock
End Function
I wanted to write a unique queue, so I decided to try to avoid using CAS. I ended up using the XCHG (Exchange) atomic primitive. XCHG atomically gets-and-sets a variable, effectively exchanging a new value for the old value. Pseudo code:
      Public Function Exchange(ByRef target As Integer,
ByVal newValue As Integer) As Integer
SyncLock GlobalLock
Dim oldValue = target
target = newValue
Return oldValue
End SyncLock
End Function
This is the only atomic primitive I needed for the queue.

Designing the Queue

Queues are typically stored in two ways: as an array or as a linked list. I don't want to even think about how hard it would be to resize an array without acquiring any locks, so the queue is stored using a linked list. There will be a head node, which the consumer will advance in order to dequeue elements, and a tail node which producers will use to enqueue items. The producers will never manipulate the head variable, and items will be published to the queue purely by linking them to the insertion point.
      Public Class SingleConsumerLockFreeQueue(Of T)
Private head As Node = New Node(Nothing)
Private insertionPoint As Node = head
Private Class Node
Public ReadOnly value As T
Public [next] As Node
Public Sub New(ByVal value As T)
Me.value = value
End Sub
End Class
...
Essentially the same starting definition you use for any linked list queue. The consumer part is also easy. There is no need for any locks or atomic operations because there is only one consumer, and the producers never manipulate the head variable. We can just implement it as if we were working with a single threaded queue:
          ...
Public ReadOnly Property WasEmpty As Boolean
Get
Return head.next Is Nothing
End Get
End Property
Public Function Peek() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
Return head.next.value
End Function
Public Function Dequeue() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
head = head.next
Return head.value
End Function
...
Note that Peek and Dequeue must only be called from the Consumer thread. You can call WasEmpty from a producer thread, but its return value won't be stable. It's only stable if the queue is non-empty and we called from the Consumer thread (which is why it's WasEmpty instead of IsEmpty). If you've seen CAS-based queues, you'll also notice I avoided some issues by making ".next != null" implicitely equivalent to "another item is queued".

The producers are more complicated. They're going to have to share the insertion point variable by using the XCHG operation. Each producer will exchange a new node for the current insertion point, which will stop producers from interfering with each other:
          ...
Public Sub BeginEnqueue(ByVal item As T)
Dim newNode = New Node(item)
Dim oldInsertionPoint = Threading.Interlocked.Exchange(Me.insertionPoint, newNode)
oldInsertionPoint.next = newNode
End Sub
...
This queue is essentially identical to a normal queue, except for the XCHG magic in BeginEnqueue. Note that there's a reason I call it BeginEnqueue instead of Enqueue, which comes up later in this post.

Correctness

With the producer and consumer methods defined, we now have a complete queue. But is it correct? The consumer part probably is, but the producer part looks like it could get crazy with lots of threads in there! Let's start by proving the producers don't destructively interfere with each other.

Notice that, for each call to BeginEnqueue, a brand new node is passed as the second argument to XCHG. Therefore, the second argument to XCHG will always be unique. Therefore, the value returned by XCHG will always be unique, because it essentially lags the second argument by one call. Therefore, each producer gets a unique oldInsertionPoint. Also notice that the producers never modify the .next pointer of its own constructed node: they leave that responsibility to the next producer to call XCHG. In fact, the only time a node's .next pointer is ever modified is "oldInsertionPoint.next = newNode". Therefore, there is no write-write race condition on oldInsertionPoint.next = newNode, because oldInsertionPoint is unique to each call and the .next pointer is modified nowhere else. Finally, notice that the .next pointer can only change in the future if it is currently null. This guarantees the property I mentioned earlier: ".next != null" == "another item is queued".

A good mental image of the queue is a big disassembled and growing chain randomly linking itself together until it is a big assembled chain. In later posts I will talk about how I tested the queue in order to truly convince myself it was correct (It wasn't just stress testing.). For now, let's convince ourselves by looking at an example of what happens when two threads contend while enqueueing items.



We start with an empty queue, with the insertion point equal to the head. Then two threads call BeginEnqueue. The threads race, but Thread1 calls XCHG first. Its gets the head as its old insertion point and sets the new insertion point to its constructed node. Now Thread2 pre-empts Thread1 and runs its call to XCHG, getting Thread1's constructed node as its old insertion point. Thread2 continues and finishes, linking Prev2.next to Node2, but leaving the queue still technically empty. Finally, Thread1 finishes and links Prev1.next to Node1, leaving the queue with two items. Even though the thread order was perturbed, the chain still came together correctly. This works for any number of threads. Try it.

To finish this section off, notice that I can actually write a modified BeginEnqueue method to enqueue a sequence of items and guarantee they end up adjacent in the queue. Do you see how to do it?

Performance

Now that I've convinced myself the queue is correct, I want to know how it performs compared to the old locking queue and to CAS-based lock free queues. Actually, comparing it to the old queue was a joke. I profiled my code after I inserted this queue, and it just destroyed the old queue. The queueing overhead was essentially gone. So, I'm going to jump straight to armchair comparing it against CAS queues.

The obvious thing to notice is that all XCHG queue operations are guaranteed constant time. CAS queues, on the other hand, can spend arbitrarily long trying to enqueue their value (in practice this is extremely unlikely). The XCHG operation is also simpler than CAS. Overall, I expect producer throughput to be non-negligibly faster than a CAS based queue. The ability to safely enqueue multiple adjacent items is also nice and could be used to significantly reduce the number of calls to the expensive atomic operations.

The consumer side is essentially equivalent to a CAS queue, in that it could be modified to be multiple-consumer by using CAS. We don't need multiple consumers for a call queue, so I didn't bother making it safe for multiple consumers. The major consumer benefit compared to a CAS queue is that garbage collection is not required. Most lock free data structures require garbage collection, because anytime you explicitly free a node there is a risk that some slow thread still has a reference to it. Here the consumer can safely free nodes when their .next is consumed (because setting .next is the last thing a producer does), making a translation to a low level language like C trivial.

One downside I do want to note, however, is the non-obvious priority inversion. Remember, from the enqueueing example I gave, how Node2 wasn't technically queued when Thread2 finished? It was only queued when Thread1 finished later on. Now imagine a very low priority thread and a very high priority thread continuously enqueueing items. The low priority thread would run the exchange line, and essentially stall the queue until it updated the .next pointer on the next line. The high priority thread would be appending items, but they would not be dequeueable until the low priority queue finished. In other words, the average delay depends on the slowest producer. Luckily, the buffering effect stops this from affecting the average throughput. I would not recommend this queue for real-time systems where milliseconds count and prioritization is king.

Overly Simple Profiling

In order to get an idea of the queue contention, I profiled the time it took 1 to 100 threads to queue 10 000 items each. I'm running on a Core2 Duo processor, and the times were measured using GetTickCount, which probably explains why the low data points are a bit wild. I don't see much usefulness in these numbers, but I needed a pretty graph so make of it what you will. I'm open to suggestions for more relevant tests.


Conclusion

The completed queue class is posted below, along with comments. The class is 48 lines of code plus 82 lines of comments and white space. Lock-free stuff is not a joke: you have to explain yourself. In my next post I will be discussing futures, which will be another integral part of the call queue coming later on.
      ''' <summary>
''' A multiple-producer, single-consumer lock-free queue.
''' Does NOT guarantee an item has been queued when BeginEnqueue finishes.
''' Does guarantee an item will eventually be queued after BeginEnqueue finishes.
''' Does guarantee that, for calls simultaneously in BeginEnqueue, at least one will finish with its item having been enqueued.
''' Does guarantee that, if BeginEnqueue(Y) is called after BeginEnqueue(X) finishes, Y will follow X in the queue.
''' </summary>
''' <remarks>
''' Performance characteristics:
''' - All operations are guaranteed constant time.
''' - Latency between BeginEnqueue finishing and the item being enqueued can be delayed arbitrarily by slowing down only one of the producers.
''' - (How does this compare to CAS-based implementations in terms of average throughput? It should be higher?)
''' </remarks>
Public Class SingleConsumerLockFreeQueue(Of T)
''' <summary>
''' Owned by the consumer.
''' This node is the end marker of the consumed nodes.
''' This node's next is the next node to be consumed.
''' </summary>
Private head As Node = New Node(Nothing)
''' <summary>
''' This node is the tail of the last partially or fully inserted chain.
''' The next inserted chain will exchange its tail for the insertionPoint, then set the old insertionPoint's next to the chain's head.
''' </summary>
Private insertionPoint As Node = head
''' <summary>
''' Singly linked list node containing queue items.
''' </summary>
Private Class Node
Public ReadOnly value As T
Public [next] As Node
Public Sub New(ByVal value As T)
Me.value = value
End Sub
End Class

''' <summary>
''' Begins adding new items to the queue.
''' The items may not be dequeueable when this method finishes, but eventually they will be.
''' The items are guaranteed to end up adjacent in the queue and in the correct order.
''' </summary>
''' <remarks>
''' An example of what can occur when two items are queued simultaneously:
''' Initial state:
''' insert=head -> null
''' [queue is empty]
''' Step 1: First item is created and exchanged with insertion point.
''' head=prev1 -> null
''' insert=node1 -> null
''' [queue is empty]
''' Step 2: Second thread pre-empts and second item is created and exchanged with insertion point.
''' head=prev1 -> null
''' node1=prev2 -> null
''' insert=node2 -> null
''' [queue is empty]
''' Step 3: Second thread finishes setting prev.next.
''' head=prev1 -> null
''' node1=prev2 -> insert=node2 -> null
''' [queue is empty]
''' Step 4: First thread comes back and finishes setting prev.next.
''' head=prev1 -> node1=prev2 -> insert=node2 -> null
''' [queue contains 2 elements]
''' </remarks>
''' <implementation>
''' Each producer creates a new chain, and exchanges the shared insertion point for the tail of the new chain.
''' The producer then links the old insertion point to the head of the new chain.
''' A new chain might not be in the main chain when the function exits, but it will be in a chain that will eventually be in the main chain.
''' </implementation>
Public Sub BeginEnqueue(ByVal items As IEnumerable(Of T))
If items Is Nothing Then Throw New ArgumentNullException("items")
If Not items.Any Then Return

'Create new chain
Dim chainHead As Node = Nothing
Dim chainTail As Node = Nothing
For Each item In items
If chainHead Is Nothing Then
chainHead = New Node(item)
chainTail = chainHead
Else
chainTail.next = New Node(item)
chainTail = chainTail.next
End If
Next item

'Append chain to previous chain
Dim prevChainTail = Threading.Interlocked.Exchange(Me.insertionPoint, chainTail)
prevChainTail.next = chainHead
End Sub
''' <summary>
''' Begins adding a new item to the queue.
''' The item may not be dequeueable when this method finishes, but eventually it will be.
''' </summary>
''' <implementation>
''' Just an inlined and simplified version of BeginEnqueue(IEnumerable(Of T))
''' </implementation>
Public Sub BeginEnqueue(ByVal item As T)
Dim chainOfOne = New Node(item)
Dim prevChainTail = Threading.Interlocked.Exchange(Me.insertionPoint, chainOfOne)
prevChainTail.next = chainOfOne
End Sub

''' <summary>
''' Returns true if there were any items in the queue.
''' The return value is only stable if the queue is non-empty and you are calling from the consumer thread.
''' </summary>
Public ReadOnly Property WasEmpty As Boolean
Get
Return head.next Is Nothing
End Get
End Property

''' <summary>
''' Returns the next item in the queue.
''' This function must only be called from the consumer thread.
''' </summary>
Public Function Peek() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
Return head.next.value
End Function
''' <summary>
''' Removes and returns an item from the queue.
''' This function must only be called from the consumer thread.
''' </summary>
Public Function Dequeue() As T
If head.next Is Nothing Then Throw New InvalidOperationException("Empty Queue")
head = head.next
Return head.value
End Function
End Class