4. Low-level multi-threading

The low-level OmniThreadLibrary layer focuses on the task concept. In most aspects this is similar to the Delphi’s TThread approach except that OmniThreadLibrary focuses on the code (a.k.a. task) and interaction with the code, while the Delphi focuses on the operating system primitive required for executing additional threads.

A task is created using the CreateTask function, which takes as a parameter a global procedure, a method, an instance of the TOmniWorker class (or, usually, a descendant of that class) or an anonymous procedure (in Delphi 2009 and newer). CreateTask will also accept an optional second parameter, a task name, which will be displayed in the Delphi’s Thread view on the thread running the task.

 1 type
 2   TOmniTaskProcedure = procedure(const task: IOmniTask);
 3   TOmniTaskMethod = procedure(const task: IOmniTask) of object;
 4   TOmniTaskDelegate = reference to procedure(const task: IOmniTask);
 5 
 6 function CreateTask(worker: TOmniTaskProcedure; const taskName: string = ''): 
 7   IOmniTaskControl; overload;
 8 function CreateTask(worker: TOmniTaskMethod; const taskName: string = ''): 
 9   IOmniTaskControl; overload;
10 function CreateTask(worker: TOmniTaskDelegate; const taskName: string = ''): 
11   IOmniTaskControl; overload;
12 function CreateTask(const worker: IOmniWorker; const taskName: string = ''): 
13   IOmniTaskControl; overload;

CreateTask returns a feature-full interface IOmniTaskControl which we will explore in this chapter. The most important function in this interface, Run, creates a new thread and starts your task in it.

4.1 Low-level for the impatient

The following code represents the simplest low-level OmniThreadLibrary example. It executes the Beep function in a background thread. The Beep function merely beeps and exits. By exiting from the task function, the Windows thread running the task is also terminated.

1 procedure TfrmTestSimple.Beep(const task: IOmniTask);
2 begin
3   //Executed in a background thread
4   MessageBeep(MB_ICONEXCLAMATION);
5 end;
6 
7 CreateTask(Beep, 'Beep').Run;

Another way to start a task is to call a Schedule function which starts it in a thread allocated from a thread pool. This is covered in the Thread pooling chapter.

4.2 Four ways to create a task

Let’s examine all four ways of creating a task. The simplest way (demoed in application 2_TwoWayHello) is to pass a name of a global procedure to the CreateTask. This global procedure must accept one parameter of type IOmniTask .

1 procedure RunHelloWorld(const task: IOmniTask);
2 begin
3   //
4 end;
5 
6 CreateTask(RunHelloWorld, 'HelloWorld').Run;

A variation on the theme is passing a name of a method to the CreateTask. This approach is used in the demo application 1_HelloWorld. The interesting point here is that you can declare this method in the same class from which the CreateTask is called. That way you can access all class fields and methods from the threaded code. Just keep in mind you’ll be doing this from another thread so make sure you protect shared access with locking!

1 procedure TfrmTestHelloWorld.RunHelloWorld(const task: IOmniTask);
2 begin
3   //
4 end;
5 
6 procedure TfrmTestHelloWorld.StartTask;
7 begin
8   CreateTask(RunHelloWorld, 'HelloWorld').Run;
9 end;

In Delphi 2009 and newer you can also write the task code as an anonymous function.

1 CreateTask(
2   procedure (const task: IOmniTask)
3   begin
4     //
5   end,
6   'HellowWorld').Run;

For all except the simplest tasks, you’ll use the fourth approach as it will give you access to the true OmniThreadLibrary power (namely internal wait loop and message dispatching). To use it, you have to create a worker object deriving from the TOmniWorker class.

 1 type
 2   THelloWorker = class(TOmniWorker)
 3   end;
 4 
 5 procedure TfrmTestTwoWayHello.actStartHelloExecute(Sender: TObject);
 6 begin
 7   FHelloTask :=
 8     CreateTask(THelloWorker.Create(), 'Hello').
 9     Run;
10 end;

4.3 IOmniTaskControl and IOmniTask interfaces

When you create a low-level task, OmniThreadLibrary returns a task controller interface IOmniTaskControl. This interface, which is defined in the OtlTaskControl unit, can be used to control the task from the owner’s side. The task code, on the other hand, has access to another interface, IOmniTask (defined in the OtlTask unit), which can be used to communicate with the owner and manipulate the task itself. A picture in the Tasks vs. threads chapter shows the relationship between these interfaces.

This chapter deals mainly with these two interfaces. For the reference reasons, the IOmniTaskControl is reprinted here in full. In the rest of the chapter I’ll just show relevant interface parts.

The IOmniTask interface is described at the end of this chapter.

 1 type
 2   IOmniTaskControl = interface 
 3     function  Alertable: IOmniTaskControl;
 4     function  CancelWith(const token: IOmniCancellationToken): IOmniTaskControl;
 5     function  ChainTo(const task: IOmniTaskControl;
 6       ignoreErrors: boolean = false): IOmniTaskControl;
 7     function  ClearTimer(timerID: integer): IOmniTaskControl;
 8     function  DetachException: Exception;
 9     function  Enforced(forceExecution: boolean = true): IOmniTaskControl;
10     function  GetFatalException: Exception;
11     function  GetParam: TOmniValueContainer;
12     function  Invoke(const msgMethod: pointer): IOmniTaskControl; overload;
13     function  Invoke(const msgMethod: pointer; 
14       msgData: array of const): IOmniTaskControl; overload;
15     function  Invoke(const msgMethod: pointer; 
16       msgData: TOmniValue): IOmniTaskControl; overload;
17     function  Invoke(const msgName: string): IOmniTaskControl; overload;
18     function  Invoke(const msgName: string; 
19       msgData: array of const): IOmniTaskControl; overload;
20     function  Invoke(const msgName: string; 
21       msgData: TOmniValue): IOmniTaskControl; overload;
22     function  Invoke(remoteFunc: TOmniTaskControlInvokeFunction):
23       IOmniTaskControl; overload;
24     function  Invoke(remoteFunc: TOmniTaskControlInvokeFunctionEx):
25       IOmniTaskControl; overload;
26     function  Join(const group: IOmniTaskGroup): IOmniTaskControl;
27     function  Leave(const group: IOmniTaskGroup): IOmniTaskControl;
28     function  MonitorWith(const monitor: IOmniTaskControlMonitor): 
29       IOmniTaskControl;
30     function  MsgWait(wakeMask: DWORD = QS_ALLEVENTS): IOmniTaskControl;
31     function  NUMANode(numaNodeNumber: integer): IOmniTaskControl;
32     function  OnMessage(eventDispatcher: TObject): IOmniTaskControl; overload;
33     function  OnMessage(eventHandler: TOmniTaskMessageEvent): IOmniTaskControl; overload;
34     function  OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent): 
35       IOmniTaskControl; overload;
36     function  OnMessage(msgID: word; eventHandler: TOmniMessageExec): 
37       IOmniTaskControl; overload;
38     function  OnMessage(eventHandler: TOmniOnMessageFunction): 
39       IOmniTaskControl; overload;
40     function  OnMessage(msgID: word; eventHandler: TOmniOnMessageFunction):
41       IOmniTaskControl; overload;
42     function  OnTerminated(eventHandler: TOmniOnTerminatedFunction):
43       IOmniTaskControl; overload;
44     function  OnTerminated(eventHandler: TOmniOnTerminatedFunctionSimple): 
45       IOmniTaskControl; overload;
46     function  OnTerminated(eventHandler: TOmniTaskTerminatedEvent): 
47       IOmniTaskControl; overload;
48     function  ProcessorGroup(procGroupNumber: integer): IOmniTaskControl;
49     function  RemoveMonitor: IOmniTaskControl;
50     function  Run: IOmniTaskControl;
51     function  Schedule(const threadPool: IOmniThreadPool = nil {default pool}):
52       IOmniTaskControl;
53     function  SetMonitor(hWindow: THandle): IOmniTaskControl;
54     function  SetParameter(const paramName: string; 
55       const paramValue: TOmniValue): IOmniTaskControl; overload;
56     function  SetParameter(const paramValue: TOmniValue): 
57       IOmniTaskControl; overload;
58     function  SetParameters(const parameters: array of TOmniValue):
59       IOmniTaskControl;
60     function  SetPriority(threadPriority: TOTLThreadPriority): IOmniTaskControl;
61     function  SetQueueSize(numMessages: integer): IOmniTaskControl;
62     function  SetTimer(timerID: integer; interval_ms: cardinal; 
63       const timerMessage: TOmniMessageID): IOmniTaskControl; overload;
64     procedure SetTimer(timerID: integer; interval_ms: cardinal; 
65       const timerMessage: TProc); overload;
66     procedure SetTimer(timerID: integer; interval_ms: cardinal; 
67       const timerMessage: TProc<integer>); overload;
68     function  SetUserData(const idxData: TOmniValue; 
69       const value: TOmniValue): IOmniTaskControl;
70     procedure Stop;
71     function  Terminate(maxWait_ms: cardinal = INFINITE): boolean; 
72     function  TerminateWhen(event: THandle): IOmniTaskControl; overload;
73     function  TerminateWhen(token: IOmniCancellationToken): 
74       IOmniTaskControl; overload;
75     function  Unobserved: IOmniTaskControl;
76     function  WaitFor(maxWait_ms: cardinal): boolean;
77     function  WaitForInit: boolean;
78     function  WithCounter(const counter: IOmniCounter): IOmniTaskControl;
79     function  WithLock(const lock: TSynchroObject; 
80       autoDestroyLock: boolean = true): IOmniTaskControl; overload;
81     function  WithLock(const lock: IOmniCriticalSection): 
82       IOmniTaskControl; overload;
83   //
84     property CancellationToken: IOmniCancellationToken 
85       read GetCancellationToken;
86     property Comm: IOmniCommunicationEndpoint read GetComm;
87     property ExitCode: integer read GetExitCode;
88     property ExitMessage: string read GetExitMessage;
89     property FatalException: Exception read GetFatalException;
90     property Lock: TSynchroObject read GetLock;
91     property Name: string read GetName;
92     property Param: TOmniValueContainer read GetParam;
93     property UniqueID: int64 read GetUniqueID;
94     property UserData[const idxData: TOmniValue]: TOmniValue 
95       read GetUserDataVal write SetUserDataVal;
96   end;

