Monday, July 13, 2009

Concurrency #2: The Future is the Message

Introduction

A future is a thing that may not be available now, but will be available later. This simple abstraction simplifies many concurrent tasks by cleanly separating how they are performed and how their results are used. All the caller cares about is that the result will eventually be available, and all the callee cares about is eventually making the result available.

Implementation

We'll start by defining an interface for future values. Callers will need to be able to check if the future is ready yet, a way to be informed when the future does become ready, and a method to extract the future's value once it is ready. So let's start with an interface which does just that:
     Public Interface IFuture(Of Out R)
Event Readied()
ReadOnly Property IsReady() As Boolean
Function GetValue() As R
End Interface
That's good, but sometimes we don't want to include a value with the future (e.g. we just want to signal when a subroutine has finished), and some methods only care about the 'will eventually be ready' part of the future. Let's refactor the interface into two interfaces:
     Public Interface IFuture
Event Readied()
ReadOnly Property IsReady() As Boolean
End Interface
Public Interface IFuture(Of Out R)
Inherits IFuture
Function GetValue() As R
End Interface
Now IFuture(of A) and IFuture(of B) share their common functionality, which is a nice property to have. We will also need classes implementing the IFuture interfaces, so we actually have something to return from future functions. They aren't very hard to implement, so I'll just include what you get if you remove everything but the declarations. Notice that, although IFuture(of R) inherits from IFuture, Future(of R) doesn't inherit from Future. Do you see why?
     Public Class Future
Implements IFuture
Public Event Readied() Implements IFuture.Readied
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Public Function TrySetReady() As Boolean
Public Sub SetReady() 'Throws an InvalidOperationException if the future was already ready.
End Class
Public Class Future(Of R)
Implements IFuture(Of R)
Public Event Readied() Implements IFuture.Readied
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Public Function GetValue() As R Implements IFuture(Of R).GetValue 'Throws an InvalidOperationException if the value isn't ready yet.
Public Function TrySetValue(ByVal val As R) As Boolean
Public Sub SetValue(ByVal val As R) 'Throws an InvalidOperationException if the future was already ready.
End Class
Alright, so now we're all set to start using futures! Well, except these super simple interfaces would be a huge pain to use. We need to define some higher level operations to make things easier. We'll use extension methods to implement them as if they were part of the interface itself. The methods we will require are:
- CallWhenReady: Calls an action when a future is ready, and returns a future for when the action completes.
- EvalWhenReady
: Evaluates a function when a future is ready, and returns a future for the function's output.
- CallWhenValueReady<a>: Calls an action when a future is ready, passing the future's value as an argument, and returns a future for when the action completes.
- EvalWhenValueReady<a,r>: Evaluates a function when a future is ready, passing the future's value as an argument, and returns a future for the function's output.
- Futurize: Takes any value and returns an instantly-ready future for it.
- Defuturize: Takes a future of a future and returns a condensed version, which is just a normal future.
- Defuturize<r>: Takes a future of a future of a value and returns a condensed version, which is just a normal future for the final value.

The simplest and operation is Futurize, and it can be implemented like this:
         <Extension()>
Public Function Futurize(Of R)(ByVal value As R) As IFuture(Of R)
Dim f = New Future(Of R)
f.SetValue(value)
Return f
End Function
The most complicated operation to implement is CallWhenReady. It's also the most useful operation, because the other operations can all be implemented easily using CallWhenReady. Here it is:
         <Extension()>
Public Function CallWhenReady(ByVal future As IFuture,
ByVal action As Action) As IFuture
Dim lockVal As Integer
Dim f As New Future()
Dim notify As IFuture.ReadiedEventHandler
notify = Sub()
If Threading.Interlocked.Exchange(lockVal, 1) = 0 Then 'only run once
RemoveHandler future.Readied, notify
Call action()
f.SetReady()
End If
End Sub

AddHandler future.Readied, notify
If future.IsReady Then Call notify() 'In case the future was already ready

Return f
End Function
Wow, that's a bit complicated! At least the concept is simple. First, we define the notification subroutine, which will run the target action and set CallWhenReady's returned future to ready. Then we make sure the notification subroutine is called once the future is ready. Note that there is a chance the future will become ready between registering for the event and manually checking, which is why the notify subroutine is wrapped in a one time lock.

The rest of the higher level operations are short and sweet because they can be implemented using CallWhenReady. Try to implement them (or just cheat and look at the end of the post).

Some readers might not be familiar with Visual Basic, so I will quickly cover the things used by this method.
- The first line applies the Extension attribute to the method. That tells the compiler the method can be used as if it was part of IFuture. It allows me to write someFuture.CallWhenReady(...) instead of CallWhenReady(someFuture, ...), and includes the method in intellisense. Extension methods are extremely useful, because they make it much easier to discover helper methods.
- The sub ... end sub block is an anonymous subroutine, also called a lambda expression or a closure. Lambda expressions let you inline simple helper functions, but their real power comes from their access to the surrounding variables. For example, CallWhenReady's anonymous notify subroutine uses the arguments passed to CallWhenReady. This is achieved behind the scenes by 'hoisting' those local variables into a private class and passing it to the anonymous method.
- RemoveHandler and AddHandler are used to add and remove the methods called when an Event is raised. In this case we want notify to be called when Readied is fired, then we remove notify to remove unnecessary references (which would prevent the garbage collector from collecting futures derived from any future you held on to).

