Multi - Process Collaborative Real - Time Data Acquisition and Sharing System

xun_wang_6384a403f9817c2

SameX

Posted on November 12, 2024

Multi - Process Collaborative Real - Time Data Acquisition and Sharing System

This article aims to deeply explore the inter - process communication (IPC) mechanism in the HarmonyOS IPC Kit and realize the design and development of a multi - process data acquisition and sharing system based on actual development practices. This article is mainly for developers, sharing the architecture design and code implementation of the data acquisition system, and explaining through cases how to efficiently handle multi - process data communication in HarmonyOS.

1. Case Background and Requirement Analysis

In the era of the Internet of Things and big data, real - time data acquisition has become the core function of many applications, such as the multi - sensor acquisition in smart homes and the status monitoring of industrial equipment. In these systems, the acquisition processes usually run in different process spaces, and the collected sensor data needs to be integrated into one process for real - time processing. Therefore, an efficient data sharing and communication mechanism is required.
Target Requirements:

  • Multi - Process Data Acquisition: Different acquisition processes collect data from multiple sensors.
  • Real - Time Data Sharing: The data acquisition process passes the collected data to the data integration process for real - time update.
  • Efficient Data Transmission: The system performance is not affected when transmitting a large amount of data between processes.
  • Resource Management: When the acquisition process terminates unexpectedly, the resources can be cleaned up and the data recovery operation can be triggered. Involved Technologies:
  • IPC Kit: Realize multi - process data communication through the Client - Server model.
  • Shared Memory: When transmitting a large amount of data between multiple processes, use shared memory to avoid the overhead of data copying.
  • Asynchronous Invocation and Multithreading Management: Asynchronous invocation prevents the process from being blocked due to data transmission.

- Process Termination Notification: The remote termination notification mechanism is used to monitor the process status and ensure the safe recovery of resources.

2. System Architecture Design

In this system, the process that collects data (acquisition process) acts as the IPC client, and the process responsible for data integration (integration process) acts as the IPC server. Through shared memory and the IPC Kit, data can be transmitted quickly between processes. The architecture is divided into the following modules:

  1. Acquisition Process Module: Each acquisition process configures an independent IPC communication interface to pass the collected data to the integration process.
  2. Integration Process Module: Integrate the data of all acquisition processes and manage and process the data in real time through shared memory.
  3. Remote Object Termination Notification: Monitor the process status through registerDeathRecipient and handle the acquisition processes that exit abnormally. ### 3. Multithreaded Asynchronous Communication of the Data Acquisition Process The data acquisition process is responsible for collecting data from sensors and transmitting the data to the integration process asynchronously. Each acquisition process, as an IPC client, needs to implement the following communication process:
  4. Configure the IPC Client Interface: Create a process proxy (Proxy) through the IPC Kit and send data to the integration process.
  5. Asynchronous Invocation to Avoid Blocking: Each acquisition process independently runs asynchronous threads for data acquisition and transmission to improve efficiency.
  6. Data Encapsulation and Transmission: Encapsulate the data into IPCParcel and send it asynchronously to the integration process. #### Sample Code The following is an example code for asynchronous data sending in the acquisition process. The collected data is sent asynchronously to the integration process through SendRequest.
#include <IPCKit/ipc_kit.h>
#include <thread>
#include <mutex>
#include <condition_variable>
class DataCollector {
public:
    DataCollector(OHIPCRemoteProxy* proxy) : proxy_(proxy) {}
    void CollectAndSendData(int sensorData) {
        std::thread([this, sensorData]() {
            OHIPCParcel *data = OH_IPCParcel_Create();
            if (data!= nullptr) {
                OH_IPCParcel_WriteInt32(data, sensorData);  // Write sensor data
                SendDataAsync(data);
                OH_IPCParcel_Destroy(data);
            }
        }).detach();
    }
private:
    void SendDataAsync(OHIPCParcel* data) {
        OH_IPC_MessageOption option = { OH_IPC_REQUEST_MODE_ASYNC, 0 };
        OHIPCParcel *reply = OH_IPCParcel_Create();
        int result = OH_IPCRemoteProxy_SendRequest(proxy_, 1, data, reply, &option);
       
        if (result == OH_IPC_SUCCESS) {
            printf("Data sent successfully!\n");
        } else {
            printf("Failed to send data!\n");
        }
       
        OH_IPCParcel_Destroy(reply);
    }
    OHIPCRemoteProxy* proxy_;
};
Enter fullscreen mode Exit fullscreen mode