4.4 Task controller needs an owner

The IOmniTaskController interface returned from the CreateTask must always be stored in a variable/field with a scope that exceeds the lifetime of the background task. In other words, don’t store a long-term background task interface in a local variable.

The simplest example of the wrong approach can be written in one line:

1 CreateTask(MyWorker).Run;

This code looks fine, but it doesn’t work. In this case, the IOmniTaskController interface is stored in a hidden temporary variable which is destroyed at the end of the current method. This then causes the task controller to be destroyed which in turn causes the background task to be destroyed. Running this code would therefore just create and then destroy the task.

A common solution is to just store the interface in some field.

1 FTaskControl := CreateTask(MyWorker).Run;

When you don’t need background worker anymore, you should terminate the task and free the task controller.

1 FTaskControl.Terminate;
2 FTaskControl := nil;

Another solution is to provide the task with an implicit owner. You can, for example, use the event monitor to monitor tasks lifetime or messages sent from the task and that will make the task owned by the monitor. The following code is therefore valid:

1 CreateTask(MyWorker).MonitorWith(eventMonitor).Run;

Yet another possibility is to call the Unobserved before the Run. This method makes the task being observed by an internal monitor.

1 CreateTask(MyWorker).Unobserved.Run;

When you use a thread pool to run a task, the thread pool acts as a task owner so there’s no need for an additional explicit owner.

1 procedure Beep(const task: IOmniTask);
2 begin
3   MessageBeep(MB_ICONEXCLAMATION);
4 end;
5 
6 CreateTask(Beep, 'Beep').Schedule;

4.5 Communication subsystem

As it is explained in the Locking vs. messaging section, OmniThreadLibrary automatically creates a communication channel between the task controller and the task and exposes it through the Comm property. The communication channel is not exclusive to the OmniThreadLibrary; you could use it equally well from a TThread-based multi-threading code.

1 property Comm: IOmniCommunicationEndpoint read GetComm;

The IOmniCommunicationEndpoint interface exposes a simple interface for sending and receiving messages.

 1 type
 2   TOmniMessage = record
 3     MsgID  : word;
 4     MsgData: TOmniValue;
 5     constructor Create(aMsgID: word; aMsgData: TOmniValue); overload;
 6     constructor Create(aMsgID: word); overload;
 7   end;
 8   
 9   IOmniCommunicationEndpoint = interface 
10     function  Receive(var msg: TOmniMessage): boolean; overload;
11     function  Receive(var msgID: word; var msgData: TOmniValue): boolean; overload;
12     function  ReceiveWait(var msg: TOmniMessage; timeout_ms: cardinal): boolean; overload;
13     function  ReceiveWait(var msgID: word; var msgData: TOmniValue; 
14       timeout_ms: cardinal): boolean; overload;
15     procedure Send(const msg: TOmniMessage); overload;
16     procedure Send(msgID: word); overload;
17     procedure Send(msgID: word; msgData: array of const); overload;
18     procedure Send(msgID: word; msgData: TOmniValue); overload;
19     function  SendWait(msgID: word; 
20       timeout_ms: cardinal = CMaxSendWaitTime_ms): boolean; overload;
21     function  SendWait(msgID: word; msgData: TOmniValue; 
22       timeout_ms: cardinal = CMaxSendWaitTime_ms): boolean; overload;
23     property NewMessageEvent: THandle read GetNewMessageEvent;
24     property OtherEndpoint: IOmniCommunicationEndpoint read GetOtherEndpoint;
25     property Reader: TOmniMessageQueue read GetReader;
26     property Writer: TOmniMessageQueue read GetWriter;
27   end;

For practical examples on a communication channel usage, see the Communication subsection of simple tasks and TOmniWorker tasks sections.

4.6 Processor groups and NUMA nodes

On a system with multiple processor groups you can use ProcessorGroup [3.06] function to specify a processor group this task should run on.

On a system with multiple NUMA nodes you can use NUMANode [3.06] function to specify a NUMA node this task should run on.

When a task is not started directly (Run) but executed via thread pool (Schedule), IOmniThreadPool.ProcessorGroups and IOmniThreadPool.NUMANodes should be used instead.

An information about existing processor groups and NUMA nodes can be accessed through the Environment object.

Demo 64_ProcessorGroups_NUMA demonstrates the use of ProcessorGroup and NUMANode functions.

4.7 Thread pooling

Starting a new thread in the Windows OS is not a very fast operation. If you are frequently scheduling background tasks, the overhead of creating new threads can significantly impact your program. To solve this, OmniThreadLibrary implements a thread pool, which is a basically a cache for threads.

You don’t run a task in a thread pool but schedule it by calling the Schedule method. A very short example of a scheduled task would be:

1 procedure Beep(const task: IOmniTask);
2 begin
3   MessageBeep(MB_ICONEXCLAMATION);
4 end;
5 
6 CreateTask(Beep, 'Beep').Schedule;

A thread pool is created by calling the CreateThreadPool function. A thread pool should have a name (you can set it to an empty string) which is used as part of a pool management thread’s name.

You can also use the default GlobalOmniThreadPool pool which is created on the first use.

1 function CreateThreadPool(const threadPoolName: string): IOmniThreadPool;
2 
3 function GlobalOmniThreadPool: IOmniThreadPool;

All thread pool-related code is stored in the OtlThreadPool unit.

See also demo 11_ThreadPool.

4.7.1 Execution flow

When you schedule a task into a thread pool, it merely enters a queue. The thread pool management thread detects this and tries to start this task in an already existing but idle thread. If there is no such thread, it tries to create a new thread (you can limit the maximum number of concurrent threads so this may not always succeed) and run the task in it.

When a task finishes execution, the thread it was running in is put into an idle state and may be reused for execution of new tasks.

Next section explains the various configuration options implemented by the thread pool.

4.7.2 IOmniThreadPool interface

 1 type
 2   TOTPThreadDataFactoryFunction = function: IInterface;
 3   TOTPThreadDataFactoryMethod = function: IInterface of object;  
 4   
 5   IOmniThreadPool = interface 
 6     function  Cancel(taskID: int64): boolean; overload;
 7     function  Cancel(taskID: int64; signalCancellationToken: boolean): boolean; overload;
 8     procedure CancelAll; overload;
 9     procedure CancelAll(signalCancellationToken: boolean); overload;
10     function  CountExecuting: integer;
11     function  CountQueued: integer;
12     function  IsIdle: boolean;
13     function  MonitorWith(const monitor: IOmniThreadPoolMonitor): IOmniThreadPool;
14     function  RemoveMonitor: IOmniThreadPool;
15     function  SetMonitor(hWindow: THandle): IOmniThreadPool;
16     procedure SetThreadDataFactory(const value: TOTPThreadDataFactoryMethod); overload;
17     procedure SetThreadDataFactory(const value: TOTPThreadDataFactoryFunction); overload;
18     property Asy_OnUnhandledWorkerException: TOTPUnhandledWorkerException read
19       GetAsy_OnUnhandledWorkerException write SetAsy_OnUnhandledWorkerException;
20     property Affinity: IOmniIntegerSet read GetAffinity;
21     property IdleWorkerThreadTimeout_sec: integer read GetIdleWorkerThreadTimeout_sec
22       write SetIdleWorkerThreadTimeout_sec;
23     property MaxExecuting: integer read GetMaxExecuting write SetMaxExecuting;
24     property MaxQueued: integer read GetMaxQueued write SetMaxQueued;
25     property MaxQueuedTime_sec: integer read GetMaxQueuedTime_sec write
26       SetMaxQueuedTime_sec;
27     property MinWorkers: integer read GetMinWorkers write SetMinWorkers;
28     property Name: string read GetName write SetName;
29     property NumCores: integer read GetNumCores;
30     property Options: TOmniThreadPoolOptions read GetOptions write SetOptions;
31     property UniqueID: int64 read GetUniqueID;
32     property WaitOnTerminate_sec: integer read GetWaitOnTerminate_sec
33       write SetWaitOnTerminate_sec;
34     {$IFDEF OTL_NUMASupport}
35     property ProcessorGroups: IOmniIntegerSet read GetProcessorGroups write
36       SetProcessorGroups;
37     property NUMANodes: IOmniIntegerSet read GetNUMANodes write SetNUMANodes;
38     {$ENDIF OTL_NUMASupport}
39   end;

Methods

Detaches external monitor from the thread pool.

Properties

4.7.3 Task exit code

Various reasons for task termination are signalled through its ExitCode property. Following thread pool-specific exit codes are defined.

4.7.4 Monitoring thread pool operations

An event monitor component can be used to monitor thread pool events. First you have to attach it to the thread pool by calling the MonitorWith method. After that, following event monitor event handlers can be used:

 1 type
 2   TOmniMonitorPoolThreadEvent = procedure(const pool: IOmniThreadPool; 
 3     threadID: integer) of object;
 4   TOmniMonitorPoolWorkItemEvent = procedure(const pool: IOmniThreadPool; 
 5     taskID: int64) of object;
 6 
 7   TOmniEventMonitor = class(TComponent, 
 8                             IOmniTaskControlMonitor,
 9                             IOmniThreadPoolMonitor)
