5. Synchronization

Although the OmniThreadLibrary treats communication as a superior approach to locking, there are still times when using “standard” synchronization primitives such as a critical section are unavoidable. As the standard Delphi/Windows approach to locking is low-level, OmniThreadLibrary builds on it and improves it in some significant ways. All these improvements are collected in the OtlSync unit and are described in the following sections. The only exception is the waitable value class/interface, which is declared in the OtlCommon unit.

This part of the book assumes that you have a basic understanding of locking. If you are new to the topic, you should first read the appropriate chapters from one of the books mentioned in the introduction.

5.1 Critical sections

The most useful synchronisation primitive for multi-threaded programming is indubitably the critical section67

OmniThreadLibrary simplifies sharing critical sections between a task owner and a task with the use of the WithLock method. High-level tasks can access this method through the task configuration block.

I was always holding the opinion that locks should be as granular as possible. Putting many small locks around many unrelated pieces of code is better than using one giant lock for everything. However, programmers frequently use one or few locks because managing many critical sections can be a bother.

To help you with writing a better code, OmniThreadLibrary implements three extensions to the Delphi’s TCriticalSection class - IOmniCriticalSection, TOmniCS and Locked<T>.

5.1.1 IOmniCriticalSection

Delphi implements critical section support with a TCriticalSection class which must be created and destroyed in the code. (There is also a TRTLCriticalSection record, but it is only supported on Windows.) OmniThreadLibrary extends this implementation with an IOmniCriticalSection interface, which you only have to create. The compiler will make sure that it is destroyed automatically at the appropriate place.

1 type
2   IOmniCriticalSection = interface
3     procedure Acquire;
4     procedure Release;
5     function  GetSyncObj: TSynchroObject;
6     property LockCount: integer read GetLockCount;
7   end;
9 function CreateOmniCriticalSection: IOmniCriticalSection;  

IOmniCriticalSection uses TCriticalSection internally8. It acts just as a proxy that calls TCriticalSection functions. Besides that, it provides an additional functionality by counting the number of times a critical section has been acquired, which can help a lot while debugging. This counter can be read through the LockCount property.

5.1.2 TOmniCS

Another TCriticalSection extension found in the OmniThreadLibrary is the TOmniCS record. It allows you to use a critical section by declaring a record in an appropriate place.

Using TOmniCS, locking can be as simple as this:

 1 uses
 2   GpLists,
 3   OtlSync;
 5 procedure ProcessList(const intf: IGpIntegerList);
 6 begin
 7   //...
 8 end;
10 var
11   lock: TOmniCS;
12   intf: IGpIntegerList;
14 procedure Test1;
15 begin
16   intf := TGpIntegerList.Create;
17   //...
18   lock.Acquire;
19   try
20     ProcessList(intf);
21   finally lock.Release; end;
22 end;

TOmniCS is implemented as a record with one private field holding the IOmniCriticalSection interface.

 1 type
 2   TOmniCS = record
 3   strict private
 4     ocsSync: IOmniCriticalSection;
 5   private
 6     function  GetLockCount: integer; inline;
 7     function  GetSyncObj: TSynchroObject; inline;
 8   public
 9     procedure Initialize;
10     procedure Acquire; inline;
11     procedure Release; inline;
12     property LockCount: integer read GetLockCount;
13     property SyncObj: TSynchroObject read GetSyncObj;
14   end;

The Release method merely calls the Release method on the internal interface, while the Acquire method is more tricky as it has to initialize the ocsSync field first.

 1 procedure TOmniCS.Acquire;
 2 begin
 3   Initialize;
 4   ocsSync.Acquire;
 5 end;
 7 procedure TOmniCS.Release;
 8 begin
 9   ocsSync.Release;
10 end;

The initialization uses a global critical section to synchronize access to the code that should not be executed from two threads at once.

 1 procedure TOmniCS.Initialize;
 2 begin
 3   if not assigned(ocsSync) then begin
 4     GOmniCSInitializer.Acquire;
 5     try
 6       if not assigned(ocsSync) then
 7         ocsSync := CreateOmniCriticalSection;
 8     finally GOmniCSInitializer.Release; end;
 9   end;
10 end;

5.1.3 Locked<T>

TOmniCS is a great simplification of the critical section concept, but it still requires you to declare a separate locking entity. If this locking entity is only used to synchronize access to a specific instance (being that an object, record, interface or even a simple type) it is often better to declare a variable/field of type Locked<T> which combines any type with a critical section.

