Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Tuesday, September 15, 2009

Concurrency #4: Using the Call Queue

Introduction

Now that we have implemented a lock free queue, futures, and a call queue, we will start using them to perform concurrent programming. For example, consider a bank account class synchronized using locks:

Public Class BankAccount
Private balance As UInteger
Private ReadOnly lock As New Object()
Public Function Deposit(ByVal value As UInteger) As UInteger
SyncLock lock
balance += value
Return balance
End SyncLock
End Function
Public Function TryWithdraw(ByVal value As UInteger) As UInteger?
SyncLock lock
If balance < style="color: blue;">Then Return Nothing
balance -= value
Return balance
End SyncLock
End Function
End Class

Rewriting the class to use queues and futures is dead simple, because the locking is at the method level. Just replace SyncLock blocks with QueueFunc blocks.

Public Class BankAccount
Private balance As UInteger
Private queue As ICallQueue = New ThreadPooledCallQueue()
Public Function FutureDeposit(ByVal value As UInteger) As IFuture(Of UInteger)
Return queue.QueueFunc(
Function()
balance += value
Return balance
End Function)
End Function
Public Function FutureTryWithdraw(ByVal value As UInteger) As IFuture(Of UInteger?)
Return queue.QueueFunc(
Function() As UInteger?
If balance < style="color: blue;">Then Return Nothing
balance -= value
Return balance
End Function)
End Function
End Class

Message Passing

Look closely at the bank account example. Calling FutureTryWithdraw is like sending a message to the bank account object (saying "please try to withdraw X dollars"), and the future returned by the method is like a reply saying either "Ok, you now have X dollars!" or "Not enough funds!". We've implemented a form of message passing, which is what highly concurrent languages like Erlang are based on.

Message passing is not the holy grail of concurrency, but it is an extremely powerful abstraction. For example, it would be much easier to message-pass to a remote machine than do some type of lock sharing. Message passing also makes it impossible to cause a deadlock (the closest analogue is creating a never-ending message cycle; much easier to debug). Most importantly, it is easier to reason about messages than locks. Proving a lock is used correctly requires looking at the entire program. Proving a message is used correctly only requires looking at part of the program (usually one or two functions).

Messages are not superior to locks in every way, of course. Different methods work better in different situations. For example, there is no message-passing analogue for holding two locks at the same time. In addition, it is natural to lock only part a method but it is not always clear what that would mean in terms of passing messages.

Those Darn Philosophers

I'm going to finish with an implementation of the dining philosophers done with message passing. Put it in a command line project (alongside the other classes introduced in this series). Notice how each class is simple on its own. In fact, the most complex parts are related to writing readable information to the console! In the next post in this series, I will supply implementations of futures for various asynchronous tasks.

Public Module Main
Public Sub Main()
Const NumPhilosphers As Integer = 5

'Setup table
Dim chopSticks(0 To NumPhilosphers - 1) As Chopstick
Dim philosophers(0 To NumPhilosphers - 1) As DiningPhilosopher
For i = 0 To NumPhilosphers - 1
chopSticks(i) = New Chopstick()
Next i
For i = 0 To NumPhilosphers - 1
philosophers(i) = New DiningPhilosopher(New Random(Environment.TickCount + i), chopSticks(i), chopSticks((i + 1) Mod NumPhilosphers))
Next i

'Give a bit of stabilizing time for measurements
Threading.Thread.Sleep(5000)

'Print status updates
Dim n = 0
Do
n += 1
System.Console.WriteLine("Update #" + n.ToString())
For i = 0 To NumPhilosphers - 1
System.Console.WriteLine("Philosopher #" + i.ToString() + " is " + philosophers(i).Status + ".")
Next i
Threading.Thread.Sleep(1000)
Loop
End Sub
End Module