10   public
11     property OnPoolThreadCreated: TOmniMonitorPoolThreadEvent;
12     property OnPoolThreadDestroying: TOmniMonitorPoolThreadEvent;
13     property OnPoolThreadKilled: TOmniMonitorPoolThreadEvent;
14     property OnPoolWorkItemCompleted: TOmniMonitorPoolWorkItemEvent;
15   end;

4.7.5 Processor groups and NUMA nodes

Through the property ProcessorGroups you can specify all processor groups the tasks can be scheduled to. By default this set is empty which means that tasks will only be scheduled to the default processor group.

By setting the property NUMANodes you can specify all NUMA nodes the tasks can be scheduled to. By default this set is empty which means that tasks will only be scheduled to the default NUMA node.

An information about existing processor groups and NUMA nodes can be accessed through the Environment object.

Demo 64_ProcessorGroups_NUMA demonstrates the use of ProcessorGroup and NUMANode functions.

4.8 Lock-free collections

OmniThreadLibrary implements three lock-free data structures suitable for low-level usage – bounded stack, bounded queue and dynamic queue. Bounded queue is used inside the OmniThreadLibrary for messaging while dynamic queue is used as a basis of the blocking collection.

All three data structures are fully thread-safe. They support multiple simultaneous readers and writers. They are implemented in the OtlContainers unit.

Another lock-free data structure, a message queue, is defined in the OtlComm unit and is mostly intended for internal operation (such as sending messages to and from a thread) although it can also be used for other tasks. An example of such usage is shown in the Using message queue with a TThread worker chapter.

The term lock-free is not well defined (and not even universally accepted). In the context of this book lock-free means that the synchronisation between threads is not achieved with the user- or kernel-level synchronisation primitives such as critical sections, but with bus-locking CPU instructions. With modern CPU architectures this approach is much faster than locking on the operating system level.

See also demos 10_Containers and 32_Queue.

4.8.1 Bounded Stack

The bounded stack structure is a very fast stack with limited length. The core of the implementation is stored in the TOmniBaseBoundedStack class.

Derived class TOmniBoundedStack adds support for external observers. Both classes implement the same interface – IOmniStack – so you can code against the class or against the interface.

 1 type
 2   IOmniStack = interface 
 3     procedure Empty;
 4     procedure Initialize(numElements, elementSize: integer);
 5     function  IsEmpty: boolean;
 6     function  IsFull: boolean;
 7     function  Pop(var value): boolean;
 8     function  Push(const value): boolean;
 9   end;
10                                                                         
11   TOmniBaseBoundedStack = class(TInterfacedObject, IOmniStack)
12   public
13     destructor Destroy; override;
14     procedure Empty;
15     procedure Initialize(numElements, elementSize: integer); virtual;
16     function  IsEmpty: boolean; inline;
17     function  IsFull: boolean; inline;
18     function  Pop(var value): boolean;
19     function  Push(const value): boolean;
20     property  ElementSize: integer read obsElementSize;
21     property  NumElements: integer read obsNumElements;
22   end;
23   
24   TOmniBoundedStack = class(TOmniBaseBoundedStack)
25   public
26     constructor Create(numElements, elementSize: integer;
27       partlyEmptyLoadFactor: real = CPartlyEmptyLoadFactor;
28       almostFullLoadFactor: real = CAlmostFullLoadFactor);
29     destructor  Destroy; override;
30     function Pop(var value): boolean;
31     function Push(const value): boolean;
32     property ContainerSubject: TOmniContainerSubject read osContainerSubject;
33   end;

4.8.2 Bounded queue

The bounded queue structure is a very fast queue with limited length.

The core of the implementation is stored in the TOmniBaseBoundedQueue class. Derived class TOmniBoundedQueue adds support for external observers. Both classes implement the same interface – IOmniQueue – so you can code against the class or against the interface.

 1 type
 2   IOmniQueue = interface 
 3     function  Dequeue(var value): boolean;
 4     procedure Empty;
 5     function  Enqueue(const value): boolean;
 6     procedure Initialize(numElements, elementSize: integer);
 7     function  IsEmpty: boolean;
 8     function  IsFull: boolean;
 9   end;
10 
11   TOmniBaseBoundedQueue = class(TInterfacedObject, IOmniQueue)
12   public
13     destructor Destroy; override;
14     function  Dequeue(var value): boolean;
15     procedure Empty;
16     function  Enqueue(const value): boolean;
17     procedure Initialize(numElements, elementSize: integer); virtual;
18     function  IsEmpty: boolean;
19     function  IsFull: boolean;
20     property  ElementSize: integer read obqElementSize;
21     property  NumElements: integer read obqNumElements;
22   end;
23   
24   TOmniBoundedQueue = class(TOmniBaseBoundedQueue)
25   public
26     constructor Create(numElements, elementSize: integer;
27       partlyEmptyLoadFactor: real = CPartlyEmptyLoadFactor;
28       almostFullLoadFactor: real = CAlmostFullLoadFactor);
29     destructor  Destroy; override;
30     function  Dequeue(var value): boolean;
31     function  Enqueue(const value): boolean;
32     property  ContainerSubject: TOmniContainerSubject read oqContainerSubject;
33   end;

4.8.3 Message queue

The TOmniMessageQueue is just a thin wrapper around the bounded queue data structure. An element of this queue is a (message ID, message data) pair, stored in a TOmniMessage record.

This class greatly simplifies creating and attaching event and window observers.

 1 type
 2   TOmniMessage = record
 3     MsgID  : word;
 4     MsgData: TOmniValue;
 5     constructor Create(aMsgID: word; aMsgData: TOmniValue); overload;
 6     constructor Create(aMsgID: word); overload;
 7   end;
 8 
 9   TOmniContainerWindowsEventObserver = class(TOmniContainerObserver)
10   public
11     function  GetEvent: THandle; virtual; abstract;
12   end;
13   
14   TOmniMessageQueueMessageEvent = 
15     procedure(Sender: TObject; const msg: TOmniMessage) of object;
16 
17   TOmniMessageQueue = class(TOmniBoundedQueue)
18   public
19     constructor Create(numMessages: integer; 
20       createEventObserver: boolean = true); reintroduce;
21     destructor  Destroy; override;
22     function  Dequeue: TOmniMessage; reintroduce;
23     function  Enqueue(const value: TOmniMessage): boolean; reintroduce;
24     procedure Empty;
25     function  GetNewMessageEvent: THandle;
26     function  TryDequeue(var msg: TOmniMessage): boolean; reintroduce;
27     property EventObserver: TOmniContainerWindowsEventObserver 
28       read mqWinEventObserver;
29     property OnMessage: TOmniMessageQueueMessageEvent 
30       read mqWinMsgObserver.OnMessage write SetOnMessage;
31   end;

TOmniMessageQueue.Create creates an event observer unless the second parameter (createEventObserver) is set to False. It is created with the coiNotifyOnAllInserts interest meaning that an event (accessible through the GetNewMessageEvent function) is signalled each time an element (a message) is added to the queue. The observer itself is accessible through the EventObserver property.

You can also easily create a window message observer by attaching an event handler to the OnMessage property. This observer is also created with the coiNotifyOnAllInserts interest which causes the OnMessage event handler to be called each time an element (a message) is added to the queue. You can destroy this observer at any time by assigning a nil value to the OnMessage event.

For an example, see chapter Using message queue with a TThread worker.

4.8.4 Dynamic queue

The dynamic queue is a fast queue with unlimited length. It can grow as required as the data used to store elements is dynamically allocated.

The core of the implementation is stored in the TOmniBaseQueue class. Derived class TOmniQueue adds support for external observers. Both structures store TOmniValue elements.

 1 type
 2   TOmniBaseQueue = class
 3   ...
 4   public
 5     constructor Create(blockSize: integer = 65536; numCachedBlocks: integer = 4);
 6     destructor  Destroy; override;
 7     function  Dequeue: TOmniValue;
 8     procedure Enqueue(const value: TOmniValue);
 9     function  IsEmpty: boolean;
10     function  TryDequeue(var value: TOmniValue): boolean;
11   end;
12   
13   TOmniQueue = class(TOmniBaseQueue)
14   ...
15   public
16     function  Dequeue: TOmniValue;
17     procedure Enqueue(const value: TOmniValue);
18     function  TryDequeue(var value: TOmniValue): boolean;
19     property ContainerSubject: TOmniContainerSubject read ocContainerSubject;
20   end;

4.8.5 Observing lock-free collections

OmniThreadLibrary data structures support the observer design pattern. Each structure can be observed by multiple observers at the same time. Supporting code and two observer implementations are stored in the OtlContainerObserver unit.

Current architecture supports four different kinds of events that can be observed:

1 type
2   ///<summary>All possible actions observer can take interest in.</summary>
3   TOmniContainerObserverInterest = (
4     //Interests with permanent subscription:
5     coiNotifyOnAllInserts, coiNotifyOnAllRemoves,
6     //Interests with one-shot subscription:
7     coiNotifyOnPartlyEmpty, coiNotifyOnAlmostFull
8   );

The OtlContainerObserver unit implements event and message observers.

 1   TOmniContainerWindowsEventObserver = class(TOmniContainerObserver)
 2   public
 3     function  GetEvent: THandle; virtual; abstract;
 4   end; 
 5 
 6   TOmniContainerWindowsMessageObserver = class(TOmniContainerObserver)
 7   strict protected
 8     function  GetHandle: THandle; virtual; abstract;
 9   public
10     procedure Send(aMessage: cardinal; wParam, lParam: integer); 
11       virtual; abstract;
12     property Handle: THandle read GetHandle;
13   end; 
14 
15   function CreateContainerWindowsEventObserver(externalEvent: THandle = 0):
16     TOmniContainerWindowsEventObserver;
17     
18   function CreateContainerWindowsMessageObserver(hWindow: THandle; 
19     msg: cardinal; wParam, lParam: integer): 
20     TOmniContainerWindowsMessageObserver;