Using Locked<T>, the example from the TOmniCS section can be rewritten as follows.

 1 uses
 2   GpLists,
 3   OtlSync;
 5 procedure ProcessList(const intf: IGpIntegerList);
 6 begin
 7   //...
 8 end;
10 var
11   lockedIntf: Locked<IGpIntegerList>;
13 procedure Test2;
14 begin
15   lockedIntf := TGpIntegerList.CreateInterface;
16   //...
17   lockedIntf.Acquire;
18   try
19     ProcessList(lockedIntf);
20   finally lockedIntf.Release; end;
21 end;

The interesting fact to notice is although the lockedIntf is declared as a variable of type Locked<IGpIntegerList>, it can be initialized and used as if it is of type IGpIntegerList. This is accomplished by providing Implicit operators for conversion from Locked<T> to T and back. Delphi compiler is (sadly) not smart enough to use this conversion operator in some cases so you would still sometimes have to use the provided Value property. For example, you’d have to do it to release wrapped object. (In the example above we have wrapped an interface and the compiler itself handled the destruction.)

 1 procedure ProcessObjList(obj: TGpIntegerList);
 2 begin
 3   //...
 4 end;
 6 var
 7   lockedObj: Locked<TGpIntegerList>;
 9 procedure Test3;
10 begin
11   lockedObj := TGpIntegerList.Create;
12   try
13     //...
14     lockedObj.Acquire;
15     try
16       ProcessObjList(lockedObj);
17     finally lockedObj.Release; end;
18     //...
19   finally lockedObj.Value.Free; end;
20 end;

Besides the standard Acquire/Release methods, Locked<T> also implements methods used for pessimistic locking, which is described later in this chapter, and two almost identical methods called Locked which allow you to execute a code segment (a procedure, a method or an anonymous method) while the critical section is acquired. (In other words, you can be assured that the code passed to the Locked method is always executed only once provided that all code in the program properly locks access to the shared variable.)

 1 type
 2   Locked<T> = record
 3   public
 4     type TFactory = reference to function: T;
 5     type TProcT = reference to procedure(const value: T);
 6     constructor Create(const value: T; ownsObject: boolean = true);
 7     class operator Implicit(const value: Locked<T>): T; inline;
 8     class operator Implicit(const value: T): Locked<T>; inline;
 9     function  Initialize(factory: TFactory): T; overload;
11     function  Initialize: T; overload;
13     procedure Acquire; inline;
14     procedure Locked(proc: TProc); overload; inline;
15     procedure Locked(proc: TProcT); overload; inline;
16     procedure Release; inline;
17     procedure Free; inline;
18     property Value: T read GetValue;
19   end;
21 procedure Locked<T>.Locked(proc: TProc);
22 begin
23   Acquire;
24   try
25     proc;
26   finally Release; end;
27 end;
29 procedure Locked<T>.Locked(proc: TProcT);
30 begin
31   Acquire;
32   try
33     proc(Value);
34   finally Release; end;
35 end; Why not use TMonitor?

There is an alternative built into Delphi since 2009 which provides functionality similar to the Locked<T>TMonitor. In modern Delphis, every object can be locked by using System.TMonitor.Enter function and unlocked by using System.TMonitor.Exit. The example above could be rewritten to use the TMonitor with little work.

 1 var
 2   obj: TGpIntegerList;
 4 procedure Test4;
 5 begin
 6   obj := TGpIntegerList.Create;
 7   try
 8     //...
 9     System.TMonitor.Enter(obj);
10     try
11       ProcessObjList(obj);
12     finally System.TMonitor.Exit(obj); end;
13     //...
14   finally FreeAndNil(obj); end;
15 end;

A reasonable question to ask is, therefore, why implementing Locked<T>. Why is TMonitor not good enough? There are plenty of reasons for that.

On the positive size, TMonitor is faster than a critical section.

5.2 TOmniMREW

