Tuesday, September 15, 2009

Concurrency #4: Using the Call Queue

Introduction

Now that we have implemented a lock free queue, futures, and a call queue, we will start using them to perform concurrent programming. For example, consider a bank account class synchronized using locks:

Public Class BankAccount
Private balance As UInteger
Private ReadOnly lock As New Object()
Public Function Deposit(ByVal value As UInteger) As UInteger
SyncLock lock
balance += value
Return balance
End SyncLock
End Function
Public Function TryWithdraw(ByVal value As UInteger) As UInteger?
SyncLock lock
If balance < style="color: blue;">Then Return Nothing
balance -= value
Return balance
End SyncLock
End Function
End Class

Rewriting the class to use queues and futures is dead simple, because the locking is at the method level. Just replace SyncLock blocks with QueueFunc blocks.

Public Class BankAccount
Private balance As UInteger
Private queue As ICallQueue = New ThreadPooledCallQueue()
Public Function FutureDeposit(ByVal value As UInteger) As IFuture(Of UInteger)
Return queue.QueueFunc(
Function()
balance += value
Return balance
End Function)
End Function
Public Function FutureTryWithdraw(ByVal value As UInteger) As IFuture(Of UInteger?)
Return queue.QueueFunc(
Function() As UInteger?
If balance < style="color: blue;">Then Return Nothing
balance -= value
Return balance
End Function)
End Function
End Class

Message Passing

Look closely at the bank account example. Calling FutureTryWithdraw is like sending a message to the bank account object (saying "please try to withdraw X dollars"), and the future returned by the method is like a reply saying either "Ok, you now have X dollars!" or "Not enough funds!". We've implemented a form of message passing, which is what highly concurrent languages like Erlang are based on.

Message passing is not the holy grail of concurrency, but it is an extremely powerful abstraction. For example, it would be much easier to message-pass to a remote machine than do some type of lock sharing. Message passing also makes it impossible to cause a deadlock (the closest analogue is creating a never-ending message cycle; much easier to debug). Most importantly, it is easier to reason about messages than locks. Proving a lock is used correctly requires looking at the entire program. Proving a message is used correctly only requires looking at part of the program (usually one or two functions).

Messages are not superior to locks in every way, of course. Different methods work better in different situations. For example, there is no message-passing analogue for holding two locks at the same time. In addition, it is natural to lock only part a method but it is not always clear what that would mean in terms of passing messages.

Those Darn Philosophers

I'm going to finish with an implementation of the dining philosophers done with message passing. Put it in a command line project (alongside the other classes introduced in this series). Notice how each class is simple on its own. In fact, the most complex parts are related to writing readable information to the console! In the next post in this series, I will supply implementations of futures for various asynchronous tasks.

Public Module Main
Public Sub Main()
Const NumPhilosphers As Integer = 5

'Setup table
Dim chopSticks(0 To NumPhilosphers - 1) As Chopstick
Dim philosophers(0 To NumPhilosphers - 1) As DiningPhilosopher
For i = 0 To NumPhilosphers - 1
chopSticks(i) = New Chopstick()
Next i
For i = 0 To NumPhilosphers - 1
philosophers(i) = New DiningPhilosopher(New Random(Environment.TickCount + i), chopSticks(i), chopSticks((i + 1) Mod NumPhilosphers))
Next i

'Give a bit of stabilizing time for measurements
Threading.Thread.Sleep(5000)

'Print status updates
Dim n = 0
Do
n += 1
System.Console.WriteLine("Update #" + n.ToString())
For i = 0 To NumPhilosphers - 1
System.Console.WriteLine("Philosopher #" + i.ToString() + " is " + philosophers(i).Status + ".")
Next i
Threading.Thread.Sleep(1000)
Loop
End Sub
End Module

'''Represents a shared resource.
Public Class Chopstick
Public queue As ICallQueue = New ThreadPooledCallQueue()
Private inUse As Boolean

Public Function QueueTryPickUp() As IFuture(Of HeldChopStick)
Return queue.QueueFunc(Function()
If inUse Then Return Nothing
inUse = True
Return New HeldChopStick(AddressOf QueuePutDown)
End Function)
End Function
Private Function QueuePutDown() As IFuture
Return queue.QueueAction(Sub()
inUse = False
End Sub)
End Function
End Class

'''Represents an acquired resource.
Public Class HeldChopStick
Private wasDropped As Boolean
Private ReadOnly drop As Func(Of IFuture)
Public Sub New(ByVal drop As Func(Of IFuture))
Me.drop = drop
End Sub
Public Function PutDown() As IFuture
If wasDropped Then Throw New InvalidOperationException()
wasDropped = True
Return drop()
End Function
End Class

'''Represents a philosopher trying to share chopsticks for eating.
Public Class DiningPhilosopher
Private ReadOnly leftChopStick As Chopstick
Private ReadOnly rightChopStick As Chopstick
Private feedings As Integer
Private lastTime As Date
Private weightedRate As Double

Public Sub New(ByVal rng As Random, ByVal leftChopStick As Chopstick, ByVal rightChopStick As Chopstick)
Me.lastTime = Now
Me.leftChopStick = leftChopStick
Me.rightChopStick = rightChopStick
Call TryGrabChopsticks(rng)
End Sub

'This method isn't thread-safe, but the errors will be marginal [race condition on increments and resets of 'feeding']
Public ReadOnly Property Status As String
Get
'Measure
Dim t = Now
Dim dt = t - lastTime
Dim rate = feedings / dt.TotalSeconds

'Estimate
If weightedRate = 0 Then
weightedRate = rate
Else
'Exponential approach w.r.t. time
Dim lambda = 0.9 ^ dt.TotalSeconds
weightedRate *= lambda
weightedRate += rate * (1 - lambda)
End If

'Categorize
Dim category = ""
Select Case weightedRate
Case 0 : category = "Empty"
Case 0 To 1 : category = "Starving"
Case 1 To 2 : category = "Hungry"
Case 2 To 3 : category = "Satisfied"
Case Else : category = "Plump"
End Select

'Finish
lastTime = t
feedings = 0
Return category + " (~" + weightedRate.ToString("0.00") + "/s)"
End Get
End Property

Private Sub TryGrabChopsticks(ByVal rng As Random)
'Try to grab both chopsticks
Dim futureLeft = leftChopStick.QueueTryPickUp()
Dim futureRight = rightChopStick.QueueTryPickUp()

'React once both chopsticks have been attempted
futureLeft.CallWhenValueReady(
Sub(left) futureRight.CallWhenValueReady(
Sub(right)
'Call on a new thread because there are sleeps
Call New Threading.Thread(Sub() TryHoldChopsticks(rng, left, right)).Start()
End Sub))
End Sub
Private Sub TryHoldChopsticks(ByVal rng As Random, ByVal left As HeldChopStick, ByVal right As HeldChopStick)
If right Is Nothing OrElse left Is Nothing Then
'Oh no! Put down and wait a bit before trying again.
If left IsNot Nothing Then left.PutDown()
If right IsNot Nothing Then right.PutDown()
Threading.Thread.Sleep(10 + rng.Next(10))
Else
'Yay! Food!
feedings += 1
Threading.Thread.Sleep(50 + rng.Next(50))
left.PutDown()
right.PutDown()

'Snooze
Threading.Thread.Sleep(50 + rng.Next(50))
End If

'Hungry! Repeat!
Call TryGrabChopsticks(rng)
End Sub
End Class


No comments:

Post a Comment