The event observer TOmniContainerWindowsEventObserver raises an event every time the observed event occurs.

The message observer TOmniContainerWindowsMessageObserver sends a message to a window every time the observed event occurs.

4.8.5.1 Examples

Create and attach the event observer:

1 FObserver := CreateContainerWindowsEventObserver;
2 FCollection.ContainerSubject.Attach(FObserver, coiNotifyOnAllInserts);

Access the observer event so you can wait on it:

1 FEvent := FObserver.GetEvent;

Detach and destroy the observer:

1 FCollection.ContainerSubject.Detach(FObserver, coiNotifyOnAllInserts);
2 FreeAndNil(FObserver);

Create and attach the message observer:

1 FWindow := DSiAllocateHWnd(ObserverWndProc);
2 FObserver := CreateContainerWindowsMessageObserver(
3   FWindow, MSG_ITEM_INSERTED, 0, 0);
4 FWorker.Output.ContainerSubject.Attach(FObserver, coiNotifyOnAllInserts);

Process observer messages:

 1 procedure ObserverWndProc(var message: TMessage);
 2 var
 3   ovWorkItem: TOmniValue;
 4   workItem  : IOmniWorkItem;
 5 begin
 6   if message.Msg = MSG_ITEM_INSERTED then begin
 7     //...
 8     message.Result := Ord(true);
 9   end
10   else
11     message.Result := DefWindowProc(FWindow, message.Msg, 
12       message.WParam, message.LParam);
13 end;

Detach and destroy the observer:

1 FWorker.Output.ContainerSubject.Detach(FObserver, coiNotifyOnAllInserts);
2 FreeAndNil(FObserver);
3 DSiDeallocateHWnd(FWindow);

4.8.6 Benchmarks

OmniThreadLibrary contains two demos that can be used to measure the performance of the lock-free structures. Bounded structures are benchmarked in the 10_Containers demo and dynamic queue is benchmarked in the 32_Queue demo.

Following results were measured on 4-core i7-2630QM running at 2 GHz. As you can see, lock-free structures can transfer from 2,5 to 5 million messages per second.

4.9 Event monitor

While the OmniThreadLibrary is mostly a code-oriented framework, it also contains one package (stored in the packages subfolder) with one component (TOmniEventMonitor). This component supports Win32 and Win64 projects and contains some events that can be used to monitor task and thread lifecycle.

 1 type
 2   TOmniMonitorTaskEvent = procedure(const task: IOmniTaskControl) of object;
 3   TOmniMonitorTaskMessageEvent = procedure(const task: IOmniTaskControl;
 4     const msg: TOmniMessage) of object;
 5   TOmniMonitorPoolThreadEvent = procedure(const pool: IOmniThreadPool; 
 6     threadID: integer) of object;
 7   TOmniMonitorPoolWorkItemEvent = procedure(const pool: IOmniThreadPool; 
 8     taskID: int64) of object;
 9 
10   TOmniEventMonitor = class(TComponent, 
11                             IOmniTaskControlMonitor, 
12                             IOmniThreadPoolMonitor)
13   published
14     property OnPoolThreadCreated: TOmniMonitorPoolThreadEvent;
15     property OnPoolThreadDestroying: TOmniMonitorPoolThreadEvent;
16     property OnPoolThreadKilled: TOmniMonitorPoolThreadEvent;
17     property OnPoolWorkItemCompleted: TOmniMonitorPoolWorkItemEvent;
18     property OnTaskMessage: TOmniMonitorTaskMessageEvent;
19     property OnTaskTerminated: TOmniMonitorTaskEvent;
20     property OnTaskUndeliveredMessage: TOmniMonitorTaskMessageEvent;
21   end;

The first four events (OnPoolThread*) are used to monitor thread pool events. They are described in the Monitoring thread pool operations section.

The other three events are used to monitor tasks attached to this monitor. See the MonitorWith section for more information.

4.10 Simple tasks

This part of the book describes properties and methods of the IOmniTaskControl interface that are useful with all four kinds of tasks. Next section, TOmniWorker tasks covers only parts that are useful for TOmniWorker-based tasks.

4.10.1 Name

1 property Name: string read GetName;

The Name property returns the task name as set in the CreateTask call.

4.10.2 UniqueID

1 property UniqueID: int64 read GetUniqueID;

The UniqueID property returns task’s unique ID. This ID is generated automatically when a task is created and is guaranteed to be unique and greater than zero.

4.10.3 Parameters

1 function SetParameter(const paramName: string; 
2   const paramValue: TOmniValue): IOmniTaskControl; overload;
3 function SetParameter(const paramValue: TOmniValue): IOmniTaskControl; overload;
4 function SetParameters(const parameters: array of TOmniValue): IOmniTaskControl;
5 property Param: TOmniValueContainer read GetParam;

Task controller can set parameters for the task by calling the SetParameter and SetParameters methods. Parameters must be set before the task is started or scheduled (IOW, before the Run or Schedule are called).

Parameters can have names or they can be accessed by an index number.

In the former approach you have to pass in a name and a value for each parameter. You can set multiple parameters in one call by calling SetParameters and providing it with an array of (name, value) pairs.

1 taskControl := CreateTask(MyTask);
2 taskControl.SetParameter('Initial value', '42');
3 taskControl.SetParameters(['From', 0, 'To', 99]);

Both the controller and the task can access parameters through the Param property.

1 task.Param['Initial value'] // '42'
2 task.Param['From'] // 0
3 task.Param['To'] // 99

Another approach is to just set parameters one by one by calling SetParameter repeatedly.

1 taskControl := CreateTask(MyTask);
2 taskControl.SetParameter('42');
3 taskControl.SetParameter(0);
4 taskControl.SetParameter(99);

In this case, a task can index the Param property with a 0-based index to access parameters.

1 task.Param[0] // '42'
2 task.Param[1] // 0
3 task.Param[2] // 99

Parameter storage is implemented in the TOmniValueContainer class.

4.10.4 Termination

A simple background task can function either as a single-shot or as a long term operation. In the former case there’s no need for stopping the task as it will stop by itself but in the latter case we would need to tell the task to stop at some point.

To stop a task, call its Terminate method. It will wait up to maxWait_ms milliseconds for the task to exit, after which it will kill the thread running the task.

1 procedure Stop;
2 function Terminate(maxWait_ms: cardinal = INFINITE): boolean; 
3 function WaitFor(maxWait_ms: cardinal): boolean;

Simple tasks should occasionally check either the Terminated function (it will return True once the Terminate has been called) or the TerminateEvent event (it will become signalled once the Terminate has been called).

The two examples below demonstrate how to write a task that does nothing except wait to be terminated.

 1 procedure TerminateTask1(const task: IOmniTask);
 2 begin
 3   while WaitForSingleObject(task.TerminateEvent, 1000) = WAIT_TIMEOUT do begin
 4     // some periodic task
 5   end;
 6 end;
 7 
 8 procedure TerminateTask2(const task: IOmniTask);
 9 begin
10   while not task.Terminated do begin
11     // some periodic task
12     Sleep(1000);
13   end;
14 end;

If you just want to tell the task to stop (without waiting for its termination), you can call the Stop method. You can then check the tasks status by calling the WaitFor function which will wait at most maxWait_ms milliseconds for the task to stop and then return True if the task has stopped.

Sometimes you may want to be notified when the task has terminated. One way to do that is to attach the task to a monitor and monitor the OnTaskTerminated event. Another is to write a termination event handler.

 1 type
 2   TOmniOnTerminatedFunction = reference to procedure(const task: IOmniTaskControl);
 3   TOmniOnTerminatedFunctionSimple = reference to procedure;
 4   TOmniTaskTerminatedEvent = procedure(const task: IOmniTaskControl) of object;
 5 
 6 function OnTerminated(eventHandler: TOmniOnTerminatedFunction): 
 7   IOmniTaskControl; overload;
 8 function OnTerminated(eventHandler: TOmniOnTerminatedFunctionSimple): 
 9   IOmniTaskControl; overload;
10 function OnTerminated(eventHandler: TOmniTaskTerminatedEvent): 
11   IOmniTaskControl; overload;

The termination handler can receive a reference to the controller of the task being terminated or it can be a parameterless procedure.

The termination handler is called in the context of the thread that created the task.

The example below shows how to write a termination handler to clear the task controller.

 1 procedure NullTask(const task: IOmniTask);
 2 begin
 3 end;
 4 
 5 var
 6   FTaskControl: IOmniTaskControl;
 7 
 8 FTaskControl := CreateTask(NullTask)
 9 FTaskControl.OnTerminated(
10   procedure
11   begin
12     ShowMessage('Background task has terminated');
13     FTaskControl := nil;
14   end);
15 FTaskControl.Run;

4.10.5 ExitCode

A task can send a result to the controller by calling the SetExitStatus procedure. The task controller can access this result through the ExitCode and ExitMessage properties.

1 property ExitCode: integer read GetExitCode;
2 property ExitMessage: string read GetExitMessage;

By default, ExitCode contains value 0 and ExitMessage contains an empty string.

An application can use exit codes from 0 to $7FFFFFFF. Exit codes from $80000000 to $FFFFFFFF are reserved for internal OmniThreadLibrary use. Following exit codes (defined in the OtlCommon unit) have reserved meaning:

