Although the OmniThreadLibrary treats communication as a superior approach to locking, there are still times when using “standard” synchronization primitives such as a critical section are unavoidable. As the standard Delphi/Windows approach to locking is low-level, OmniThreadLibrary builds on it and improves it in some significant ways. All these improvements are collected in the OtlSync unit and are described in the following sections. The only exception is the waitable value class/interface, which is declared in the OtlCommon unit.
This part of the book assumes that you have a basic understanding of locking. If you are new to the topic, you should first read the appropriate chapters from one of the books mentioned in the introduction.
The most useful synchronisation primitive for multi-threaded programming is indubitably the critical section67
OmniThreadLibrary simplifies sharing critical sections between a task owner and a task with the use of the WithLock
method. High-level tasks can access this method through the task configuration block.
I was always holding the opinion that locks should be as granular as possible. Putting many small locks around many unrelated pieces of code is better than using one giant lock for everything. However, programmers frequently use one or few locks because managing many critical sections can be a bother.
To help you with writing a better code, OmniThreadLibrary implements three extensions to the Delphi’s TCriticalSection
class - IOmniCriticalSection
, TOmniCS
and Locked<T>
.
Delphi implements critical section support with a TCriticalSection
class which must be created and destroyed in the code. (There is also a TRTLCriticalSection
record, but it is only supported on Windows.) OmniThreadLibrary extends this implementation with an IOmniCriticalSection
interface, which you only have to create. The compiler will make sure that it is destroyed automatically at the appropriate place.
1
type
2
IOmniCriticalSection
=
interface
3
procedure
Acquire
;
4
procedure
Release
;
5
function
GetSyncObj
:
TSynchroObject
;
6
property
LockCount
:
integer
read
GetLockCount
;
7
end
;
8
9
function
CreateOmniCriticalSection
:
IOmniCriticalSection
;
IOmniCriticalSection
uses TCriticalSection
internally8. It acts just as a proxy that calls TCriticalSection
functions. Besides that, it provides an additional functionality by counting the number of times a critical section has been acquired, which can help a lot while debugging. This counter can be read through the LockCount
property.
Another TCriticalSection
extension found in the OmniThreadLibrary is the TOmniCS
record. It allows you to use a critical section by declaring a record in an appropriate place.
Using TOmniCS
, locking can be as simple as this:
1
uses
2
GpLists
,
3
OtlSync
;
4
5
procedure
ProcessList
(
const
intf
:
IGpIntegerList
)
;
6
begin
7
//...
8
end
;
9
10
var
11
lock
:
TOmniCS
;
12
intf
:
IGpIntegerList
;
13
14
procedure
Test1
;
15
begin
16
intf
:=
TGpIntegerList
.
Create
;
17
//...
18
lock
.
Acquire
;
19
try
20
ProcessList
(
intf
)
;
21
finally
lock
.
Release
;
end
;
22
end
;
TOmniCS
is implemented as a record with one private field holding the IOmniCriticalSection
interface.
1
type
2
TOmniCS
=
record
3
strict
private
4
ocsSync
:
IOmniCriticalSection
;
5
private
6
function
GetLockCount
:
integer
;
inline
;
7
function
GetSyncObj
:
TSynchroObject
;
inline
;
8
public
9
procedure
Initialize
;
10
procedure
Acquire
;
inline
;
11
procedure
Release
;
inline
;
12
property
LockCount
:
integer
read
GetLockCount
;
13
property
SyncObj
:
TSynchroObject
read
GetSyncObj
;
14
end
;
The Release
method merely calls the Release
method on the internal interface, while the Acquire
method is more tricky as it has to initialize the ocsSync
field first.
1
procedure
TOmniCS
.
Acquire
;
2
begin
3
Initialize
;
4
ocsSync
.
Acquire
;
5
end
;
6
7
procedure
TOmniCS
.
Release
;
8
begin
9
ocsSync
.
Release
;
10
end
;
The initialization uses a global critical section to synchronize access to the code that should not be executed from two threads at once.
1
procedure
TOmniCS
.
Initialize
;
2
begin
3
if
not
assigned
(
ocsSync
)
then
begin
4
GOmniCSInitializer
.
Acquire
;
5
try
6
if
not
assigned
(
ocsSync
)
then
7
ocsSync
:=
CreateOmniCriticalSection
;
8
finally
GOmniCSInitializer
.
Release
;
end
;
9
end
;
10
end
;
TOmniCS
is a great simplification of the critical section concept, but it still requires you to declare a separate locking entity. If this locking entity is only used to synchronize access to a specific instance (being that an object, record, interface or even a simple type) it is often better to declare a variable/field of type Locked<T>
which combines any type with a critical section.
Using Locked<T>
, the example from the TOmniCS
section can be rewritten as follows.
1
uses
2
GpLists
,
3
OtlSync
;
4
5
procedure
ProcessList
(
const
intf
:
IGpIntegerList
)
;
6
begin
7
//...
8
end
;
9
10
var
11
lockedIntf
:
Locked
<
IGpIntegerList
>;
12
13
procedure
Test2
;
14
begin
15
lockedIntf
:=
TGpIntegerList
.
CreateInterface
;
16
//...
17
lockedIntf
.
Acquire
;
18
try
19
ProcessList
(
lockedIntf
)
;
20
finally
lockedIntf
.
Release
;
end
;
21
end
;
The interesting fact to notice is although the lockedIntf
is declared as a variable of type Locked<IGpIntegerList>
, it can be initialized and used as if it is of type IGpIntegerList
. This is accomplished by providing Implicit
operators for conversion from Locked<T>
to T
and back. Delphi compiler is (sadly) not smart enough to use this conversion operator in some cases so you would still sometimes have to use the provided Value
property. For example, you’d have to do it to release wrapped object. (In the example above we have wrapped an interface and the compiler itself handled the destruction.)
1
procedure
ProcessObjList
(
obj
:
TGpIntegerList
)
;
2
begin
3
//...
4
end
;
5
6
var
7
lockedObj
:
Locked
<
TGpIntegerList
>;
8
9
procedure
Test3
;
10
begin
11
lockedObj
:=
TGpIntegerList
.
Create
;
12
try
13
//...
14
lockedObj
.
Acquire
;
15
try
16
ProcessObjList
(
lockedObj
)
;
17
finally
lockedObj
.
Release
;
end
;
18
//...
19
finally
lockedObj
.
Value
.
Free
;
end
;
20
end
;
Besides the standard Acquire
/Release
methods, Locked<T>
also implements methods used for pessimistic locking, which is described later in this chapter, and two almost identical methods called Locked
which allow you to execute a code segment (a procedure, a method or an anonymous method) while the critical section is acquired. (In other words, you can be assured that the code passed to the Locked
method is always executed only once provided that all code in the program properly locks access to the shared variable.)
1
type
2
Locked
<
T
>
=
record
3
public
4
type
TFactory
=
reference
to
function
:
T
;
5
type
TProcT
=
reference
to
procedure
(
const
value
:
T
)
;
6
constructor
Create
(
const
value
:
T
;
ownsObject
:
boolean
=
true
)
;
7
class
operator
Implicit
(
const
value
:
Locked
<
T
>
)
:
T
;
inline
;
8
class
operator
Implicit
(
const
value
:
T
)
:
Locked
<
T
>;
inline
;
9
function
Initialize
(
factory
:
TFactory
)
:
T
;
overload
;
10
{$IFDEF OTL_ERTTI}
11
function
Initialize
:
T
;
overload
;
12
{$ENDIF OTL_ERTTI}
13
procedure
Acquire
;
inline
;
14
procedure
Locked
(
proc
:
TProc
)
;
overload
;
inline
;
15
procedure
Locked
(
proc
:
TProcT
)
;
overload
;
inline
;
16
procedure
Release
;
inline
;
17
procedure
Free
;
inline
;
18
property
Value
:
T
read
GetValue
;
19
end
;
20
21
procedure
Locked
<
T
>.
Locked
(
proc
:
TProc
)
;
22
begin
23
Acquire
;
24
try
25
proc
;
26
finally
Release
;
end
;
27
end
;
28
29
procedure
Locked
<
T
>.
Locked
(
proc
:
TProcT
)
;
30
begin
31
Acquire
;
32
try
33
proc
(
Value
)
;
34
finally
Release
;
end
;
35
end
;
There is an alternative built into Delphi since 2009 which provides functionality similar to the Locked<T>
– TMonitor
. In modern Delphis, every object can be locked by using System.TMonitor.Enter
function and unlocked by using System.TMonitor.Exit
. The example above could be rewritten to use the TMonitor
with little work.
1
var
2
obj
:
TGpIntegerList
;
3
4
procedure
Test4
;
5
begin
6
obj
:=
TGpIntegerList
.
Create
;
7
try
8
//...
9
System
.
TMonitor
.
Enter
(
obj
)
;
10
try
11
ProcessObjList
(
obj
)
;
12
finally
System
.
TMonitor
.
Exit
(
obj
)
;
end
;
13
//...
14
finally
FreeAndNil
(
obj
)
;
end
;
15
end
;
A reasonable question to ask is, therefore, why implementing Locked<T>
. Why is TMonitor
not good enough? There are plenty of reasons for that.
TMonitor
was buggy since its inception9,10 (although that was fixed few years later).TMonitor
doesn’t convey your intentions. Just by looking at the variable/field declaration you wouldn’t know that the entity is supposed to be used in a thread-safe manner. Using Locked<T>
, however, explicitly declares your intent.TMonitor.Enter
/Exit
doesn’t work with interfaces, records and primitive types. Locked<T>
does.On the positive size, TMonitor
is faster than a critical section.
A typical situation in a multi-threaded program is a multiple readers/exclusive writer scenario. It occurs when there are multiple reader threads which can operate on the same object simultaneously, but must be locked out when an exclusive writer thread wants to make changes to this object. Delphi already implements a synchronizer for this scenario (TMultiReadExclusiveWriteSynchronizer
11,12 from SysUtils), but it is quite a heavyweight object which you can use in many ways. For situations when the probability of collision13 is low and especially, when the object is not locked for a long period, a TOmniMREW
synchronizer will give you a better performance.
1
type
2
TOmniMREW
=
record
3
public
4
procedure
EnterReadLock
;
inline
;
5
procedure
EnterWriteLock
;
inline
;
6
procedure
ExitReadLock
;
inline
;
7
procedure
ExitWriteLock
;
inline
;
8
function
TryEnterReadLock
(
timeout_ms
:
integer
=
0
)
:
boolean
;
9
function
TryEnterWriteLock
(
timeout_ms
:
integer
=
0
)
:
boolean
;
10
end
;
To use the TOmniMREW
synchronizer, a reader must call EnterReadLock
before reading the object and ExitReadLock
when it doesn’t need the object anymore. Similarly, a writer must call EnterWriteLock
and ExitWriteLock
.
Function TryEnterReadLock
and TryEnterWriteLock
[3.07.6] try to enter a read/write lock. If the lock cannot be acquired in timeout_ms
milliseconds, the functions return False
.
I’d like to stress again the importance of not locking an object for a long time when using TOmniMREW
. Both Enter
methods wait in a tight loop while waiting to get access, which can quickly use lots of CPU time if probability of collisions are high. (Collisions typically occur more often if an object is locked for extensive periods of time.)
1
procedure
TOmniMREW
.
EnterReadLock
;
2
var
3
currentReference
:
NativeInt
;
4
begin
5
//Wait on writer to reset write flag so Reference.Bit0 must be 0
6
//then increase Reference
7
repeat
8
currentReference
:=
NativeInt
(
omrewReference
)
AND
NOT
1
;
9
until
CAS
(
currentReference
,
currentReference
+
2
,
10
NativeInt
(
omrewReference
))
;
11
end
;
12
13
procedure
TOmniMREW
.
EnterWriteLock
;
14
var
15
currentReference
:
NativeInt
;
16
begin
17
//Wait on writer to reset write flag so omrewReference.Bit0 must be 0
18
//then set omrewReference.Bit0
19
repeat
20
currentReference
:=
NativeInt
(
omrewReference
)
AND
NOT
1
;
21
until
CAS
(
currentReference
,
currentReference
+
1
,
22
NativeInt
(
omrewReference
))
;
23
//Now wait on all readers
24
repeat
25
until
NativeInt
(
omrewReference
)
=
1
;
26
end
;
Because of an optimized implementation that favours speed over safety, you’ll get a cryptic access violation error if the TOmniMREW
instance is destroyed while a read or write lock is taken. To be clear, this is a programming error; you should never destroy a synchronization object while it holds a lock. It’s just that the error displayed will not make it very clear what you are doing wrong.
For example, the following test code fragment will cause an access violation.
1
procedure
Test5
;
2
var
3
cs
:
TOmniMREW
;
4
begin
5
cs
.
EnterWriteLock
;
6
end
;
7
8
Test5
;
//<-- accvio here!
Sometimes you want to instruct background tasks to stop whatever they are doing and quit. Typically, this happens when the program is shutting down. Programs using the “standard” multi-threaded programming (i.e. TThread
) are solving this problem each in its own way, typically by using boolean flags or Windows events.
To make the task cancellation simpler and more standardized, OmniThreadLibrary introduces a cancellation token. A cancellation token is an instance of the IOmniCancellationToken
interface and implements functionality very similar to the Windows event synchronization primitive.
1
type
2
IOmniCancellationToken
=
interface
3
procedure
Clear
;
4
function
IsSignalled
:
boolean
;
5
procedure
Signal
;
6
property
Handle
:
THandle
read
GetHandle
;
7
end
;
{ IOmniCancellationToken }
8
9
function
CreateOmniCancellationToken
:
IOmniCancellationToken
;
By default, a cancellation token is in a cleared (inactive) state. To signal it, a code calls the Signal
method. Signalled token can be cleared by calling the Clear
method.
The task can check the cancellation token’s state by calling the IsSignalled
method or by waiting (using WaitForSingleObject
or any of its variants) on the Handle
property. Wait will succeed when the cancellation token is signalled.
An important part of the cancellation token implementation is that the same token can be shared between multiple tasks. To cancel all tasks, the code must only call Signal
once (provided that other parts of the program don’t call Clear
).
Cancellation tokens are used in low-level and high-level multi-threading. Low-level multi-threading uses the CancelWith
method to pass a multi-threading token around while the high-level multi-threading uses the task configuration block.
Cancellation is demonstrated in examples 35_ParallelFor
and 38_OrderedFor
.
The communication framework in the OmniThreadLibrary works asynchronously (you cannot know when a task or owner will receive and process the message). Most of the time that functions great, but sometimes you have to process messages synchronously (that is, you want to wait until the task processes the message) because otherwise the code gets too complicated. For those situations, OmniThreadLibrary offers a waitable value TOmniWaitableValue
, which is also exposed as an interface IOmniWaitableValue
.
1
type
2
IOmniWaitableValue
=
interface
3
procedure
Reset
;
4
procedure
Signal
;
overload
;
5
procedure
Signal
(
const
data
:
TOmniValue
)
;
overload
;
6
function
WaitFor
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
7
property
Handle
:
THandle
read
GetHandle
;
8
property
Value
:
TOmniValue
read
GetValue
;
9
end
;
10
11
TOmniWaitableValue
=
class
(
TInterfacedObject
,
IOmniWaitableValue
)
12
public
13
constructor
Create
;
14
destructor
Destroy
;
override
;
15
procedure
Reset
;
inline
;
16
procedure
Signal
;
overload
;
inline
;
17
procedure
Signal
(
const
data
:
TOmniValue
)
;
overload
;
inline
;
18
function
WaitFor
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
inline
;
19
property
Handle
:
THandle
read
GetHandle
;
20
property
Value
:
TOmniValue
read
GetValue
;
21
end
;
22
23
function
CreateWaitableValue
:
IOmniWaitableValue
;
The usage pattern is simple. The caller creates an object or interface of that type, sends it to another thread (typically via Task.Comm.Send
) and calls the WaitFor
method. The task receives the message, does the processing and calls Signal
to signal completion or Signal(some_data)
to signal completion and return data. At that point, the WaitFor
returns and caller can read the data from the Value
property.
A practical example should clarify this explanation. The two methods below are taken from the OtlThreadPool unit.
When a code wants to cancel threadpooled task, it will call the Cancel
function. This function sends the Cancel
message to the worker task and passes along the ID of the task to be cancelled and a TOmniWaitableValue
object. Then it waits for the object to become signalled.
1
function
TOmniThreadPool
.
Cancel
(
taskID
:
int64
)
:
boolean
;
2
var
3
res
:
TOmniWaitableValue
;
4
begin
5
res
:=
TOmniWaitableValue
.
Create
;
6
try
7
otpWorkerTask
.
Invoke
(
@
TOTPWorker
.
Cancel
,
[
taskID
,
res
])
;
8
res
.
WaitFor
(
INFINITE
)
;
9
Result
:=
res
.
Value
;
10
finally
FreeAndNil
(
res
)
;
end
;
11
end
;
The Cancel
method in the worker task processes the message, does lots of complicated work (removed for clarity) and at the end calls the Signal
method on the TOmniWaitableValue
object to signal completion and return a boolean value.
1
procedure
TOTPWorker
.
Cancel
(
const
params
:
TOmniValue
)
;
2
var
3
waitParam
:
TOmniValue
;
4
wasTerminated
:
boolean
;
5
begin
6
//... lots of code; sets wasTerminated to some value
7
waitParam
:=
params
[
1
]
;
8
(
waitParam
.
AsObject
as
TOmniWaitableValue
)
.
Signal
(
wasTerminated
)
;
9
end
;
{ TOTPWorker.Cancel }
Soon after the Signal
is called, the WaitFor
in the caller code exits and TOmniThreadPool.Cancel
retrieves result from the Value
property.
A semaphore is a counting synchronisation object that starts at some value (typically greater than 0) which usually represents a number of available resources of some kind. To allocate a semaphore, one waits on it. If the semaphore count is greater than zero, the semaphore is signalled, wait will succeed and semaphore count gets decremented by one. [Of course, all of this occurs atomically.] If the semaphore count is zero, the semaphore is not signalled and wait will block until the timeout or until some other thread releases the semaphore, which increments the semaphore’s count and puts it into the signalled state.
While semaphores are implemented in the Windows kernel and Delphi wraps them in a pretty object TSemaphore
, Windows doesn’t support an useful variation on the theme – an inverse semaphore, also known as a countdown event.
Inverse semaphore differs from a normal semaphore by getting signalled when the count drops to zero. This allows another thread to execute a blocking wait that will succeed only when the semaphore’s count is zero. Why is that good, you’ll ask? Because it simplifies resource exhaustion detection. If you wait on an inverse semaphore and this semaphore becomes signalled, then you know that the resource is fully used.
The inverse semaphore is implemented by the TOmniResourceCount
class which implements an IOmniResourceCount
interface.
1
type
2
IOmniResourceCount
=
interface
3
function
Allocate
:
cardinal
;
4
function
Release
:
cardinal
;
5
function
TryAllocate
(
var
resourceCount
:
cardinal
;
6
timeout_ms
:
cardinal
=
0
)
:
boolean
;
7
property
Handle
:
THandle
read
GetHandle
;
8
end
;
9
10
TOmniResourceCount
=
class
(
TInterfacedObject
,
IOmniResourceCount
)
11
public
12
constructor
Create
(
initialCount
:
cardinal
)
;
13
destructor
Destroy
;
override
;
14
function
Allocate
:
cardinal
;
inline
;
15
function
Release
:
cardinal
;
16
function
TryAllocate
(
var
resourceCount
:
cardinal
;
17
timeout_ms
:
cardinal
=
0
)
:
boolean
;
18
property
Handle
:
THandle
read
GetHandle
;
19
end
;
20
21
function
CreateResourceCount
(
initialCount
:
integer
)
:
IOmniResourceCount
;
Initial resource count is passed to the constructor or to the CreateResourceCount
function. Allocate
will block if this count is zero (and will unblock automatically when the count becomes greater than zero); otherwise it will decrement the count. The new value of the resource count is returned as a function result.
The TryAllocate
is a safer version of Allocate
taking a timeout parameter (which may be set to INFINITE
) and returning success/fail status as a function result.
Release
increments the count and unblocks waiting Allocate
s. New resource count (potentially already incorrect at the moment caller sees it) is returned as the result.
Finally, there is a Handle
property exposing a handle which is signalled when resource count is zero and unsignalled otherwise.
Initializing an object in a multi-threaded world is not a problem – as long as the object is initialized before it is shared. To put this into a simple language – everything is fine if we can initialize object first and then pass it to multiple tasks.14
In most cases, this is not a problem, but sometimes we want to use a shared global object in multiple tasks. In that case, the first task that wants to use the object will have to create it. While this may look as a weird approach to programming, it is a legitimate programming pattern, called lazy initialization.
The reason behind this weirdness is that sometimes we don’t know in advance whether an object (or some part of a composite object) will be used at all. If the probability that the object will be used is low enough, it may be a good idea not to initialize it in advance, as that would take some time and use some memory (or maybe even lots of memory).
Additionally, there may not be a good place to call the initialization. A good example is the TOmniCS
record where we want to do an implicit initialization the first time an Acquire
method is called. As this record is usually just declared as a variable/field and not explicitly initialized, there is no better place to call the initialization code than from the Acquire
itself.
This part of the book will explain two well-known approaches to shared initialization – a pessimistic initialization and an optimistic initialization. There’s also a third approach – busy-wait – which you can read more about on my blog.
The difference between the two approaches is visible from the following pseudo-code.
1
var
2
Shared
:
T
;
3
4
procedure
OptimisticInitializer
;
5
var
6
temp
:
T
;
7
begin
8
if
not
assigned
(
Shared
)
then
begin
9
temp
:=
T
.
Create
;
10
if
not
AtomicallyTestAndStore
(
Shared
,
nil
,
temp
)
then
11
temp
.
Free
;
12
end
;
13
end
;
14
15
procedure
PessimisticInitializer
;
16
begin
17
if
not
assigned
(
Shared
)
then
begin
18
Lock
;
19
try
20
if
not
assigned
(
Shared
)
then
21
Shared
:=
T
.
Create
;
22
finally
Unlock
;
end
;
23
end
;
24
end
;
An optimistic initializer assumes that there’s hardly a chance of initialization being called from two tasks at the same time. Under this assumption, it is fastest to initialize the object (in the code above, the initialization is represented by creation of the shared object) and then atomically copying this object into the shared field/variable. The (nonexisting) AtomicallyTestAndStore
method compares old value of Shared
with nil
and stores temp
into Shared
if Shared
is nil. It makes all this in a way that prevents the code from being executed from two threads at the same time. If the AtomicallyTestAndStore
fails (returns False), another task has already modified the Shared
variable and we must destroy the temporary resource.
The advantage of this approach is that there is no locking so we don’t have to create an additional critical section. Only CPU-level bus locking is used to implement the AtomicallyTestAndStore
. The disadvantage is that duplicate objects may be created at some point.
A pessimistic initializer assumes that there’s a significant probability of initialization being called from two tasks at the same time and uses an additional critical section to lock access to the initialization part. A test, lock, retest pattern is used for performance reason – the code first checks whether the shared object is initialized then (if it is not) locks the critical section and retests the shared object as another task could have initialized it in the meantime.
The advantage of this approach is that only a single object is created. The disadvantage is that we must manage additional critical section that will be used for locking.
It is unclear which approach is better. Although locking slows the application more than micro-locking, creating duplicate resources may slow it down even more. On the other hand, pessimistic initializer requires additional lock, but that won’t make much difference if you don’t create millions of shared objects. In most cases initialization code will be rarely called and the complexity of initializer will not change the program performance in any meaningful way so the choice of initializer will mainly be a matter of personal taste.
While pessimistic initialization doesn’t represent any problems for a skilled programmer, it is bothersome as we must manage an additional locking object. (Typically that will be a critical section.) To simplify the code and to make it more intentional, OmniThreadLibrary introduces a Locked<T>
type which wraps any type (the type of your shared object) and a critical section.
An instance of the Locked<T>
type contains two fields – one holding your data (FValue
) and another containing a critical section (FLock
). Locked<T>
provides two helper functions (Initialize
) which implement the pessimistic initialization pattern.
The first version accepts a factory function which creates the object. The code implements the test, lock, retest pattern explained previously in this section.
1
function
Locked
<
T
>.
Initialize
(
factory
:
TFactory
)
:
T
;
2
begin
3
if
not
FInitialized
then
begin
4
Acquire
;
5
try
6
if
not
FInitialized
then
begin
7
FValue
:=
factory
()
;
8
FInitialized
:=
true
;
9
end
;
10
finally
Release
;
end
;
11
end
;
12
Result
:=
FValue
;
13
end
;
Another version, implemented only in Delphi 2010 and newer, doesn’t require a factory function but calls the default (parameter-less) constructor. This is only possible if the T
type represents a class. Actually, this method simply calls the other version and provides a special factory method which travels the extended RTTI information, selects an appropriate constructor and executes it to create the shared object.
1
function
Locked
<
T
>.
Initialize
:
T
;
2
begin
3
if
not
FInitialized
then
begin
4
if
PTypeInfo
(
TypeInfo
(
T
))
^.
Kind
<>
tkClass
then
5
raise
Exception
.
Create
(
'Locked<T>.Initialize: Unsupported type'
)
;
6
Result
:=
Initialize
(
7
function
:
T
8
var
9
aMethCreate
:
TRttiMethod
;
10
instanceType
:
TRttiInstanceType
;
11
ctx
:
TRttiContext
;
12
params
:
TArray
<
TRttiParameter
>;
13
resValue
:
TValue
;
14
rType
:
TRttiType
;
15
begin
16
ctx
:=
TRttiContext
.
Create
;
17
rType
:=
ctx
.
GetType
(
TypeInfo
(
T
))
;
18
for
aMethCreate
in
rType
.
GetMethods
do
begin
19
if
aMethCreate
.
IsConstructor
then
begin
20
params
:=
aMethCreate
.
GetParameters
;
21
if
Length
(
params
)
=
0
then
begin
22
instanceType
:=
rType
.
AsInstance
;
23
resValue
:=
AMethCreate
.
Invoke
(
24
instanceType
.
MetaclassType
,
[])
;
25
Result
:=
resValue
.
AsType
<
T
>;
26
break
;
//for
27
end
;
28
end
;
29
end
;
//for
30
end
)
;
31
end
;
32
end
;
Locked<T>
also implements methods Acquire
and Release
which use the built-in critical section to implement synchronization.
An optimistic initialization is supported with the Atomic<T>
class which is much simpler than the pessimistic Locked<T>
alternative.
1
type
2
Atomic
<
T
>
=
class
3
type
TFactory
=
reference
to
function
:
T
;
4
class
function
Initialize
(
var
storage
:
T
;
5
factory
:
TFactory
)
:
T
;
overload
;
6
{$IFDEF OTL_ERTTI}
7
class
function
Initialize
(
var
storage
:
T
)
:
T
;
overload
;
8
{$ENDIF OTL_ERTTI}
9
end
;
As in Locked<T>
, there are two Initialize
functions, one creating the object using a user-provided factory function and another using RTTI to call the default parameter-less constructor. We’ll only examine the former.
1
class
function
Atomic
<
T
>.
Initialize
(
var
storage
:
T
;
factory
:
TFactory
)
:
T
;
2
var
3
interlockRes
:
pointer
;
4
tmpT
:
T
;
5
begin
6
if
not
assigned
(
PPointer
(
@
storage
)
^
)
then
begin
7
Assert
(
cardinal
(
@
storage
)
mod
SizeOf
(
pointer
)
=
0
,
8
'Atomic<T>.Initialize: storage is not properly aligned!'
)
;
9
Assert
(
cardinal
(
@
tmpT
)
mod
SizeOf
(
pointer
)
=
0
,
10
'Atomic<T>.Initialize: tmpT is not properly aligned!'
)
;
11
tmpT
:=
factory
()
;
12
interlockRes
:=
InterlockedCompareExchangePointer
(
13
PPointer
(
@
storage
)
^,
PPointer
(
@
tmpT
)
^,
nil
)
;
14
case
PTypeInfo
(
TypeInfo
(
T
))
^.
Kind
of
15
tkInterface
:
16
if
interlockRes
=
nil
then
17
PPointer
(
@
tmpT
)
^
:=
nil
;
18
tkClass
:
19
if
interlockRes
<>
nil
then
20
TObject
(
PPointer
(
@
tmpT
)
^
)
.
Free
;
21
else
22
raise
Exception
.
Create
(
'Atomic<T>.Initialize: Unsupported type'
)
;
23
end
;
//case
24
end
;
25
Result
:=
storage
;
26
end
;
The code first checks if the storage is already initialized by using a weird cast which assumes that the T
is pointer-sized. This is a safe assumption because Atomic<T>
only supports T
being a class or an interface.
Next the code checks whether the shared object and the temporary variable are properly aligned. This should in most cases not present a problem as all ‘normal’ fields (not stored in packed record
types) should always be appropriately aligned.
After that, the factory function is called to create an object.
Next, the InterlockedCompareExchangePointer
is called. It takes three parameters – a destination address, an exchange data and a comparand. The functionality of the code can be represented by the following pseudo-code:
1
function
InterlockedCompareExchangePointer
(
var
destination
:
pointer
;
2
exchange
,
comparand
:
pointer
)
:
pointer
;
3
begin
4
Result
:=
destination
;
5
if
destination
=
comparand
then
6
destination
:=
exchange
;
7
end
;
The trick here is that this code is all executed inside the CPU, atomically. The CPU ensures that the destination value is not modified (by another CPU) during the execution of the code. It is hard to understand (interlocked functions always make my mind twirl in circles) but basically it reduces to two scenarios:
nil
(old, uninitialized value of storage
), and storage
is set to the new object (tmpT
).storage
) and storage
is not modified. In yet another words – InterlockedCompareExchangePointer
either stores the new value in the storage
and returns nil
or does nothing, leaves already initialized storage
intact and returns something else than nil
.
At the end, the code handles two specific cases. If a T
is an interface type and initialization was successful, the temporary value in tmpT
must be replaced with nil
. Otherwise two variables (storage
and tmpT
) would own an interface with a reference count of 1 which would cause big problems.
1
if
interlockRes
=
nil
then
2
PPointer
(
@
tmpT
)
^
:=
nil
;
It a T
is a class type and initialization was not successful, the temporary value stored in tmpT
must be destroyed.
1
if
interlockRes
<>
nil
then
2
TObject
(
PPointer
(
@
tmpT
)
^
)
.
Free
;
Initialize
returns the same shared object twice – once in the storage
parameter and once as the function result. This allows us to write space-efficient initializers like in the example below, taken from the OtlParallel unit.
1
function
TOmniWorkItem
.
GetCancellationToken
:
IOmniCancellationToken
;
2
begin
3
Result
:=
Atomic
<
IOmniCancellationToken
>.
Initialize
(
4
FCancellationToken
,
CreateOmniCancellationToken
)
;
5
end
;
When you are initializing an interface and a new instance of the implementing object is created by calling the default constructor Create
, you can use the two-parameter version of Atomic
to simplify the code. [3.06] This is only supported in Delphi XE and newer.
1
Atomic
<
I
;
T
:
constructor
>
=
class
2
class
function
Initialize
(
var
storage
:
I
)
:
I
;
3
end
;
For example, if the shared object is stored in shared: IMyInterface
and is created by calling TMyInterface.Create
, you can initialize it via:
1
Atomic
<
IMyInterface
,
TMyInterface
>.
Initialize
(
shared
)
;
A common scenario in parallel programming is that the program has to wait for something to happen. The occurrence of that something is usually signalled with an event.
On Windows, this is usually accomplished by calling one of the functions from the WaitForMultipleObjects
family. While they are powerful and quite simple to use, they also have a big limitation – one can only wait for up to 64 events at the same time.
Windows also offers a RegisterWaitForSingleObject
API call which can be used to circumvent this limitation. Its use is, however, quite complicated to use. To simplify programmer’s life, OmniThreadLibrary introduces a TWaitFor
class which allows the code to wait for any number of events.
1
type
2
TWaitFor
=
class
3
public
type
4
TWaitResult
=
(
5
waAwaited
,
// WAIT_OBJECT_0 .. WAIT_OBJECT_n
6
waTimeout
,
// WAIT_TIMEOUT
7
waFailed
,
// WAIT_FAILED
8
waIOCompletion
// WAIT_IO_COMPLETION
9
)
;
10
THandleInfo
=
record
11
Index
:
integer
;
12
end
;
13
THandles
=
array
of
THandleInfo
;
14
15
constructor
Create
;
overload
;
16
constructor
Create
(
const
handles
:
array
of
THandle
)
;
overload
;
17
destructor
Destroy
;
override
;
18
function
MsgWaitAny
(
timeout_ms
,
wakeMask
,
flags
:
cardinal
)
:
TWaitResult
;
19
procedure
SetHandles
(
const
handles
:
array
of
THandle
)
;
20
function
WaitAll
(
timeout_ms
:
cardinal
)
:
TWaitResult
;
21
function
WaitAny
(
timeout_ms
:
cardinal
;
alertable
:
boolean
=
false
)
:
TWaitResult
;
22
property
Signalled
:
THandles
read
FSignalledHandles
;
23
end
;
To use TWaitFor
, create an instance of this class and pass it an array of handles either as a constructor parameter or by calling the SetHandles
method. All handles must be created with the CreateEvent
Windows function.
You can then wait for any (WaitAny
) or all (WaitAll
) events to become signalled. In both cases the Signalled
array is filled with information about signalled (set) events. The Signalled
property is an array of THandleInfo
records, each of which only contains one field - an index (into the handles
array) of the signalled event.
For example, if you want to wait for two events and then react to them, use the following approach:
1
var
2
wf
:
TWaitFor
;
3
info
:
THandleInfo
;
4
5
wf
:=
TWaitFor
.
Create
([
handle1
,
handle2
])
;
6
try
7
if
wf
.
WaitAny
(
INFINITE
)
=
waAwaited
then
begin
8
for
info
in
wf
.
Signalled
do
9
if
info
.
Index
=
0
then
10
// handle1 is signalled - do something
11
else
if
info
.
Index
=
1
then
12
// handle2 is signalled - do something
13
end
;
14
finally
FreeAndNil
(
wf
)
;
end
;
You don’t have to recreate TWaitFor
for each wait operation; it is perfectly ok to call WaitXXX
functions repeatedly on the same object. It is also fine to change the array of handles between two WaitXXX
calls by calling the SetHandles
method.
The WaitAny
method also comes in a variant which processes Windows messages, I/O completion routines and APC calls (MsgWaitAny
). It’s wakeMask
and flags
parameters are the same as the corresponding parameters to the MsgWaitForMultipleObjectsEx
API.
The use of the TWaitFor
is shown in demo 59_TWaitFor
.
The TOmniLockManager<K>
class solves a specific problem – how to synchronize access to entities of any type. It is similar to TMonitor
, except that it works on all types, not just on objects.
Following requirements are implemented in the TOmniLockManager<K>
.
TMonitor.Enter/Exit
. The code calls Lock
to get exclusive access to a key and calls Unlock
to release the key back to the public use.Unlock
calls in one thread matches the number of Lock
calls, the key is unlocked. (In other words, if you call Lock
twice with the same key, you also have to call Unlock
twice to release that key.)
1
type
2
IOmniLockManagerAutoUnlock
=
interface
3
procedure
Unlock
;
4
end
;
5
6
IOmniLockManager
<
K
>
=
interface
7
function
Lock
(
const
key
:
K
;
timeout_ms
:
cardinal
)
:
boolean
;
8
function
LockUnlock
(
const
key
:
K
;
timeout_ms
:
cardinal
)
:
IOmniLockManagerAutoUnlock
;
9
procedure
Unlock
(
const
key
:
K
)
;
10
end
;
The TOmniLockManager<K>
public class implements the IOmniLockManager<K>
interface.
The Lock
function returns False
if it does not lock the key in the specified timeout. Timeouts 0 and INFINITE
are supported.
There’s also a LockUnlock
function which returns an interface that automatically unlocks the key when it is released. This interface also implements an Unlock
function which unlocks the key.
A practical example of using the lock manager is shown in demo 54_LockManager
.
For debugging purposes, OmniThreadLibrary implements the TOmniSingleThreadUseChecker
record. It gives the programmer a simple way to make sure that some code is always executed from the same thread.
1
type
2
TOmniSingleThreadUseChecker
=
record
3
procedure
AttachToCurrentThread
;
4
procedure
Check
;
5
procedure
DebugCheck
;
6
end
;
Using it is simple – first declare a variable/field of type TOmniSingleThreadUseChecker
in a context that has to be checked and then call Check
or DebugCheck
method of that variable whenever you want to check that some part of code was not used from more than one thread.
The difference between Check
and DebugCheck
is that the latter can be disabled during the compilation. It implements the check only if the conditional symbol OTL_CheckThreadSafety
is defined. Otherwise, DebugCheck
contains no code and does not affect the execution speed.
In cases where you use such an object from more than one thread (for example, if you use it from a task and then from the task controller after the task terminates) you can call AttachToCurrentThread
to associate the checker with the current thread.