A typical situation in a multi-threaded program is a multiple readers/exclusive writer scenario. It occurs when there are multiple reader threads which can operate on the same object simultaneously, but must be locked out when an exclusive writer thread wants to make changes to this object. Delphi already implements a synchronizer for this scenario (TMultiReadExclusiveWriteSynchronizer11,12 from SysUtils), but it is quite a heavyweight object which you can use in many ways. For situations when the probability of collision13 is low and especially, when the object is not locked for a long period, a TOmniMREW synchronizer will give you a better performance.

 1 type
 2   TOmniMREW = record
 3   public
 4     procedure EnterReadLock; inline;
 5     procedure EnterWriteLock; inline;
 6     procedure ExitReadLock; inline;
 7     procedure ExitWriteLock; inline;
 8     function  TryEnterReadLock(timeout_ms: integer = 0): boolean;
 9     function  TryEnterWriteLock(timeout_ms: integer = 0): boolean;
10   end;

To use the TOmniMREW synchronizer, a reader must call EnterReadLock before reading the object and ExitReadLock when it doesn’t need the object anymore. Similarly, a writer must call EnterWriteLock and ExitWriteLock.

Function TryEnterReadLock and TryEnterWriteLock [3.07.6] try to enter a read/write lock. If the lock cannot be acquired in timeout_ms milliseconds, the functions return False.

I’d like to stress again the importance of not locking an object for a long time when using TOmniMREW. Both Enter methods wait in a tight loop while waiting to get access, which can quickly use lots of CPU time if probability of collisions are high. (Collisions typically occur more often if an object is locked for extensive periods of time.)

 1 procedure TOmniMREW.EnterReadLock;
 2 var
 3   currentReference: NativeInt;
 4 begin
 5   //Wait on writer to reset write flag so Reference.Bit0 must be 0
 6   //then increase Reference
 7   repeat
 8     currentReference := NativeInt(omrewReference) AND NOT 1;
 9   until CAS(currentReference, currentReference + 2, 
10             NativeInt(omrewReference));
11 end; 
13 procedure TOmniMREW.EnterWriteLock;
14 var
15   currentReference: NativeInt;
16 begin
17   //Wait on writer to reset write flag so omrewReference.Bit0 must be 0 
18   //then set omrewReference.Bit0
19   repeat
20     currentReference := NativeInt(omrewReference) AND NOT 1;
21   until CAS(currentReference, currentReference + 1, 
22             NativeInt(omrewReference));
23   //Now wait on all readers
24   repeat
25   until NativeInt(omrewReference) = 1;
26 end;

Because of an optimized implementation that favours speed over safety, you’ll get a cryptic access violation error if the TOmniMREW instance is destroyed while a read or write lock is taken. To be clear, this is a programming error; you should never destroy a synchronization object while it holds a lock. It’s just that the error displayed will not make it very clear what you are doing wrong.

For example, the following test code fragment will cause an access violation.

1 procedure Test5;
2 var
3   cs: TOmniMREW;
4 begin
5   cs.EnterWriteLock;
6 end;
8 Test5; //<-- accvio here!

5.3 Cancellation token

Sometimes you want to instruct background tasks to stop whatever they are doing and quit. Typically, this happens when the program is shutting down. Programs using the “standard” multi-threaded programming (i.e. TThread) are solving this problem each in its own way, typically by using boolean flags or Windows events.

To make the task cancellation simpler and more standardized, OmniThreadLibrary introduces a cancellation token. A cancellation token is an instance of the IOmniCancellationToken interface and implements functionality very similar to the Windows event synchronization primitive.

1 type
2   IOmniCancellationToken = interface 
3     procedure Clear;
4     function  IsSignalled: boolean;
5     procedure Signal;
6     property Handle: THandle read GetHandle;
7   end; { IOmniCancellationToken }
9 function CreateOmniCancellationToken: IOmniCancellationToken;

By default, a cancellation token is in a cleared (inactive) state. To signal it, a code calls the Signal method. Signalled token can be cleared by calling the Clear method.

The task can check the cancellation token’s state by calling the IsSignalled method or by waiting (using WaitForSingleObject or any of its variants) on the Handle property. Wait will succeed when the cancellation token is signalled.

An important part of the cancellation token implementation is that the same token can be shared between multiple tasks. To cancel all tasks, the code must only call Signal once (provided that other parts of the program don’t call Clear).

Cancellation tokens are used in low-level and high-level multi-threading. Low-level multi-threading uses the CancelWith method to pass a multi-threading token around while the high-level multi-threading uses the task configuration block.

Cancellation is demonstrated in examples 35_ParallelFor and 38_OrderedFor.

5.4 Waitable value

