ServiceBlock Class
(Sequencer::ServiceBlock)The ServiceBlocks
are written in code to build new event-based operators that need to use blocking calls like accessing databases, using sockets to send/receive data, accessing the file system, etc. More...
Header: | #include <ServiceBlock> |
Inherits: | |
Inherited By: | Sequencer::CSVFileReader, Sequencer::TextFileReader, Sequencer::TextFileWriter, and TestLib::FastAsyncer |
Public Functions
ServiceBlock(const CDPPropertyBase &in) |
Reimplemented Public Functions
virtual void | Configure(XMLPrimitive *objectElement) override |
virtual void | Create(const char *shortName, CDPBaseObject *parentBaseObject) override |
- 18 public functions inherited from CDPOperator
- 17 public functions inherited from CDPOperatorBase
- 46 public functions inherited from CDPBaseObject
- 26 public functions inherited from CDP::StudioAPI::CDPNode
- 22 public functions inherited from CDP::StudioAPI::ICDPNode
Protected Functions
void | Await(double seconds) const |
void | Emit(EventID eventOutID, ArgumentData data = {}, EventID eventInID = {}) const |
void | EmitRawPacket(EventID eventOutID, const MessagePacketHandle &rawMessage) const |
Event * | FindEvent(const std::string &name) |
Event * | FindEvent(const EventID &eventInID) |
void | ForEachEvent(const std::function<void( Event & ) > &callback) |
bool | IsAbortRequested() const |
void | OverrideWorkerPool(CDPThreadPool *pool) |
virtual EventID | ProcessAsync(const EventID &eventInID, ArgumentData &data) const |
virtual bool | ProcessEmit(const EventID &eventInID, const EventID &eventOutID, ArgumentData &data) |
virtual bool | ProcessEvent(const EventID &eventInID, ArgumentData &data) |
- 1 protected function inherited from CDP::StudioAPI::CDPNode
Additional Inherited Members
- 1 public variable inherited from CDPOperatorBase
- 1 static public member inherited from CDPBaseObject
- 8 protected variables inherited from CDPOperator
- 5 protected variables inherited from CDPOperatorBase
- 11 protected variables inherited from CDPBaseObject
Detailed Description
The ServiceBlocks
are written in code to build new event-based operators that need to use blocking calls like accessing databases, using sockets to send/receive data, accessing the file system, etc.
ServiceBlock
allows processing message events in a background thread safely. For each input event ProcessAsync() function is called and desired output event can just be returned from the call to be sent.
There may be a need to do something either before the ProcessAsync() call, in the ServiceBlock's thread context or after the ProcessAsync() call, back in the ServiceBlock's thread context before emitting the output event.
- To add code before the ProcessAsync() is executed, override the ProcessEvent() function.
- To add code before an event is emitted after calling the Emit() function or when the ProcessAsync() function returns, override the ProcessEmit() function.
Properties
Name | Description |
---|---|
ParallelProcessing | Enables ProcessAsync() to be run in parallel in case input events arrive in quick succession. This option is true by default. ParallelProcessing should be disabled and set ReadOnly for ServiceBlocks that use a resource that is not safe to use in parallel. |
EventID and ArgumentID
There are two helper data structures: EventID
and ArgumentID
. These allow safe access to ServiceBlock data and pass event identifiers from/to asynchronous context. The EventID
allows comparing with Event member in ProcessAsync() to identify the triggering event source when needed.
EventID WhatEvent::ProcessAsync(const EventID& eventInID, ArgumentData& data) const { if (eventInID == GoodIn) return GoodOut; return BadOut; }
The ArgumentID
is used as a key in the ArgumentData
data map. By default, it is populated by input Argument
values so they can be safely accessed from the data map in ProcessAsync() function. The ArgumentID
also has meta
member that is for the end user to add any kind of meta-information about the data when adding custom data in ProcessEvent() function, like names of the Arguments etc. The ArgumentID
also supports exposing data from other CDP value types via the ArgumentData
data map.
Note: ServiceBlock
has a internal read only ArgumentData
flags property that can change what data is auto populated to ArgumentData data map in your custom block. This allows somtimes skiping the ProcessEvent to repopulate the data map with desired arguments.
bool CustomDataOp::ProcessEvent(const EventID& eventInID, ArgumentData& data) { // Adding a new value of MyProperty and name as meta info to ArgumentData data[{{MyProperty}, {MyProperty.GetNodeName()}}] = MyProperty.GetVariantValue(); // To add/update meta info for a value already in the map it needs to be extracted, updated and added back. // An alternative approach would be to clear the data map and repopulate it with desired items // for example when all arguments need names in meta info. auto item = data.extract(MyArgument); item.key().meta = MyArgument.GetNodeName(); data.insert(std::move(item)); return true; } EventID CustomDataOp::ProcessAsync(const EventID& eventInID, ArgumentData& data) const { // Print out all items in data with meta info and value for (auto& mapPair : data) CDPMessage("Key.meta=%s, Value=%s\n", mapPair.first.meta.c_str(), mapPair.second.ToString().c_str()); if ((int)data[MyProperty] > 0) { data[MyProperty] = 0; return GoodValue; } return BadValue; } bool CustomDataOp::ProcessEmit(const EventID& eventInID, const EventID& eventOutID, ArgumentData& data) { // Update MyProperty member value from the data map MyProperty = (int)data[MyProperty]; return true; }
Example
The ReadFile ServiceBlock is intended to provide parallel and sequential support for reading lines from a given file. Upon the arrival of the Read event message, the block will start an asynchronous process to open and read line data to send with the Line event message. When reading is done the ServiceBlock emits a Done event message and when there is a file open error it emits an Error event message with error text.
The following is a list of library-includes, members and types that are used, with short comments about any configuration performed on them.
#include <iostream> #include <fstream> #include <string> Sequencer::Event Read; // Input event with FileName argument selected in the event's Data member Sequencer::Event Line; // Output event with FileName, TextLine argument selected in the event's Data member Sequencer::Event Done; // Output event with FileName argument selected in the event's Data member Sequencer::Event Error; // Output event with FileName, ErrorText argument selected in the event's Data member Argument<std::string> FileName; // Input argument for file name Argument<std::string> TextLine; // Output argument to hold the line from the file for each Line event Argument<std::string> ErrorText; // Output argument string to hold error message
The ProcessAsync() function is called from the worker thread context so accessing ServiceBlock members or containers directly is not advised. Argument
values are made available for safe access via the ArgumentData
data
map. Event
names can safely be used to emit events from the function, and Argument
names can be used to access the Argument
values in the data
map.
Note: Output Argument
values are not by default available for reading. Reading an output Argument
value from data
map returns a variant with IsValid() false. It is possible to use the ProcessEvent() function to pre-populate ArgumentData
data
map with desired output Argument
values.
EventID ReadFile::ProcessAsync(const EventID& eventInID, ArgumentData& data) const { std::ifstream fileStream; std::string readLine; fileStream.open((std::string)data[FileName], std::ios::in); if (!fileStream) { data[ErrorText] = std::string(strerror(errno)); return Error; } while (!fileStream.eof()) { if (IsAbortRequested()) return {}; // abort without output event std::getline(fileStream, readLine); if (readLine.empty() && fileStream.eof()) return Done; data[TextLine] = readLine; Emit(Line, data); } return Done; }
See also Basic Block and Composite Block.
Member Function Documentation
ServiceBlock::ServiceBlock(const CDPPropertyBase &in)
Default constructs an instance of ServiceBlock.
[protected]
void ServiceBlock::Await(double seconds) const
Allows other tasks to use current worker for seconds amount of time.
Note: The actual time depends on the task lengths being processed but is at least the requested amount of time.
Note: Using Await in a large number of service blocks will require more worker stack to be allocated in CDPEngine configuration.
[override virtual]
void ServiceBlock::Configure(XMLPrimitive *objectElement)
Reimplemented from CDPBaseObject::Configure().
[override virtual]
void ServiceBlock::Create(const char *shortName, CDPBaseObject *parentBaseObject)
Reimplemented from CDPBaseObject::Create().
[protected]
void ServiceBlock::Emit(EventID eventOutID, ArgumentData data = {}, EventID eventInID = {}) const
Sends event message eventOutID from any context.
The function is typically used to send multiple event messages from ProcessAsync(). The ProcessEmit() will be called in block context after Emit() calls for possible post processing.
[protected]
void ServiceBlock::EmitRawPacket(EventID eventOutID, const MessagePacketHandle &rawMessage) const
[protected]
Event *ServiceBlock::FindEvent(const std::string &name)
[protected]
Event *ServiceBlock::FindEvent(const EventID &eventInID)
[protected]
void ServiceBlock::ForEachEvent(const std::function<void( Event & ) > &callback)
[protected]
bool ServiceBlock::IsAbortRequested() const
Return current AbortRequested
status from any context.
The function should be used to check the need to abort from ProcessAsync() code prematurely due to app exit or block being deleted.
[protected]
void ServiceBlock::OverrideWorkerPool(CDPThreadPool *pool)
Allows overriding the default blocks behavior of using the CDPEngine worker pool
Does not take ownership of the pool.
To override worker pool the OverrideWorkerPool() must be called with self constructed CDPThreadPool in Configure() after call to ServiceBlock::Configure() base.
[virtual protected]
EventID ServiceBlock::ProcessAsync(const EventID &eventInID, ArgumentData &data) const
Asynchronous function to implement potentially slow or long processing of event messages.
Called after ProcessEvent() returning true value in worker thread context to perform asynchronous processing.
Note: It is not advisable to use sleep calls in ProcessAsync() as it consumes shared CDPEngine
worker threads. Any required waiting that can't be implemented in other ways should use Await()
The ArgumentData
data will by default contains all input Argument
member values and eventInID indicates the triggering event. Output values to be returned should be assigned to the ArgumentData
data variable.
data[ArgOut] = (int)data[ArgIn] + 1;
ProcessAsync() is by default a reentrant function and implementation should take into account that the code may run multiple times in parallel.
The function may emit multiple output event messages via calls to the Emit() function and/or emit an event by returning an event message member. Argument
values present in ArgumentData
data map after returning will be used to update block Argument
members in case the ProcessEmit() call is not used to remove any of them.
If the code does not return an event message nor call Emit(), the ProcessEmit() function will not be called and there will not be an output event message from the block.
[virtual protected]
bool ServiceBlock::ProcessEmit(const EventID &eventInID, const EventID &eventOutID, ArgumentData &data)
Synchronous function for event message processing to implement anything prior event message is sent in ServiceBlock thread.
Called after ProcessAsync() returns an event or after Emit() call in block thread context to perform processing before an event is emitted. The function by default returns true to allow Argument
members to be updated from data map and event message to be sent.
The ProcessEmit() can be used for multiple things:
- Assign custom variables in data map back to block variables.
- Remove input
Arguments
from data map so they will not be assigned before emitting the event. - Cancel emitting the event message by returning false from the call.
The eventInID indicates triggering event and eventOutID indicates the event to be emitted.
Argument
values present in ArgumentData
data map after returning will be used to update Argument
values before emitting the event.
[virtual protected]
bool ServiceBlock::ProcessEvent(const EventID &eventInID, ArgumentData &data)
Synchronous function for event message processing to implement pre-processing before asynchronous code is started.
Called on each reception of event message before ProcessAsync() with eventInID indicating the triggering event. The function by default returns true to allow event message processing in ProcessAsync().
The function is called from the block's thread when an event message is received and can be overridden to access block members and fill or update the ArgumentData
data map with additional member data for the ProcessAsync() call. Before the ProcessEvent() call, ArgumentData
data map is filled with input Argument
values.
The function can also be used to identify and discard event messages when needed by returning false from the function. When processing of event message should continue with the call to ProcessAsync(), return true.
Get started with CDP Studio today
Let us help you take your great ideas and turn them into the products your customer will love.