1 const
2   // reserved exit statuses
3   EXIT_OK                        = 0;
4   EXIT_INTERNAL                  = integer($80000000);
5   EXIT_THREADPOOL_QUEUE_TOO_LONG = EXIT_INTERNAL + 0;
6   EXIT_THREADPOOL_STALE_TASK     = EXIT_INTERNAL + 1;
7   EXIT_THREADPOOL_CANCELLED      = EXIT_INTERNAL + 2;
8   EXIT_THREADPOOL_INTERNAL_ERROR = EXIT_INTERNAL + 3;

EXIT_THREADPOOL_* exit codes are described in the Thread pooling section.

4.10.6 Exceptions

If an unhandled exception is raised in the background task, it is caught by the OmniThreadLibrary code and stored in the FatalException property. When that happens, the background task terminates execution in a normal way (termination handler is called etc.).

The task controller can check the FatalException property to see whether an unhandled exception was raised (if not, the property will contain nil).

When the task controller is destroyed, so is the exception object. If you need to preserve the exception object, you can detach it from the task controller by calling the DetachException function.

1 function  DetachException: Exception;
2 property FatalException: Exception read GetFatalException;

See also demo 13_Exceptions.

4.10.7 Sending messages to a task

The Communication subsystem section explains how the communication subsystem works and what methods can be used in your program. This part of the book will show a practical example of sending messages to a simple (not TOmniWorker-based) task. This example is based on the 2_TwoWayHello demo.

The task is created in a standard manner.

1 FHelloTask :=
2   OmniEventMonitor1.Monitor(CreateTask(RunHello, 'Hello'))
3     .SetParameter('Message', 'Initial message')
4     .Run;

A message is sent to the task through the Comm property.

1 const
2   MSG_CHANGE_MESSAGE = 1;
3 
4 FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'New message');

The task (RunHello procedure) must implement a loop which will wait on two events – task.TerminateEvent (to know when to stop) and task.Comm.NewMessageEvent (to know when a new message arrived). When the latter event is signalled, a message can be read from the message queue.

You should always read and process all waiting messages, not just one.

 1 procedure RunHello(const task: IOmniTask);
 2 var
 3   msg    : string;
 4   msgData: TOmniValue;
 5   msgID  : word;
 6 begin
 7   msg := task.Param['Message'];
 8   repeat
 9     case DSiWaitForTwoObjects(task.TerminateEvent, task.Comm.NewMessageEvent,
10            false, task.Param['Delay']) 
11     of
12       WAIT_OBJECT_1:
13         begin
14           while task.Comm.Receive(msgID, msgData) do begin
15             if msgID = MSG_CHANGE_MESSAGE then
16               msg := msgData;
17           end;
18         end;
19       WAIT_TIMEOUT:
20         task.Comm.Send(0, msg);
21       else
22         break; //repeat
23     end;
24   until false;
25 end;

This code uses DSiWaitForTwoObjects to wait on two events. That function is just a thin wrapper around Windows’ WaitForMultipleObjects5.

4.10.8 Receiving messages from a task

Messages sent from a task can be received and processed in many ways. They can all be used with both simple and TOmniWorker tasks.

First approach is to use an event monitor to process messages.

Another way is to call an OnMessage method providing an event handler which can be a method or an anonymous method.

1 type
2   TOmniTaskMessageEvent = procedure(const task: IOmniTaskControl; 
3     const msg: TOmniMessage) of object;
4   TOmniOnMessageFunction = reference to procedure(const task: IOmniTaskControl; 
5     const msg: TOmniMessage);
6 
7 function OnMessage(eventHandler: TOmniTaskMessageEvent): IOmniTaskControl; overload;
8 function OnMessage(eventHandler: TOmniOnMessageFunction): IOmniTaskControl; overload;
1 task := CreateTask(MyTask).OnMessage(
2   procedure (const task: IOmniTaskControl; const msg: TOmniMessage)
3   begin
4     ShowMessage('Received message with ID ' + IntToStr(msg.MsgID));
5   end).Run;  

You can also set multiple message handlers, each handling a specific message ID.

 1 type
 2   TOmniTaskMessageEvent = procedure(const task: IOmniTaskControl; 
 3     const msg: TOmniMessage) of object;
 4   TOmniOnMessageFunction = reference to procedure(const task: IOmniTaskControl; 
 5     const msg: TOmniMessage);
 6 
 7 function OnMessage(msgID: word; eventHandler: TOmniTaskMessageEvent): 
 8   IOmniTaskControl; overload;
 9 function OnMessage(msgID: word; eventHandler: TOmniOnMessageFunction): 
10   IOmniTaskControl; overload;
 1 task := CreateTask(MyTask);
 2 task.OnMessage(
 3   MSG_FIRST,
 4   procedure (const task: IOmniTaskControl; const msg: TOmniMessage)
 5   begin
 6     ShowMessage('Received MSG_FIRST');
 7   end);  
 8 task.OnMessage(
 9   MSG_SECOND,
10   procedure (const task: IOmniTaskControl; const msg: TOmniMessage)
11   begin
12     ShowMessage('Received MSG_SECOND');
13   end);  
14 task.Run;

The OmniThreadLibrary allows you to combine both approaches, providing specific message handlers for some messages and generic message handlers for all the rest.

1 task := CreateTask(MyTask);
2 task.OnMessage(MSG_FIRST, HandleFirstMessage);
3 task.OnMessage(MSG_SECOND, HandleSecondMessage);
4 task.OnMessage(HandleOtherMessages);
5 task.Run;

A message handler can also be provided through a TOmniMessageExec class, which is defined in the OtlTaskControl unit. This class can wrap any message handler type and is mainly intended for internal use.

1 function OnMessage(msgID: word; eventHandler: TOmniMessageExec): 
2   IOmniTaskControl; overload;

The last possibility is to dispatch all messages to an object which implements Delphi’s Dispatch mechanism, for example, to a form.

1 function OnMessage(eventDispatcher: TObject): IOmniTaskControl; overload;

An object should then define message handlers by using the Delphi’s message procedures. Each should accept a var TOmniMessage parameter.

 1 const
 2   WM_FIRST_MSG = WM_USER;
 3   WM_SECOND_MSG = WM_USER + 1;
 4   
 5 type
 6   TForm1 = class(TForm)
 7   public
 8     FTask: IOmniTaskControl;
 9     procedure StartTask;
10     procedure HandleFirstMessage(var msg: TOmniMessage); message WM_FIRST_MSG;
11     procedure HandleSecondMessage(var msg: TOmniMessage); message WM_SECOND_MSG;
12   end;                  
13   
14 procedure TForm1.StartTask;
15 begin
16   FTask := CreateTask(MyTask).OnMessage(Self).Run;  
17 end;                                                         

4.10.9 ChainTo

Tasks can be chained together so that next task is started when the previous terminates. This is achieved by the ChainTo method.

1 function ChainTo(const task: IOmniTaskControl; 
2   ignoreErrors: boolean = false): IOmniTaskControl;

The code below starts the task1 task. When it is finished, OmniThreadLibrary immediately starts task2. Note that in this case you should not start the second task explicitly.

1 task2 := CreateTask(SecondTask);
2 task1 := CreateTask(FirstTask).ChainTo(task2).Run;

If ignoreErrors is left at default (False), the second task is only started if first task’s ExitCode is EXIT_OK (0). If ignoreErrors is set to True, the second task is started even if ExitCode of the first task is not 0.

4.10.10 Join / Leave

Calling Join adds the task to a task group.

Calling Leave removes the task from a group.

1 function Join(const group: IOmniTaskGroup): IOmniTaskControl;
2 function Leave(const group: IOmniTaskGroup): IOmniTaskControl;

4.10.11 MonitorWith / RemoveMonitor

Calling MonitorWith attaches the task to an event monitor. Calling RemoveMonitor removes the task from the monitor.

1 function MonitorWith(const monitor: IOmniTaskControlMonitor): IOmniTaskControl;
2 function RemoveMonitor: IOmniTaskControl;

Calling task.MonitorWith(otlMonitor) is equivalent to calling otlMonitor.Monitor(task). Calling task.RemoveMonitor is equivalent to calling otlMonitor.Detach(task).

Following three monitor events are useful for monitoring tasks.

 1 type
 2   TOmniMonitorTaskEvent = procedure(const task: IOmniTaskControl) of object;
 3   TOmniMonitorTaskMessageEvent = procedure(const task: IOmniTaskControl;
 4     const msg: TOmniMessage) of object;
 5 
 6   TOmniEventMonitor = class(TComponent, 
 7                             IOmniTaskControlMonitor,
 8                             IOmniThreadPoolMonitor)
 9   public
10     property OnTaskMessage: TOmniMonitorTaskMessageEvent;
11     property OnTaskTerminated: TOmniMonitorTaskEvent;
12     property OnTaskUndeliveredMessage: TOmniMonitorTaskMessageEvent;
13   end;

4.10.12 Enforced

The Enforced method regulates behaviour in a very specific situation – when a task is terminated before it even starts execution.

1 function Enforced(forceExecution: boolean = true): IOmniTaskControl;

In the default scenario, a task is always started when the Run is called, even if it was terminated before that. While the task would typically immediately exit, it can still do some processing.

1 procedure ShortLivedTask(const task: IOmniTask);
2 begin
3   // this line will be executed
4 end;
5 
6 task := CreateTask(ShortLivedTask);
7 task.Terminate;
8 task.Run;

If Enforced(False) is called, a terminated process won’t be started at all.

1 procedure UnstartedTask(const task: IOmniTask);
2 begin
3   // this line will never be executed
4 end;
5 
6 task := CreateTask(UnstartedTask);
7 task.Enforced(false);
8 task.Terminate;
9 task.Run;

This method also regulates behaviour of tasks that were scheduled to the thread pool.

4.10.13 Unobserved

Calling Unobserved adds an implicit owner to the task so that you don’t have to store returned IOmniTaskController interface in a variable or a field.

1 function Unobserved: IOmniTaskControl;