Example #1

Now we have all the pieces we need in order to use futures. Let's explore a simple example: replacing existing asynchronous methods with futures. We will replace the Net.Sockets.TcpClient BeginConnect/EndConnect methods with a single FutureConnect method. Here is an example of using BeginConnect/EndConnect:
    'Connecting code:
'...
'... do whatever you're doing leading up to a connection
'...

Dim client = New TcpClient
Try
'starts a new async task, which calls EndConnect:
client.BeginConnect(hostname, port, AddressOf EndConnect, client)
Catch e As SocketException
'deal with failure to connect
End Try

'...

Private Sub EndConnect(ByVal ar As IAsyncResult)
Dim client = CType(ar.AsyncState, TcpClient)
Try
client.EndConnect(ar)
Catch e As SocketException
'deal with failure to connect
End Try

'...
'do whatever you wanted to do with the client once it was connected
...
End Sub
I don't like the Begin/End style methods for a bunch of reasons. Do you see how the BeginConnect method mixes arguments about what you want and what to do with it? That you have to handle the IAsyncResult argument? That the ar.AsyncState member is type unsafe? How we have to call EndConnect for every call to BeginConnect? How this pattern has to repeated every single time you use BeginConnect/EndConnect? All these little problems add up! I don't want to care about these details!

Now we'll implement the same thing with futures. Here is a FutureConnect function:
     Public Function FutureConnect(ByVal hostname As String,
ByVal port As UShort) As IFuture(Of PossibleException(Of TcpClient, SocketException))
Dim f = New Future(Of PossibleException(Of TcpClient, SocketException))
Try
Dim client = New TcpClient
client.BeginConnect(hostname, port, Sub(ar)
Try
client.EndConnect(ar)
f.SetValue(client)
Catch e As SocketException
f.SetValue(e)
End Try
End Sub, Nothing)
Catch e As SocketException
f.SetValue(e)
End Try
Return f
End Function
Essentially, we just wrapped the existing BeginConnect/EndConnect methods inside a future. The main differences are the use of futures and the fact that we create a new TcpClient instead of affecting an existing TcpClient.

Let's use our fancy future connect method. Here is an example:
         '...
'... do whatever you're doing leading up to a connection
'...

FutureConnect(hostname, port).CallWhenValueReady(
Sub(possibleClient)
If possibleClient.Exception IsNot Nothing Then
'deal with failure to connect
Return
End If
Dim client = possibleClient.Value

'...
'do whatever you wanted to do with the client once it was connected
'...
End Sub
)

'...
Do you see how the "what I want" and "what I want to do with it" parts are separated? How everything is type safe? How there is no required EndFutureConnect call? We've managed to eliminate those little details. That is a good abstraction at work.

There is a downside to using futures shown here, though. Did you notice that the return value was a PossibleException? That's just a structure which stores a value or an exception. It is impossible for exceptions to safely propagate out of a future the way you expect, so you must encode them into the future's value. The upside is that, because the future's value is type safe, you can't accidentally ignore the possible exception.

Example #2

Suppose you want to do something after a subroutine finishes, but you want to run the subroutine on another thread. Or suppose it's a function running on the other thread and you need to wait for its return value. Futures make this problem trivial. You just write a method that returns a future, and runs the function or subroutine on another thread.