'''Represents a shared resource.
Public Class Chopstick
Public queue As ICallQueue = New ThreadPooledCallQueue()
Private inUse As Boolean

Public Function QueueTryPickUp() As IFuture(Of HeldChopStick)
Return queue.QueueFunc(Function()
If inUse Then Return Nothing
inUse = True
Return New HeldChopStick(AddressOf QueuePutDown)
End Function)
End Function
Private Function QueuePutDown() As IFuture
Return queue.QueueAction(Sub()
inUse = False
End Sub)
End Function
End Class

'''Represents an acquired resource.
Public Class HeldChopStick
Private wasDropped As Boolean
Private ReadOnly drop As Func(Of IFuture)
Public Sub New(ByVal drop As Func(Of IFuture))
Me.drop = drop
End Sub
Public Function PutDown() As IFuture
If wasDropped Then Throw New InvalidOperationException()
wasDropped = True
Return drop()
End Function
End Class

'''Represents a philosopher trying to share chopsticks for eating.
Public Class DiningPhilosopher
Private ReadOnly leftChopStick As Chopstick
Private ReadOnly rightChopStick As Chopstick
Private feedings As Integer
Private lastTime As Date
Private weightedRate As Double

Public Sub New(ByVal rng As Random, ByVal leftChopStick As Chopstick, ByVal rightChopStick As Chopstick)
Me.lastTime = Now
Me.leftChopStick = leftChopStick
Me.rightChopStick = rightChopStick
Call TryGrabChopsticks(rng)
End Sub

'This method isn't thread-safe, but the errors will be marginal [race condition on increments and resets of 'feeding']
Public ReadOnly Property Status As String
Get
'Measure
Dim t = Now
Dim dt = t - lastTime
Dim rate = feedings / dt.TotalSeconds

'Estimate
If weightedRate = 0 Then
weightedRate = rate
Else
'Exponential approach w.r.t. time
Dim lambda = 0.9 ^ dt.TotalSeconds
weightedRate *= lambda
weightedRate += rate * (1 - lambda)
End If

'Categorize
Dim category = ""
Select Case weightedRate
Case 0 : category = "Empty"
Case 0 To 1 : category = "Starving"
Case 1 To 2 : category = "Hungry"
Case 2 To 3 : category = "Satisfied"
Case Else : category = "Plump"
End Select

'Finish
lastTime = t
feedings = 0
Return category + " (~" + weightedRate.ToString("0.00") + "/s)"
End Get
End Property

Private Sub TryGrabChopsticks(ByVal rng As Random)
'Try to grab both chopsticks
Dim futureLeft = leftChopStick.QueueTryPickUp()
Dim futureRight = rightChopStick.QueueTryPickUp()

'React once both chopsticks have been attempted
futureLeft.CallWhenValueReady(
Sub(left) futureRight.CallWhenValueReady(
Sub(right)
'Call on a new thread because there are sleeps
Call New Threading.Thread(Sub() TryHoldChopsticks(rng, left, right)).Start()
End Sub))
End Sub
Private Sub TryHoldChopsticks(ByVal rng As Random, ByVal left As HeldChopStick, ByVal right As HeldChopStick)
If right Is Nothing OrElse left Is Nothing Then
'Oh no! Put down and wait a bit before trying again.
If left IsNot Nothing Then left.PutDown()
If right IsNot Nothing Then right.PutDown()
Threading.Thread.Sleep(10 + rng.Next(10))
Else
'Yay! Food!
feedings += 1
Threading.Thread.Sleep(50 + rng.Next(50))
left.PutDown()
right.PutDown()

'Snooze
Threading.Thread.Sleep(50 + rng.Next(50))
End If

'Hungry! Repeat!
Call TryGrabChopsticks(rng)
End Sub
End Class


Monday, August 17, 2009

Concurrency #3: The Queue is the Mailbox

Introduction

So far in this series, I have introduced futures and a lock-free queue. Now we are going to use those tools to create a call queue, which will just happen to return futures. This queue will allow us to do message-based synchronization, which will be covered in the next post.

Designing the Interface

A call queue's interface is trivial. It accepts actions to run and, in this case, returns a future for when the action has completed.
  
Public Interface ICallQueue
Function QueueAction(ByVal action As Action) As IFuture
End Interface

We also want to be able to work with functions, so we will add an extension method that accepts functions and returns a future for the value computed when the queued function finally runs.
  
Public Module ExtensionsForICallQueue
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module

Notice that the interface does not specify how the actions run. The queue might run them on a separate thread, on the thread pool, or in any other context it wants. The only requirements are that the queued calls must eventually run and they must run in order.

Implementing the Call Queue

The call queue is going to be lock free, making things fast and (more importantly) interesting. The call queue implementation will handle guaranteeing the eventual execution of queued calls, since the lock-free queue handles the in-order stuff. We will need to create a consumer when the queue becomes non-empty, destroy the consumer when the queue becomes empty, and do all of this in parallel.

To simplify reasoning about the very large number of possible thread interleavings, I am only going to focus on the exit point of methods. At every method exit point the queue will guarantee "empty or moving". To be more explicit, at every point the queue guarantees ONE of the following:
1) Queued calls are being executed
2) There is another thread executing inside the queue (which will affect shared state before it finishes)
3) The queue is empty
Given "empty or moving", it is impossible for a method to leave the queue idle in a non-empty state (which would result in calls potentially never being executed).

Alright, let's implement a queue satisfying the above property! All the difficult logic will be placed into private methods for acquiring and releasing consumer responsibilities, and the queue will call them appropriately before exiting methods. Let's start with the simple stuff so we see the big picture. We need a lock-free queue to store queued actions, a method to enqueue actions, and an overridable method to consume them. The consume method is overridable so child classes can override it and do things like call the base method on a new thread, giving a threaded call queue.
  
Public MustInherit Class AbstractCallQueue
Implements ICallQueue

Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore

Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class

Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)
If TryAcquireConsumer() Then
Call Consume()
End If
Return item.future
End Function

Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
...

Notice that the last shared-state-actions performed by QueueAction and Consume are TryAcquireConsumer and TryReleaseConsumer respectively. Therefore if those two methods meet "empty or moving", the queue will work.

The TryAcquireConsumer method will be simple and optimistic, because many producers can call it at the same time. It will just optimistically fail if the queue is empty, and otherwise try to atomically acquire the consumer responsibilities. Now to prove all possible exits meet "empty or moving". The optimistic failure is safe, because if another producer exists to enqueue an item in parallel, then there is some other thread executing inside the queue. The attempt to acquire the lock is safe because if it fails then there is a consumer executing inside the queue, and if it succeeds then we will create a consumer executing inside the queue. Therefore, when this method ends the queue will be empty or there will be some thread executing inside the queue, which proves "empty or moving".
  
...
Private Function TryAcquireConsumer() As Boolean
If queue.WasEmpty Then Return False
Return Threading.Interlocked.Exchange(running, 1) = 0
End Function
...

Try TryReleaseConsumer method will be less simply. It will first check if the queue is not empty, in which case clearly we should still be consuming and it will fail. Then it will optimistically release the consumer responsibilities. But, between the empty check and releasing responsibilities, a producer may have enqueued an item. Therefore, we have to try to re-acquire consumer responsibilities. If that fails, then we know the queue is empty or another thread is now the consumer. Therefore, if we fail to re-acquire consumer, responsibilities we are done and can safely exit.

We're not done yet. There is one last possibility: some other thread may have acquired consumer responsibilities, emptied the queue, and released those responsibilities just before we acquired them; leaving us with a potentially empty queue! Therefore, if we acquire consumer responsibilities then the first thing we need to do is try to release consumer responsibilities all over again! Therefore we must keep trying until we fail to acquire (in which case we exit the queue) or find a non-empty queue (in which case we process some items then come back). But, as long as we don't give up, "empty or moving" will be satisfied. Therefore, even though the caller can potentially be live-locked, TryReleaseConsumer satisfies "empty or moving". (The live lock is not an issue because, in order for it to occur, progress must be occurring).
  
...
Private Function TryReleaseConsumer() As Boolean
Do
If Not queue.WasEmpty Then Return False
Threading.Interlocked.Exchange(running, 0)
If Not TryAcquireConsumer() Then Return True
Loop
End Function
End Class

A couple call queue types

Hurray, we have an abstract lock-free call queue! Now let's go nuts and implement some concrete types we'll actually want to use. The obvious one is a ThreadedCallQueue, which runs queued calls on an independent thread:
  
'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class

However, we might have a very large number of call queues in a program, and the start-up and tear-down cost for threads is quite high. The .Net framework provides a ThreadPool class that accepts work items to execute on a limited number of threads. We should create a ThreadPooledCallQueue for running queued calls with the thread pool:
  
'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class

Lastly, we might require our calls to run on the UI thread (attempting to access form controls from another thread causes exceptions). It would be extremely convenient if we had a call queue which didn't require us to Invoke all over the place. Enter the InvokedCallQueue:
  
'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class

There are surely plenty of other interesting ways to run queued calls, but these three have met my needs so far.

Summary

Now we have a bunch of lock-free call queues we can use to order calls from multiple threads. In the next post in this series I will cover how to use the queue for synchronization and compare it with a lock based approach. Fully commented code follows.
  
'''<summary>Describes a thread-safe call queue for non-blocking calls.</summary>
Public Interface ICallQueue
'''<summary>Queues an action to be run and returns a future for the action's eventual completion.</summary>
Function QueueAction(ByVal action As Action) As IFuture
End Interface

Public Module ExtensionsForICallQueue
'''<summary>Queues a function to be run and returns a future for the function's eventual output.</summary>
<Extension()>
Public Function QueueFunc(Of R)(ByVal queue As ICallQueue, ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
queue.QueueAction(Sub() f.SetValue(func()))
Return f
End Function
End Module

''' <summary>
''' A queue for running actions in order.
''' Debug.Prints unexpected exceptions from queued calls.
''' </summary>
''' <remarks>
''' Consumes items produced by multiple producers.
''' Ensures that enqueued items are consumed by ensuring at all exit points that either:
''' - the queue is empty
''' - or exactly one consumer exists
''' - or another exit point will be hit [by another thread]
''' </remarks>
Public MustInherit Class AbstractCallQueue
Implements ICallQueue

Private ReadOnly queue As New SingleConsumerLockFreeQueue(Of Node)
Private running As Integer 'stores consumer state and is used as a semaphore
Private Class Node
Public ReadOnly action As Action
Public ReadOnly future As New Future()
Public Sub New(ByVal action As Action)
Me.action = action
End Sub
End Class

''' <summary>
''' Queues an action to be run and returns a future for the action's eventual completion.
''' Starts running calls from the if they were not already being run.
'''</summary>
''' <remarks>
''' Enqueues a sequence of items to be consumed by the consumer.
''' The items are guaranteed to end up adjacent in the queue.
''' </remarks>
Public Function QueueAction(ByVal action As Action) As IFuture Implements ICallQueue.QueueAction
Dim item = New Node(action)
queue.BeginEnqueue(item)

'Start the consumer if it is not already running
If TryAcquireConsumer() Then
Call Consume()
End If

Return item.future
End Function

'''<summary>Returns true if consumer responsibilities were acquired by this thread.</summary>
Private Function TryAcquireConsumer() As Boolean
'Don't bother acquiring if there are no items to consume
'This unsafe check is alright because enqueuers call this method after enqueuing
'Even if an item is queued between the check and returning false, the enqueuer will call this method again
'So we never end up with a non-empty idle queue
If queue.WasEmpty Then Return False

'Try to acquire consumer responsibilities
Return Threading.Interlocked.Exchange(running, 1) = 0

'Note that between the empty check and acquiring the consumer, all queued actions may have been processed.
'Therefore the queue may be empty at this point, but that's alright. Just a bit of extra work, nothing unsafe.
End Function
'''<summary>Returns true if consumer responsibilities were released by this thread.</summary>
Private Function TryReleaseConsumer() As Boolean
Do
'Don't release while there's still things to consume
If Not queue.WasEmpty Then Return False

'Release consumer responsibilities
Threading.Interlocked.Exchange(running, 0)

'It is possible that a new item was queued between the empty check and actually releasing
'Therefore it is necessary to check if we can re-acquire in order to guarantee we don't leave a non-empty queue idle
If Not TryAcquireConsumer() Then Return True

'Even though we've now acquired consumer, we may have ended up with nothing to process!
'So let's repeat this whole check for empty/release dance!
'A caller could become live-locked here if other threads keep emptying and filling the queue.
'But only consumer threads call here, and the live-lock requires that progress is being made.
'So it's alright. We still make progress and we still don't end up in an invalid state.
Loop
End Function

''' <summary>
''' Runs queued calls until there are none left.
''' Child classes can override and call base implementation using desired method (eg. on a new thread).
''' </summary>
Protected Overridable Sub Consume()
Do Until TryReleaseConsumer()
Dim item = queue.Dequeue()
Call item.action()
Call item.future.SetReady()
Loop
End Sub
End Class

'''<summary>Runs queued calls on an independent thread.</summary>
Public Class ThreadedCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call New Thread(Sub() MyBase.Consume()).Start()
End Sub
End Class

'''<summary>Runs queued calls on the thread pool.</summary>
Public Class ThreadPooledCallQueue
Inherits AbstractCallQueue
Protected Overrides Sub Consume()
Call ThreadPool.QueueUserWorkItem(Sub() MyBase.Consume())
End Sub
End Class

'''<summary>Runs queued calls on a control's thread.</summary>
Public Class InvokedCallQueue
Inherits AbstractCallQueue
Private ReadOnly control As System.Windows.Forms.Control
Public Sub New(ByVal control As System.Windows.Forms.Control)
Me.control = control
End Sub
Protected Overrides Sub Consume()
control.BeginInvoke(Sub() MyBase.Consume())
End Sub
End Class

Monday, July 13, 2009

Concurrency #2: The Future is the Message

Introduction

A future is a thing that may not be available now, but will be available later. This simple abstraction simplifies many concurrent tasks by cleanly separating how they are performed and how their results are used. All the caller cares about is that the result will eventually be available, and all the callee cares about is eventually making the result available.

Implementation

We'll start by defining an interface for future values. Callers will need to be able to check if the future is ready yet, a way to be informed when the future does become ready, and a method to extract the future's value once it is ready. So let's start with an interface which does just that:
     Public Interface IFuture(Of Out R)
Event Readied()
ReadOnly Property IsReady() As Boolean
Function GetValue() As R
End Interface
That's good, but sometimes we don't want to include a value with the future (e.g. we just want to signal when a subroutine has finished), and some methods only care about the 'will eventually be ready' part of the future. Let's refactor the interface into two interfaces:
     Public Interface IFuture
Event Readied()
ReadOnly Property IsReady() As Boolean
End Interface
Public Interface IFuture(Of Out R)
Inherits IFuture
Function GetValue() As R
End Interface
Now IFuture(of A) and IFuture(of B) share their common functionality, which is a nice property to have. We will also need classes implementing the IFuture interfaces, so we actually have something to return from future functions. They aren't very hard to implement, so I'll just include what you get if you remove everything but the declarations. Notice that, although IFuture(of R) inherits from IFuture, Future(of R) doesn't inherit from Future. Do you see why?
     Public Class Future
Implements IFuture
Public Event Readied() Implements IFuture.Readied
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Public Function TrySetReady() As Boolean
Public Sub SetReady() 'Throws an InvalidOperationException if the future was already ready.
End Class
Public Class Future(Of R)
Implements IFuture(Of R)
Public Event Readied() Implements IFuture.Readied
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Public Function GetValue() As R Implements IFuture(Of R).GetValue 'Throws an InvalidOperationException if the value isn't ready yet.
Public Function TrySetValue(ByVal val As R) As Boolean
Public Sub SetValue(ByVal val As R) 'Throws an InvalidOperationException if the future was already ready.
End Class
Alright, so now we're all set to start using futures! Well, except these super simple interfaces would be a huge pain to use. We need to define some higher level operations to make things easier. We'll use extension methods to implement them as if they were part of the interface itself. The methods we will require are:
- CallWhenReady: Calls an action when a future is ready, and returns a future for when the action completes.
- EvalWhenReady
: Evaluates a function when a future is ready, and returns a future for the function's output.
- CallWhenValueReady<a>: Calls an action when a future is ready, passing the future's value as an argument, and returns a future for when the action completes.
- EvalWhenValueReady<a,r>: Evaluates a function when a future is ready, passing the future's value as an argument, and returns a future for the function's output.
- Futurize: Takes any value and returns an instantly-ready future for it.
- Defuturize: Takes a future of a future and returns a condensed version, which is just a normal future.
- Defuturize<r>: Takes a future of a future of a value and returns a condensed version, which is just a normal future for the final value.

The simplest and operation is Futurize, and it can be implemented like this:
         <Extension()>
Public Function Futurize(Of R)(ByVal value As R) As IFuture(Of R)
Dim f = New Future(Of R)
f.SetValue(value)
Return f
End Function
The most complicated operation to implement is CallWhenReady. It's also the most useful operation, because the other operations can all be implemented easily using CallWhenReady. Here it is:
         <Extension()>
Public Function CallWhenReady(ByVal future As IFuture,
ByVal action As Action) As IFuture
Dim lockVal As Integer
Dim f As New Future()
Dim notify As IFuture.ReadiedEventHandler
notify = Sub()
If Threading.Interlocked.Exchange(lockVal, 1) = 0 Then 'only run once
RemoveHandler future.Readied, notify
Call action()
f.SetReady()
End If
End Sub

AddHandler future.Readied, notify
If future.IsReady Then Call notify() 'In case the future was already ready

Return f
End Function
Wow, that's a bit complicated! At least the concept is simple. First, we define the notification subroutine, which will run the target action and set CallWhenReady's returned future to ready. Then we make sure the notification subroutine is called once the future is ready. Note that there is a chance the future will become ready between registering for the event and manually checking, which is why the notify subroutine is wrapped in a one time lock.

The rest of the higher level operations are short and sweet because they can be implemented using CallWhenReady. Try to implement them (or just cheat and look at the end of the post).

Some readers might not be familiar with Visual Basic, so I will quickly cover the things used by this method.
- The first line applies the Extension attribute to the method. That tells the compiler the method can be used as if it was part of IFuture. It allows me to write someFuture.CallWhenReady(...) instead of CallWhenReady(someFuture, ...), and includes the method in intellisense. Extension methods are extremely useful, because they make it much easier to discover helper methods.
- The sub ... end sub block is an anonymous subroutine, also called a lambda expression or a closure. Lambda expressions let you inline simple helper functions, but their real power comes from their access to the surrounding variables. For example, CallWhenReady's anonymous notify subroutine uses the arguments passed to CallWhenReady. This is achieved behind the scenes by 'hoisting' those local variables into a private class and passing it to the anonymous method.
- RemoveHandler and AddHandler are used to add and remove the methods called when an Event is raised. In this case we want notify to be called when Readied is fired, then we remove notify to remove unnecessary references (which would prevent the garbage collector from collecting futures derived from any future you held on to).

Example #1

Now we have all the pieces we need in order to use futures. Let's explore a simple example: replacing existing asynchronous methods with futures. We will replace the Net.Sockets.TcpClient BeginConnect/EndConnect methods with a single FutureConnect method. Here is an example of using BeginConnect/EndConnect:
    'Connecting code:
'...
'... do whatever you're doing leading up to a connection
'...

Dim client = New TcpClient
Try
'starts a new async task, which calls EndConnect:
client.BeginConnect(hostname, port, AddressOf EndConnect, client)
Catch e As SocketException
'deal with failure to connect
End Try

'...

Private Sub EndConnect(ByVal ar As IAsyncResult)
Dim client = CType(ar.AsyncState, TcpClient)
Try
client.EndConnect(ar)
Catch e As SocketException
'deal with failure to connect
End Try

'...
'do whatever you wanted to do with the client once it was connected
...
End Sub
I don't like the Begin/End style methods for a bunch of reasons. Do you see how the BeginConnect method mixes arguments about what you want and what to do with it? That you have to handle the IAsyncResult argument? That the ar.AsyncState member is type unsafe? How we have to call EndConnect for every call to BeginConnect? How this pattern has to repeated every single time you use BeginConnect/EndConnect? All these little problems add up! I don't want to care about these details!

Now we'll implement the same thing with futures. Here is a FutureConnect function:
     Public Function FutureConnect(ByVal hostname As String,
ByVal port As UShort) As IFuture(Of PossibleException(Of TcpClient, SocketException))
Dim f = New Future(Of PossibleException(Of TcpClient, SocketException))
Try
Dim client = New TcpClient
client.BeginConnect(hostname, port, Sub(ar)
Try
client.EndConnect(ar)
f.SetValue(client)
Catch e As SocketException
f.SetValue(e)
End Try
End Sub, Nothing)
Catch e As SocketException
f.SetValue(e)
End Try
Return f
End Function
Essentially, we just wrapped the existing BeginConnect/EndConnect methods inside a future. The main differences are the use of futures and the fact that we create a new TcpClient instead of affecting an existing TcpClient.

Let's use our fancy future connect method. Here is an example:
         '...
'... do whatever you're doing leading up to a connection
'...

FutureConnect(hostname, port).CallWhenValueReady(
Sub(possibleClient)
If possibleClient.Exception IsNot Nothing Then
'deal with failure to connect
Return
End If
Dim client = possibleClient.Value

'...
'do whatever you wanted to do with the client once it was connected
'...
End Sub
)

'...
Do you see how the "what I want" and "what I want to do with it" parts are separated? How everything is type safe? How there is no required EndFutureConnect call? We've managed to eliminate those little details. That is a good abstraction at work.

There is a downside to using futures shown here, though. Did you notice that the return value was a PossibleException? That's just a structure which stores a value or an exception. It is impossible for exceptions to safely propagate out of a future the way you expect, so you must encode them into the future's value. The upside is that, because the future's value is type safe, you can't accidentally ignore the possible exception.

Example #2

Suppose you want to do something after a subroutine finishes, but you want to run the subroutine on another thread. Or suppose it's a function running on the other thread and you need to wait for its return value. Futures make this problem trivial. You just write a method that returns a future, and runs the function or subroutine on another thread.

Public Function FutureThreadedAction(ByVal action As Action) As IFuture
Dim f As New Future
Call New Threading.Thread(
Sub()
Call action()
Call f.SetReady()
End Sub
).Start()
Return f
End Function
Public Function FutureThreadedFunction(Of R)(ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
FutureThreadedAction(Sub() f.SetValue(func()))
Return f
End Function
That's it. Now you can call these methods to run functions on other threads, and use futures to work with the return values. I really like this example because, even though we haven't solved any big problems, we've solve one of those tiny annoyances we deal with every day as programmers. The futures are making things easier.

Conclusion

We've really just scratched the surface here. In my next post I will be creating a concurrent call queue which uses futures. In the post after that I will put it all together to implement message passing.

An implementation of futures is posted below. The whole thing is just 170 lines, including comments and whitespace. (My original implementation took upwards of 500 lines, so I'm oddly proud of the line count).

Imports System.Runtime.CompilerServices

''''Provides type-safe methods for return values that will be ready in the future, and for passing future arguments into normal functions.
Namespace Futures
'''Represents a thread-safe read-only class that fires an event when it becomes ready.
Public Interface IFuture
'''Raised when the future becomes ready.
Event Readied()
'''Returns true if the future is ready.
ReadOnly Property IsReady() As Boolean
End Interface

'''Represents a thread-safe read-only class that fires an event when its value becomes ready.
Public Interface IFuture(Of Out R)
Inherits IFuture
'''
''' Returns the future's value.
''' Throws an InvalidOperationException if the value isn't ready yet.
'''
Function GetValue() As R
End Interface

'''A thread-safe class that fires an event when it becomes ready.
Public Class Future
Implements IFuture
Private lockVal As Integer
Public Event Readied() Implements IFuture.Readied

'''Returns true if the future is ready.
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Get
Return lockVal = 1
End Get
End Property

'''
''' Makes the future ready.
''' Throws an InvalidOperationException if the future was already ready.
'''
Public Sub SetReady()
If Not TrySetReady() Then
Throw New InvalidOperationException("Future readied more than once.")
End If
End Sub
'''
''' Makes the future ready.
''' Returns false if the future was already ready.
'''
Public Function TrySetReady() As Boolean
If Threading.Interlocked.Exchange(lockVal, 1) <> 0 Then Return False
RaiseEvent Readied()
Return True
End Function
End Class

'''A thread-safe class that fires an event when its value becomes ready.
Public Class Future(Of R)
Implements IFuture(Of R)
Private val As R
Private lockVal As Integer
Private ReadOnly lockReady As New OneTimeLock
Public Event Readied() Implements IFuture.Readied

'''Returns true if the future is ready.
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Get
Return lockVal = 1
End Get
End Property

'''
'''Returns the future's value.
'''Throws an InvalidOperationException if the value isn't ready yet.
'''
Public Function GetValue() As R Implements IFuture(Of R).GetValue
If Not IsReady Then Throw New InvalidOperationException("Attempted to get a future value before it was ready.")
Return val
End Function

'''
''' Sets the future's value and makes the future ready.
''' Throws a InvalidOperationException if the future was already ready.
'''
Public Sub SetValue(ByVal val As R)
If Not TrySetValue(val) Then
Throw New InvalidOperationException("Future readied more than once.")
End If
End Sub
'''
''' Sets the future's value and makes the future ready.
''' Fails if the future was already ready.
'''
Public Function TrySetValue(ByVal val As R) As Boolean
If Threading.Interlocked.Exchange(lockVal, 1) <> 0 Then Return False
Me.val = val
RaiseEvent Readied()
Return True
End Function
End Class

Public Module ExtensionsForIFuture
'''Runs an action once the future is ready, and returns a future for the action's completion.
<Extension()>
Public Function CallWhenReady(ByVal future As IFuture,
ByVal action As Action) As IFuture
Dim lockVal As Integer
Dim f As New Future()
Dim notify As IFuture.ReadiedEventHandler
notify = Sub()
If lock.TryAcquire Then 'only run once
RemoveHandler future.Readied, notify
Call action()
f.SetReady()
End If
End Sub

AddHandler future.Readied, notify
If future.IsReady Then Call notify() 'in case the future was already ready

Return f
End Function

'''Passes the future's value to an action once ready, and returns a future for the action's completion.
<Extension()>
Public Function CallWhenValueReady(Of A1)(ByVal future As IFuture(Of A1),
ByVal action As Action(Of A1)) As IFuture
Return future.CallWhenReady(Sub() action(future.GetValue))
End Function

'''Runs a function once the future is ready, and returns a future for the function's return value.
<Extension()>
Public Function EvalWhenReady(Of R)(ByVal future As IFuture,
ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
future.CallWhenReady(Sub() f.SetValue(func()))
Return f
End Function

'''Passes the future's value to a function once ready, and returns a future for the function's return value.
<Extension()>
Public Function EvalWhenValueReady(Of A1, R)(ByVal future As IFuture(Of A1),
ByVal func As Func(Of A1, R)) As IFuture(Of R)
Return future.EvalWhenReady(Function() func(future.GetValue))
End Function

'''Wraps a normal value as an instantly ready future.
<Extension()>
Public Function Futurize(Of R)(ByVal value As R) As IFuture(Of R)
Dim f = New Future(Of R)
f.SetValue(value)
Return f
End Function

'''Returns a future for the final value of a future of a future.
<Extension()>
Public Function Defuturize(Of R)(ByVal futureFutureVal As IFuture(Of IFuture(Of R))) As IFuture(Of R)
Dim f = New Future(Of R)
futureFutureVal.CallWhenValueReady(Sub(futureVal) futureVal.CallWhenValueReady(Sub(value) f.SetValue(value)))
Return f
End Function

'''Returns a future for the readyness of a future of a future.
<Extension()>
Public Function Defuturize(ByVal futureFuture As IFuture(Of IFuture)) As IFuture
Dim f = New Future
futureFuture.CallWhenValueReady(Sub(future) future.CallWhenReady(Sub() f.SetReady()))
Return f
End Function
End Module
End Namespace