See the Task controller needs an owner section for an example.

4.10.14 Cancellation token / CancelWith

OmniThreadLibrary implements the concept of a cancellation token, which is a synchronisation mechanism that allows multiple tasks to be cancelled with a single command.

Cancellation must be cooperative, i.e. the task must watch its CancellationToken property and exit when it becomes signalled.

1 function CancelWith(const token: IOmniCancellationToken): IOmniTaskControl;
2 
3 property CancellationToken: IOmniCancellationToken read GetCancellationToken;

4.10.15 Lock / WithLock

Sometimes you have to establish a common synchronisation primitive between the main program and one (or more) background tasks. In such case, the WithLock method can be called to assign a synchronisation primitive (such as a critical section) to the task’s Lock property.

1 function WithLock(const lock: TSynchroObject; 
2   autoDestroyLock: boolean = true): IOmniTaskControl; overload;
3 function WithLock(const lock: IOmniCriticalSection): IOmniTaskControl; overload;
4 
5 property Lock: TSynchroObject read GetLock;

If you pass in a TSynchroObject descendant, the default behaviour is to automatically destroy this object when the task is destroyed. In case you are passing the same synchronisation object to two or more tasks, this is not a good idea and second WithLock parameter should be set to False. Alternatively, you can use an IOmniCriticalSection which will destroy itself automatically when it is no longer used.

See also demo 12_Lock.

4.10.16 WithCounter

Similar to the Lock mechanism, WithCounter connects a task with a thread-safe counter, IOmniCounter.

1 function WithCounter(const counter: IOmniCounter): IOmniTaskControl;

See also demo 14_TerminateWhen.

4.10.17 SetPriority

By calling SetPriority you can specify the priority of the thread that will execute the task. By default the priority is set to tpNormal which corresponds to Windows priority level THREAD_PRIORITY_NORMAL.

1 type
2   TOTLThreadPriority = (tpIdle, tpLowest, tpBelowNormal, tpNormal, 
3     tpAboveNormal, tpHighest);
4 
5 function SetPriority(threadPriority: TOTLThreadPriority): IOmniTaskControl;

4.10.18 SetQueueSize

The SetQueueSize sets the length for message queues. By default, message queue length is set to 1000 messages.

1 function  SetQueueSize(numMessages: integer): IOmniTaskControl;

This method must be called before the communication channel is used for the first time. The safest way is to call it before the task is run or scheduled.

4.11 TOmniWorker tasks

In the heart of every sufficiently complicated thread code there lies a complicated loop similar to the one below.

 1 type
 2   handles: array [0..3] of THandle;
 3   waitRes: DWORD;
 4 
 5 begin
 6   handles[0] := TerminateEvent;
 7   handles[1] := FWakeUp;
 8   handles[2] := FResetFilters;
 9   handles[3] := FResetFullStream;
10   
11   repeat
12     waitRes := WaitForMultipleObjects(numHandles, 4, INFINITE, false);
13     case waitRes of:
14       WAIT_OBJECT_0: 
15         Exit;
16       WAIT_OBJECT_0+1:  
17         ProcessIncomingData;
18       WAIT_OBJECT_0+2: 
19         ResetFilters;
20       WAIT_OBJECT_0+3: 
21         ResetStream;
22       else: Exit; //WAIT_TIMEOUT, WAIT_ABANDONED
23     end;
24   until false;
25 end;

You can write OmniThreadLibrary-based code in the same manner (see simple tasks) but the library also offers a much better way. Write the task as a descendant of the TOmniWorker class and OmniThreadLibrary will implement the main loop for you.

Every TOmniWorker-type task executes an internal loop which can be represented by the following pseudo-code.

 1 if not Initialize then
 2   Exit;
 3 repeat
 4   waitRes := WaitForEvent;
 5   if waitRes = TerminateEvent then
 6     break
 7   else
 8     DispatchEvent(waitRes);
 9 until false;
10 Cleanup; 

Firstly, the Initialize function is called. In the TOmniWorker class it does nothing, just returns True. You can override it with your own function which does task-specific initialization. It can optionally return False which signifies that the task should not be executed at all.

For compatibility with future OmniThreadLibrary versions which may return False from the base Initialize under some conditions, it is recommended to write overridden Initialize in the following form:

1 function Initialize: boolean;
2 begin
3   Result := inherited Initialize;
4   if not Result then
5     Exit;
6   // do the rest of initialization here
7 end;

Next, the wait and dispatch loop is run. It will watch for following events:

When an event occurs, an appropriate action is executed. For example, messages are dispatched (using the Delphi’s Dispatch mechanism) to the task so you can use message methods for message processing (see Receiving messages below).

When a TerminateEvent is signalled (which happens when controller’s Terminate method is called) or when one of events passed in the TerminateWhen method is signalled, the task’s main loop exits. The Cleanup method is then called. In the base class, Cleanup is implemented as an empty method; override it to implement the task-specific clean-up.

4.11.1 WaitForInit

If the owner of the task wants to wait until the Initialize method has completed its work, it can call the WaitForInit method.

1 function WaitForInit: boolean;

4.11.2 Task

The TOmniWorker class implements a public property Task returning the task’s management interface IOmniTask. You can use it to access any task-specific property, such as the communication interface (Comm).

1 property Task: IOmniTask read GetTask write SetTask;

This property is available in the Initialize and Cleanup methods but not in the constructor and destructor.

4.11.3 Receiving messages

Messages sent from the controller or from other tasks are passed to the Delphi’s Dispatch mechanism. For each message we want to process we must therefore write a message method.

To demonstrate this approach, here is a short example extracted from the 5_TwoWayHello_without_loop demo.

The task is implemented as an instance of the TAsyncHello class which is derived from the TOmniWorker. The task will do some initialization in the Initialize method. It will then handle two messages - MSG_CHANGE_MESSAGE and MSG_SEND_MESSAGE.

When the MSG_CHANGE_MESSAGE is received, the worker will change the internal message text which was initially assigned in Initialize. When the MSG_SEND_MESSAGE is received, the worker will send this message back to the owner.

 1 const
 2   MSG_CHANGE_MESSAGE = 1;
 3   MSG_SEND_MESSAGE   = 2;
 4 
 5 type
 6   TAsyncHello = class(TOmniWorker)
 7   strict private
 8     aiMessage: string;
 9   public
10     function  Initialize: boolean; override;
11     procedure OMChangeMessage(var msg: TOmniMessage); 
12       message MSG_CHANGE_MESSAGE;
13     procedure OMSendMessage(var msg: TOmniMessage); 
14       message MSG_SEND_MESSAGE;
15   end;

We can write those two message handlers identical to the way we write Windows message handlers. Message IDs can take any value from 0 to $FFFE. Message ID $FFFF is reserved for internal purposes.

 1 function TAsyncHello.Initialize: boolean;
 2 begin
 3   aiMessage := Task.ParamByName['Message'];
 4   Result := true;
 5 end;
 6 
 7 procedure TAsyncHello.OMChangeMessage(var msg: TOmniMessage);
 8 begin
 9   aiMessage := msg.MsgData;
10 end;
11 
12 procedure TAsyncHello.OMSendMessage(var msg: TOmniMessage);
13 begin
14   Task.Comm.Send(0, aiMessage);
15 end;

4.11.4 RegisterComm

You can send and receive messages directly from one task to another. To do that, you should firstly create a two-way communication channel and pass each endpoint of this channel to one of the tasks. A task should then call RegisterComm function to start listening on an endpoint. To stop listening, call the UnregisterComm function.

Received messages are dispatched in the same way as messages received from the task controller.

1 procedure RegisterComm(const comm: IOmniCommunicationEndpoint);
2 procedure UnregisterComm(const comm: IOmniCommunicationEndpoint);

The 8_RegisterComm demo demonstrates this.

A two-way channel (FCommChannel) is created with space for 1024 messages. Each of its endpoints is passed to one instance of the TCommTester class and each instance is started as its own task.

 1 procedure TfrmTestRegisterComm.FormCreate(Sender: TObject);
 2 begin
 3   FCommChannel := CreateTwoWayChannel(1024);
 4   FClient1 := CreateTask(TCommTester.Create(FCommChannel.Endpoint1))
 5     .MonitorWith(OmniTED)
 6     .Run;
 7   FClient2 := CreateTask(TCommTester.Create(FCommChannel.Endpoint2))
 8     .MonitorWith(OmniTED)
 9     .Run;
10 end;

The TCommTester constructor just stores this endpoint in an internal field. It is then used in the overridden Initialize as a parameter to the RegisterComm call. [We cannot call RegisterComm in the constructor as the Task property is not available at that time yet.]

 1 type
 2   TCommTester = class(TOmniWorker)
 3   strict private
 4     ctComm: IOmniCommunicationEndpoint;
 5   public
 6     constructor Create(commEndpoint: IOmniCommunicationEndpoint);
 7     function  Initialize: boolean; override;
 8     procedure OMForward(var msg: TOmniMessage); message MSG_FORWARD;
 9     procedure OMForwarding(var msg: TOmniMessage); message MSG_FORWARDING;
10   end;
11   
12 constructor TCommTester.Create(commEndpoint: IOmniCommunicationEndpoint);
13 begin
14   inherited Create;
15   ctComm := commEndpoint;
16 end;
17 
18 function TCommTester.Initialize: boolean;
19 begin
20   Result := inherited Initialize;
21   if Result then 
22     Task.RegisterComm(ctComm);
23 end;