Public Function FutureThreadedAction(ByVal action As Action) As IFuture
Dim f As New Future
Call New Threading.Thread(
Sub()
Call action()
Call f.SetReady()
End Sub
).Start()
Return f
End Function
Public Function FutureThreadedFunction(Of R)(ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
FutureThreadedAction(Sub() f.SetValue(func()))
Return f
End Function
That's it. Now you can call these methods to run functions on other threads, and use futures to work with the return values. I really like this example because, even though we haven't solved any big problems, we've solve one of those tiny annoyances we deal with every day as programmers. The futures are making things easier.

Conclusion

We've really just scratched the surface here. In my next post I will be creating a concurrent call queue which uses futures. In the post after that I will put it all together to implement message passing.

An implementation of futures is posted below. The whole thing is just 170 lines, including comments and whitespace. (My original implementation took upwards of 500 lines, so I'm oddly proud of the line count).

Imports System.Runtime.CompilerServices

''''Provides type-safe methods for return values that will be ready in the future, and for passing future arguments into normal functions.
Namespace Futures
'''Represents a thread-safe read-only class that fires an event when it becomes ready.
Public Interface IFuture
'''Raised when the future becomes ready.
Event Readied()
'''Returns true if the future is ready.
ReadOnly Property IsReady() As Boolean
End Interface

'''Represents a thread-safe read-only class that fires an event when its value becomes ready.
Public Interface IFuture(Of Out R)
Inherits IFuture
'''
''' Returns the future's value.
''' Throws an InvalidOperationException if the value isn't ready yet.
'''
Function GetValue() As R
End Interface

'''A thread-safe class that fires an event when it becomes ready.
Public Class Future
Implements IFuture
Private lockVal As Integer
Public Event Readied() Implements IFuture.Readied

'''Returns true if the future is ready.
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Get
Return lockVal = 1
End Get
End Property

'''
''' Makes the future ready.
''' Throws an InvalidOperationException if the future was already ready.
'''
Public Sub SetReady()
If Not TrySetReady() Then
Throw New InvalidOperationException("Future readied more than once.")
End If
End Sub
'''
''' Makes the future ready.
''' Returns false if the future was already ready.
'''
Public Function TrySetReady() As Boolean
If Threading.Interlocked.Exchange(lockVal, 1) <> 0 Then Return False
RaiseEvent Readied()
Return True
End Function
End Class

'''A thread-safe class that fires an event when its value becomes ready.
Public Class Future(Of R)
Implements IFuture(Of R)
Private val As R
Private lockVal As Integer
Private ReadOnly lockReady As New OneTimeLock
Public Event Readied() Implements IFuture.Readied

'''Returns true if the future is ready.
Public ReadOnly Property IsReady() As Boolean Implements IFuture.IsReady
Get
Return lockVal = 1
End Get
End Property

'''
'''Returns the future's value.
'''Throws an InvalidOperationException if the value isn't ready yet.
'''
Public Function GetValue() As R Implements IFuture(Of R).GetValue
If Not IsReady Then Throw New InvalidOperationException("Attempted to get a future value before it was ready.")
Return val
End Function

'''
''' Sets the future's value and makes the future ready.
''' Throws a InvalidOperationException if the future was already ready.
'''
Public Sub SetValue(ByVal val As R)
If Not TrySetValue(val) Then
Throw New InvalidOperationException("Future readied more than once.")
End If
End Sub
'''
''' Sets the future's value and makes the future ready.
''' Fails if the future was already ready.
'''
Public Function TrySetValue(ByVal val As R) As Boolean
If Threading.Interlocked.Exchange(lockVal, 1) <> 0 Then Return False
Me.val = val
RaiseEvent Readied()
Return True
End Function
End Class

Public Module ExtensionsForIFuture
'''Runs an action once the future is ready, and returns a future for the action's completion.
<Extension()>
Public Function CallWhenReady(ByVal future As IFuture,
ByVal action As Action) As IFuture
Dim lockVal As Integer
Dim f As New Future()
Dim notify As IFuture.ReadiedEventHandler
notify = Sub()
If lock.TryAcquire Then 'only run once
RemoveHandler future.Readied, notify
Call action()
f.SetReady()
End If
End Sub

AddHandler future.Readied, notify
If future.IsReady Then Call notify() 'in case the future was already ready

Return f
End Function

'''Passes the future's value to an action once ready, and returns a future for the action's completion.
<Extension()>
Public Function CallWhenValueReady(Of A1)(ByVal future As IFuture(Of A1),
ByVal action As Action(Of A1)) As IFuture
Return future.CallWhenReady(Sub() action(future.GetValue))
End Function

'''Runs a function once the future is ready, and returns a future for the function's return value.
<Extension()>
Public Function EvalWhenReady(Of R)(ByVal future As IFuture,
ByVal func As Func(Of R)) As IFuture(Of R)
Dim f As New Future(Of R)
future.CallWhenReady(Sub() f.SetValue(func()))
Return f
End Function

'''Passes the future's value to a function once ready, and returns a future for the function's return value.
<Extension()>
Public Function EvalWhenValueReady(Of A1, R)(ByVal future As IFuture(Of A1),
ByVal func As Func(Of A1, R)) As IFuture(Of R)
Return future.EvalWhenReady(Function() func(future.GetValue))
End Function

'''Wraps a normal value as an instantly ready future.
<Extension()>
Public Function Futurize(Of R)(ByVal value As R) As IFuture(Of R)
Dim f = New Future(Of R)
f.SetValue(value)
Return f
End Function

'''Returns a future for the final value of a future of a future.
<Extension()>
Public Function Defuturize(Of R)(ByVal futureFutureVal As IFuture(Of IFuture(Of R))) As IFuture(Of R)
Dim f = New Future(Of R)
futureFutureVal.CallWhenValueReady(Sub(futureVal) futureVal.CallWhenValueReady(Sub(value) f.SetValue(value)))
Return f
End Function

'''Returns a future for the readyness of a future of a future.
<Extension()>
Public Function Defuturize(ByVal futureFuture As IFuture(Of IFuture)) As IFuture
Dim f = New Future
futureFuture.CallWhenValueReady(Sub(future) future.CallWhenReady(Sub() f.SetReady()))
Return f
End Function
End Module
End Namespace

No comments:

Post a Comment