The communication framework in the OmniThreadLibrary works asynchronously (you cannot know when a task or owner will receive and process the message). Most of the time that functions great, but sometimes you have to process messages synchronously (that is, you want to wait until the task processes the message) because otherwise the code gets too complicated. For those situations, OmniThreadLibrary offers a waitable value TOmniWaitableValue, which is also exposed as an interface IOmniWaitableValue.

 1 type
 2   IOmniWaitableValue = interface
 3     procedure Reset;
 4     procedure Signal; overload;
 5     procedure Signal(const data: TOmniValue); overload;
 6     function  WaitFor(maxWait_ms: cardinal = INFINITE): boolean;
 7     property Handle: THandle read GetHandle;
 8     property Value: TOmniValue read GetValue;
 9   end;
11   TOmniWaitableValue = class(TInterfacedObject, IOmniWaitableValue)
12   public
13     constructor Create;
14     destructor  Destroy; override;
15     procedure Reset; inline;
16     procedure Signal; overload; inline;
17     procedure Signal(const data: TOmniValue); overload; inline;
18     function  WaitFor(maxWait_ms: cardinal = INFINITE): boolean; inline;
19     property Handle: THandle read GetHandle;
20     property Value: TOmniValue read GetValue;
21   end;
23 function  CreateWaitableValue: IOmniWaitableValue;

The usage pattern is simple. The caller creates an object or interface of that type, sends it to another thread (typically via Task.Comm.Send) and calls the WaitFor method. The task receives the message, does the processing and calls Signal to signal completion or Signal(some_data) to signal completion and return data. At that point, the WaitFor returns and caller can read the data from the Value property.

A practical example should clarify this explanation. The two methods below are taken from the OtlThreadPool unit.

When a code wants to cancel threadpooled task, it will call the Cancel function. This function sends the Cancel message to the worker task and passes along the ID of the task to be cancelled and a TOmniWaitableValue object. Then it waits for the object to become signalled.

 1 function TOmniThreadPool.Cancel(taskID: int64): boolean;
 2 var
 3   res: TOmniWaitableValue;
 4 begin
 5   res := TOmniWaitableValue.Create;
 6   try
 7     otpWorkerTask.Invoke(@TOTPWorker.Cancel, [taskID, res]);
 8     res.WaitFor(INFINITE);
 9     Result := res.Value;
10   finally FreeAndNil(res); end;
11 end;

The Cancel method in the worker task processes the message, does lots of complicated work (removed for clarity) and at the end calls the Signal method on the TOmniWaitableValue object to signal completion and return a boolean value.

1 procedure TOTPWorker.Cancel(const params: TOmniValue);
2 var
3   waitParam: TOmniValue;
4   wasTerminated: boolean;
5 begin
6   //... lots of code; sets wasTerminated to some value
7   waitParam := params[1];
8   (waitParam.AsObject as TOmniWaitableValue).Signal(wasTerminated);
9 end; { TOTPWorker.Cancel }

Soon after the Signal is called, the WaitFor in the caller code exits and TOmniThreadPool.Cancel retrieves result from the Value property.

5.5 Inverse semaphore

A semaphore is a counting synchronisation object that starts at some value (typically greater than 0) which usually represents a number of available resources of some kind. To allocate a semaphore, one waits on it. If the semaphore count is greater than zero, the semaphore is signalled, wait will succeed and semaphore count gets decremented by one. [Of course, all of this occurs atomically.] If the semaphore count is zero, the semaphore is not signalled and wait will block until the timeout or until some other thread releases the semaphore, which increments the semaphore’s count and puts it into the signalled state.

While semaphores are implemented in the Windows kernel and Delphi wraps them in a pretty object TSemaphore, Windows doesn’t support an useful variation on the theme – an inverse semaphore, also known as a countdown event.

Inverse semaphore differs from a normal semaphore by getting signalled when the count drops to zero. This allows another thread to execute a blocking wait that will succeed only when the semaphore’s count is zero. Why is that good, you’ll ask? Because it simplifies resource exhaustion detection. If you wait on an inverse semaphore and this semaphore becomes signalled, then you know that the resource is fully used.

The inverse semaphore is implemented by the TOmniResourceCount class which implements an IOmniResourceCount interface.

 1 type
 2   IOmniResourceCount = interface 
 3     function  Allocate: cardinal; 
 4     function  Release: cardinal;
 5     function  TryAllocate(var resourceCount: cardinal; 
 6       timeout_ms: cardinal = 0): boolean;
 7     property Handle: THandle read GetHandle;
 8   end;