To send a message to another task, use the Send method of the IOmniCommunicationEndpoint.

 1 procedure TCommTester.OMForward(var msg: TOmniMessage);
 2 begin
 3   Task.Comm.Send(MSG_NOTIFY_FORWARD, msg.MsgData);
 4   ctComm.Send(MSG_FORWARDING, msg.MsgData);
 5 end;
 6 
 7 procedure TCommTester.OMForwarding(var msg: TOmniMessage);
 8 begin
 9   Task.Comm.Send(MSG_NOTIFY_RECEPTION, msg.MsgData);
10 end;

4.11.5 Invoke

Instead of sending messages that are processed in the task you can also instruct the task to execute a specific code. OmniThreadLibrary provides three variations of the same mechanism – Invoke. They allow you to call a method by providing its name, call a method by providing its address or to execute an anonymous method.

 1 function Invoke(const msgName: string): IOmniTaskControl; overload;
 2 function Invoke(const msgName: string; msgData: array of const): 
 3   IOmniTaskControl; overload;
 4 function Invoke(const msgName: string; msgData: TOmniValue): 
 5   IOmniTaskControl; overload;
 6 
 7 function Invoke(const msgMethod: pointer): IOmniTaskControl; overload;
 8 function Invoke(const msgMethod: pointer; msgData: array of const): 
 9   IOmniTaskControl; overload;
10 function Invoke(const msgMethod: pointer; msgData: TOmniValue): IOmniTaskControl; overload;
11 
12 function Invoke(remoteFunc: TOmniTaskControlInvokeFunction): IOmniTaskControl; overload;
13 function Invoke(remoteFunc: TOmniTaskControlInvokeFunctionEx): IOmniTaskControl; overload;

Invoke uses the communication system to execute the code inside the task. It sends a special internal message (with the reserved ID $FFFF) to the task. Internal TOmniWorker code catches this message and instead of dispatching it to your task object the code referenced from the message is executed. The code is therefore not executed when you call the Invoke but some undeterminable time later.

You can pass data to an invoked method. In Invoke you can add a second parameter which can be a TOmniValue or an array of elements which is converted to a TOmniValue. To receive this data, the method on the task side must either accept a const TOmniValue parameter or a var TObject parameter. In the latter case you must, of course, provide an object (of any class) as the second Invoke parameter.

Let’s see a simple example. We have a simple worker with three public methods.

 1 type
 2   TMyObj = class
 3     Val: integer;
 4     constructor Create(value: integer);
 5   end;
 6 
 7   TMyTask = class(TOmniWorker)
 8   public
 9     procedure Test1;
10     procedure Test2(const value: TOmniValue);
11     procedure Test3(var obj: TObject);
12   end;
13 
14 { TMyObj }
15 
16 constructor TMyObj.Create(value: integer);
17 begin
18   Val := value;
19 end;
20 
21 { TMyTask }
22 
23 procedure TMyTask.Test1;
24 begin
25 end;
26 
27 procedure TMyTask.Test2(const value: TOmniValue);
28 begin
29 end;
30 
31 procedure TMyTask.Test3(var obj: TObject);
32 begin
33   obj.Free;
34 end;

You can now call these three methods in the following manner.

1 FTask := CreateTask(TMyTask.Create()).Run;
2 FTask.Invoke('Test1');
3 FTask.Invoke('Test2', [1, 2, 3]);
4 FTask.Invoke('Test3', TMyObj.Create(42));
5 
6 //This would cause a runtime error because method 'Test4' doesn't exist.
7 //FTask.Invoke('Test4');

Alternatively, you can provide an address of the method instead of its name which gives you some compile-time checking – if the code tries to call an inexistent method it will not compile.

1 FTask := CreateTask(TMyTask.Create()).Run;
2 FTask.Invoke(@TMyTask.Test1);
3 FTask.Invoke(@TMyTask.Test2, [1, 2, 3]);
4 FTask.Invoke(@TMyTask.Test3, TMyObj.Create(42));
5 
6 //This would not compile because method 'Test4' doesn't exist.
7 //FTask.Invoke(@TMyTask.Test4);

The last two versions of Invoke allow you to execute an anonymous method in the background task. One will run a parameter-less method while another executes a method accepting an IOmniTask parameter which you can use to control the task.

 1 FTask := CreateTask(TMyTask.Create()).Run;
 2 FTask.Invoke(
 3   procedure
 4   begin
 5     // some code
 6   end);
 7 FTask.Invoke(
 8   procedure (const task: IOmniTask)
 9   begin
10     // some code
11   end);

See also demo 43_InvokeAnonymous.

4.11.6 Windows message & APC processing

The internal main loop in the TOmniWorker can optionally process and dispatch Windows messages. This is especially important if your task creates components that use Windows messages for normal operation.

To force Windows message processing, you must call the MsgWait method of the task controller. You can optionally provide a wake mask.

1 function MsgWait(wakeMask: DWORD = QS_ALLEVENTS): IOmniTaskControl;

If your background code uses asynchronous procedure calls (for example if it is reading from a file asynchronously), you should call the Alertable method which will enable the MWMO_ALERTABLE flag.

1 function Alertable: IOmniTaskControl;

More information about the wake mask and about the alertable flag can be found in the MSDN.

4.11.7 Timers

The TOmniWorker model greatly simplifies executing repeated tasks inside the task worker. You can set up multiple timers and associate them with the code in few different ways.

1 function ClearTimer(timerID: integer): IOmniTaskControl;
2 function SetTimer(timerID: integer; interval_ms: cardinal; 
3   const timerMessage: TOmniMessageID): IOmniTaskControl;
4 procedure SetTimer(timerID: integer; interval_ms: cardinal; 
5   const timerMessage: TProc); overload;
6 procedure SetTimer(timerID: integer; interval_ms: cardinal; 
7   const timerMessage: TProc<integer>); overload;

To set up a timer, call the SetTimer function. It accepts three parameters – timer ID, interval (in milliseconds) and a timer message. The latter is a parameter specifying the code to be executed every interval_ms milliseconds and can be specified in three different ways which are discussed below.

Two other SetTimer overloads, introduced in version [3.07.3] allow you to execute an anonymous function as a timer event. The latter will pass timer ID to the anonymous function’s integer parameter.

To clear a timer (so that the associated code is not called anymore) call the ClearTimer function.

Two additional SetTimer overloads (not shown in the book) are provided only for backward compatibility with very old code and should not be used in new programs.

The code below shows four ways in which a timer method can be specified.

 1 const
 2   MSG_TIMER3 = 1;
 3 
 4 type
 5   TMyTask = class(TOmniWorker)
 6   public
 7     procedure Timer1;
 8     procedure Timer2(const value: TOmniValue);
 9     procedure Timer3(var msg: TOmniMessage); message MSG_TIMER3;
10   end;
11 
12 { TMyTask }
13 
14 procedure TMyTask.Timer1;
15 begin
16   // called every 150 ms
17 end;
18 
19 procedure TMyTask.Timer2(const value: TOmniValue);
20 begin
21   // called every 200 ms
22   // value contains the timer ID
23 end;
24 
25 procedure TMyTask.Timer3(var msg: TOmniMessage);
26 begin
27   // called every 250 ms
28   // msg.MsgData contains the timer ID
29 end;
30 
31 FTask := CreateTask(TMyTask.Create()).Run;
32 FTask.SetTimer(1, 150, 'Timer1');
33 FTask.SetTimer(2, 200, @TMyTask.Timer2);
34 FTask.SetTimer(3, 250, MSG_TIMER3);
35 FTask.SetTimer(4, 333, 
36   procedure (timerID: integer)
37   begin
38     // called every 333 seconds
39     // timerID contains the timer ID
40   end);

Timers can also be set and cleared from inside the task.

4.11.8 TerminateWhen

By using the TerminateWhen function you can set up a termination trigger – a signal that will stop the background task execution. This can be either a Windows synchronization primitive (such as an event) or a cancellation token.

1 function TerminateWhen(event: THandle): IOmniTaskControl; overload;
2 function TerminateWhen(token: IOmniCancellationToken): IOmniTaskControl; overload;

The use of the TerminateWhen function is demonstrated in the 14_TerminateWhen demo.

4.11.9 UserData

Sometimes you would want to associate data with the task controller. The UserData property can be used for this purpose. This data is never accessed from the OmniThreadLibrary code; it is only managed by the library for your convenience. UserData is available via the IOmniTaskController interface and cannot be used in the background task.

1 function SetUserData(const idxData: TOmniValue; 
2   const value: TOmniValue): IOmniTaskControl;
3 property UserData[const idxData: TOmniValue]: TOmniValue 
4   read GetUserDataVal write SetUserDataVal;

Data can be accessed through an integer index or through a name. Both lines below are valid.

1 taskController.UserData[0] := myTaskList.Add(taskController);
2 taskController.UserData['token'] := 'something';

4.12 Task groups

Task group is a mechanism that allows you to treat a number of tasks as one. You can start all tasks, send a message to all tasks in a group, terminate all tasks at once and wait for all to terminate.

See also demo 15_TaskGroup.

 1 type
 2   IOmniTaskGroup = interface 
 3     ['{B36C08B4-0F71-422C-8613-63C4D04676B7}']
 4     function  GetTasks: IOmniTaskControlList;
 5   //
 6     function  Add(const taskControl: IOmniTaskControl): IOmniTaskGroup;
 7     function  GetEnumerator: IOmniTaskControlListEnumerator;
 8     function  RegisterAllCommWith(const task: IOmniTask): IOmniTaskGroup;
 9     function  Remove(const taskControl: IOmniTaskControl): IOmniTaskGroup;
