Inter process communication with $SYSTEM.Event

intersystemsdev

InterSystems Developer

Posted on February 13, 2024

Inter process communication with $SYSTEM.Event

Hi, developers!

Currently, I'm working on a project that requires highly dynamic event management. In the context of the Java programming language, my first instinct should be to opt for the "Observer Pattern", which is an approach to managing interactions between objects by establishing a notification mechanism. It allows multiple observers to react to changes in the state of a subject autonomously, promoting code flexibility and modularity. If you are not familiar with this design pattern, check out Wikipedia to find more information about it.


While it's natural and commonly used in certain programming languages as Java and C++, in ObjectScript, it's quite a different story.


These languages differ significantly.While Java allows you to start threads, in ObjectScript, you can only launch new processes. This distinction is noticeable in several aspects:

  • Threads: sshare the same memory, which makes it easier to communicate and share data between them. However, this can also lead to concurrency issues if synchronization is not managed properly.

 

  • Processes: are independent executions with their own memory space. They offer complete isolation from each other, meaning that one process cannot directly access another one's memory. Due to their isolation, processes are safer in case an error occurs because a faulty process will not be able to affect other system processes. However, communication between them is more complex than between threads since it requires such inter-process communication (IPC) mechanisms as sockets, pipes, or message queues.


This fundamental difference calls for a different approach. However, is it not conceivable to develop a library fast enough and solve this problem? This is the problem I have faced, and I wish to present my solution to you now. As usual for my articles, you will find the source code on a GitHub repository and a package available on Open Exchange.

The goal is not to replicate the Observer Pattern but to develop a quick fix to facilitate inter-process communication in order to notify processes waiting for an event. Let's take as an example a process that handles patient creation.  In this scenario, we could represent the following sequence of activities:

Image description

Process A informs the "Patient" event manager and is responsible for sending a message to all registered observers.To set this up, it is essential to determine how to adjust inter-process communication. Upon consulting the documentation, I quickly discovered a perfect class for solving this issue: $SYSTEM.Event.

You can perform a brief test by opening two IRIS terminal sessions. The first one will be intended to create the resource and listen to incoming messages, while the second one will be used to send notifications.

Session en écoute Session qui notifie
Do $SYSTEM.Event.Create("MyResource")

Set message = $SYSTEM.Event.WaitMsg("MyResource", 10)

 

Do $SYSTEM.Event.Signal("MyResource","Hello, from PID:"_$Job)



 

 

After sending the signal, the content of the "message" variable will be as follows:

$lb(1,"Hello, this is a message from PID:710")

The first position is a return code:

  • 1: Message received.
  • 0: Timeout (10 seconds in the example above).
  • -1: Error situation "MyResource" resource was removed.


Note: It's not mandatory to create the resource with $SYSTEM.Event.Create; in this case waiting can be done with the next line:

Set message = $SYSTEM.Event.WaitMsg("",10)

The signal must specify the PID (Process ID) of the listening process:

$SYSTEM.Event.Signal(<pid>, "Hello from:"_$Job)


In this example, we transmitted a simple string because it is impossible to pass objects directly. As explained earlier in this article, a process cannot access another process's memory, making it impossible to transmit object references. However, thanks to the %DynamicObject class, we can easily convert objects to JSON strings and vice versa. Therefore, we can do the following:

Do $SYSTEM.Event.Signal("MyResource",{"PID":($JOB),"Msg":"Hello!"}.%ToJSON())

; After receiving a message: 
Set object = {}.%FromJSON($ListGet(message,2))

Furthermore, if you are working with a recent version of IRIS, you have the option to extend your existing classes using %JSON.Adaptor and easily export their instances with the %JSONExportToString method.

By combining the use of $SYSTEM.Event and %DynamicObject, you can establish inter-process communication in just a few lines, sending both simple messages and objects.

It is important to note that while $SYSTEM.Event is very convenient, it will not work on an ECP (Enterprise Cache Protocol) system, as mentioned in the official documentation:

“There is no networking support for these functions - processes can only wait on and awaken resources on the same system.”

It is imperative to exercise great caution when using $SYSTEM.Event because it relies on "SharedMemory," an essential but limited space for system operation. If a process sends a large number of signals, and the destination process cannot deal with them quickly enough, the signal queue will grow, potentially leading to a system crash. Below there is an overview of what you might find in the "messages.log" file:

10/04/23-08:33:47:501 (1057) 1 [Generic.Event] SMH Surrender Stage 1 started.
10/04/23-08:33:47:542 (1057) 1 [Generic.Event] SMH Surrender Stage 2 started.
10/04/23-08:33:48:547 (818) 2 [Utility.Event] ISCLOG: WorkMgr appendError: Error ns=%SYS rtn=%SYS.WorkQueueMgr  data=$lb("rc","0 "_$lb($lb(5002,"<STORE>RunDaemon+167^%SYS.WorkQueueMgr",,,,,,,,$lb(,"%SYS",$lb("D^RunDaemon+167^%SYS.WorkQueueMgr +1","D^StartWorkDaemon+4^STU +1"))))/* ERROR #5002: ObjectScript error: <STORE>RunDaemon+167^%SYS.WorkQueueMgr */,"group",,"stack",$lb("d^RunDaemon+226^%SYS.WorkQueueMgr^1","d^StartWorkDaemon+4^STU^1","d^^^0"),"$zu(56,2)","$Id: //iris/2023.2.0/kernel/common/src/events.c#1 $ 568 4")
10/04/23-08:34:00:202 (1033) 2 [Utility.Event] [SYSTEM MONITOR] SMHState Alert: Shared Memory Heap state Troubled
10/04/23-08:34:00:978 (1032) 0 [Utility.Event] Task Manager Error from CheckSchedule - Error 0
10/04/23-08:34:51:308 (1063) 2 [Utility.Event] ISCLOG: CSPServer  Error displaying login page $ZE= ns=%SYS rtn=%SYS.cspServer
10/04/23-08:35:00:290 (1033) 2 [Utility.Event] [SYSTEM MONITOR] SMHPercentFull Alert: SMHPercentFull = 100, 100, 100 (Max value is 98).

To prevent such a situation, you can opt for bidirectional communication. When a signal gets sent, the process waits for an “acknowledgement” before proceeding, as illustrated in the following example:

Do $SYSTEM.Event.Create("MyResource")
Write !,"Type < ctrl+c > to stop listening."
Try {
    For  {
        Set result = $SYSTEM.Event.WaitMsg("MyResource",10)
        Set code = $ListGet(result,1)
        Continue:code=0
        Quit:code=-1
        Set event = {}.%FromJSON($ListGet(result,2))
        Write !, "Content : ", event.Content
        Do $SYSTEM.Event.Signal(event.PID) ; Send aknowledgement
    }
} Catch ex {
    If $SYSTEM.Event.Defined("MyResource") {
        Do $SYSTEM.Event.Clear("MyResource")
        Do $SYSTEM.Event.Delete("MyResource")
    }
}

 

For i=1:1:100 {
    Do $SYSTEM.Event.Signal("MyResource",{"PID":($Job),"Content":"Hello!"})
    Set code = $SYSTEM.Event.Wait("",3) ; Wait acknowledgement
    If code < 1 {
        Write "Something went wrong."
        If $SYSTEM.Event.Defined("MyResource") {
            Do $SYSTEM.Event.Clear("MyResource")
            Do $SYSTEM.Event.Delete("MyResource")
        }
        Quit
    }
}

If you adopt this approach, your process's queue will never contain more than one message at a time. Also, make sure that you always perform a "Clear" operation before "Delete" to avoid leaving unprocessed messages in the queue indefinitely.

Now that we have laid the groundwork, we can implement an "abstract" base class with the listener role.