10   TOmniResourceCount = class(TInterfacedObject, IOmniResourceCount)
11   public
12     constructor Create(initialCount: cardinal);
13     destructor  Destroy; override;
14     function  Allocate: cardinal; inline;
15     function  Release: cardinal;
16     function  TryAllocate(var resourceCount: cardinal; 
17       timeout_ms: cardinal = 0): boolean;
18     property Handle: THandle read GetHandle;
19   end;
21 function CreateResourceCount(initialCount: integer): IOmniResourceCount;

Initial resource count is passed to the constructor or to the CreateResourceCount function. Allocate will block if this count is zero (and will unblock automatically when the count becomes greater than zero); otherwise it will decrement the count. The new value of the resource count is returned as a function result.

The TryAllocate is a safer version of Allocate taking a timeout parameter (which may be set to INFINITE) and returning success/fail status as a function result.

Release increments the count and unblocks waiting Allocates. New resource count (potentially already incorrect at the moment caller sees it) is returned as the result.

Finally, there is a Handle property exposing a handle which is signalled when resource count is zero and unsignalled otherwise.

5.6 Initialization

Initializing an object in a multi-threaded world is not a problem – as long as the object is initialized before it is shared. To put this into a simple language – everything is fine if we can initialize object first and then pass it to multiple tasks.14

In most cases, this is not a problem, but sometimes we want to use a shared global object in multiple tasks. In that case, the first task that wants to use the object will have to create it. While this may look as a weird approach to programming, it is a legitimate programming pattern, called lazy initialization.

The reason behind this weirdness is that sometimes we don’t know in advance whether an object (or some part of a composite object) will be used at all. If the probability that the object will be used is low enough, it may be a good idea not to initialize it in advance, as that would take some time and use some memory (or maybe even lots of memory).

Additionally, there may not be a good place to call the initialization. A good example is the TOmniCS record where we want to do an implicit initialization the first time an Acquire method is called. As this record is usually just declared as a variable/field and not explicitly initialized, there is no better place to call the initialization code than from the Acquire itself.

This part of the book will explain two well-known approaches to shared initialization – a pessimistic initialization and an optimistic initialization. There’s also a third approach – busy-wait – which you can read more about on my blog.

The difference between the two approaches is visible from the following pseudo-code.

 1 var
 2   Shared: T;
 4 procedure OptimisticInitializer;
 5 var
 6   temp: T;
 7 begin
 8   if not assigned(Shared) then begin
 9     temp := T.Create;
10     if not AtomicallyTestAndStore(Shared, nil, temp) then
11       temp.Free;
12   end;
13 end;
15 procedure PessimisticInitializer;
16 begin
17   if not assigned(Shared) then begin
18     Lock;
19     try
20       if not assigned(Shared) then
21         Shared := T.Create;
22     finally Unlock; end;
23   end;
24 end;

An optimistic initializer assumes that there’s hardly a chance of initialization being called from two tasks at the same time. Under this assumption, it is fastest to initialize the object (in the code above, the initialization is represented by creation of the shared object) and then atomically copying this object into the shared field/variable. The (nonexisting) AtomicallyTestAndStore method compares old value of Shared with nil and stores temp into Shared if Shared is nil. It makes all this in a way that prevents the code from being executed from two threads at the same time. If the AtomicallyTestAndStore fails (returns False), another task has already modified the Shared variable and we must destroy the temporary resource.

The advantage of this approach is that there is no locking so we don’t have to create an additional critical section. Only CPU-level bus locking is used to implement the AtomicallyTestAndStore. The disadvantage is that duplicate objects may be created at some point.

A pessimistic initializer assumes that there’s a significant probability of initialization being called from two tasks at the same time and uses an additional critical section to lock access to the initialization part. A test, lock, retest pattern is used for performance reason – the code first checks whether the shared object is initialized then (if it is not) locks the critical section and retests the shared object as another task could have initialized it in the meantime.

The advantage of this approach is that only a single object is created. The disadvantage is that we must manage additional critical section that will be used for locking.