4. Data Management and Synchronization of the Data Integration Process

The integration process, as the server, needs to receive the data from multiple acquisition processes and perform unified storage and management. To improve the data processing efficiency, the integration process uses shared memory (anonymous shared memory) to store and synchronize the data from each acquisition process. At the same time, to monitor the status of the acquisition processes, the integration process implements the remote object termination notification.

Data Integration Process

  1. Receive Data from Acquisition Processes: The integration process receives the data requests from the acquisition processes through the IPC Kit.
  2. Shared Memory Storage and Data Synchronization: Use shared memory to achieve data sharing and reduce the memory copying overhead.
  3. Register Termination Notification Callback: Set registerDeathRecipient. When the acquisition process exits unexpectedly, clean up the corresponding resources and record the log. #### Sample Code The following code shows how the integration process receives data and stores it in shared memory and monitors the termination status of the acquisition processes:
#include <IPCKit/ipc_kit.h>
#include <AbilityKit/native_child_process.h>
#include <hilog/log.h>
#include <unordered_map>
#include <mutex>
class DataAggregator {
public:
    DataAggregator() {
        remoteStub_ = OH_IPCRemoteStub_Create("DATA_AGGREGATOR", OnRequestReceived, nullptr, this);
        OH_IPCRemoteStub_AddDeathRecipient(remoteStub_, &deathRecipient_);
    }
    static int OnRequestReceived(uint32_t code, const OHIPCParcel* data, OHIPCParcel* reply, void* userData) {
        int sensorData = 0;
        if (OH_IPCParcel_ReadInt32(data, &sensorData) == OH_IPC_SUCCESS) {
            auto* aggregator = static_cast<DataAggregator*>(userData);
            aggregator->StoreData(sensorData);
        }
        return OH_IPC_SUCCESS;
    }
    void StoreData(int data) {
        std::lock_guard<std::mutex> lock(dataMutex_);
        // Assume that sharedDataBuffer_ is the address of the shared memory mapping
        sharedDataBuffer_.emplace_back(data);  
    }
    void RegisterDeathRecipient() {
        deathRecipient_.onRemoteDied = [](void* userData) {
            auto* aggregator = static_cast<DataAggregator*>(userData);
            aggregator->HandleProcessDeath();
        };
        OH_IPCRemoteStub_RegisterDeathRecipient(remoteStub_, &deathRecipient_);
    }
    void HandleProcessDeath() {
        // Handle the resource cleanup operation when the acquisition process terminates
        printf("采集进程消亡,清理资源!\n");
    }
private:
    OHIPCRemoteStub* remoteStub_;
    std::vector<int> sharedDataBuffer_;
    std::mutex dataMutex_;
    OHIPCDeathRecipient deathRecipient_;
};
Enter fullscreen mode Exit fullscreen mode

5. Implementation of the Process Termination Notification Mechanism

In the data acquisition system, the termination of a process will lead to the interruption of data update. Therefore, the integration process needs to timely sense the status of the acquisition processes. When the acquisition process terminates unexpectedly, the termination notification mechanism will trigger resource cleanup and alarm operations to ensure the stability of the system.

The monitoring of the acquisition processes is realized by registering a DeathRecipient object in DataAggregator. When the termination of the remote acquisition process is detected, the integration process can execute the corresponding cleanup logic and update the system status.

6. Summary

This article shows how to develop a multi - process real - time data acquisition and sharing system based on the IPC Kit of HarmonyOS, applying the Client - Server communication mechanism, shared memory and termination notification mechanism in the IPC Kit. This system can efficiently acquire and integrate sensor data and timely manage the termination status of processes, ensuring the stability of the data acquisition system.
Through this case, we can master the implementation methods of asynchronous invocation in inter - process communication, shared memory transmission of large data and remote object termination notification, providing a practical reference for the development of multi - process real - time data sharing systems.

💖 💪 🙅 🚩
xun_wang_6384a403f9817c2
SameX

Posted on November 12, 2024

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related