Class dc.ipcutils.ListenerAbstract Extends %RegisteredObject
{

Parameter EVENTTYPE;
Parameter WAITTIMEOUT = 10;
Parameter VERBOSE = 0;
Property ResourceName As %String [ Internal ];
Property Verbose As %Boolean [ InitialExpression = {$Get(%zverbose,..#VERBOSE)}, Internal ];
Property EventType As %String [ InitialExpression = {..#EVENTTYPE} ];
Property Event As %DynamicObject;
/// could be a string or a dynamicobject
Property Data;
Property Context As %DynamicObject;
Property LastState As %Integer [ InitialExpression = 0 ];
Method OnStartListen(Context As %DynamicObject = {}) As %Status
{
    Set ..ResourceName = ..GenResourceName()
    Do $SYSTEM.Event.Create(..ResourceName), ##class(dc.ipcutils.Manager).Subscribe(##this, Context)
    Write:..Verbose !, $zdt($h,3,1), " + Listening ", ..EventType, " with resourcename ", ..ResourceName, " started."
    Quit $$$OK
}

Method Listen() As %Status
{
    Set sc = $$$OK
    $$$QuitOnError(..OnStartListen())
    
    Try {
        For  If ..Wait() = -1 $$$ThrowStatus($$$ERROR($$$GeneralError,$$$FormatText("Resource %1 deleted.",..ResourceName)))
    } Catch Ex {
        If Ex.Name '[ "<INTERRUPT>" Set sc = Ex.AsStatus()
    }
    
    Quit $$$ADDSC(sc,..OnStopListen())
}

Method Wait(
    TimeOut As %Integer = {..#WAITTIMEOUT},
    sc As %Status = {$$$OK}) As %Integer
{
    Do:..LastState=1 ..SendAck() ; We have to send a ack before waiting a new incoming message.
    Set result = $SYSTEM.Event.WaitMsg(..ResourceName, TimeOut)
    Set ..LastState = $ListGet(result,1)

    If ..LastState < 1 Quit ..LastState
    Set ..Event = {}.%FromJSON($ListGet(result,2))
    Set ..Data = ..Event.Data
    Set ..Context = ..Event.Context
    
    Do ..Event.%Remove("Data")
    Do ..Event.%Remove("Context")

    Try {
        Do ..Update(..Event, ..Data, ..Context)
    } Catch (ex) {
        Set sc = ex.AsStatus()
        Set ^Listener.Err("last") = $zdt($h,3,1)_" "_$SYSTEM.Status.GetOneErrorText(sc)
    }

    Quit ..LastState
}

Method SendAck()
{
    Do $SYSTEM.Event.Signal(..Event.PIDSource,..Event.%ToJSON())
}

Method Update(
    EventObject As %DynamicObject,
    Data As %DynamicObject,
    Context As %DynamicObject) As %Status
{
    Quit $$$OK
}

Method WaitEvent(
    Output Event As %DynamicObject,
    TimeOut As %Integer = {..#WAITTIMEOUT}) As %Integer
{
    Set result = $SYSTEM.Event.WaitMsg(..ResourceName, TimeOut), returnCode = $ListGet(result,1), Event = ""
    If returnCode < 1 Quit returnCode
    
    Set ..Event = {}.%FromJSON($ListGet(result,2))
    Set ..Data = Event.Data
    Set ..Context = Event.Context
    Do ..Event.%Remove("Data"), ..Event.%Remove("Context")

    Quit returnCode
}

Method OnStopListen(Context As %DynamicObject = {}) As %Status
{
    Write:..Verbose !, $zdt($h,3,1), " - Listening ", ..EventType, " with resourcename ", ..ResourceName, " has been STOPPED."
    Do:$SYSTEM.Event.Defined(..ResourceName) $SYSTEM.Event.Clear(..ResourceName), $SYSTEM.Event.Delete(..ResourceName)
    Quit ##class(dc.ipcutils.Manager).UnSubscribe(##this, Context)
}

Method GenResourceName() As %String [ CodeMode = expression, Private ]
{
$Translate($SYSTEM.Encryption.Base64Encode($Job_$zcrc(..EventType_$ZDT($H,3,1),7)),"=")
}

}

Listeners need to register with a "Manager." Let's move on to writing this manager, which will handle the functions of subscribing and unsubscribing as well as notifications:

Include ipcutils

Class dc.ipcutils.Manager
{

Parameter ACKTIMEOUT = 3;
ClassMethod Subscribe(
    Observer As dc.ipcutils.ListenerAbstract,
    Context As %DynamicObject = {}) As %Status
{
    Set $$$Subscribed(Observer.EventType, $Classname(Observer), Observer.ResourceName) = $ListBuild(Observer.EventType,Observer.ResourceName,$zdt($H,3,1),$Job,Context.%ToJSON())
    Quit $$$OK
}

ClassMethod UnSubscribe(
    Observer As %String,
    Context As %DynamicObject) As %Status
{
    Kill $$$Subscribed(Observer.EventType, $Classname(Observer), Observer.ResourceName)
    Quit $$$OK
}

ClassMethod Notify(
    Event As %String,
    Data As %DynamicObject) As %Status
{
    #def1arg ResourceName(%val)     $QSubscript(%val,3)
    #def1arg Context(%val)          {}.%FromJSON($ListGet(%val,5))
    
    #dim NotifyObject As %DynamicObject = ##class(dc.ipcutils.Manager).GenEventObject(Event)
    
    Set sc = $$$OK
    Set node = $Name($$$Subscribed(NotifyObject.EventType))
    For  {
        Set node = $Query(@node,1,value)
        Quit:node=""||(NotifyObject.EventType'=$QSubscript(node,1))
        
        Set resourceName = $$$ResourceName(node)
        Set NotifyObject.Context = $$$Context(value), NotifyObject.Data = Data

        ; check if the resource exists AND if process that creates the resource still exists!        
        If '$System.Event.Defined(resourceName) || '$Data(^$JOB($ListGet(value, 4))) Do UnSubscribe Continue
        Do $SYSTEM.Event.Signal(resourceName, NotifyObject.%ToJSON())
        Do WaitAck

        Do NotifyObject.%Remove("Context"), NotifyObject.%Remove("Data")
    }
    Quit sc

UnSubscribe
    Do:$SYSTEM.Event.Defined(resourceName) $SYSTEM.Event.Clear(resourceName), $SYSTEM.Event.Delete(resourceName)
    Kill @node
    Quit
WaitAck
    Set start = $zh, match = $$$NO
    Do {
        Set syncResult = $SYSTEM.Event.WaitMsg("", ..#ACKTIMEOUT)
        Set syncStatus = $ListGet(syncResult, 1)
        If syncStatus < 1 Do UnSubscribe Quit
        Set msg = {}.%FromJSON($ListGet(syncResult, 2))
        Set match = msg.MessageID = NotifyObject.MessageID ; Ok this is the expected ack
        Quit:match
    } While (start + ..#ACKTIMEOUT > $zh)

    Do:'match UnSubscribe ; no ACK received -> force Unsubscribe this subscriber
    Quit
}

ClassMethod GenEventObject(Event As %String) As %DynamicObject
{
    Quit {
        "Event":(Event),
        "EventType":($Piece(Event,":",1)),
        "EventName":($Piece(Event,":",2)),
        "PIDSource":($JOB),
        "Timestamp":($ZDateTime($Horolog,3,1)),
        "MessageID":($Increment(^dc.ipcutils.msg)) 
    }
}

ClassMethod HashContext(Context As %DynamicObject) As %String [ CodeMode = expression, Internal, Private ]
{
$ZCRC($Select($IsObject(Context):Context.%ToJSON(),1:Context),7)
}

ClassMethod ShowSubscribed()
{
    Set node = $Name($$$Subscribed)
    For  {
        Set node = $Query(@node,1,value)
        Quit:node=""
        Write !," * Event: ", $QSubscript(node,1), "  ClassName: ", $QSubscript(node,2)
        Write !,"   Date time: ",$Lg(value,3)
        Write !,"   PID: ", $Lg(value,4)
        Write !,"   Context: ",$Lg(value,5)
        If $ListGet(value,6)=1 Write !,"   ResourceName: ",$QSubscript(node,3)
    }
    Quit
}

ClassMethod Kill() [ Internal ]
{
    Kill $$$Subscribed
}

}

With the "Manager" in place, we can now develop a concrete listener:

/// Basic Listener for demo purpose<br/>
/// Open a terminal: <br/>
/// Set listener = ##class(dc.ipcutils.BasicListener).%New()<br/>
/// Do listener.Listen()<br/>
/// or in one line : Do ##class(dc.ipcutils.BasicListener).%New().Listen()<br/>
/// Type ctrl+c to stop.<br/>
/// Open anoter terminal:<br/>
/// Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTestString","This is string notification")<br/>
/// Do ##class(dc.ipcutils.Manager).Notify("Demo","This is a demo notification")<br/>
/// Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTestObject",{"object":"demo"})<br/>
Class dc.ipcutils.BasicListener Extends dc.ipcutils.ListenerAbstract
{

Parameter EVENTTYPE = "Demo";
Parameter VERBOSE = 1;
Method Listen() As %Status
{
    Set sc = $$$OK
    $$$QuitOnError(..OnStartListen())
    Try {
        Write:..Verbose !,$zdt($h,3,1)," + Type < ctrl+c > to stop listening."
        For  If ..Wait() = -1 $$$ThrowStatus($$$ERROR($$$GeneralError,$$$FormatText("Resource %1 deleted.",..ResourceName)))
    } Catch Ex {
        If Ex.Name '[ "<INTERRUPT>" Set sc = Ex.AsStatus()
        If ..Verbose, $$$ISERR(sc) Do $SYSTEM.Status.DisplayError(sc)
    }
    Quit $$$ADDSC(sc,..OnStopListen())
}

Method Update(
    Event As %DynamicObject,
    Data As %DynamicObject,
    Context As %DynamicObject) As %Status
{
    Set dt = $ZDateTime($Horolog, 3, 1)
    Write:..Verbose !,dt," + Update received!"
    Write !,dt, " = Event : ", !
    Do ##class(%JSON.Formatter).%New().Format(Event)
    Write !,dt, " = Context : ", !
    Do ##class(%JSON.Formatter).%New().Format(Context)
    Write !,dt, " = Data : ", !
    Do ##class(%JSON.Formatter).%New().Format(Data)
    Quit $$$OK
}

ClassMethod Test() As %Status
{
    Quit ..%New().Listen()
}

}

Everything is finally ready for testing. Open a terminal (or several) with an instance of "dc.ipcutils.BasicListener":

Do ##class(dc.ipcutils.BasicListener).Test()

Another instance is also required to notify "Demo" events:

Do ##class(dc.ipcutils.Manager).Notify("Demo:OnTest",{"Message":"My FirstTest"}.%ToJSON())

It will result in the following output in the first terminal

IRISAPP>d ##class(dc.ipcutils.BasicListener).Test()

2023-10-17 19:20:34 + Listening Demo with resourcename MjAxMjIzMzE5NzkwOTg1 started.
2023-10-17 19:20:34 + Type <ctrl+c> to stop listening.
2023-10-17 19:20:44 + Update received!
2023-10-17 19:20:44 = Event: 
{
  "Event":"Demo:OnTestObject",
  "EventType":"Demo",
  "EventName":"OnTestObject",
  "PIDSource":"20165",
  "Timestamp":"2023-10-17 19:20:44",
  "MessageID":2171
}
2023-10-17 19:20:44 = Context: 
{
}
2023-10-17 19:20:44 = Data: 
{
  "object":"demo-1"
}

You ultimately have a functional and relatively simple example of setting up. Just create a class that inherits from "dc.ipcutils.ListenerAbstract" and adapt it to your needs. Although it is not the Observer Pattern, this solution allows us to reach our goals.

See you soon!

💖 💪 🙅 🚩
intersystemsdev
InterSystems Developer

Posted on February 13, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related