It is unclear which approach is better. Although locking slows the application more than micro-locking, creating duplicate resources may slow it down even more. On the other hand, pessimistic initializer requires additional lock, but that won’t make much difference if you don’t create millions of shared objects. In most cases initialization code will be rarely called and the complexity of initializer will not change the program performance in any meaningful way so the choice of initializer will mainly be a matter of personal taste.

5.6.1 Pessimistic initialization

While pessimistic initialization doesn’t represent any problems for a skilled programmer, it is bothersome as we must manage an additional locking object. (Typically that will be a critical section.) To simplify the code and to make it more intentional, OmniThreadLibrary introduces a Locked<T> type which wraps any type (the type of your shared object) and a critical section.

An instance of the Locked<T> type contains two fields – one holding your data (FValue) and another containing a critical section (FLock). Locked<T> provides two helper functions (Initialize) which implement the pessimistic initialization pattern.

The first version accepts a factory function which creates the object. The code implements the test, lock, retest pattern explained previously in this section.

 1 function Locked<T>.Initialize(factory: TFactory): T;
 2 begin
 3   if not FInitialized then begin
 4     Acquire;
 5     try
 6       if not FInitialized then begin
 7         FValue := factory();
 8         FInitialized := true;
 9       end;
10     finally Release; end;
11   end;
12   Result := FValue;
13 end;

Another version, implemented only in Delphi 2010 and newer, doesn’t require a factory function but calls the default (parameter-less) constructor. This is only possible if the T type represents a class. Actually, this method simply calls the other version and provides a special factory method which travels the extended RTTI information, selects an appropriate constructor and executes it to create the shared object.

 1 function Locked<T>.Initialize: T;
 2 begin
 3   if not FInitialized then begin
 4     if PTypeInfo(TypeInfo(T))^.Kind  <> tkClass then
 5       raise Exception.Create('Locked<T>.Initialize: Unsupported type');
 6     Result := Initialize(
 7       function: T
 8       var
 9         aMethCreate : TRttiMethod;
10         instanceType: TRttiInstanceType;
11         ctx         : TRttiContext;
12         params      : TArray<TRttiParameter>;
13         resValue    : TValue;
14         rType       : TRttiType;
15       begin
16         ctx := TRttiContext.Create;
17         rType := ctx.GetType(TypeInfo(T));
18         for aMethCreate in rType.GetMethods do begin
19           if aMethCreate.IsConstructor then begin
20             params := aMethCreate.GetParameters;
21             if Length(params) = 0 then begin
22               instanceType := rType.AsInstance;
23               resValue := AMethCreate.Invoke(
24                             instanceType.MetaclassType, []);
25               Result := resValue.AsType<T>;
26               break; //for
27             end;
28           end;
29         end; //for
30       end);
31   end;
32 end;

Locked<T> also implements methods Acquire and Release which use the built-in critical section to implement synchronization.

5.6.2 Optimistic initialization

An optimistic initialization is supported with the Atomic<T> class which is much simpler than the pessimistic Locked<T> alternative.

1 type
2   Atomic<T> = class
3     type TFactory = reference to function: T;
4     class function Initialize(var storage: T; 
5       factory: TFactory): T; overload;
7     class function Initialize(var storage: T): T; overload;
9   end;

As in Locked<T>, there are two Initialize functions, one creating the object using a user-provided factory function and another using RTTI to call the default parameter-less constructor. We’ll only examine the former.

 1 class function Atomic<T>.Initialize(var storage: T; factory: TFactory): T;
 2 var
 3   interlockRes: pointer;
 4   tmpT        : T;
 5 begin
 6   if not assigned(PPointer(@storage)^) then begin
 7     Assert(cardinal(@storage) mod SizeOf(pointer) = 0, 
 8       'Atomic<T>.Initialize: storage is not properly aligned!');
 9     Assert(cardinal(@tmpT) mod SizeOf(pointer) = 0, 
10       'Atomic<T>.Initialize: tmpT is not properly aligned!');
11     tmpT := factory();
12     interlockRes := InterlockedCompareExchangePointer(
13                       PPointer(@storage)^, PPointer(@tmpT)^, nil);
14     case PTypeInfo(TypeInfo(T))^.Kind of
15       tkInterface:
16         if interlockRes = nil then
17           PPointer(@tmpT)^ := nil;
18       tkClass:
19         if interlockRes <> nil then
20           TObject(PPointer(@tmpT)^).Free;
21       else
22         raise Exception.Create('Atomic<T>.Initialize: Unsupported type');
23     end; //case
24   end;
25   Result := storage;
26 end;