10     function  RunAll: IOmniTaskGroup;
11     procedure SendToAll(const msg: TOmniMessage);
12     function  TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
13     function  UnregisterAllCommFrom(const task: IOmniTask): IOmniTaskGroup;
14     function  WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
15     property  Tasks: IOmniTaskControlList read GetTasks;
16   end;
17   
18   TOmniTaskGroup = class(TInterfacedObject, IOmniTaskGroup)
19   public
20     constructor Create;
21     destructor  Destroy; override;
22     function  Add(const taskControl: IOmniTaskControl): IOmniTaskGroup;
23     function  GetEnumerator: IOmniTaskControlListEnumerator;
24     function  RegisterAllCommWith(const task: IOmniTask): IOmniTaskGroup;
25     function  Remove(const taskControl: IOmniTaskControl): IOmniTaskGroup;
26     function  RunAll: IOmniTaskGroup;
27     procedure SendToAll(const msg: TOmniMessage);
28     function  TerminateAll(maxWait_ms: cardinal = INFINITE): boolean;
29     function  UnregisterAllCommFrom(const task: IOmniTask): IOmniTaskGroup;
30     function  WaitForAll(maxWait_ms: cardinal = INFINITE): boolean;
31     property Tasks: IOmniTaskControlList read GetTasks;
32   end;  
33 
34 function CreateTaskGroup: IOmniTaskGroup;  

4.13 IOmniTask interface

The background worker can use the IOmniTask interface (defined in the OtlTask unit) to communicate with the task controller and to control the task’s execution. The worker can access the interface in two different ways. For the simple tasks the interface is passed as a parameter to the task worker method. TOmniWorker tasks can access it through the Task property.

The IOmniTask interface exposes following methods.

 1 type
 2   TOmniTaskInvokeFunction = reference to procedure;
 3 
 4   IOmniTask = interface
 5     procedure ClearTimer(timerID: integer = 0);
 6     procedure Enforced(forceExecution: boolean = true);
 7     procedure Invoke(remoteFunc: TOmniTaskInvokeFunction); 
 8     procedure InvokeOnSelf(remoteFunc: TOmniTaskInvokeFunction);
 9     procedure RegisterComm(const comm: IOmniCommunicationEndpoint);
10     procedure RegisterWaitObject(waitObject: THandle; 
11       responseHandler: TOmniWaitObjectMethod);
12     procedure SetException(exceptionObject: pointer);
13     procedure SetExitStatus(exitCode: integer; const exitMessage: string);
14     procedure SetProcessorGroup(procGroupNumber: integer);
15     procedure SetNUMANode(numaNodeNumber: integer);
16     procedure SetTimer(timerID: integer; interval_ms: cardinal; 
17       const timerMessage: TOmniMessageID); 
18     procedure SetTimer(timerID: integer; interval_ms: cardinal; 
19       const timerMessage: TProc); overload;
20     procedure SetTimer(timerID: integer; interval_ms: cardinal; 
21       const timerMessage: TProc<integer>); overload;
22     procedure StopTimer;
23     procedure Terminate;
24     function  Terminated: boolean;
25     function  Stopped: boolean;
26     procedure UnregisterComm(const comm: IOmniCommunicationEndpoint);
27     procedure UnregisterWaitObject(waitObject: THandle);
28     property CancellationToken: IOmniCancellationToken read GetCancellationToken;
29     property Comm: IOmniCommunicationEndpoint read GetComm;
30     property Counter: IOmniCounter read GetCounter;
31     property Implementor: TObject read GetImplementor;
32     property Lock: TSynchroObject read GetLock;
33     property Name: string read GetName;
34     property Param: TOmniValueContainer read GetParam;
35     property TerminateEvent: THandle read GetTerminateEvent; 
36     property ThreadData: IInterface read GetThreadData;
37     property UniqueID: int64 read GetUniqueID;
38   end; { IOmniTask }

4.13.1 Name and ID

The worker can access its name and unique ID through the Name and UniqueID properties.

1 property Name: string read GetName;
2 property UniqueID: int64 read GetUniqueID;

4.13.2 Parameters

To access the parameters provided by the task owner, the worker can use the Param property.

1 property Param: TOmniValueContainer read GetParam;

More information is provided in the Simple Tasks/Parameters section.

4.13.3 Termination

To terminate itself, a simple task should just exit from the worker method. A TOmniWorker task, on the other hand, should call the Terminate method of the IOmniTask interface.

1 procedure Terminate;
2 function  Terminated: boolean;
3 function  Stopped: boolean;
4 property TerminateEvent: THandle read GetTerminateEvent;

Other termination-related methods and properties are.

Returns True when a worker was requested to stop, either by itself or by the task controller.

Returns True when a worker has fully stopped. There’s no use in calling this method from the worker itself as it will always return False while the worker is still active.

Returns the event that is signalled when a task controller requires the worker to stop. The worker can wait on this event but should never signal it. Terminate should be called instead to terminate the worker.

4.13.4 Exit status

A task can send a result to the controller by calling the SetExitStatus procedure.

1 procedure SetExitStatus(exitCode: integer; const exitMessage: string);

The program can access this program through the task controller interface.

4.13.5 Exceptions

While exceptions in a background worker code are automatically caught, a task can also set an associated exception manually, by calling SetException.

1 procedure SetException(exceptionObject: pointer);

This exception is available to the main program through the FatalException property.

4.13.6 Communication

To send a message to the task controller (and through it to the main program), a background task can use the Comm property.

1 property Comm: IOmniCommunicationEndpoint read GetComm;

Any data can be packed in a TOmniValue record and passed to the Comm.Send method. The mechanism is the same as sending the data from the main program to the background worker.

Messages can be received in different ways, depending on the worker type. See the appropriate sections of this manual for simple tasks and for TOmniWorker tasks.

A simple trick can be used to send a message from a task back to itself – instead of sending it on the Comm channel, you send it on the Comm.OtherEndpoint channel.

1 Task.Comm.OtherEndpoint.Send(MSG_FROM_TASK, 'Sent to Self');

A background task can use the communication channel to request asynchronous execution of code in the context of its owner (the thread which created the task).

1 type
2   TOmniTaskInvokeFunction = reference to procedure;
3 
4 procedure Invoke(remoteFunc: TOmniTaskInvokeFunction);

This mechanism works by posting a special message containing the function to be executed from the task back to the task controller. OmniThreadLibrary catches that special message and instead of dispatching it to the normal message processing code, it will execute the attached function.

Since version [3.07.3], IOmniTask interface implements method InvokeOnSelf, which works the same as Invoke, except that it posts a message back to the task itself. In other words, InvokeOnSelf puts an anonymous method into the task’s message queue while Invoke puts it into the task controller’s message queue. InvokeOnSelf will only work correctly when the task is implemented as a TOmniWorker task.

1 type
2   TOmniTaskInvokeFunction = reference to procedure;
3 
4 procedure InvokeOnSelf(remoteFunc: TOmniTaskInvokeFunction);

A TOmniWorker task can register additional communication channels to enable automatic processing of messages sent across those channels. This mechanism is described in the TOmniWorker tasks section.

1 procedure RegisterComm(const comm: IOmniCommunicationEndpoint);
2 procedure UnregisterComm(const comm: IOmniCommunicationEndpoint);

4.13.7 Timers

A TOmniWorker task can use timers to automate repeating tasks. This functionality is explained in the TOmniWorker tasks section.

1 procedure ClearTimer(timerID: integer = 0);
2 procedure SetTimer(timerID: integer; interval_ms: cardinal; 
3   const timerMessage: TOmniMessageID); overload;
4 procedure SetTimer(timerID: integer; interval_ms: cardinal; 
5   const timerMessage: TProc); overload;
6 procedure SetTimer(timerID: integer; interval_ms: cardinal; 
7   const timerMessage: TProc<integer>); overload;

4.13.8 RegisterWaitObject

A TOmniWorker task can register external synchronization object (typically an event) to be included in the main loop. An associated method will be called when a synchronization object becomes signalled.

1 type
2   TOmniWaitObjectMethod = procedure of object;
3   
4 procedure RegisterWaitObject(waitObject: THandle; 
5   responseHandler: TOmniWaitObjectMethod); 
6 procedure UnregisterWaitObject(waitObject: THandle);

Use RegisterWaitObject to register external synchronization object with the main loop and UnregisterWaitObject to remove the synchronization object from the main loop.

4.13.9 CancellationToken

If a cancellation token is associated with the background task, it can be accessed through the CancellationToken property.

1 property CancellationToken: IOmniCancellationToken read GetCancellationToken;

4.13.10 Lock

If a lock is associated with the background task, it can be accessed through the Lock property.

1 property Lock: TSynchroObject read GetLock;

4.13.11 Counter

If a thread-safe counter is associated with the background task, it can be accessed through the Counter property.

1 property Counter: IOmniCounter read GetCounter;

4.13.12 Processor groups and NUMA nodes

On a system with multiple processor groups you can use SetProcessorGroup [3.06] function to specify a processor group this task should run on.

On a system with multiple NUMA nodes you can use SetNUMANode [3.06] function to specify a NUMA node this task should run on.

An information about existing processor groups and NUMA nodes can be accessed through the Environment object.

This information should be in most cases managed via IOmniTaskControl.ProcessorGroup, IOmniTaskControl.NUMANode, IOmniThreadPool.ProcessorGroups, and IOmniThreadPool.NUMANodes.

4.13.13 Internal and obsolete functions

The Enforced method works the same as the IOmniTaskControl equivalent. As it must be called before the task starts execution, it cannot be used from the background task itself. OmniThreadLibrary uses this function in the thread pool implementation.

1 procedure Enforced(forceExecution: boolean = true);

The Implementor object returns an object implementing the IOmniTask interface.

1 property Implementor: TObject read GetImplementor;

The ThreadData property is used to access the internal interface used in the thread pool’s SetThreadDataFactory mechanism.

1 property ThreadData: IInterface read GetThreadData;

The Building a connection pool example contains more information on this topic.

The StopTimer method is obsolete and should not be used anymore. Use the ClearTimer instead.

1 procedure StopTimer;