Messenger Class
Messenger connects to other CDP Applications, and sends/receives all CDP messages to/from other applications. More...
Header: | #include <CDPSystem/Messenger/Messenger.h> |
Inherits: | CDPComponent |
Public Functions
virtual | ~Messenger() |
unsigned int | AppHandle() |
unsigned int | AppNumber() |
const ApplicationConnector * | ApplicationConnectionData() const |
virtual void | Create(const char *fullName, const IpAddress ipAddress) |
void | DebugDisplayApplicationList() |
void | DebugDisplaySendBuffer(bool displayAll) |
int | GetApplicationHandle(const char *application) |
int | GetApplicationHandle(unsigned int applicationNumber) |
IpAddress | GetApplicationIpAddress(unsigned int applicationNumber) |
const char * | GetApplicationName(unsigned int applicationHandle) |
IpAddress * | GetIpAddress() |
virtual unsigned int | GetNextMessageId() |
int | GetNumberOfApplications() |
virtual void | HandleMessageReceipt(Message *pMsg) |
bool | IsCommError() |
bool | IsSendSocketNonBlocking() |
virtual unsigned int | SendMessage(Message *msg, bool retryEnabled = true) |
virtual unsigned int | SendMessage(const MessagePacketHandle &message) |
void | SetLocalMessageQueueFullAlarm(const std::string &processName) |
virtual void | Stop() |
const char * | ThisAppName() |
void | UpdateApplicationName() |
virtual void | UpdateApplicationNotifyMsg(MessageApplicationNotify &msgAN) |
void | UseNonBlockingSendSocket(bool bUseNonBlockingSocket) |
bool | VerifyApplicationConnected(unsigned int applicationHandle) |
Reimplemented Public Functions
virtual void | Activate() override |
virtual void | Configure(const char *xml) override |
virtual void | Destroy() override |
virtual int | ReceiveMessage(void *message) override |
virtual bool | ValidatePropertyHandler(CDP::StudioAPI::CDPVariantValue &newValue, CDPPropertyBase *property) override |
- 90 public functions inherited from CDPComponent
- 37 public functions inherited from CDPObject
- 46 public functions inherited from CDPBaseObject
- 26 public functions inherited from CDP::StudioAPI::CDPNode
- 22 public functions inherited from CDP::StudioAPI::ICDPNode
Static Public Members
void | AddGlobalAppConnectedHandler(const std::function<void( unsigned int ) > &myAppConnectedHandler) |
void | AddGlobalAppDisconnectedHandler(const std::function<void( unsigned int ) > &myAppDisconnectedHandler) |
void | AddGlobalAppUpdatedHandler(const std::function<void( const std::string & appName, unsigned int oldHandle, unsigned int newHandle ) > &myAppUpdatedHandler) |
Messenger * | GetInstance() |
- 2 static public members inherited from CDPComponent
- 6 static public members inherited from CDPObject
- 1 static public member inherited from CDPBaseObject
Protected Functions
Messenger() | |
void | ProcessInit() |
void | ProcessReceive(int socketInterfaceNo) |
void | ProcessRetry() |
void | ProcessRunning() |
void | ProcessSend() |
void | ProcessStop() |
void | SendApplicationPing() |
bool | SizeAndDomainMatching(MessageApplicationNotify *pMsgAN) |
bool | TransitionInitToRunning() |
bool | TransitionNullToStop() |
bool | TransitionRunningToStop() |
bool | TransitionStopToInit() |
void | UpdateApplication(MessageApplicationNotify *pMsgAN) |
Reimplemented Protected Functions
virtual void | CreateModel() override |
- 12 protected functions inherited from CDPComponent
- 13 protected functions inherited from CDPObject
- 1 protected function inherited from CDP::StudioAPI::CDPNode
Static Protected Members
OSAPITASK | taskReceive1(OSAPITASK_ARG) |
OSAPITASK | taskReceive1Broadcast(OSAPITASK_ARG) |
OSAPITASK | taskRetry(OSAPITASK_ARG) |
OSAPITASK | taskSend(OSAPITASK_ARG) |
- 1 static protected member inherited from CDPObject
Additional Inherited Members
- 31 protected variables inherited from CDPComponent
- 7 protected variables inherited from CDPObject
- 9 protected variables inherited from CDPBaseObject
Detailed Description
Messenger connects to other CDP Applications, and sends/receives all CDP messages to/from other applications.
Messenger maintains a list of CDP applications communicating in the same subnet(s) as Messenger. It discovers and adds new applications to list, and removes applications which disappears from the subnet(s). Messenger uses ethernet- and ip- protocols to communicate between physical controllers.
The Messenger communicates on the first <NetworkInterface> defined in Application.xml.
See CDPDefines.h for message command definitions.
See MessengerDefines.h for message format definitions.
Usage
Only one global instance can exist in the application space. One global instance of messenger named messenger is created in each application. Access methods by calling the global instance 'Messenger::GetInstance()->'.
The Messenger configuration can be changed in Configure mode by editing the 'SystemName->ApplicationName->CDP->Messenger' configuration.
Method SendMessage(...)
- Call SendMessage(...) passing a pointer to a message filled with data and message header information.
- Use the Message struct to create messages.
- Fill in the message with destination, origin and command.
- The parmaSize member specifies the size of additional user data, exceeding the Message message header.
- All numeric members must be sent using network byte order, accomplished by using htonl(...), htond(...).
- Also when receiving messages all numeric members must be read using ntohl(...), ntohd(...).
- Messages can also be sent using a CDPConnector, the connector keeps track of destination component handle by mapping from name to binary handle.
- A CDPConnector eventually calls Messenger::GetInstance()->SendMessage(...) to send the message.
- SendMessage() returns a message ID for the message. If the message is sent with retry, the objects MessageConfirm() or MessageTimeout() is called with a MessageACK message containing the original message and the messageID.
- Note that SendMessage() can return '0', in which case the message was not sent. When SendMessage returns 0, MessageTimeout is NOT called.
- Note also that if you call SendMessage from a thread (only), you must lock the componentmutex befor SendMessage, and release the mutex afterwards, or else the MessageConfirm/MessageTimeout may be called before SendMessage returns.
State machine
- # Stop: All message transport is disabled.
- # Init: Waits for other applications to connect, sends broadcast ping messages to notify other applications on this applications existence, receive application broadcast notify only, continuously updating expected application handle to send in application ping notify message.
- # Running: Sends and receives messages, while maintaning the connected applications list, receive all messages with specific destination, receive broadcast messages from connected applications only except application notify which is received from any.
Send and receive threads
Network socket send and receive is handled in separate threads with priority level just below the component high priority thread.
Send retry
If parameter retryEnabled = true
when calling SendMessage(...), the message is retransmitted until a receipt is sent from the receiving remote messenger. The number of times to try to send the message is limited, typically 3 to 4 times. Retry send interval is 30 ms. Send retry is executed from a separate, low- priority thread.
Application connect process
Messenger broadcast application notify messages at periodic interval. List of applications is maintained by receiving broadcast notify from other applications. Applications are disconnected and lost if notify is not received within a certain time limit. When connecting an application, the list of applications is searched for existing instances to prevent connection of applications with same name. Existing instance is detected by comparing name, ip address and port, if any is different the application is not the same instance. When application instance is found to already exist on a different handle the existing connection is cleared and the application is connected using the new handle.
When an application is disconnected, you may be notified if you add a function to the m_listAppDisconnectedHandlers, see AddGlobalAppDisconnectedHandler.
Application handle collision
The application handle is read from Application.xml on startup. In Init mode, all application are connected, and a free handle is selected in transition Init to Running.
If the handle was changed during Init, the new handle is saved to Application.xml. If an application trying to connect has handle already connected, the new application is not connected as the application connector is already occupied.
Collision is detected when another application with name different from this and handle similar to this tries to connect. When collision is detected this application and the conflicting application must decide which application should swap handle. Swap handle decision is based on name, which is always different and thus ensures predictable decision, port and ip- address. If decision is to to swap this handle, a new handle is selected by using the same search as in transition Init to Running.
After swapping handle, local CDPConnectors and MessengerIOServer are notified and remote applications are notified by sending broadcast message CM_APPLICATIONHANDLECHANGED. If remote applications fail to receive CM_APPLICATIONHANDLECHANGED they will still manage to fix connections and signal routing, but this process will take much longer time. When receiving CM_APPLICATIONHANDLECHANGED local connectors and messengerioserver in packet handles are updated. If remote application name and handle is equal to this, handle is kept equal and remote application is not connected.
Application name collision
Remote application with same name as this is not connected.
If two or more remote applications have same name, only the first one detected will be connected. If remote application name and handle is equal to this, nothing happens, handle is kept equal and remote application is not connected.
Handles explained
32-bit handle = app handle.component handle, app handle = HIWORD, component handle = LOWORD. CDPObject handle includes app handle.
Broadcast destination = 0x00ff0000; Local destination = 0x0000xxxx;
Handle application part : 0xxxxx0000; Handle CDPObject part : 0x0000xxxx; Messenger destination handle for broadcast: 0x00ffxxxx; Messenger destination handle for local : 0x0000xxxx;
To extract application and component part of handle:
#define HANDLE_APPLICATIONPART(X) (X >> 16) #define HANDLE_COMPONENTPART(X) (X & 0x0000ffff)
Parameters
- # kMonitorFilter: Unit=0..1. IIR filter constant 0..1 for monitor signal filtering. 1 == filter disabled.
- # Max retry count:" Unit=Count. Maximum UDP send retry count for a message.
- # Network disabled: Unit=Bool. Set == 1 to disable messenger network send and isolate this application from the network.
Messages
Messenger handles broadcast messages and reroutes many messages to other system components. Some system messages are handled both by messenger and application.
- # CM_APPLICATIONNOTIFY: An application notifies its existence. (MessageApplicationNotify)
- # CM_APPLICATIONSTOPPED: An application has stopped, disconnect it.
- # CM_APPLICATIONPING: Ping request from another application, must reply by sending notify message CM_APPLICATIONNOTIFY.
- # CM_STATUS: Receiving status reply from component. Message is routed to CDPConnection::RouteStatusMessage(...).
- # CM_NOTIFY: Receiving notify reply from component. Message is routed to CDPConnection::RouteNotifyMessage(...).
- # CM_REQUESTHANDLE: CDPObject/CDPComponent handle request from remote, send notify reply if object/component found. Message is routed to engine.SendNotify(...).
- # CM_APPLICATIONHANDLECHANGED: Application changed notify from an application.
See also ApplicationConnector.
Member Function Documentation
[protected]
Messenger::Messenger()
Default constructs an instance of Messenger.
[virtual]
Messenger::~Messenger()
Destroys the instance of Messenger. The destructor is virtual.
[override virtual]
void Messenger::Activate()
Reimplemented from CDPBaseObject::Activate().
Activate the messenger to enable message communication.
[static]
void Messenger::AddGlobalAppConnectedHandler(const std::function<void( unsigned int ) > &myAppConnectedHandler)
Adds an Application Connected Handler function myAppConnectedHandler
Example code if you want to add a function in class MyClass, which will be called after another application is connected by Messenger: otherAppNumber
will be in range [1..MESSENGER_MAXNUMBEROFAPPLICATIONS]:
#include <functional> ... using namespace std::placeholders; Messenger::AddGlobalAppConnectedHandler( std::bind(&MyClass::AppConnectedHandler, this, _1) );
where AppConnectedHandler is declared as
void AppConnectedHandler(unsigned int otherAppNumber);
[static]
void Messenger::AddGlobalAppDisconnectedHandler(const std::function<void( unsigned int ) > &myAppDisconnectedHandler)
Adds an Application Disconnected Handler function myAppDisconnectedHandler
Example code if you want to add a function in class MyClass, which will be called after another application is disconnected by Messenger. otherAppNumber
will be in range [1..MESSENGER_MAXNUMBEROFAPPLICATIONS]:
#include <functional> ... using namespace std::placeholders; Messenger::AddGlobalAppDisconnectedHandler( std::bind(&MyClass::AppDisconnectedHandler, this, _1) );
where AppDisconnectedHandler is declared as
void AppDisconnectedHandler(unsigned int otherAppNumber);
[static]
void Messenger::AddGlobalAppUpdatedHandler(const std::function<void( const std::string & appName, unsigned int oldHandle, unsigned int newHandle ) > &myAppUpdatedHandler)
Adds an Application Updated Handler function myAppUpdatedHandler which is called when the application handle is changed or when any of the ApplicationConnector fields are changed.
Note: The application handle is changed when a duplicate handle is detected. It is recommended to use appName
to identify which application was updated instead of the oldHandle
.
Example code if you want to add a function in class MyClass, which will be called after another application handle is changed or its fields are updated by Messenger: oldHandle
and newHandle
will be in range [1..MESSENGER_MAXNUMBEROFAPPLICATIONS]:
#include <functional> ... using namespace std::placeholders; Messenger::AddGlobalAppUpdatedHandler( std::bind(&MyClass::AppUpdatedHandler, this, _1, _2, _3) );
where AppUpdatedHandler is declared as
void AppUpdatedHandler(const std::string& appName, unsigned int oldHandle, unsigned int newHandle);
unsigned int Messenger::AppHandle()
Returns the application handle of this application.
Note: Lower 16 bits are 0, application number is in upper 16 bits. (AppHandle() == ThisApp() << 16).
unsigned int Messenger::AppNumber()
Returns the high-word handle number of this application (ThisApp() == AppHandle() >> 16).
const ApplicationConnector *Messenger::ApplicationConnectionData() const
ApplicationData returns pointer to array of ApplicationConnector objects with size of MESSENGER_MAXNUMBEROFAPPLICATIONS+1
Position 0 should be ignored as that contains broadcast connector info.
[override virtual]
void Messenger::Configure(const char *xml)
[virtual]
void Messenger::Create(const char *fullName, const IpAddress ipAddress)
Creates the messenger.
[override virtual protected]
void Messenger::CreateModel()
Reimplemented from CDPBaseObject::CreateModel().
Create the Messenger Model
void Messenger::DebugDisplayApplicationList()
Display connected applications on CDPMessages debug interface.
void Messenger::DebugDisplaySendBuffer(bool displayAll)
Executes DebugDisplaySendBuffer in all BufferHandlers. Prints message-info on CDPMessages debug interface.
If displayAll
is true
, message-info from all positions in sendbuffers are printed, both valid messages, old messages (and unused positions). If displayAll
is false
, only message-info from occupied positions in sendbuffers, or positions which have sendflag set, are printed.
[override virtual]
void Messenger::Destroy()
int Messenger::GetApplicationHandle(const char *application)
Returns application handle for application name.
Returns 0 if name not found.
int Messenger::GetApplicationHandle(unsigned int applicationNumber)
Returns application handle for application number.
Returns 0 if number is not valid or an application with this slot number is not connected.
IpAddress Messenger::GetApplicationIpAddress(unsigned int applicationNumber)
Returns an application's IP-address.
const char *Messenger::GetApplicationName(unsigned int applicationHandle)
Returns name of application from applicastion number or handle.
Automatically adjusts applicationHandle parameter, will interpret as number if 0x0000XXXX, handle if 0xXXXX0000.
Returns "" if not found. Will also work for the local application.
[static]
Messenger *Messenger::GetInstance()
Return the Messenger Singleton instance, create it if it is not yet created.
IpAddress *Messenger::GetIpAddress()
Return the IPAddress that the messenger is using.
[virtual]
unsigned int Messenger::GetNextMessageId()
Returns a new message Id to be used in the header when sending messages.
int Messenger::GetNumberOfApplications()
Returns highest connected application number.
Does not necessarily equal number of applications in the system, slots within this range may be free.
Note: Only remote applications are included, thisApp may be higher.
[virtual]
void Messenger::HandleMessageReceipt(Message *pMsg)
Handles CM_MESSAGERECEIPT messages. Will remove message waiting for receipt from SendBuffer.
bool Messenger::IsCommError()
Check if communication is running.
bool Messenger::IsSendSocketNonBlocking()
Return the non-blocking mode of Messengers sendsocket.
[protected]
void Messenger::ProcessInit()
Init process sends application ping requests.
Will enter 'Running' after timeout.
[protected]
void Messenger::ProcessReceive(int socketInterfaceNo)
The receive message task.
Receives message from remote applications and delivers local application destination messages to CDPEngine.
Parameter socketInterfaceNo | Must have
|
[protected]
void Messenger::ProcessRetry()
The retry send message process.
[protected]
void Messenger::ProcessRunning()
Running process does periodic maintenance:
- Sends application notify at periodic interval.
- Monitors connected applications for timeout and diconnects lost applications
- Updates messenger and network system information values.
[protected]
void Messenger::ProcessSend()
The send message process.
Sends all messages to remote applications.
[protected]
void Messenger::ProcessStop()
Sets requestedState
to "Init".
[override virtual]
int Messenger::ReceiveMessage(void *message)
Reimplemented from CDPBaseObject::ReceiveMessage().
Handle messages sent to the Messenger.
Returns > 1 if the message was handled.
Parameter message | The message, of format Message. |
[protected]
void Messenger::SendApplicationPing()
Send application notify request broadcast message.
[virtual]
unsigned int Messenger::SendMessage(Message *msg, bool retryEnabled = true)
[virtual]
unsigned int Messenger::SendMessage(const MessagePacketHandle &message)
Send a Message message.
This method is reentrant- safe and may be called at any time from any thread.
Local messages are delivered to CDPEngine immediately. Remote messages are buffered and will be sent from the send message task.
Messages are handled by engine. Engine stores messages in a buffer.
- Set retryEnabled = false to avoid receipt (typically for periodic messages).
- SendMessage() returns a message ID for the message.
- If the message is sent with retry, the calling object's MessageConfirm() or MessageTimeout() is called with a MessageACK message containing the original message and the messageID.
- Note that SendMessage() can return '0' if the message sending fails in an early stage (typically no network or buffer full).
- When SendMessage returns 0, the calling objects MessageTimeout() function is NOT called.
Parameter msg | The message. Header always of Message format. |
Parameter retryEnabled | Set to false to avoid receipt and send retry for periodic messages. If true: Default value retryCount == 4. Number of retries = retryCount-1. |
void Messenger::SetLocalMessageQueueFullAlarm(const std::string &processName)
Set alarm alerting that some Message queue is full
Parameter processName | Name of the process where Queue is full |
[protected]
bool Messenger::SizeAndDomainMatching(MessageApplicationNotify *pMsgAN)
Returns true if parameter-size is OK and domain matches, false if not.
[virtual]
void Messenger::Stop()
Suspend the messenger so that messages are no longer transmitted.
const char *Messenger::ThisAppName()
Returns the name of this application.
[protected]
bool Messenger::TransitionInitToRunning()
Will go from state Init to Running if requestedState
is "Running". requestedState
is set to "Running" in ProcessInit()
after a timeout.
- On entry: Restarts timer to start sending Notify-messages.
- On entry: Find new, free application handle
[protected]
bool Messenger::TransitionNullToStop()
Initial state transition, will always go from Null to Stop.
[protected]
bool Messenger::TransitionRunningToStop()
Will go from state Running to Stop if requestedState
is "Running".: requestedState
is only set to "Stop" when Application is stopping.
- On entry: Send CM_APPLICATIONSTOPPED message to all applications.
[protected]
bool Messenger::TransitionStopToInit()
Will go from state Stop to Init if requestedState
is "Init". requestedState
is set to "Init" in ProcessStop()
and in Activate()
.
- On entry: Start the find- all- computers- in- local- network- segment process.
- On entry: Restarts some timers.
- On entry: Create broadcast connectors if necessary.
- Does not send notify messages from this application now, only ping.
[protected]
void Messenger::UpdateApplication(MessageApplicationNotify *pMsgAN)
Update application connection.
Connects application if not connected, verifies ip adress and port number, resets connection timeout.
Parameter pMsgAN | Pointer to the received MessageApplicationNotify. |
void Messenger::UpdateApplicationName()
Update local application name copy from application component.
[virtual]
void Messenger::UpdateApplicationNotifyMsg(MessageApplicationNotify &msgAN)
Updates contents (not header) of a MessageApplicationNotify-msg.
void Messenger::UseNonBlockingSendSocket(bool bUseNonBlockingSocket)
Set if Messenger should use a non-blocking send-socket or not.
[override virtual]
bool Messenger::ValidatePropertyHandler(CDP::StudioAPI::CDPVariantValue &newValue, CDPPropertyBase *property)
bool Messenger::VerifyApplicationConnected(unsigned int applicationHandle)
Return true if this application is connected.
Automatically adjusts applicationHandle parameter, will interpret as number if 0x0000XXXX, handle if 0xXXXX0000.
[static protected]
OSAPITASK Messenger::taskReceive1(OSAPITASK_ARG)
Wrap to Messenger::GetInstance()->ProcessReceive(...).
[static protected]
OSAPITASK Messenger::taskReceive1Broadcast(OSAPITASK_ARG)
Wrap to Messenger::GetInstance()->ProcessReceive(...).
[static protected]
OSAPITASK Messenger::taskRetry(OSAPITASK_ARG)
Wrap to Messenger::GetInstance()->ProcessRetry();
Parameter OSAPITASK_ARG | Argument to task. |
[static protected]
OSAPITASK Messenger::taskSend(OSAPITASK_ARG)
Wrap to Messenger::GetInstance()->ProcessSend();
Parameter OSAPITASK_ARG | Optional parameter. |
Get started with CDP Studio today
Let us help you take your great ideas and turn them into the products your customer will love.