The code first checks if the storage is already initialized by using a weird cast which assumes that the T is pointer-sized. This is a safe assumption because Atomic<T> only supports T being a class or an interface.

Next the code checks whether the shared object and the temporary variable are properly aligned. This should in most cases not present a problem as all ‘normal’ fields (not stored in packed record types) should always be appropriately aligned.

After that, the factory function is called to create an object.

Next, the InterlockedCompareExchangePointer is called. It takes three parameters – a destination address, an exchange data and a comparand. The functionality of the code can be represented by the following pseudo-code:

1 function InterlockedCompareExchangePointer(var destination: pointer;
2   exchange, comparand: pointer): pointer;
3 begin
4   Result := destination;
5   if destination = comparand then
6     destination := exchange;
7 end;

The trick here is that this code is all executed inside the CPU, atomically. The CPU ensures that the destination value is not modified (by another CPU) during the execution of the code. It is hard to understand (interlocked functions always make my mind twirl in circles) but basically it reduces to two scenarios:

In yet another words – InterlockedCompareExchangePointer either stores the new value in the storage and returns nil or does nothing, leaves already initialized storage intact and returns something else than nil.

At the end, the code handles two specific cases. If a T is an interface type and initialization was successful, the temporary value in tmpT must be replaced with nil. Otherwise two variables (storage and tmpT) would own an interface with a reference count of 1 which would cause big problems.

1 if interlockRes = nil then
2   PPointer(@tmpT)^ := nil;

It a T is a class type and initialization was not successful, the temporary value stored in tmpT must be destroyed.

1 if interlockRes <> nil then
2   TObject(PPointer(@tmpT)^).Free;

Initialize returns the same shared object twice – once in the storage parameter and once as the function result. This allows us to write space-efficient initializers like in the example below, taken from the OtlParallel unit.

1 function TOmniWorkItem.GetCancellationToken: IOmniCancellationToken;
2 begin
3   Result := Atomic<IOmniCancellationToken>.Initialize(
4     FCancellationToken, CreateOmniCancellationToken);
5 end;

When you are initializing an interface and a new instance of the implementing object is created by calling the default constructor Create, you can use the two-parameter version of Atomic to simplify the code. [3.06] This is only supported in Delphi XE and newer.

1   Atomic<I; T:constructor> = class
2     class function Initialize(var storage: I): I;
3   end;

For example, if the shared object is stored in shared: IMyInterface and is created by calling TMyInterface.Create, you can initialize it via:

1 Atomic<IMyInterface, TMyInterface>.Initialize(shared);

5.7 TWaitFor

A common scenario in parallel programming is that the program has to wait for something to happen. The occurrence of that something is usually signalled with an event.

On Windows, this is usually accomplished by calling one of the functions from the WaitForMultipleObjects family. While they are powerful and quite simple to use, they also have a big limitation – one can only wait for up to 64 events at the same time.

Windows also offers a RegisterWaitForSingleObject API call which can be used to circumvent this limitation. Its use is, however, quite complicated to use. To simplify programmer’s life, OmniThreadLibrary introduces a TWaitFor class which allows the code to wait for any number of events.

 1 type
 2   TWaitFor = class
 3   public type
 4     TWaitResult = (
 5       waAwaited,      // WAIT_OBJECT_0 .. WAIT_OBJECT_n
 6       waTimeout,      // WAIT_TIMEOUT
 7       waFailed,       // WAIT_FAILED
 8       waIOCompletion  // WAIT_IO_COMPLETION
 9     );
10     THandleInfo = record
11       Index: integer;
12     end;
13     THandles = array of THandleInfo;
15     constructor Create; overload;
16     constructor Create(const handles: array of THandle); overload;
17     destructor  Destroy; override;
18     function  MsgWaitAny(timeout_ms, wakeMask, flags: cardinal): TWaitResult;
19     procedure SetHandles(const handles: array of THandle);
20     function  WaitAll(timeout_ms: cardinal): TWaitResult;
21     function  WaitAny(timeout_ms: cardinal; alertable: boolean = false): TWaitResult;
22     property Signalled: THandles read FSignalledHandles;
23   end; 

To use TWaitFor, create an instance of this class and pass it an array of handles either as a constructor parameter or by calling the SetHandles method. All handles must be created with the CreateEvent Windows function.

