The low-level OmniThreadLibrary layer focuses on the task
concept. In most aspects this is similar to the Delphi’s TThread
approach except that OmniThreadLibrary focuses on the code (a.k.a. task) and interaction with the code, while the Delphi focuses on the operating system primitive required for executing additional threads.
A task is created using the CreateTask
function, which takes as a parameter a global procedure, a method, an instance of the TOmniWorker
class (or, usually, a descendant of that class) or an anonymous procedure (in Delphi 2009 and newer). CreateTask
will also accept an optional second parameter, a task name, which will be displayed in the Delphi’s Thread view on the thread running the task.
1
type
2
TOmniTaskProcedure
=
procedure
(
const
task
:
IOmniTask
)
;
3
TOmniTaskMethod
=
procedure
(
const
task
:
IOmniTask
)
of
object
;
4
TOmniTaskDelegate
=
reference
to
procedure
(
const
task
:
IOmniTask
)
;
5
6
function
CreateTask
(
worker
:
TOmniTaskProcedure
;
const
taskName
:
string
=
''
)
:
7
IOmniTaskControl
;
overload
;
8
function
CreateTask
(
worker
:
TOmniTaskMethod
;
const
taskName
:
string
=
''
)
:
9
IOmniTaskControl
;
overload
;
10
function
CreateTask
(
worker
:
TOmniTaskDelegate
;
const
taskName
:
string
=
''
)
:
11
IOmniTaskControl
;
overload
;
12
function
CreateTask
(
const
worker
:
IOmniWorker
;
const
taskName
:
string
=
''
)
:
13
IOmniTaskControl
;
overload
;
CreateTask
returns a feature-full interface IOmniTaskControl
which we will explore in this chapter. The most important function in this interface, Run
, creates a new thread and starts your task in it.
The following code represents the simplest low-level OmniThreadLibrary example. It executes the Beep
function in a background thread. The Beep
function merely beeps and exits. By exiting from the task function, the Windows thread running the task is also terminated.
1
procedure
TfrmTestSimple
.
Beep
(
const
task
:
IOmniTask
)
;
2
begin
3
//Executed in a background thread
4
MessageBeep
(
MB_ICONEXCLAMATION
)
;
5
end
;
6
7
CreateTask
(
Beep
,
'Beep'
)
.
Run
;
Another way to start a task is to call a Schedule
function which starts it in a thread allocated from a thread pool. This is covered in the Thread pooling chapter.
Let’s examine all four ways of creating a task. The simplest way (demoed in application 2_TwoWayHello
) is to pass a name of a global procedure to the CreateTask
. This global procedure must accept one parameter of type IOmniTask
.
1
procedure
RunHelloWorld
(
const
task
:
IOmniTask
)
;
2
begin
3
//
4
end
;
5
6
CreateTask
(
RunHelloWorld
,
'HelloWorld'
)
.
Run
;
A variation on the theme is passing a name of a method to the CreateTask
. This approach is used in the demo application 1_HelloWorld
. The interesting point here is that you can declare this method in the same class from which the CreateTask
is called. That way you can access all class fields and methods from the threaded code. Just keep in mind you’ll be doing this from another thread so make sure you protect shared access with locking!
1
procedure
TfrmTestHelloWorld
.
RunHelloWorld
(
const
task
:
IOmniTask
)
;
2
begin
3
//
4
end
;
5
6
procedure
TfrmTestHelloWorld
.
StartTask
;
7
begin
8
CreateTask
(
RunHelloWorld
,
'HelloWorld'
)
.
Run
;
9
end
;
In Delphi 2009 and newer you can also write the task code as an anonymous function.
1
CreateTask
(
2
procedure
(
const
task
:
IOmniTask
)
3
begin
4
//
5
end
,
6
'HellowWorld'
)
.
Run
;
For all except the simplest tasks, you’ll use the fourth approach as it will give you access to the true OmniThreadLibrary power (namely internal wait loop and message dispatching). To use it, you have to create a worker object deriving from the TOmniWorker
class.
1
type
2
THelloWorker
=
class
(
TOmniWorker
)
3
end
;
4
5
procedure
TfrmTestTwoWayHello
.
actStartHelloExecute
(
Sender
:
TObject
)
;
6
begin
7
FHelloTask
:=
8
CreateTask
(
THelloWorker
.
Create
()
,
'Hello'
)
.
9
Run
;
10
end
;
When you create a low-level task, OmniThreadLibrary returns a task controller interface IOmniTaskControl
. This interface, which is defined in the OtlTaskControl unit, can be used to control the task from the owner’s side. The task code, on the other hand, has access to another interface, IOmniTask
(defined in the OtlTask unit), which can be used to communicate with the owner and manipulate the task itself. A picture in the Tasks vs. threads chapter shows the relationship between these interfaces.
This chapter deals mainly with these two interfaces. For the reference reasons, the IOmniTaskControl
is reprinted here in full. In the rest of the chapter I’ll just show relevant interface parts.
The IOmniTask
interface is described at the end of this chapter.
1
type
2
IOmniTaskControl
=
interface
3
function
Alertable
:
IOmniTaskControl
;
4
function
CancelWith
(
const
token
:
IOmniCancellationToken
)
:
IOmniTaskControl
;
5
function
ChainTo
(
const
task
:
IOmniTaskControl
;
6
ignoreErrors
:
boolean
=
false
)
:
IOmniTaskControl
;
7
function
ClearTimer
(
timerID
:
integer
)
:
IOmniTaskControl
;
8
function
DetachException
:
Exception
;
9
function
Enforced
(
forceExecution
:
boolean
=
true
)
:
IOmniTaskControl
;
10
function
GetFatalException
:
Exception
;
11
function
GetParam
:
TOmniValueContainer
;
12
function
Invoke
(
const
msgMethod
:
pointer
)
:
IOmniTaskControl
;
overload
;
13
function
Invoke
(
const
msgMethod
:
pointer
;
14
msgData
:
array
of
const
)
:
IOmniTaskControl
;
overload
;
15
function
Invoke
(
const
msgMethod
:
pointer
;
16
msgData
:
TOmniValue
)
:
IOmniTaskControl
;
overload
;
17
function
Invoke
(
const
msgName
:
string
)
:
IOmniTaskControl
;
overload
;
18
function
Invoke
(
const
msgName
:
string
;
19
msgData
:
array
of
const
)
:
IOmniTaskControl
;
overload
;
20
function
Invoke
(
const
msgName
:
string
;
21
msgData
:
TOmniValue
)
:
IOmniTaskControl
;
overload
;
22
function
Invoke
(
remoteFunc
:
TOmniTaskControlInvokeFunction
)
:
23
IOmniTaskControl
;
overload
;
24
function
Invoke
(
remoteFunc
:
TOmniTaskControlInvokeFunctionEx
)
:
25
IOmniTaskControl
;
overload
;
26
function
Join
(
const
group
:
IOmniTaskGroup
)
:
IOmniTaskControl
;
27
function
Leave
(
const
group
:
IOmniTaskGroup
)
:
IOmniTaskControl
;
28
function
MonitorWith
(
const
monitor
:
IOmniTaskControlMonitor
)
:
29
IOmniTaskControl
;
30
function
MsgWait
(
wakeMask
:
DWORD
=
QS_ALLEVENTS
)
:
IOmniTaskControl
;
31
function
NUMANode
(
numaNodeNumber
:
integer
)
:
IOmniTaskControl
;
32
function
OnMessage
(
eventDispatcher
:
TObject
)
:
IOmniTaskControl
;
overload
;
33
function
OnMessage
(
eventHandler
:
TOmniTaskMessageEvent
)
:
IOmniTaskControl
;
overload
;
34
function
OnMessage
(
msgID
:
word
;
eventHandler
:
TOmniTaskMessageEvent
)
:
35
IOmniTaskControl
;
overload
;
36
function
OnMessage
(
msgID
:
word
;
eventHandler
:
TOmniMessageExec
)
:
37
IOmniTaskControl
;
overload
;
38
function
OnMessage
(
eventHandler
:
TOmniOnMessageFunction
)
:
39
IOmniTaskControl
;
overload
;
40
function
OnMessage
(
msgID
:
word
;
eventHandler
:
TOmniOnMessageFunction
)
:
41
IOmniTaskControl
;
overload
;
42
function
OnTerminated
(
eventHandler
:
TOmniOnTerminatedFunction
)
:
43
IOmniTaskControl
;
overload
;
44
function
OnTerminated
(
eventHandler
:
TOmniOnTerminatedFunctionSimple
)
:
45
IOmniTaskControl
;
overload
;
46
function
OnTerminated
(
eventHandler
:
TOmniTaskTerminatedEvent
)
:
47
IOmniTaskControl
;
overload
;
48
function
ProcessorGroup
(
procGroupNumber
:
integer
)
:
IOmniTaskControl
;
49
function
RemoveMonitor
:
IOmniTaskControl
;
50
function
Run
:
IOmniTaskControl
;
51
function
Schedule
(
const
threadPool
:
IOmniThreadPool
=
nil
{default pool}
)
:
52
IOmniTaskControl
;
53
function
SetMonitor
(
hWindow
:
THandle
)
:
IOmniTaskControl
;
54
function
SetParameter
(
const
paramName
:
string
;
55
const
paramValue
:
TOmniValue
)
:
IOmniTaskControl
;
overload
;
56
function
SetParameter
(
const
paramValue
:
TOmniValue
)
:
57
IOmniTaskControl
;
overload
;
58
function
SetParameters
(
const
parameters
:
array
of
TOmniValue
)
:
59
IOmniTaskControl
;
60
function
SetPriority
(
threadPriority
:
TOTLThreadPriority
)
:
IOmniTaskControl
;
61
function
SetQueueSize
(
numMessages
:
integer
)
:
IOmniTaskControl
;
62
function
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
63
const
timerMessage
:
TOmniMessageID
)
:
IOmniTaskControl
;
overload
;
64
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
65
const
timerMessage
:
TProc
)
;
overload
;
66
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
67
const
timerMessage
:
TProc
<
integer
>
)
;
overload
;
68
function
SetUserData
(
const
idxData
:
TOmniValue
;
69
const
value
:
TOmniValue
)
:
IOmniTaskControl
;
70
procedure
Stop
;
71
function
Terminate
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
72
function
TerminateWhen
(
event
:
THandle
)
:
IOmniTaskControl
;
overload
;
73
function
TerminateWhen
(
token
:
IOmniCancellationToken
)
:
74
IOmniTaskControl
;
overload
;
75
function
Unobserved
:
IOmniTaskControl
;
76
function
WaitFor
(
maxWait_ms
:
cardinal
)
:
boolean
;
77
function
WaitForInit
:
boolean
;
78
function
WithCounter
(
const
counter
:
IOmniCounter
)
:
IOmniTaskControl
;
79
function
WithLock
(
const
lock
:
TSynchroObject
;
80
autoDestroyLock
:
boolean
=
true
)
:
IOmniTaskControl
;
overload
;
81
function
WithLock
(
const
lock
:
IOmniCriticalSection
)
:
82
IOmniTaskControl
;
overload
;
83
//
84
property
CancellationToken
:
IOmniCancellationToken
85
read
GetCancellationToken
;
86
property
Comm
:
IOmniCommunicationEndpoint
read
GetComm
;
87
property
ExitCode
:
integer
read
GetExitCode
;
88
property
ExitMessage
:
string
read
GetExitMessage
;
89
property
FatalException
:
Exception
read
GetFatalException
;
90
property
Lock
:
TSynchroObject
read
GetLock
;
91
property
Name
:
string
read
GetName
;
92
property
Param
:
TOmniValueContainer
read
GetParam
;
93
property
UniqueID
:
int64
read
GetUniqueID
;
94
property
UserData
[
const
idxData
:
TOmniValue
]
:
TOmniValue
95
read
GetUserDataVal
write
SetUserDataVal
;
96
end
;
The IOmniTaskController
interface returned from the CreateTask
must always be stored in a variable/field with a scope that exceeds the lifetime of the background task. In other words, don’t store a long-term background task interface in a local variable.
The simplest example of the wrong approach can be written in one line:
1
CreateTask
(
MyWorker
)
.
Run
;
This code looks fine, but it doesn’t work. In this case, the IOmniTaskController
interface is stored in a hidden temporary variable which is destroyed at the end of the current method. This then causes the task controller to be destroyed which in turn causes the background task to be destroyed. Running this code would therefore just create and then destroy the task.
A common solution is to just store the interface in some field.
1
FTaskControl
:=
CreateTask
(
MyWorker
)
.
Run
;
When you don’t need background worker anymore, you should terminate the task and free the task controller.
1
FTaskControl
.
Terminate
;
2
FTaskControl
:=
nil
;
Another solution is to provide the task with an implicit owner. You can, for example, use the event monitor to monitor tasks lifetime or messages sent from the task and that will make the task owned by the monitor. The following code is therefore valid:
1
CreateTask
(
MyWorker
)
.
MonitorWith
(
eventMonitor
)
.
Run
;
Yet another possibility is to call the Unobserved
before the Run
. This method makes the task being observed by an internal monitor.
1
CreateTask
(
MyWorker
)
.
Unobserved
.
Run
;
When you use a thread pool to run a task, the thread pool acts as a task owner so there’s no need for an additional explicit owner.
1
procedure
Beep
(
const
task
:
IOmniTask
)
;
2
begin
3
MessageBeep
(
MB_ICONEXCLAMATION
)
;
4
end
;
5
6
CreateTask
(
Beep
,
'Beep'
)
.
Schedule
;
As it is explained in the Locking vs. messaging section, OmniThreadLibrary automatically creates a communication channel between the task controller and the task and exposes it through the Comm
property. The communication channel is not exclusive to the OmniThreadLibrary; you could use it equally well from a TThread
-based multi-threading code.
1
property
Comm
:
IOmniCommunicationEndpoint
read
GetComm
;
The IOmniCommunicationEndpoint
interface exposes a simple interface for sending and receiving messages.
1
type
2
TOmniMessage
=
record
3
MsgID
:
word
;
4
MsgData
:
TOmniValue
;
5
constructor
Create
(
aMsgID
:
word
;
aMsgData
:
TOmniValue
)
;
overload
;
6
constructor
Create
(
aMsgID
:
word
)
;
overload
;
7
end
;
8
9
IOmniCommunicationEndpoint
=
interface
10
function
Receive
(
var
msg
:
TOmniMessage
)
:
boolean
;
overload
;
11
function
Receive
(
var
msgID
:
word
;
var
msgData
:
TOmniValue
)
:
boolean
;
overload
;
12
function
ReceiveWait
(
var
msg
:
TOmniMessage
;
timeout_ms
:
cardinal
)
:
boolean
;
overload
;
13
function
ReceiveWait
(
var
msgID
:
word
;
var
msgData
:
TOmniValue
;
14
timeout_ms
:
cardinal
)
:
boolean
;
overload
;
15
procedure
Send
(
const
msg
:
TOmniMessage
)
;
overload
;
16
procedure
Send
(
msgID
:
word
)
;
overload
;
17
procedure
Send
(
msgID
:
word
;
msgData
:
array
of
const
)
;
overload
;
18
procedure
Send
(
msgID
:
word
;
msgData
:
TOmniValue
)
;
overload
;
19
function
SendWait
(
msgID
:
word
;
20
timeout_ms
:
cardinal
=
CMaxSendWaitTime_ms
)
:
boolean
;
overload
;
21
function
SendWait
(
msgID
:
word
;
msgData
:
TOmniValue
;
22
timeout_ms
:
cardinal
=
CMaxSendWaitTime_ms
)
:
boolean
;
overload
;
23
property
NewMessageEvent
:
THandle
read
GetNewMessageEvent
;
24
property
OtherEndpoint
:
IOmniCommunicationEndpoint
read
GetOtherEndpoint
;
25
property
Reader
:
TOmniMessageQueue
read
GetReader
;
26
property
Writer
:
TOmniMessageQueue
read
GetWriter
;
27
end
;
Receive
Both variants of Receive
return the first message from the message queue, either as a TOmniMessage
record or as a (message ID, message data) pair. Data is always passed as a TOmniValue
record.
The function returns True
if a message was returned, False
if the message queue is empty.
ReceiveWait
These two variations of the Receive
allow you to specify the maximum timeout (in milliseconds) you are willing to wait for the next message. Timeout of 0 milliseconds makes the function behave just like the Receive
. Special timeout value INFINITE
(defined in the Windows unit) will make the function wait until a message is available.
The function returns True
if a message was returned, False
if the message queue is still empty after the timeout.
Send
Four overloaded versions of Send
all write a message to the message queue and raise an exception if the queue is full. [Message queue size defaults to 1000 elements and can be increased by calling the OmniTaskControl.SetQueueSize
before the communication channel is used for the first time.]
The Send(msgID: word)
version sends an empty message data (TOmniValue.Null
).
The Send(msgID: word; msgData: array of const)
version packs the data array into one TOmniValue
value by calling TOmniValue.Create(msgData)
.
SendWait
These two variations of the Send
method allow you to specify the maximum timeout (in milliseconds) you are willing to wait if a message queue is full and there’s no place for the messages. The timeout of 0 ms makes the function behave just like the Send
. A timeout of INFINITE
milliseconds is also supported.
The function returns True
if the message was successfully sent, False
if the message queue is still full after the timeout.
NewMessageEvent
This property returns Windows event which is signalled every time new data is inserted in the queue. This event is not created until the code accesses the NewMessageEvent
property for the first time.
OtherEndpoint
Returns the other end of the communication channel (task’s end if accessed through the IOmniTaskControl.Comm
and task controller’s end if accessed through the IOmniTask.Comm
interface).
Reader
Returns the input queue associated with this endpoint.
Writer
Returns the output queue associated with this endpoint.
For practical examples on a communication channel usage, see the Communication subsection of simple tasks and TOmniWorker
tasks sections.
On a system with multiple processor groups you can use ProcessorGroup
[3.06] function to specify a processor group this task should run on.
On a system with multiple NUMA nodes you can use NUMANode
[3.06] function to specify a NUMA node this task should run on.
When a task is not started directly (Run
) but executed via thread pool (Schedule
), IOmniThreadPool.ProcessorGroups
and IOmniThreadPool.NUMANodes
should be used instead.
An information about existing processor groups and NUMA nodes can be accessed through the Environment
object.
Demo 64_ProcessorGroups_NUMA
demonstrates the use of ProcessorGroup
and NUMANode
functions.
Starting a new thread in the Windows OS is not a very fast operation. If you are frequently scheduling background tasks, the overhead of creating new threads can significantly impact your program. To solve this, OmniThreadLibrary implements a thread pool, which is a basically a cache for threads.
You don’t run a task in a thread pool but schedule it by calling the Schedule
method. A very short example of a scheduled task would be:
1
procedure
Beep
(
const
task
:
IOmniTask
)
;
2
begin
3
MessageBeep
(
MB_ICONEXCLAMATION
)
;
4
end
;
5
6
CreateTask
(
Beep
,
'Beep'
)
.
Schedule
;
A thread pool is created by calling the CreateThreadPool
function. A thread pool should have a name (you can set it to an empty string) which is used as part of a pool management thread’s name.
You can also use the default GlobalOmniThreadPool
pool which is created on the first use.
1
function
CreateThreadPool
(
const
threadPoolName
:
string
)
:
IOmniThreadPool
;
2
3
function
GlobalOmniThreadPool
:
IOmniThreadPool
;
All thread pool-related code is stored in the OtlThreadPool unit.
See also demo 11_ThreadPool
.
When you schedule a task into a thread pool, it merely enters a queue. The thread pool management thread detects this and tries to start this task in an already existing but idle thread. If there is no such thread, it tries to create a new thread (you can limit the maximum number of concurrent threads so this may not always succeed) and run the task in it.
When a task finishes execution, the thread it was running in is put into an idle state and may be reused for execution of new tasks.
Next section explains the various configuration options implemented by the thread pool.
1
type
2
TOTPThreadDataFactoryFunction
=
function
:
IInterface
;
3
TOTPThreadDataFactoryMethod
=
function
:
IInterface
of
object
;
4
5
IOmniThreadPool
=
interface
6
function
Cancel
(
taskID
:
int64
)
:
boolean
;
overload
;
7
function
Cancel
(
taskID
:
int64
;
signalCancellationToken
:
boolean
)
:
boolean
;
overload
;
8
procedure
CancelAll
;
overload
;
9
procedure
CancelAll
(
signalCancellationToken
:
boolean
)
;
overload
;
10
function
CountExecuting
:
integer
;
11
function
CountQueued
:
integer
;
12
function
IsIdle
:
boolean
;
13
function
MonitorWith
(
const
monitor
:
IOmniThreadPoolMonitor
)
:
IOmniThreadPool
;
14
function
RemoveMonitor
:
IOmniThreadPool
;
15
function
SetMonitor
(
hWindow
:
THandle
)
:
IOmniThreadPool
;
16
procedure
SetThreadDataFactory
(
const
value
:
TOTPThreadDataFactoryMethod
)
;
overload
;
17
procedure
SetThreadDataFactory
(
const
value
:
TOTPThreadDataFactoryFunction
)
;
overload
;
18
property
Asy_OnUnhandledWorkerException
:
TOTPUnhandledWorkerException
read
19
GetAsy_OnUnhandledWorkerException
write
SetAsy_OnUnhandledWorkerException
;
20
property
Affinity
:
IOmniIntegerSet
read
GetAffinity
;
21
property
IdleWorkerThreadTimeout_sec
:
integer
read
GetIdleWorkerThreadTimeout_sec
22
write
SetIdleWorkerThreadTimeout_sec
;
23
property
MaxExecuting
:
integer
read
GetMaxExecuting
write
SetMaxExecuting
;
24
property
MaxQueued
:
integer
read
GetMaxQueued
write
SetMaxQueued
;
25
property
MaxQueuedTime_sec
:
integer
read
GetMaxQueuedTime_sec
write
26
SetMaxQueuedTime_sec
;
27
property
MinWorkers
:
integer
read
GetMinWorkers
write
SetMinWorkers
;
28
property
Name
:
string
read
GetName
write
SetName
;
29
property
NumCores
:
integer
read
GetNumCores
;
30
property
Options
:
TOmniThreadPoolOptions
read
GetOptions
write
SetOptions
;
31
property
UniqueID
:
int64
read
GetUniqueID
;
32
property
WaitOnTerminate_sec
:
integer
read
GetWaitOnTerminate_sec
33
write
SetWaitOnTerminate_sec
;
34
{$IFDEF OTL_NUMASupport}
35
property
ProcessorGroups
:
IOmniIntegerSet
read
GetProcessorGroups
write
36
SetProcessorGroups
;
37
property
NUMANodes
:
IOmniIntegerSet
read
GetNUMANodes
write
SetNUMANodes
;
38
{$ENDIF OTL_NUMASupport}
39
end
;
Methods
Cancel
Cancels a task with specified unique ID by calling task’s Terminate
method. If a task does not stop within WaitOnTerminate_sec
seconds, the thread that is running the task will be killed by calling the TerminateThread
Windows function.
Since [3.07.2], Cancel
signals task’s cancellation token before calling Terminate. Old behaviour (without signalling cancellation token) can be achieved by calling overloaded version accepting the signalCancellationToken
parameter or by setting Option
to [tpoPreventCancellationTokenOnCancel]
.
If a task is not yet executing, it will simply be removed from the input queue.
CancelAll
Cancels all tasks, waiting and running.
CountExecuting
Returns the number of executing tasks.
CountQueued
Returns the number of queued (waiting) tasks.
IsIdle
Returns True
when thread pool has no work to do.
MonitorWith
Attaches external monitor to the thread pool.
RemoveMonitor
Detaches external monitor from the thread pool.
SetMonitor
This is an internal function, called from the external monitor when attaching to the thread pool.
SetThreadDataFactory
Associates a data factory function with the thread pool. A data factory can be used to create thread local data whenever a new thread is created.
[3.07] If ThreadDataFactory.Execute
throws an exception, that exception is caught, ignored and ThreadData
is set to nil.
The Building a connection pool example contains more information on this topic.
Properties
Affinity
[3.06]
Provides a set of all processors that are used for executing tasks in this thread pool. By changing this set you can specify which processors will be used to run tasks scheduled to this thread pool.
IdleWorkerThreadTimeout_sec
Specifies the maximum time a thread can spend in an idle state. After that, the thread will be terminated. By default, this value is set to 10 seconds.
By setting this value to 0, idle threads are never terminated.
OmniThreadLibrary will always keep MinWorkers
threads alive, even if they are idle for more than IdleWorkerThreadTimeout_sec
seconds.
MaxExecuting
Specifies the maximum number of working threads in the thread pool. Initially, this value is set to the number of cores current process is allowed to run on.
By setting this value to 0 you can prevent the thread pool from creating any threads. You can use this to temporarily stop a thread pool without destroying it.
If MaxExecuting
is set to -1, the number of running threads is only limited by the implementation.
MaxQueued
Specifies maximum number of tasks waiting in the input queue. If there are already MaxQueued
tasks in the queue and a new task is scheduled, it will be immediately rejected (see Monitoring thread pool operations for more information).
If MaxQueued
is set to 0 (which is the default value), the size of the input queue is not limited.
MaxQueuedTime_sec
Specifies the maximum time a task will wait in the input queue before it is rejected (see Monitoring thread pool operations for more information).
If set to 0 (which is the default value), the time a task can spend in the input queue is not limited.
MinWorkers
Specifies the minimum number of threads that should be created in any moment. Both idle and working threads are counted. By default this value is set to 0.
Since [3.05] setting this value to a positive number will create specified number of worker threads even if no tasks are waiting for the execution.
Name
Specifies the name of the thread pool.
NumCores
Returns the number of cores this pool uses for running tasks. Changing Affinity
, ProcessorGroups
, or NUMANodes
properties may modify this value.
Options
Contains a set of options that govern how thread pool operates internally. See Cancel
for more information.
UniqueID
Gives readonly access to the unique ID associated with the pool. This value is guaranteed to be greater than 0.
WaitOnTerminate_sec
When a pooled task is terminated after a Cancel
or CancelAll
is called, a thread pool manager will wait up to this number of seconds for the task to stop execution. After that, it will kill the thread with the TerminateThread
call.
By default this value is set to 30 seconds.
Asy_OnUnhandledWorkerException
[3.06]
Before version 3.06, unhandled exceptions in the code handling the task execution were lost. Now, they are passed up to the IOmniThreadPool
. If the property Asy_OnUnhandledWorkerException
is set, such exception will be passed to the event handler and application should react to it. The only safe way at that point is to log the error and terminate the application.
Various reasons for task termination are signalled through its ExitCode
property. Following thread pool-specific exit codes are defined.
EXIT_THREADPOOL_QUEUE_TOO_LONG
Task was rejected because the input queue already contains at least MaxQueued
tasks.
EXIT_THREADPOOL_STALE_TASK
Task was rejected because it was waiting in the input queue for more than MaxQueuedTime_sec
seconds.
EXIT_THREADPOOL_CANCELLED
Task was cancelled with a Cancel
or CancelAll
call.
EXIT_THREADPOOL_INTERNAL_ERROR
This exit code is currently not in use.
An event monitor component can be used to monitor thread pool events. First you have to attach it to the thread pool by calling the MonitorWith
method. After that, following event monitor event handlers can be used:
1
type
2
TOmniMonitorPoolThreadEvent
=
procedure
(
const
pool
:
IOmniThreadPool
;
3
threadID
:
integer
)
of
object
;
4
TOmniMonitorPoolWorkItemEvent
=
procedure
(
const
pool
:
IOmniThreadPool
;
5
taskID
:
int64
)
of
object
;
6
7
TOmniEventMonitor
=
class
(
TComponent
,
8
IOmniTaskControlMonitor
,
9
IOmniThreadPoolMonitor
)
10
public
11
property
OnPoolThreadCreated
:
TOmniMonitorPoolThreadEvent
;
12
property
OnPoolThreadDestroying
:
TOmniMonitorPoolThreadEvent
;
13
property
OnPoolThreadKilled
:
TOmniMonitorPoolThreadEvent
;
14
property
OnPoolWorkItemCompleted
:
TOmniMonitorPoolWorkItemEvent
;
15
end
;
OnPoolThreadCreated
This event is called whenever a new thread is created in the pool.
OnPoolThreadDestroying
This event is called just before a thread is destroyed.
OnPollThreadKilled
This event is called when a thread is killed because the task did not stop in the allowed time.
OnPoolWorkItemCompleted
This event is called when a task is removed from the thread pool. This can occur on task completion, when a task is removed before it has even started (Cancel
, CancelAll
) and when a thread containing a task is killed because the Cancel
was called and the task didn’t stop in the allowed time.
Through the property ProcessorGroups
you can specify all processor groups the tasks can be scheduled to. By default this set is empty which means that tasks will only be scheduled to the default processor group.
By setting the property NUMANodes
you can specify all NUMA nodes the tasks can be scheduled to. By default this set is empty which means that tasks will only be scheduled to the default NUMA node.
An information about existing processor groups and NUMA nodes can be accessed through the Environment
object.
Demo 64_ProcessorGroups_NUMA
demonstrates the use of ProcessorGroup
and NUMANode
functions.
OmniThreadLibrary implements three lock-free data structures suitable for low-level usage – bounded stack, bounded queue and dynamic queue. Bounded queue is used inside the OmniThreadLibrary for messaging while dynamic queue is used as a basis of the blocking collection.
All three data structures are fully thread-safe. They support multiple simultaneous readers and writers. They are implemented in the OtlContainers unit.
Another lock-free data structure, a message queue, is defined in the OtlComm unit and is mostly intended for internal operation (such as sending messages to and from a thread) although it can also be used for other tasks. An example of such usage is shown in the Using message queue with a TThread
worker chapter.
The term lock-free is not well defined (and not even universally accepted). In the context of this book lock-free means that the synchronisation between threads is not achieved with the user- or kernel-level synchronisation primitives such as critical sections, but with bus-locking CPU instructions. With modern CPU architectures this approach is much faster than locking on the operating system level.
See also demos 10_Containers
and 32_Queue
.
The bounded stack structure is a very fast stack with limited length. The core of the implementation is stored in the TOmniBaseBoundedStack
class.
Derived class TOmniBoundedStack
adds support for external observers. Both classes implement the same interface – IOmniStack
– so you can code against the class or against the interface.
1
type
2
IOmniStack
=
interface
3
procedure
Empty
;
4
procedure
Initialize
(
numElements
,
elementSize
:
integer
)
;
5
function
IsEmpty
:
boolean
;
6
function
IsFull
:
boolean
;
7
function
Pop
(
var
value
)
:
boolean
;
8
function
Push
(
const
value
)
:
boolean
;
9
end
;
10
11
TOmniBaseBoundedStack
=
class
(
TInterfacedObject
,
IOmniStack
)
12
public
13
destructor
Destroy
;
override
;
14
procedure
Empty
;
15
procedure
Initialize
(
numElements
,
elementSize
:
integer
)
;
virtual
;
16
function
IsEmpty
:
boolean
;
inline
;
17
function
IsFull
:
boolean
;
inline
;
18
function
Pop
(
var
value
)
:
boolean
;
19
function
Push
(
const
value
)
:
boolean
;
20
property
ElementSize
:
integer
read
obsElementSize
;
21
property
NumElements
:
integer
read
obsNumElements
;
22
end
;
23
24
TOmniBoundedStack
=
class
(
TOmniBaseBoundedStack
)
25
public
26
constructor
Create
(
numElements
,
elementSize
:
integer
;
27
partlyEmptyLoadFactor
:
real
=
CPartlyEmptyLoadFactor
;
28
almostFullLoadFactor
:
real
=
CAlmostFullLoadFactor
)
;
29
destructor
Destroy
;
override
;
30
function
Pop
(
var
value
)
:
boolean
;
31
function
Push
(
const
value
)
:
boolean
;
32
property
ContainerSubject
:
TOmniContainerSubject
read
osContainerSubject
;
33
end
;
Empty
Empties the stack.
Initialize
Initializes the stack for maximum numElements
elements of size elementSize
.
IsEmpty
Returns True
when the stack is empty.
IsFull
Returns True
when the stack is full.
Pop
Takes one value from the stack and returns True
if the stack was not empty before the operation.
Push
Puts one value on the stack and returns True
if there was a place for the value (the stack was not full before the operation).
ElementSize
Returns the size of the stack element as set in the Initialize
call.
NumElements
Returns the maximum number of elements in the stack as set in the Initialize
call.
ContainerSubject
Provides a point for attaching external observers as described in the Observing lock-free collections section.
The bounded queue structure is a very fast queue with limited length.
The core of the implementation is stored in the TOmniBaseBoundedQueue
class. Derived class TOmniBoundedQueue
adds support for external observers. Both classes implement the same interface – IOmniQueue
– so you can code against the class or against the interface.
1
type
2
IOmniQueue
=
interface
3
function
Dequeue
(
var
value
)
:
boolean
;
4
procedure
Empty
;
5
function
Enqueue
(
const
value
)
:
boolean
;
6
procedure
Initialize
(
numElements
,
elementSize
:
integer
)
;
7
function
IsEmpty
:
boolean
;
8
function
IsFull
:
boolean
;
9
end
;
10
11
TOmniBaseBoundedQueue
=
class
(
TInterfacedObject
,
IOmniQueue
)
12
public
13
destructor
Destroy
;
override
;
14
function
Dequeue
(
var
value
)
:
boolean
;
15
procedure
Empty
;
16
function
Enqueue
(
const
value
)
:
boolean
;
17
procedure
Initialize
(
numElements
,
elementSize
:
integer
)
;
virtual
;
18
function
IsEmpty
:
boolean
;
19
function
IsFull
:
boolean
;
20
property
ElementSize
:
integer
read
obqElementSize
;
21
property
NumElements
:
integer
read
obqNumElements
;
22
end
;
23
24
TOmniBoundedQueue
=
class
(
TOmniBaseBoundedQueue
)
25
public
26
constructor
Create
(
numElements
,
elementSize
:
integer
;
27
partlyEmptyLoadFactor
:
real
=
CPartlyEmptyLoadFactor
;
28
almostFullLoadFactor
:
real
=
CAlmostFullLoadFactor
)
;
29
destructor
Destroy
;
override
;
30
function
Dequeue
(
var
value
)
:
boolean
;
31
function
Enqueue
(
const
value
)
:
boolean
;
32
property
ContainerSubject
:
TOmniContainerSubject
read
oqContainerSubject
;
33
end
;
Empty
Empties the stack.
Dequeue
Takes one value from the queue’s head and returns True
if the queue was not empty before the operation.
Enqueue
Inserts one value on the queue’s tail and returns True
if there was place for the value (the queue was not full before the operation).
Initialize
Initializes the queue for maximum numElements
elements of size elementSize
.
IsEmpty
Returns True
when the queue is empty.
IsFull
Returns True
when the queue is full.
ElementSize
Returns the size of the queue element as set in the Initialize
call.
NumElements
Returns the maximum number of elements in the queue as set in the Initialize
call.
ContainerSubject
Provides a point for attaching external observers as described in the Observing lock-free collections section.
The TOmniMessageQueue
is just a thin wrapper around the bounded queue data structure. An element of this queue is a (message ID, message data) pair, stored in a TOmniMessage
record.
This class greatly simplifies creating and attaching event and window observers.
1
type
2
TOmniMessage
=
record
3
MsgID
:
word
;
4
MsgData
:
TOmniValue
;
5
constructor
Create
(
aMsgID
:
word
;
aMsgData
:
TOmniValue
)
;
overload
;
6
constructor
Create
(
aMsgID
:
word
)
;
overload
;
7
end
;
8
9
TOmniContainerWindowsEventObserver
=
class
(
TOmniContainerObserver
)
10
public
11
function
GetEvent
:
THandle
;
virtual
;
abstract
;
12
end
;
13
14
TOmniMessageQueueMessageEvent
=
15
procedure
(
Sender
:
TObject
;
const
msg
:
TOmniMessage
)
of
object
;
16
17
TOmniMessageQueue
=
class
(
TOmniBoundedQueue
)
18
public
19
constructor
Create
(
numMessages
:
integer
;
20
createEventObserver
:
boolean
=
true
)
;
reintroduce
;
21
destructor
Destroy
;
override
;
22
function
Dequeue
:
TOmniMessage
;
reintroduce
;
23
function
Enqueue
(
const
value
:
TOmniMessage
)
:
boolean
;
reintroduce
;
24
procedure
Empty
;
25
function
GetNewMessageEvent
:
THandle
;
26
function
TryDequeue
(
var
msg
:
TOmniMessage
)
:
boolean
;
reintroduce
;
27
property
EventObserver
:
TOmniContainerWindowsEventObserver
28
read
mqWinEventObserver
;
29
property
OnMessage
:
TOmniMessageQueueMessageEvent
30
read
mqWinMsgObserver
.
OnMessage
write
SetOnMessage
;
31
end
;
TOmniMessageQueue.Create
creates an event observer unless the second parameter (createEventObserver
) is set to False
. It is created with the coiNotifyOnAllInserts
interest meaning that an event (accessible through the GetNewMessageEvent
function) is signalled each time an element (a message) is added to the queue. The observer itself is accessible through the EventObserver
property.
You can also easily create a window message observer by attaching an event handler to the OnMessage
property. This observer is also created with the coiNotifyOnAllInserts
interest which causes the OnMessage
event handler to be called each time an element (a message) is added to the queue. You can destroy this observer at any time by assigning a nil
value to the OnMessage
event.
For an example, see chapter Using message queue with a TThread
worker.
The dynamic queue is a fast queue with unlimited length. It can grow as required as the data used to store elements is dynamically allocated.
The core of the implementation is stored in the TOmniBaseQueue
class. Derived class TOmniQueue
adds support for external observers. Both structures store TOmniValue
elements.
1
type
2
TOmniBaseQueue
=
class
3
...
4
public
5
constructor
Create
(
blockSize
:
integer
=
65536
;
numCachedBlocks
:
integer
=
4
)
;
6
destructor
Destroy
;
override
;
7
function
Dequeue
:
TOmniValue
;
8
procedure
Enqueue
(
const
value
:
TOmniValue
)
;
9
function
IsEmpty
:
boolean
;
10
function
TryDequeue
(
var
value
:
TOmniValue
)
:
boolean
;
11
end
;
12
13
TOmniQueue
=
class
(
TOmniBaseQueue
)
14
...
15
public
16
function
Dequeue
:
TOmniValue
;
17
procedure
Enqueue
(
const
value
:
TOmniValue
)
;
18
function
TryDequeue
(
var
value
:
TOmniValue
)
:
boolean
;
19
property
ContainerSubject
:
TOmniContainerSubject
read
ocContainerSubject
;
20
end
;
Create
Creates a queue object with a specified page size (blockSize
) where numCachedBlocks
are always preserved for future use. Defaults (65536
and 4
) should be appropriate for most scenarios.
Dequeue
Takes one element from queue’s head and returns it. If the queue is empty, an exception is raised.
Enqueue
Inserts an element on the queue’s tail.
IsEmpty
Returns True
when the queue is empty.
TryDequeue
Takes one element from queue’s head and returns it in the value
parameter. Returns True
if an element was returned (the queue was not empty before the operation).
ContainerSubject
Provides a point for attaching external observers as described in the Observing lock-free collections section.
OmniThreadLibrary data structures support the observer design pattern. Each structure can be observed by multiple observers at the same time. Supporting code and two observer implementations are stored in the OtlContainerObserver unit.
Current architecture supports four different kinds of events that can be observed:
1
type
2
///<summary>All possible actions observer can take interest in.</summary>
3
TOmniContainerObserverInterest
=
(
4
//Interests with permanent subscription:
5
coiNotifyOnAllInserts
,
coiNotifyOnAllRemoves
,
6
//Interests with one-shot subscription:
7
coiNotifyOnPartlyEmpty
,
coiNotifyOnAlmostFull
8
)
;
coiNotifyOnAllInserts
Observer is notified whenever a data element is inserted into the structure.
coiNotifyOnAllRemoves
Observer is notified whenever a data element is removed from the structure.
coiNotifyOnPartlyEmpty
Observer is notified whenever a data usage drops below the partlyEmptyLoadFactor
(parameter of the data structure constructor, 80% by default). This event is only supported for bounded structures.
This event can only be observed once. After that you should destroy the observer and (if required) create another one and attach it to the data structure.
coiNotifyOnAlmostFull
Observer is notified whenever a data usage rises above the almostFullLoadFactor
(parameter of the data structure constructor, 90% by default). This event is only supported for bounded structures.
This event can only be observed once. After that you should destroy the observer and (if required) create another one and attach it to the data structure.
The OtlContainerObserver unit implements event and message observers.
1
TOmniContainerWindowsEventObserver
=
class
(
TOmniContainerObserver
)
2
public
3
function
GetEvent
:
THandle
;
virtual
;
abstract
;
4
end
;
5
6
TOmniContainerWindowsMessageObserver
=
class
(
TOmniContainerObserver
)
7
strict
protected
8
function
GetHandle
:
THandle
;
virtual
;
abstract
;
9
public
10
procedure
Send
(
aMessage
:
cardinal
;
wParam
,
lParam
:
integer
)
;
11
virtual
;
abstract
;
12
property
Handle
:
THandle
read
GetHandle
;
13
end
;
14
15
function
CreateContainerWindowsEventObserver
(
externalEvent
:
THandle
=
0
)
:
16
TOmniContainerWindowsEventObserver
;
17
18
function
CreateContainerWindowsMessageObserver
(
hWindow
:
THandle
;
19
msg
:
cardinal
;
wParam
,
lParam
:
integer
)
:
20
TOmniContainerWindowsMessageObserver
;
The event observer TOmniContainerWindowsEventObserver
raises an event every time the observed event occurs.
The message observer TOmniContainerWindowsMessageObserver
sends a message to a window every time the observed event occurs.
Create and attach the event observer:
1
FObserver
:=
CreateContainerWindowsEventObserver
;
2
FCollection
.
ContainerSubject
.
Attach
(
FObserver
,
coiNotifyOnAllInserts
)
;
Access the observer event so you can wait on it:
1
FEvent
:=
FObserver
.
GetEvent
;
Detach and destroy the observer:
1
FCollection
.
ContainerSubject
.
Detach
(
FObserver
,
coiNotifyOnAllInserts
)
;
2
FreeAndNil
(
FObserver
)
;
Create and attach the message observer:
1
FWindow
:=
DSiAllocateHWnd
(
ObserverWndProc
)
;
2
FObserver
:=
CreateContainerWindowsMessageObserver
(
3
FWindow
,
MSG_ITEM_INSERTED
,
0
,
0
)
;
4
FWorker
.
Output
.
ContainerSubject
.
Attach
(
FObserver
,
coiNotifyOnAllInserts
)
;
Process observer messages:
1
procedure
ObserverWndProc
(
var
message
:
TMessage
)
;
2
var
3
ovWorkItem
:
TOmniValue
;
4
workItem
:
IOmniWorkItem
;
5
begin
6
if
message
.
Msg
=
MSG_ITEM_INSERTED
then
begin
7
//...
8
message
.
Result
:=
Ord
(
true
)
;
9
end
10
else
11
message
.
Result
:=
DefWindowProc
(
FWindow
,
message
.
Msg
,
12
message
.
WParam
,
message
.
LParam
)
;
13
end
;
Detach and destroy the observer:
1
FWorker
.
Output
.
ContainerSubject
.
Detach
(
FObserver
,
coiNotifyOnAllInserts
)
;
2
FreeAndNil
(
FObserver
)
;
3
DSiDeallocateHWnd
(
FWindow
)
;
OmniThreadLibrary contains two demos that can be used to measure the performance of the lock-free structures. Bounded structures are benchmarked in the 10_Containers
demo and dynamic queue is benchmarked in the 32_Queue
demo.
Following results were measured on 4-core i7-2630QM running at 2 GHz. As you can see, lock-free structures can transfer from 2,5 to 5 million messages per second.
While the OmniThreadLibrary is mostly a code-oriented framework, it also contains one package (stored in the packages subfolder) with one component (TOmniEventMonitor
). This component supports Win32 and Win64 projects and contains some events that can be used to monitor task and thread lifecycle.
1
type
2
TOmniMonitorTaskEvent
=
procedure
(
const
task
:
IOmniTaskControl
)
of
object
;
3
TOmniMonitorTaskMessageEvent
=
procedure
(
const
task
:
IOmniTaskControl
;
4
const
msg
:
TOmniMessage
)
of
object
;
5
TOmniMonitorPoolThreadEvent
=
procedure
(
const
pool
:
IOmniThreadPool
;
6
threadID
:
integer
)
of
object
;
7
TOmniMonitorPoolWorkItemEvent
=
procedure
(
const
pool
:
IOmniThreadPool
;
8
taskID
:
int64
)
of
object
;
9
10
TOmniEventMonitor
=
class
(
TComponent
,
11
IOmniTaskControlMonitor
,
12
IOmniThreadPoolMonitor
)
13
published
14
property
OnPoolThreadCreated
:
TOmniMonitorPoolThreadEvent
;
15
property
OnPoolThreadDestroying
:
TOmniMonitorPoolThreadEvent
;
16
property
OnPoolThreadKilled
:
TOmniMonitorPoolThreadEvent
;
17
property
OnPoolWorkItemCompleted
:
TOmniMonitorPoolWorkItemEvent
;
18
property
OnTaskMessage
:
TOmniMonitorTaskMessageEvent
;
19
property
OnTaskTerminated
:
TOmniMonitorTaskEvent
;
20
property
OnTaskUndeliveredMessage
:
TOmniMonitorTaskMessageEvent
;
21
end
;
The first four events (OnPoolThread*
) are used to monitor thread pool events. They are described in the Monitoring thread pool operations section.
The other three events are used to monitor tasks attached to this monitor. See the MonitorWith
section for more information.
This part of the book describes properties and methods of the IOmniTaskControl
interface that are useful with all four kinds of tasks. Next section, TOmniWorker
tasks covers only parts that are useful for TOmniWorker
-based tasks.
1
property
Name
:
string
read
GetName
;
The Name
property returns the task name as set in the CreateTask
call.
1
property
UniqueID
:
int64
read
GetUniqueID
;
The UniqueID
property returns task’s unique ID. This ID is generated automatically when a task is created and is guaranteed to be unique and greater than zero.
1
function
SetParameter
(
const
paramName
:
string
;
2
const
paramValue
:
TOmniValue
)
:
IOmniTaskControl
;
overload
;
3
function
SetParameter
(
const
paramValue
:
TOmniValue
)
:
IOmniTaskControl
;
overload
;
4
function
SetParameters
(
const
parameters
:
array
of
TOmniValue
)
:
IOmniTaskControl
;
5
property
Param
:
TOmniValueContainer
read
GetParam
;
Task controller can set parameters for the task by calling the SetParameter
and SetParameters
methods. Parameters must be set before the task is started or scheduled (IOW, before the Run
or Schedule
are called).
Parameters can have names or they can be accessed by an index number.
In the former approach you have to pass in a name and a value for each parameter. You can set multiple parameters in one call by calling SetParameters
and providing it with an array of (name, value) pairs.
1
taskControl
:=
CreateTask
(
MyTask
)
;
2
taskControl
.
SetParameter
(
'Initial value'
,
'42'
)
;
3
taskControl
.
SetParameters
([
'From'
,
0
,
'To'
,
99
])
;
Both the controller and the task can access parameters through the Param
property.
1
task
.
Param
[
'Initial value'
]
// '42'
2
task
.
Param
[
'From'
]
// 0
3
task
.
Param
[
'To'
]
// 99
Another approach is to just set parameters one by one by calling SetParameter
repeatedly.
1
taskControl
:=
CreateTask
(
MyTask
)
;
2
taskControl
.
SetParameter
(
'42'
)
;
3
taskControl
.
SetParameter
(
0
)
;
4
taskControl
.
SetParameter
(
99
)
;
In this case, a task can index the Param
property with a 0-based index to access parameters.
1
task
.
Param
[
0
]
// '42'
2
task
.
Param
[
1
]
// 0
3
task
.
Param
[
2
]
// 99
Parameter storage is implemented in the TOmniValueContainer
class.
A simple background task can function either as a single-shot or as a long term operation. In the former case there’s no need for stopping the task as it will stop by itself but in the latter case we would need to tell the task to stop at some point.
To stop a task, call its Terminate
method. It will wait up to maxWait_ms
milliseconds for the task to exit, after which it will kill the thread running the task.
1
procedure
Stop
;
2
function
Terminate
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
3
function
WaitFor
(
maxWait_ms
:
cardinal
)
:
boolean
;
Simple tasks should occasionally check either the Terminated
function (it will return True
once the Terminate
has been called) or the TerminateEvent
event (it will become signalled once the Terminate
has been called).
The two examples below demonstrate how to write a task that does nothing except wait to be terminated.
1
procedure
TerminateTask1
(
const
task
:
IOmniTask
)
;
2
begin
3
while
WaitForSingleObject
(
task
.
TerminateEvent
,
1000
)
=
WAIT_TIMEOUT
do
begin
4
// some periodic task
5
end
;
6
end
;
7
8
procedure
TerminateTask2
(
const
task
:
IOmniTask
)
;
9
begin
10
while
not
task
.
Terminated
do
begin
11
// some periodic task
12
Sleep
(
1000
)
;
13
end
;
14
end
;
If you just want to tell the task to stop (without waiting for its termination), you can call the Stop
method. You can then check the tasks status by calling the WaitFor
function which will wait at most maxWait_ms
milliseconds for the task to stop and then return True
if the task has stopped.
Sometimes you may want to be notified when the task has terminated. One way to do that is to attach the task to a monitor and monitor the OnTaskTerminated
event. Another is to write a termination event handler.
1
type
2
TOmniOnTerminatedFunction
=
reference
to
procedure
(
const
task
:
IOmniTaskControl
)
;
3
TOmniOnTerminatedFunctionSimple
=
reference
to
procedure
;
4
TOmniTaskTerminatedEvent
=
procedure
(
const
task
:
IOmniTaskControl
)
of
object
;
5
6
function
OnTerminated
(
eventHandler
:
TOmniOnTerminatedFunction
)
:
7
IOmniTaskControl
;
overload
;
8
function
OnTerminated
(
eventHandler
:
TOmniOnTerminatedFunctionSimple
)
:
9
IOmniTaskControl
;
overload
;
10
function
OnTerminated
(
eventHandler
:
TOmniTaskTerminatedEvent
)
:
11
IOmniTaskControl
;
overload
;
The termination handler can receive a reference to the controller of the task being terminated or it can be a parameterless procedure.
The termination handler is called in the context of the thread that created the task.
The example below shows how to write a termination handler to clear the task controller.
1
procedure
NullTask
(
const
task
:
IOmniTask
)
;
2
begin
3
end
;
4
5
var
6
FTaskControl
:
IOmniTaskControl
;
7
8
FTaskControl
:=
CreateTask
(
NullTask
)
9
FTaskControl
.
OnTerminated
(
10
procedure
11
begin
12
ShowMessage
(
'Background task has terminated'
)
;
13
FTaskControl
:=
nil
;
14
end
)
;
15
FTaskControl
.
Run
;
A task can send a result to the controller by calling the SetExitStatus
procedure. The task controller can access this result through the ExitCode
and ExitMessage
properties.
1
property
ExitCode
:
integer
read
GetExitCode
;
2
property
ExitMessage
:
string
read
GetExitMessage
;
By default, ExitCode
contains value 0 and ExitMessage
contains an empty string.
An application can use exit codes from 0 to $7FFFFFFF. Exit codes from $80000000 to $FFFFFFFF are reserved for internal OmniThreadLibrary use. Following exit codes (defined in the OtlCommon unit) have reserved meaning:
1
const
2
// reserved exit statuses
3
EXIT_OK
=
0
;
4
EXIT_INTERNAL
=
integer
(
$80000000
)
;
5
EXIT_THREADPOOL_QUEUE_TOO_LONG
=
EXIT_INTERNAL
+
0
;
6
EXIT_THREADPOOL_STALE_TASK
=
EXIT_INTERNAL
+
1
;
7
EXIT_THREADPOOL_CANCELLED
=
EXIT_INTERNAL
+
2
;
8
EXIT_THREADPOOL_INTERNAL_ERROR
=
EXIT_INTERNAL
+
3
;
EXIT_THREADPOOL_*
exit codes are described in the Thread pooling section.
If an unhandled exception is raised in the background task, it is caught by the OmniThreadLibrary code and stored in the FatalException
property. When that happens, the background task terminates execution in a normal way (termination handler is called etc.).
The task controller can check the FatalException
property to see whether an unhandled exception was raised (if not, the property will contain nil
).
When the task controller is destroyed, so is the exception object. If you need to preserve the exception object, you can detach it from the task controller by calling the DetachException
function.
1
function
DetachException
:
Exception
;
2
property
FatalException
:
Exception
read
GetFatalException
;
See also demo 13_Exceptions
.
The Communication subsystem section explains how the communication subsystem works and what methods can be used in your program. This part of the book will show a practical example of sending messages to a simple (not TOmniWorker
-based) task. This example is based on the 2_TwoWayHello
demo.
The task is created in a standard manner.
1
FHelloTask
:=
2
OmniEventMonitor1
.
Monitor
(
CreateTask
(
RunHello
,
'Hello'
))
3
.
SetParameter
(
'Message'
,
'Initial message'
)
4
.
Run
;
A message is sent to the task through the Comm
property.
1
const
2
MSG_CHANGE_MESSAGE
=
1
;
3
4
FHelloTask
.
Comm
.
Send
(
MSG_CHANGE_MESSAGE
,
'New message'
)
;
The task (RunHello
procedure) must implement a loop which will wait on two events – task.TerminateEvent
(to know when to stop) and task.Comm.NewMessageEvent
(to know when a new message arrived). When the latter event is signalled, a message can be read from the message queue.
You should always read and process all waiting messages, not just one.
1
procedure
RunHello
(
const
task
:
IOmniTask
)
;
2
var
3
msg
:
string
;
4
msgData
:
TOmniValue
;
5
msgID
:
word
;
6
begin
7
msg
:=
task
.
Param
[
'Message'
]
;
8
repeat
9
case
DSiWaitForTwoObjects
(
task
.
TerminateEvent
,
task
.
Comm
.
NewMessageEvent
,
10
false
,
task
.
Param
[
'Delay'
])
11
of
12
WAIT_OBJECT_1
:
13
begin
14
while
task
.
Comm
.
Receive
(
msgID
,
msgData
)
do
begin
15
if
msgID
=
MSG_CHANGE_MESSAGE
then
16
msg
:=
msgData
;
17
end
;
18
end
;
19
WAIT_TIMEOUT
:
20
task
.
Comm
.
Send
(
0
,
msg
)
;
21
else
22
break
;
//repeat
23
end
;
24
until
false
;
25
end
;
This code uses DSiWaitForTwoObjects
to wait on two events. That function is just a thin wrapper around Windows’ WaitForMultipleObjects
5.
Messages sent from a task can be received and processed in many ways. They can all be used with both simple and TOmniWorker
tasks.
First approach is to use an event monitor to process messages.
Another way is to call an OnMessage
method providing an event handler which can be a method or an anonymous method.
1
type
2
TOmniTaskMessageEvent
=
procedure
(
const
task
:
IOmniTaskControl
;
3
const
msg
:
TOmniMessage
)
of
object
;
4
TOmniOnMessageFunction
=
reference
to
procedure
(
const
task
:
IOmniTaskControl
;
5
const
msg
:
TOmniMessage
)
;
6
7
function
OnMessage
(
eventHandler
:
TOmniTaskMessageEvent
)
:
IOmniTaskControl
;
overload
;
8
function
OnMessage
(
eventHandler
:
TOmniOnMessageFunction
)
:
IOmniTaskControl
;
overload
;
1
task
:=
CreateTask
(
MyTask
)
.
OnMessage
(
2
procedure
(
const
task
:
IOmniTaskControl
;
const
msg
:
TOmniMessage
)
3
begin
4
ShowMessage
(
'Received message with ID '
+
IntToStr
(
msg
.
MsgID
))
;
5
end
)
.
Run
;
You can also set multiple message handlers, each handling a specific message ID.
1
type
2
TOmniTaskMessageEvent
=
procedure
(
const
task
:
IOmniTaskControl
;
3
const
msg
:
TOmniMessage
)
of
object
;
4
TOmniOnMessageFunction
=
reference
to
procedure
(
const
task
:
IOmniTaskControl
;
5
const
msg
:
TOmniMessage
)
;
6
7
function
OnMessage
(
msgID
:
word
;
eventHandler
:
TOmniTaskMessageEvent
)
:
8
IOmniTaskControl
;
overload
;
9
function
OnMessage
(
msgID
:
word
;
eventHandler
:
TOmniOnMessageFunction
)
:
10
IOmniTaskControl
;
overload
;
1
task
:=
CreateTask
(
MyTask
)
;
2
task
.
OnMessage
(
3
MSG_FIRST
,
4
procedure
(
const
task
:
IOmniTaskControl
;
const
msg
:
TOmniMessage
)
5
begin
6
ShowMessage
(
'Received MSG_FIRST'
)
;
7
end
)
;
8
task
.
OnMessage
(
9
MSG_SECOND
,
10
procedure
(
const
task
:
IOmniTaskControl
;
const
msg
:
TOmniMessage
)
11
begin
12
ShowMessage
(
'Received MSG_SECOND'
)
;
13
end
)
;
14
task
.
Run
;
The OmniThreadLibrary allows you to combine both approaches, providing specific message handlers for some messages and generic message handlers for all the rest.
1
task
:=
CreateTask
(
MyTask
)
;
2
task
.
OnMessage
(
MSG_FIRST
,
HandleFirstMessage
)
;
3
task
.
OnMessage
(
MSG_SECOND
,
HandleSecondMessage
)
;
4
task
.
OnMessage
(
HandleOtherMessages
)
;
5
task
.
Run
;
A message handler can also be provided through a TOmniMessageExec
class, which is defined in the OtlTaskControl unit. This class can wrap any message handler type and is mainly intended for internal use.
1
function
OnMessage
(
msgID
:
word
;
eventHandler
:
TOmniMessageExec
)
:
2
IOmniTaskControl
;
overload
;
The last possibility is to dispatch all messages to an object which implements Delphi’s Dispatch
mechanism, for example, to a form.
1
function
OnMessage
(
eventDispatcher
:
TObject
)
:
IOmniTaskControl
;
overload
;
An object should then define message handlers by using the Delphi’s message
procedures. Each should accept a var TOmniMessage
parameter.
1
const
2
WM_FIRST_MSG
=
WM_USER
;
3
WM_SECOND_MSG
=
WM_USER
+
1
;
4
5
type
6
TForm1
=
class
(
TForm
)
7
public
8
FTask
:
IOmniTaskControl
;
9
procedure
StartTask
;
10
procedure
HandleFirstMessage
(
var
msg
:
TOmniMessage
)
;
message
WM_FIRST_MSG
;
11
procedure
HandleSecondMessage
(
var
msg
:
TOmniMessage
)
;
message
WM_SECOND_MSG
;
12
end
;
13
14
procedure
TForm1
.
StartTask
;
15
begin
16
FTask
:=
CreateTask
(
MyTask
)
.
OnMessage
(
Self
)
.
Run
;
17
end
;
Tasks can be chained together so that next task is started when the previous terminates. This is achieved by the ChainTo
method.
1
function
ChainTo
(
const
task
:
IOmniTaskControl
;
2
ignoreErrors
:
boolean
=
false
)
:
IOmniTaskControl
;
The code below starts the task1
task. When it is finished, OmniThreadLibrary immediately starts task2
. Note that in this case you should not start the second task explicitly.
1
task2
:=
CreateTask
(
SecondTask
)
;
2
task1
:=
CreateTask
(
FirstTask
)
.
ChainTo
(
task2
)
.
Run
;
If ignoreErrors
is left at default (False
), the second task is only started if first task’s ExitCode
is EXIT_OK
(0). If ignoreErrors
is set to True
, the second task is started even if ExitCode
of the first task is not 0.
Calling Join
adds the task to a task group.
Calling Leave
removes the task from a group.
1
function
Join
(
const
group
:
IOmniTaskGroup
)
:
IOmniTaskControl
;
2
function
Leave
(
const
group
:
IOmniTaskGroup
)
:
IOmniTaskControl
;
Calling MonitorWith
attaches the task to an event monitor. Calling RemoveMonitor
removes the task from the monitor.
1
function
MonitorWith
(
const
monitor
:
IOmniTaskControlMonitor
)
:
IOmniTaskControl
;
2
function
RemoveMonitor
:
IOmniTaskControl
;
Calling task.MonitorWith(otlMonitor)
is equivalent to calling otlMonitor.Monitor(task)
. Calling task.RemoveMonitor
is equivalent to calling otlMonitor.Detach(task)
.
Following three monitor events are useful for monitoring tasks.
1
type
2
TOmniMonitorTaskEvent
=
procedure
(
const
task
:
IOmniTaskControl
)
of
object
;
3
TOmniMonitorTaskMessageEvent
=
procedure
(
const
task
:
IOmniTaskControl
;
4
const
msg
:
TOmniMessage
)
of
object
;
5
6
TOmniEventMonitor
=
class
(
TComponent
,
7
IOmniTaskControlMonitor
,
8
IOmniThreadPoolMonitor
)
9
public
10
property
OnTaskMessage
:
TOmniMonitorTaskMessageEvent
;
11
property
OnTaskTerminated
:
TOmniMonitorTaskEvent
;
12
property
OnTaskUndeliveredMessage
:
TOmniMonitorTaskMessageEvent
;
13
end
;
OnTaskMessage
This event is called whenever a message from the task is received. The msg
parameter contains the message.
OnTaskTerminated
This event is called when a task terminates.
OnTaskUndeliveredMessage
If a task is terminated with unprocessed messages in its input queue, this event is called for each message.
The Enforced
method regulates behaviour in a very specific situation – when a task is terminated before it even starts execution.
1
function
Enforced
(
forceExecution
:
boolean
=
true
)
:
IOmniTaskControl
;
In the default scenario, a task is always started when the Run
is called, even if it was terminated before that. While the task would typically immediately exit, it can still do some processing.
1
procedure
ShortLivedTask
(
const
task
:
IOmniTask
)
;
2
begin
3
// this line will be executed
4
end
;
5
6
task
:=
CreateTask
(
ShortLivedTask
)
;
7
task
.
Terminate
;
8
task
.
Run
;
If Enforced(False)
is called, a terminated process won’t be started at all.
1
procedure
UnstartedTask
(
const
task
:
IOmniTask
)
;
2
begin
3
// this line will never be executed
4
end
;
5
6
task
:=
CreateTask
(
UnstartedTask
)
;
7
task
.
Enforced
(
false
)
;
8
task
.
Terminate
;
9
task
.
Run
;
This method also regulates behaviour of tasks that were scheduled to the thread pool.
Calling Unobserved
adds an implicit owner to the task so that you don’t have to store returned IOmniTaskController
interface in a variable or a field.
1
function
Unobserved
:
IOmniTaskControl
;
See the Task controller needs an owner section for an example.
OmniThreadLibrary implements the concept of a cancellation token, which is a synchronisation mechanism that allows multiple tasks to be cancelled with a single command.
Cancellation must be cooperative, i.e. the task must watch its CancellationToken
property and exit when it becomes signalled.
1
function
CancelWith
(
const
token
:
IOmniCancellationToken
)
:
IOmniTaskControl
;
2
3
property
CancellationToken
:
IOmniCancellationToken
read
GetCancellationToken
;
Sometimes you have to establish a common synchronisation primitive between the main program and one (or more) background tasks. In such case, the WithLock
method can be called to assign a synchronisation primitive (such as a critical section) to the task’s Lock
property.
1
function
WithLock
(
const
lock
:
TSynchroObject
;
2
autoDestroyLock
:
boolean
=
true
)
:
IOmniTaskControl
;
overload
;
3
function
WithLock
(
const
lock
:
IOmniCriticalSection
)
:
IOmniTaskControl
;
overload
;
4
5
property
Lock
:
TSynchroObject
read
GetLock
;
If you pass in a TSynchroObject
descendant, the default behaviour is to automatically destroy this object when the task is destroyed. In case you are passing the same synchronisation object to two or more tasks, this is not a good idea and second WithLock
parameter should be set to False
. Alternatively, you can use an IOmniCriticalSection
which will destroy itself automatically when it is no longer used.
See also demo 12_Lock
.
Similar to the Lock
mechanism, WithCounter
connects a task with a thread-safe counter, IOmniCounter
.
1
function
WithCounter
(
const
counter
:
IOmniCounter
)
:
IOmniTaskControl
;
See also demo 14_TerminateWhen
.
By calling SetPriority
you can specify the priority of the thread that will execute the task. By default the priority is set to tpNormal
which corresponds to Windows priority level THREAD_PRIORITY_NORMAL
.
1
type
2
TOTLThreadPriority
=
(
tpIdle
,
tpLowest
,
tpBelowNormal
,
tpNormal
,
3
tpAboveNormal
,
tpHighest
)
;
4
5
function
SetPriority
(
threadPriority
:
TOTLThreadPriority
)
:
IOmniTaskControl
;
The SetQueueSize
sets the length for message queues. By default, message queue length is set to 1000
messages.
1
function
SetQueueSize
(
numMessages
:
integer
)
:
IOmniTaskControl
;
This method must be called before the communication channel is used for the first time. The safest way is to call it before the task is run or scheduled.
In the heart of every sufficiently complicated thread code there lies a complicated loop similar to the one below.
1
type
2
handles
:
array
[
0
..
3
]
of
THandle
;
3
waitRes
:
DWORD
;
4
5
begin
6
handles
[
0
]
:=
TerminateEvent
;
7
handles
[
1
]
:=
FWakeUp
;
8
handles
[
2
]
:=
FResetFilters
;
9
handles
[
3
]
:=
FResetFullStream
;
10
11
repeat
12
waitRes
:=
WaitForMultipleObjects
(
numHandles
,
4
,
INFINITE
,
false
)
;
13
case
waitRes
of
:
14
WAIT_OBJECT_0
:
15
Exit
;
16
WAIT_OBJECT_0
+
1
:
17
ProcessIncomingData
;
18
WAIT_OBJECT_0
+
2
:
19
ResetFilters
;
20
WAIT_OBJECT_0
+
3
:
21
ResetStream
;
22
else
:
Exit
;
//WAIT_TIMEOUT, WAIT_ABANDONED
23
end
;
24
until
false
;
25
end
;
You can write OmniThreadLibrary-based code in the same manner (see simple tasks) but the library also offers a much better way. Write the task as a descendant of the TOmniWorker
class and OmniThreadLibrary will implement the main loop for you.
Every TOmniWorker
-type task executes an internal loop which can be represented by the following pseudo-code.
1
if
not
Initialize
then
2
Exit
;
3
repeat
4
waitRes
:=
WaitForEvent
;
5
if
waitRes
=
TerminateEvent
then
6
break
7
else
8
DispatchEvent
(
waitRes
)
;
9
until
false
;
10
Cleanup
;
Firstly, the Initialize
function is called. In the TOmniWorker
class it does nothing, just returns True
. You can override it with your own function which does task-specific initialization. It can optionally return False
which signifies that the task should not be executed at all.
For compatibility with future OmniThreadLibrary versions which may return False
from the base Initialize
under some conditions, it is recommended to write overridden Initialize
in the following form:
1
function
Initialize
:
boolean
;
2
begin
3
Result
:=
inherited
Initialize
;
4
if
not
Result
then
5
Exit
;
6
// do the rest of initialization here
7
end
;
Next, the wait and dispatch loop is run. It will watch for following events:
TerminateEvent
,TerminateWhen
),RegisterComm
),RegisterWaitObject
),SetTimer
),MsgWait
, Alertable
).When an event occurs, an appropriate action is executed. For example, messages are dispatched (using the Delphi’s Dispatch
mechanism) to the task so you can use message
methods for message processing (see Receiving messages below).
When a TerminateEvent
is signalled (which happens when controller’s Terminate
method is called) or when one of events passed in the TerminateWhen
method is signalled, the task’s main loop exits. The Cleanup
method is then called. In the base class, Cleanup
is implemented as an empty method; override it to implement the task-specific clean-up.
If the owner of the task wants to wait until the Initialize
method has completed its work, it can call the WaitForInit
method.
1
function
WaitForInit
:
boolean
;
The TOmniWorker
class implements a public property Task
returning the task’s management interface IOmniTask
. You can use it to access any task-specific property, such as the communication interface (Comm
).
1
property
Task
:
IOmniTask
read
GetTask
write
SetTask
;
This property is available in the Initialize
and Cleanup
methods but not in the constructor and destructor.
Messages sent from the controller or from other tasks are passed to the Delphi’s Dispatch
mechanism. For each message we want to process we must therefore write a message
method.
To demonstrate this approach, here is a short example extracted from the 5_TwoWayHello_without_loop
demo.
The task is implemented as an instance of the TAsyncHello
class which is derived from the TOmniWorker
. The task will do some initialization in the Initialize
method. It will then handle two messages - MSG_CHANGE_MESSAGE
and MSG_SEND_MESSAGE
.
When the MSG_CHANGE_MESSAGE
is received, the worker will change the internal message text which was initially assigned in Initialize
. When the MSG_SEND_MESSAGE
is received, the worker will send this message back to the owner.
1
const
2
MSG_CHANGE_MESSAGE
=
1
;
3
MSG_SEND_MESSAGE
=
2
;
4
5
type
6
TAsyncHello
=
class
(
TOmniWorker
)
7
strict
private
8
aiMessage
:
string
;
9
public
10
function
Initialize
:
boolean
;
override
;
11
procedure
OMChangeMessage
(
var
msg
:
TOmniMessage
)
;
12
message
MSG_CHANGE_MESSAGE
;
13
procedure
OMSendMessage
(
var
msg
:
TOmniMessage
)
;
14
message
MSG_SEND_MESSAGE
;
15
end
;
We can write those two message handlers identical to the way we write Windows message handlers. Message IDs can take any value from 0 to $FFFE. Message ID $FFFF is reserved for internal purposes.
1
function
TAsyncHello
.
Initialize
:
boolean
;
2
begin
3
aiMessage
:=
Task
.
ParamByName
[
'Message'
]
;
4
Result
:=
true
;
5
end
;
6
7
procedure
TAsyncHello
.
OMChangeMessage
(
var
msg
:
TOmniMessage
)
;
8
begin
9
aiMessage
:=
msg
.
MsgData
;
10
end
;
11
12
procedure
TAsyncHello
.
OMSendMessage
(
var
msg
:
TOmniMessage
)
;
13
begin
14
Task
.
Comm
.
Send
(
0
,
aiMessage
)
;
15
end
;
You can send and receive messages directly from one task to another. To do that, you should firstly create a two-way communication channel and pass each endpoint of this channel to one of the tasks. A task should then call RegisterComm
function to start listening on an endpoint. To stop listening, call the UnregisterComm
function.
Received messages are dispatched in the same way as messages received from the task controller.
1
procedure
RegisterComm
(
const
comm
:
IOmniCommunicationEndpoint
)
;
2
procedure
UnregisterComm
(
const
comm
:
IOmniCommunicationEndpoint
)
;
The 8_RegisterComm
demo demonstrates this.
A two-way channel (FCommChannel
) is created with space for 1024 messages. Each of its endpoints is passed to one instance of the TCommTester
class and each instance is started as its own task.
1
procedure
TfrmTestRegisterComm
.
FormCreate
(
Sender
:
TObject
)
;
2
begin
3
FCommChannel
:=
CreateTwoWayChannel
(
1024
)
;
4
FClient1
:=
CreateTask
(
TCommTester
.
Create
(
FCommChannel
.
Endpoint1
))
5
.
MonitorWith
(
OmniTED
)
6
.
Run
;
7
FClient2
:=
CreateTask
(
TCommTester
.
Create
(
FCommChannel
.
Endpoint2
))
8
.
MonitorWith
(
OmniTED
)
9
.
Run
;
10
end
;
The TCommTester
constructor just stores this endpoint in an internal field. It is then used in the overridden Initialize
as a parameter to the RegisterComm
call. [We cannot call RegisterComm
in the constructor as the Task
property is not available at that time yet.]
1
type
2
TCommTester
=
class
(
TOmniWorker
)
3
strict
private
4
ctComm
:
IOmniCommunicationEndpoint
;
5
public
6
constructor
Create
(
commEndpoint
:
IOmniCommunicationEndpoint
)
;
7
function
Initialize
:
boolean
;
override
;
8
procedure
OMForward
(
var
msg
:
TOmniMessage
)
;
message
MSG_FORWARD
;
9
procedure
OMForwarding
(
var
msg
:
TOmniMessage
)
;
message
MSG_FORWARDING
;
10
end
;
11
12
constructor
TCommTester
.
Create
(
commEndpoint
:
IOmniCommunicationEndpoint
)
;
13
begin
14
inherited
Create
;
15
ctComm
:=
commEndpoint
;
16
end
;
17
18
function
TCommTester
.
Initialize
:
boolean
;
19
begin
20
Result
:=
inherited
Initialize
;
21
if
Result
then
22
Task
.
RegisterComm
(
ctComm
)
;
23
end
;
To send a message to another task, use the Send
method of the IOmniCommunicationEndpoint
.
1
procedure
TCommTester
.
OMForward
(
var
msg
:
TOmniMessage
)
;
2
begin
3
Task
.
Comm
.
Send
(
MSG_NOTIFY_FORWARD
,
msg
.
MsgData
)
;
4
ctComm
.
Send
(
MSG_FORWARDING
,
msg
.
MsgData
)
;
5
end
;
6
7
procedure
TCommTester
.
OMForwarding
(
var
msg
:
TOmniMessage
)
;
8
begin
9
Task
.
Comm
.
Send
(
MSG_NOTIFY_RECEPTION
,
msg
.
MsgData
)
;
10
end
;
Instead of sending messages that are processed in the task you can also instruct the task to execute a specific code. OmniThreadLibrary provides three variations of the same mechanism – Invoke
. They allow you to call a method by providing its name, call a method by providing its address or to execute an anonymous method.
1
function
Invoke
(
const
msgName
:
string
)
:
IOmniTaskControl
;
overload
;
2
function
Invoke
(
const
msgName
:
string
;
msgData
:
array
of
const
)
:
3
IOmniTaskControl
;
overload
;
4
function
Invoke
(
const
msgName
:
string
;
msgData
:
TOmniValue
)
:
5
IOmniTaskControl
;
overload
;
6
7
function
Invoke
(
const
msgMethod
:
pointer
)
:
IOmniTaskControl
;
overload
;
8
function
Invoke
(
const
msgMethod
:
pointer
;
msgData
:
array
of
const
)
:
9
IOmniTaskControl
;
overload
;
10
function
Invoke
(
const
msgMethod
:
pointer
;
msgData
:
TOmniValue
)
:
IOmniTaskControl
;
overload
;
11
12
function
Invoke
(
remoteFunc
:
TOmniTaskControlInvokeFunction
)
:
IOmniTaskControl
;
overload
;
13
function
Invoke
(
remoteFunc
:
TOmniTaskControlInvokeFunctionEx
)
:
IOmniTaskControl
;
overload
;
Invoke
uses the communication system to execute the code inside the task. It sends a special internal message (with the reserved ID $FFFF) to the task. Internal TOmniWorker
code catches this message and instead of dispatching it to your task object the code referenced from the message is executed. The code is therefore not executed when you call the Invoke
but some undeterminable time later.
You can pass data to an invoked method. In Invoke
you can add a second parameter which can be a TOmniValue
or an array of elements which is converted to a TOmniValue
. To receive this data, the method on the task side must either accept a const TOmniValue
parameter or a var TObject
parameter. In the latter case you must, of course, provide an object (of any class) as the second Invoke
parameter.
Let’s see a simple example. We have a simple worker with three public methods.
1
type
2
TMyObj
=
class
3
Val
:
integer
;
4
constructor
Create
(
value
:
integer
)
;
5
end
;
6
7
TMyTask
=
class
(
TOmniWorker
)
8
public
9
procedure
Test1
;
10
procedure
Test2
(
const
value
:
TOmniValue
)
;
11
procedure
Test3
(
var
obj
:
TObject
)
;
12
end
;
13
14
{ TMyObj }
15
16
constructor
TMyObj
.
Create
(
value
:
integer
)
;
17
begin
18
Val
:=
value
;
19
end
;
20
21
{ TMyTask }
22
23
procedure
TMyTask
.
Test1
;
24
begin
25
end
;
26
27
procedure
TMyTask
.
Test2
(
const
value
:
TOmniValue
)
;
28
begin
29
end
;
30
31
procedure
TMyTask
.
Test3
(
var
obj
:
TObject
)
;
32
begin
33
obj
.
Free
;
34
end
;
You can now call these three methods in the following manner.
1
FTask
:=
CreateTask
(
TMyTask
.
Create
())
.
Run
;
2
FTask
.
Invoke
(
'Test1'
)
;
3
FTask
.
Invoke
(
'Test2'
,
[
1
,
2
,
3
])
;
4
FTask
.
Invoke
(
'Test3'
,
TMyObj
.
Create
(
42
))
;
5
6
//This would cause a runtime error because method 'Test4' doesn't exist.
7
//FTask.Invoke('Test4');
Alternatively, you can provide an address of the method instead of its name which gives you some compile-time checking – if the code tries to call an inexistent method it will not compile.
1
FTask
:=
CreateTask
(
TMyTask
.
Create
())
.
Run
;
2
FTask
.
Invoke
(
@
TMyTask
.
Test1
)
;
3
FTask
.
Invoke
(
@
TMyTask
.
Test2
,
[
1
,
2
,
3
])
;
4
FTask
.
Invoke
(
@
TMyTask
.
Test3
,
TMyObj
.
Create
(
42
))
;
5
6
//This would not compile because method 'Test4' doesn't exist.
7
//FTask.Invoke(@TMyTask.Test4);
The last two versions of Invoke
allow you to execute an anonymous method in the background task. One will run a parameter-less method while another executes a method accepting an IOmniTask
parameter which you can use to control the task.
1
FTask
:=
CreateTask
(
TMyTask
.
Create
())
.
Run
;
2
FTask
.
Invoke
(
3
procedure
4
begin
5
// some code
6
end
)
;
7
FTask
.
Invoke
(
8
procedure
(
const
task
:
IOmniTask
)
9
begin
10
// some code
11
end
)
;
See also demo 43_InvokeAnonymous
.
The internal main loop in the TOmniWorker
can optionally process and dispatch Windows messages. This is especially important if your task creates components that use Windows messages for normal operation.
To force Windows message processing, you must call the MsgWait
method of the task controller. You can optionally provide a wake mask.
1
function
MsgWait
(
wakeMask
:
DWORD
=
QS_ALLEVENTS
)
:
IOmniTaskControl
;
If your background code uses asynchronous procedure calls (for example if it is reading from a file asynchronously), you should call the Alertable
method which will enable the MWMO_ALERTABLE
flag.
1
function
Alertable
:
IOmniTaskControl
;
More information about the wake mask and about the alertable flag can be found in the MSDN.
The TOmniWorker
model greatly simplifies executing repeated tasks inside the task worker. You can set up multiple timers and associate them with the code in few different ways.
1
function
ClearTimer
(
timerID
:
integer
)
:
IOmniTaskControl
;
2
function
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
3
const
timerMessage
:
TOmniMessageID
)
:
IOmniTaskControl
;
4
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
5
const
timerMessage
:
TProc
)
;
overload
;
6
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
7
const
timerMessage
:
TProc
<
integer
>
)
;
overload
;
To set up a timer, call the SetTimer
function. It accepts three parameters – timer ID, interval (in milliseconds) and a timer message. The latter is a parameter specifying the code to be executed every interval_ms
milliseconds and can be specified in three different ways which are discussed below.
Two other SetTimer
overloads, introduced in version [3.07.3] allow you to execute an anonymous function as a timer event. The latter will pass timer ID to the anonymous function’s integer
parameter.
To clear a timer (so that the associated code is not called anymore) call the ClearTimer
function.
Two additional SetTimer
overloads (not shown in the book) are provided only for backward compatibility with very old code and should not be used in new programs.
The code below shows four ways in which a timer method can be specified.
Timer1
). The method must have at least public
visibility. The method can optionally accept one parameter of type TOmniValue
which will contain the timer ID.@TMyTask.Timer2
). The method can optionally accept one parameter of type TOmniValue
which will contain the timer ID.MSG_TIMER3
). This message ID must not conflict with other messages processed in the background worker. The message handler (Timer3
in the example above) must accept a TOmniMessage
parameter. Its MsgID
field will contain the message ID (MSG_TIMER3
) and its MsgData
field will contain the timer ID (3
).
1
const
2
MSG_TIMER3
=
1
;
3
4
type
5
TMyTask
=
class
(
TOmniWorker
)
6
public
7
procedure
Timer1
;
8
procedure
Timer2
(
const
value
:
TOmniValue
)
;
9
procedure
Timer3
(
var
msg
:
TOmniMessage
)
;
message
MSG_TIMER3
;
10
end
;
11
12
{ TMyTask }
13
14
procedure
TMyTask
.
Timer1
;
15
begin
16
// called every 150 ms
17
end
;
18
19
procedure
TMyTask
.
Timer2
(
const
value
:
TOmniValue
)
;
20
begin
21
// called every 200 ms
22
// value contains the timer ID
23
end
;
24
25
procedure
TMyTask
.
Timer3
(
var
msg
:
TOmniMessage
)
;
26
begin
27
// called every 250 ms
28
// msg.MsgData contains the timer ID
29
end
;
30
31
FTask
:=
CreateTask
(
TMyTask
.
Create
())
.
Run
;
32
FTask
.
SetTimer
(
1
,
150
,
'Timer1'
)
;
33
FTask
.
SetTimer
(
2
,
200
,
@
TMyTask
.
Timer2
)
;
34
FTask
.
SetTimer
(
3
,
250
,
MSG_TIMER3
)
;
35
FTask
.
SetTimer
(
4
,
333
,
36
procedure
(
timerID
:
integer
)
37
begin
38
// called every 333 seconds
39
// timerID contains the timer ID
40
end
)
;
Timers can also be set and cleared from inside the task.
By using the TerminateWhen
function you can set up a termination trigger – a signal that will stop the background task execution. This can be either a Windows synchronization primitive (such as an event) or a cancellation token.
1
function
TerminateWhen
(
event
:
THandle
)
:
IOmniTaskControl
;
overload
;
2
function
TerminateWhen
(
token
:
IOmniCancellationToken
)
:
IOmniTaskControl
;
overload
;
The use of the TerminateWhen
function is demonstrated in the 14_TerminateWhen
demo.
Sometimes you would want to associate data with the task controller. The UserData
property can be used for this purpose. This data is never accessed from the OmniThreadLibrary code; it is only managed by the library for your convenience. UserData
is available via the IOmniTaskController
interface and cannot be used in the background task.
1
function
SetUserData
(
const
idxData
:
TOmniValue
;
2
const
value
:
TOmniValue
)
:
IOmniTaskControl
;
3
property
UserData
[
const
idxData
:
TOmniValue
]
:
TOmniValue
4
read
GetUserDataVal
write
SetUserDataVal
;
Data can be accessed through an integer index or through a name. Both lines below are valid.
1
taskController
.
UserData
[
0
]
:=
myTaskList
.
Add
(
taskController
)
;
2
taskController
.
UserData
[
'token'
]
:=
'something'
;
Task group is a mechanism that allows you to treat a number of tasks as one. You can start all tasks, send a message to all tasks in a group, terminate all tasks at once and wait for all to terminate.
See also demo 15_TaskGroup
.
1
type
2
IOmniTaskGroup
=
interface
3
[
'{B36C08B4-0F71-422C-8613-63C4D04676B7}'
]
4
function
GetTasks
:
IOmniTaskControlList
;
5
//
6
function
Add
(
const
taskControl
:
IOmniTaskControl
)
:
IOmniTaskGroup
;
7
function
GetEnumerator
:
IOmniTaskControlListEnumerator
;
8
function
RegisterAllCommWith
(
const
task
:
IOmniTask
)
:
IOmniTaskGroup
;
9
function
Remove
(
const
taskControl
:
IOmniTaskControl
)
:
IOmniTaskGroup
;
10
function
RunAll
:
IOmniTaskGroup
;
11
procedure
SendToAll
(
const
msg
:
TOmniMessage
)
;
12
function
TerminateAll
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
13
function
UnregisterAllCommFrom
(
const
task
:
IOmniTask
)
:
IOmniTaskGroup
;
14
function
WaitForAll
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
15
property
Tasks
:
IOmniTaskControlList
read
GetTasks
;
16
end
;
17
18
TOmniTaskGroup
=
class
(
TInterfacedObject
,
IOmniTaskGroup
)
19
public
20
constructor
Create
;
21
destructor
Destroy
;
override
;
22
function
Add
(
const
taskControl
:
IOmniTaskControl
)
:
IOmniTaskGroup
;
23
function
GetEnumerator
:
IOmniTaskControlListEnumerator
;
24
function
RegisterAllCommWith
(
const
task
:
IOmniTask
)
:
IOmniTaskGroup
;
25
function
Remove
(
const
taskControl
:
IOmniTaskControl
)
:
IOmniTaskGroup
;
26
function
RunAll
:
IOmniTaskGroup
;
27
procedure
SendToAll
(
const
msg
:
TOmniMessage
)
;
28
function
TerminateAll
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
29
function
UnregisterAllCommFrom
(
const
task
:
IOmniTask
)
:
IOmniTaskGroup
;
30
function
WaitForAll
(
maxWait_ms
:
cardinal
=
INFINITE
)
:
boolean
;
31
property
Tasks
:
IOmniTaskControlList
read
GetTasks
;
32
end
;
33
34
function
CreateTaskGroup
:
IOmniTaskGroup
;
Add
Adds a task to the group. See also Join
.
GetEnumerator
Allows you to use the task group in a for..in
statement returning all tasks in the group.
RegisterAllCommWith
Registers communication channel of all tasks in the group with another task (the parameter to the RegisterAllCommWith
function) so they can all send messages to that task.
Remove
Removes a task from the group. See also Leave
.
RunAll
Starts all tasks in the group.
SendToAll
Sends the message to every task in the group.
TerminateAll
Terminates all tasks, waiting up to maxWait_ms
milliseconds for all of them to complete. The function returns True
if all tasks are terminated when the function returns.
UnregisterAllCommFrom
Unregisters additional communication channel from all tasks in the group.
WaitForAll
Waits up to maxWait_ms
milliseconds for all tasks to complete. The function returns True
if all tasks are terminated when the function returns.
Tasks
Provides access to the list containing all tasks in the group.
The background worker can use the IOmniTask
interface (defined in the OtlTask unit) to communicate with the task controller and to control the task’s execution. The worker can access the interface in two different ways. For the simple tasks the interface is passed as a parameter to the task worker method. TOmniWorker
tasks can access it through the Task
property.
The IOmniTask
interface exposes following methods.
1
type
2
TOmniTaskInvokeFunction
=
reference
to
procedure
;
3
4
IOmniTask
=
interface
5
procedure
ClearTimer
(
timerID
:
integer
=
0
)
;
6
procedure
Enforced
(
forceExecution
:
boolean
=
true
)
;
7
procedure
Invoke
(
remoteFunc
:
TOmniTaskInvokeFunction
)
;
8
procedure
InvokeOnSelf
(
remoteFunc
:
TOmniTaskInvokeFunction
)
;
9
procedure
RegisterComm
(
const
comm
:
IOmniCommunicationEndpoint
)
;
10
procedure
RegisterWaitObject
(
waitObject
:
THandle
;
11
responseHandler
:
TOmniWaitObjectMethod
)
;
12
procedure
SetException
(
exceptionObject
:
pointer
)
;
13
procedure
SetExitStatus
(
exitCode
:
integer
;
const
exitMessage
:
string
)
;
14
procedure
SetProcessorGroup
(
procGroupNumber
:
integer
)
;
15
procedure
SetNUMANode
(
numaNodeNumber
:
integer
)
;
16
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
17
const
timerMessage
:
TOmniMessageID
)
;
18
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
19
const
timerMessage
:
TProc
)
;
overload
;
20
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
21
const
timerMessage
:
TProc
<
integer
>
)
;
overload
;
22
procedure
StopTimer
;
23
procedure
Terminate
;
24
function
Terminated
:
boolean
;
25
function
Stopped
:
boolean
;
26
procedure
UnregisterComm
(
const
comm
:
IOmniCommunicationEndpoint
)
;
27
procedure
UnregisterWaitObject
(
waitObject
:
THandle
)
;
28
property
CancellationToken
:
IOmniCancellationToken
read
GetCancellationToken
;
29
property
Comm
:
IOmniCommunicationEndpoint
read
GetComm
;
30
property
Counter
:
IOmniCounter
read
GetCounter
;
31
property
Implementor
:
TObject
read
GetImplementor
;
32
property
Lock
:
TSynchroObject
read
GetLock
;
33
property
Name
:
string
read
GetName
;
34
property
Param
:
TOmniValueContainer
read
GetParam
;
35
property
TerminateEvent
:
THandle
read
GetTerminateEvent
;
36
property
ThreadData
:
IInterface
read
GetThreadData
;
37
property
UniqueID
:
int64
read
GetUniqueID
;
38
end
;
{ IOmniTask }
The worker can access its name and unique ID through the Name
and UniqueID
properties.
1
property
Name
:
string
read
GetName
;
2
property
UniqueID
:
int64
read
GetUniqueID
;
To access the parameters provided by the task owner, the worker can use the Param
property.
1
property
Param
:
TOmniValueContainer
read
GetParam
;
More information is provided in the Simple Tasks/Parameters section.
To terminate itself, a simple task should just exit from the worker method. A TOmniWorker
task, on the other hand, should call the Terminate
method of the IOmniTask
interface.
1
procedure
Terminate
;
2
function
Terminated
:
boolean
;
3
function
Stopped
:
boolean
;
4
property
TerminateEvent
:
THandle
read
GetTerminateEvent
;
Other termination-related methods and properties are.
Terminated
Returns True
when a worker was requested to stop, either by itself or by the task controller.
Stopped
Returns True
when a worker has fully stopped. There’s no use in calling this method from the worker itself as it will always return False
while the worker is still active.
TerminateEvent
Returns the event that is signalled when a task controller requires the worker to stop. The worker can wait on this event but should never signal it. Terminate
should be called instead to terminate the worker.
A task can send a result to the controller by calling the SetExitStatus
procedure.
1
procedure
SetExitStatus
(
exitCode
:
integer
;
const
exitMessage
:
string
)
;
The program can access this program through the task controller interface.
While exceptions in a background worker code are automatically caught, a task can also set an associated exception manually, by calling SetException
.
1
procedure
SetException
(
exceptionObject
:
pointer
)
;
This exception is available to the main program through the FatalException
property.
To send a message to the task controller (and through it to the main program), a background task can use the Comm
property.
1
property
Comm
:
IOmniCommunicationEndpoint
read
GetComm
;
Any data can be packed in a TOmniValue
record and passed to the Comm.Send
method. The mechanism is the same as sending the data from the main program to the background worker.
Messages can be received in different ways, depending on the worker type. See the appropriate sections of this manual for simple tasks and for TOmniWorker
tasks.
A simple trick can be used to send a message from a task back to itself – instead of sending it on the Comm
channel, you send it on the Comm.OtherEndpoint
channel.
1
Task
.
Comm
.
OtherEndpoint
.
Send
(
MSG_FROM_TASK
,
'Sent to Self'
)
;
A background task can use the communication channel to request asynchronous execution of code in the context of its owner (the thread which created the task).
1
type
2
TOmniTaskInvokeFunction
=
reference
to
procedure
;
3
4
procedure
Invoke
(
remoteFunc
:
TOmniTaskInvokeFunction
)
;
This mechanism works by posting a special message containing the function to be executed from the task back to the task controller. OmniThreadLibrary catches that special message and instead of dispatching it to the normal message processing code, it will execute the attached function.
Since version [3.07.3], IOmniTask
interface implements method InvokeOnSelf
, which works the same as Invoke
, except that it posts a message back to the task itself. In other words, InvokeOnSelf
puts an anonymous method into the task’s message queue while Invoke
puts it into the task controller’s message queue. InvokeOnSelf
will only work correctly when the task is implemented as a TOmniWorker
task.
1
type
2
TOmniTaskInvokeFunction
=
reference
to
procedure
;
3
4
procedure
InvokeOnSelf
(
remoteFunc
:
TOmniTaskInvokeFunction
)
;
A TOmniWorker
task can register additional communication channels to enable automatic processing of messages sent across those channels. This mechanism is described in the TOmniWorker
tasks section.
1
procedure
RegisterComm
(
const
comm
:
IOmniCommunicationEndpoint
)
;
2
procedure
UnregisterComm
(
const
comm
:
IOmniCommunicationEndpoint
)
;
A TOmniWorker
task can use timers to automate repeating tasks. This functionality is explained in the TOmniWorker
tasks section.
1
procedure
ClearTimer
(
timerID
:
integer
=
0
)
;
2
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
3
const
timerMessage
:
TOmniMessageID
)
;
overload
;
4
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
5
const
timerMessage
:
TProc
)
;
overload
;
6
procedure
SetTimer
(
timerID
:
integer
;
interval_ms
:
cardinal
;
7
const
timerMessage
:
TProc
<
integer
>
)
;
overload
;
A TOmniWorker
task can register external synchronization object (typically an event) to be included in the main loop. An associated method will be called when a synchronization object becomes signalled.
1
type
2
TOmniWaitObjectMethod
=
procedure
of
object
;
3
4
procedure
RegisterWaitObject
(
waitObject
:
THandle
;
5
responseHandler
:
TOmniWaitObjectMethod
)
;
6
procedure
UnregisterWaitObject
(
waitObject
:
THandle
)
;
Use RegisterWaitObject
to register external synchronization object with the main loop and UnregisterWaitObject
to remove the synchronization object from the main loop.
If a cancellation token is associated with the background task, it can be accessed through the CancellationToken
property.
1
property
CancellationToken
:
IOmniCancellationToken
read
GetCancellationToken
;
If a lock is associated with the background task, it can be accessed through the Lock
property.
1
property
Lock
:
TSynchroObject
read
GetLock
;
If a thread-safe counter is associated with the background task, it can be accessed through the Counter
property.
1
property
Counter
:
IOmniCounter
read
GetCounter
;
On a system with multiple processor groups you can use SetProcessorGroup
[3.06] function to specify a processor group this task should run on.
On a system with multiple NUMA nodes you can use SetNUMANode
[3.06] function to specify a NUMA node this task should run on.
An information about existing processor groups and NUMA nodes can be accessed through the Environment
object.
This information should be in most cases managed via IOmniTaskControl.ProcessorGroup
, IOmniTaskControl.NUMANode
, IOmniThreadPool.ProcessorGroups
, and IOmniThreadPool.NUMANodes
.
The Enforced
method works the same as the IOmniTaskControl
equivalent. As it must be called before the task starts execution, it cannot be used from the background task itself. OmniThreadLibrary uses this function in the thread pool implementation.
1
procedure
Enforced
(
forceExecution
:
boolean
=
true
)
;
The Implementor
object returns an object implementing the IOmniTask
interface.
1
property
Implementor
:
TObject
read
GetImplementor
;
The ThreadData
property is used to access the internal interface used in the thread pool’s SetThreadDataFactory
mechanism.
1
property
ThreadData
:
IInterface
read
GetThreadData
;
The Building a connection pool example contains more information on this topic.
The StopTimer
method is obsolete and should not be used anymore. Use the ClearTimer
instead.
1
procedure
StopTimer
;