You can then wait for any (WaitAny) or all (WaitAll) events to become signalled. In both cases the Signalled array is filled with information about signalled (set) events. The Signalled property is an array of THandleInfo records, each of which only contains one field - an index (into the handles array) of the signalled event.

For example, if you want to wait for two events and then react to them, use the following approach:

 1 var
 2   wf: TWaitFor;
 3   info: THandleInfo;
 5 wf := TWaitFor.Create([handle1, handle2]);
 6 try
 7   if wf.WaitAny(INFINITE) = waAwaited then begin
 8     for info in wf.Signalled do
 9       if info.Index = 0 then
10         // handle1 is signalled - do something
11       else if info.Index = 1 then
12         // handle2 is signalled - do something 
13   end; 
14 finally FreeAndNil(wf); end;

You don’t have to recreate TWaitFor for each wait operation; it is perfectly ok to call WaitXXX functions repeatedly on the same object. It is also fine to change the array of handles between two WaitXXX calls by calling the SetHandles method.

The WaitAny method also comes in a variant which processes Windows messages, I/O completion routines and APC calls (MsgWaitAny). It’s wakeMask and flags parameters are the same as the corresponding parameters to the MsgWaitForMultipleObjectsEx API.

The use of the TWaitFor is shown in demo 59_TWaitFor.

5.8 TOmniLockManager<K>

The TOmniLockManager<K> class solves a specific problem – how to synchronize access to entities of any type. It is similar to TMonitor, except that it works on all types, not just on objects.

Following requirements are implemented in the TOmniLockManager<K>.

  1. The lock manager is used to lock entities of any type, not just objects. Let’s name such an entity a key.
  2. Lock manager works similarly to a mutex. The caller can wait on a key with a timeout. Timeouts of 0 and INFINITE are supported.
  3. The interface is similar to TMonitor.Enter/Exit. The code calls Lock to get exclusive access to a key and calls Unlock to release the key back to the public use.
  4. The lock manager is reentrant. If a thread manages to get a lock on a key, it will always succeed in getting another lock before releasing the first one. Only when the number of Unlock calls in one thread matches the number of Lock calls, the key is unlocked. (In other words, if you call Lock twice with the same key, you also have to call Unlock twice to release that key.)
  5. Exclusive access is granted fairly. If a thread A has started waiting on a key before the thread B, it will get access to that key before the thread B.
  6. The set of keys is potentially huge, so it is not possible to create a critical section/mutex for each key.
  7. A probability of collision (two threads trying to lock the same resource at the same time) is fairly low.
 1 type
 2   IOmniLockManagerAutoUnlock = interface
 3     procedure Unlock;
 4   end; 
 6   IOmniLockManager<K> = interface
 7     function  Lock(const key: K; timeout_ms: cardinal): boolean;
 8     function  LockUnlock(const key: K; timeout_ms: cardinal): IOmniLockManagerAutoUnlock;
 9     procedure Unlock(const key: K);
10   end; 

The TOmniLockManager<K> public class implements the IOmniLockManager<K> interface.

The Lock function returns False if it does not lock the key in the specified timeout. Timeouts 0 and INFINITE are supported.

There’s also a LockUnlock function which returns an interface that automatically unlocks the key when it is released. This interface also implements an Unlock function which unlocks the key.

A practical example of using the lock manager is shown in demo 54_LockManager.

5.9 TOmniSingleThreadUseChecker

For debugging purposes, OmniThreadLibrary implements the TOmniSingleThreadUseChecker record. It gives the programmer a simple way to make sure that some code is always executed from the same thread.

1 type
2   TOmniSingleThreadUseChecker = record
3     procedure AttachToCurrentThread; 
4     procedure Check;
5     procedure DebugCheck;
6   end; 

Using it is simple – first declare a variable/field of type TOmniSingleThreadUseChecker in a context that has to be checked and then call Check or DebugCheck method of that variable whenever you want to check that some part of code was not used from more than one thread.

The difference between Check and DebugCheck is that the latter can be disabled during the compilation. It implements the check only if the conditional symbol OTL_CheckThreadSafety is defined. Otherwise, DebugCheck contains no code and does not affect the execution speed.

In cases where you use such an object from more than one thread (for example, if you use it from a task and then from the task controller after the task terminates) you can call AttachToCurrentThread to associate the checker with the current thread.