Southward Driver Development
This chapter is for Driver Developers, aiming to guide you step-by-step to complete a Production-Ready Southward Driver:
- Dynamically loadable by Gateway: Released as
cdylib, satisfying ABI/Version constraints - Auto-modeled by UI: Automatically render configuration forms and Excel import templates via Driver Metadata Schema
- Stable and Fast: High throughput, low latency, weak network tolerance, traceable, troubleshootable
0. Prerequisites and Hard Constraints
0.1 Key Concepts
In NG Gateway, a "Southward Driver" is not just a pure protocol parsing library, but a component whose lifecycle is hosted by the Gateway:
- Gateway Side - Host is responsible for:
- Dynamically loading/probing driver libraries
- Unified log bridging, dynamic log levels, observability aggregation
- Creating runtime views of Channel/Device/Point/Action based on configuration, and driving collection/write-back
- Driver Side - Driver cdylib is responsible for:
- Implementing "How to connect device + How to collect + How to write back/execute action"
- Providing a Static Metadata Schema (For UI and Excel import), and necessary ABI export symbols
0.2 What You Need to Prepare
- Rust Development Environment: Install latest stable toolchain.
- Gateway Local Environment: Refer to Local Development to set up backend and WebUI, ensuring they run.
- Protocol Simulator: Prepare the protocol simulator you want to develop (e.g., Modbus Slave / TCP Server) for local integration testing.
0.3 Hard Constraints
Contracts Must Be Obeyed
metadata_fnmust be pure: Do not read files, environment variables, or network. Probe phase must be reproducible and side-effect free.Connector::new(ctx)must be synchronous and I/O free: All network/file/blocking I/O must be placed inconnect()/init()/run().- Hot Path (
SouthwardHandle) forbids blocking:collect_data/write_pointmust be asynchronous non-blocking, forbidding long lock holding or synchronous I/O. - Strictly forbid
unwrap()/expect(): Production-grade code must handle all errors, returnResultwith context.
1. Create Plugin Crate
Before writing code, create a new driver crate (Recommend prefix ng-driver-).
cargo new --lib ng-driver-yourproto
cd ng-driver-yourproto1.1 Cargo.toml Minimal Constraints
Recommend creating driver crate in independent repository, depending on ng-gateway-sdk.
[package]
name = "ng-driver-yourproto"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
ng-gateway-sdk = "0.1"
tokio = { version = "1", features = ["full", "tracing"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
async-trait = "0.1"
tracing = "0.1"
thiserror = "2"
bytes = "1"1.2 Build Artifact
cargo build --releaseArtifact located in target/release/, depending on platform: *.so (Linux), *.dylib (macOS), *.dll (Windows).
2. Create Plugin Modules
2.1 Recommended Project Structure
Recommend following directory structure to keep responsibilities clear:
ng-driver-yourproto/
Cargo.toml
src/
lib.rs // Export factory
metadata.rs // UI Schema (Pure Static)
types.rs // Runtime Config Structs (serde)
converter.rs // Model -> Runtime Conversion
connector.rs // Connector Implementation
session.rs // Session Implementation
handle.rs // SouthwardHandle Implementation (Hot Path)
codec.rs // Protocol Encoding/Decoding
planner.rs // (Optional) Batch Planning Strategy
protocol/ // (Optional) Complex Protocol Stack Encapsulation
mod.rs
frame.rs
codec.rs
client.rs
session.rs2.2 Project Module Boundaries
- Export Layer (
lib.rs): Only dong_driver_factory!macro call. - Configuration Layer (
metadata.rs,types.rs): Define how UI displays, and how configuration deserializes. - Protocol Layer (
codec.rsorprotocol/): Handle conversion between byte stream and protocol frames. Simple protocols directly incodec.rs; complex protocols (like S7/IEC104) suggest extractingprotocolmodule, containing frame definition, state machine etc. - Connection Layer (
connector.rs,session.rs): Manage connection lifecycle, reconnection, resource initialization. - Hot Path Layer (
handle.rs): Responsible for high-frequency collection and control command execution.
3. Configuration and Schema
3.1 config.rs/types.rs - Runtime Configuration
Define strongly typed configuration structures for runtime logic.
config.rs
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct YourProtoChannelConfig {
pub ip: String,
pub port: u16,
#[serde(default = "default_timeout")]
pub timeout_ms: u64,
}
fn default_timeout() -> u64 { 1000 }
// Device, Point, Action config similarly...3.2 metadata.rs - UI/Excel Schema
Define UI form structure, supporting validation and i18n.
metadata.rs
use ng_gateway_sdk::{
ui_text, DriverSchemas, Field, Node, Rules, RuleValue, UiDataType,
};
use serde_json::json;
pub(super) fn build_metadata() -> DriverSchemas {
DriverSchemas {
channel: vec![
Node::Field(Box::new(Field {
path: "ip".into(),
label: ui_text!(en = "IP Address", zh = "IP 地址"),
data_type: UiDataType::String,
rules: Some(Rules {
required: Some(RuleValue::Value(true)),
..Default::default()
}),
..Default::default()
})),
// ... port, timeout fields
],
device: vec![], // Define device level config
point: vec![], // Define point level config
action: vec![], // Define action level config
}
}Schema Design Points
- Field Path (path): Must match serde field name in
config.rs/types.rs. - Validation First: Use
Rules(min, max, pattern, required) to intercept errors at UI layer. - Default Value: Provide reasonable
default_valuefor optional items. - I18n: Use
ui_text!macro to provide English/Chinese comparison.
4. Implement Model Convert
Responsibilities
- Parse/Normalize address (e.g., 0/1 base conversion, string trim)
- Precompile expressions (e.g., mapping/filtering/template)
- Precalculate register span, fixed length, byte order strategy
- Preallocate/Cache Planner static structures (e.g., index grouped by function code)
This can significantly reduce CPU and allocation overhead for each collection
converter.rs
use ng_gateway_sdk::{
supervision::converter::SouthwardModelConverter, ActionModel, ChannelModel, DeviceModel,
DriverError, DriverResult, PointModel, RuntimeAction, RuntimeChannel, RuntimeDevice,
RuntimePoint,
};
use std::sync::Arc;
use crate::types::{YourProtoChannel, YourProtoDevice, YourProtoPoint, YourProtoChannelConfig};
#[derive(Debug, Clone, Default)]
pub struct YourConverter;
impl SouthwardModelConverter for YourConverter {
fn convert_runtime_channel(
&self,
channel: ChannelModel,
) -> DriverResult<Arc<dyn RuntimeChannel>> {
// 1. Deserialize driver_config to strongly typed config
let config: YourProtoChannelConfig = serde_json::from_value(channel.driver_config.clone())
.map_err(|e| DriverError::ConfigurationError(format!("Invalid channel config: {e}")))?;
// 2. Construct RuntimeChannel (Usually a struct containing common fields + strongly typed config)
Ok(Arc::new(YourProtoChannel {
id: channel.id,
name: channel.name,
// ... copy other common fields (status, collection_type etc.)
config, // Inject strongly typed config
}))
}
fn convert_runtime_device(&self, device: DeviceModel) -> DriverResult<Arc<dyn RuntimeDevice>> {
// Parse device level driver_config
let driver_config = device.driver_config.as_ref().ok_or(DriverError::ConfigurationError(
"Driver config is required".to_string(),
))?;
let slave_id = driver_config.get("slaveId")
.and_then(|v| v.as_u64())
.ok_or(DriverError::ConfigurationError("slaveId is required".into()))? as u8;
Ok(Arc::new(YourProtoDevice {
id: device.id,
// ...
slave_id,
}))
}
fn convert_runtime_point(&self, point: PointModel) -> DriverResult<Arc<dyn RuntimePoint>> {
// Parse point level driver_config, extract key parameters needed for hot path
let address = point.driver_config.get("address")
.and_then(|v| v.as_u64())
.ok_or(DriverError::ConfigurationError("address is required".into()))? as u16;
Ok(Arc::new(YourProtoPoint {
id: point.id,
// ...
address,
}))
}
fn convert_runtime_action(&self, _action: ActionModel) -> DriverResult<Arc<dyn RuntimeAction>> {
// If action supported, parse ActionModel here
Err(DriverError::NotImplemented("Action not supported".into()))
}
}5. Implement Connector / Session / Handle
5.1 Connector
5.1.1 Connector Responsibilities, Production Advice
Responsibilities
- Save initialization context (Config, runtime view, observer, policy, etc.)
- Implement
connect(ctx): Establish aSession - Implement
classify_error(phase, err): Tell supervision loop whether to retry this error
Production Advice
- Classify common errors "Controllably" in
classify_error- Retryable: Transient network fluctuation, timeout, connection reset, serial port temporarily unavailable
- Fatal/Stop: Configuration error (Illegal address/Illegal certificate path), Auth failure (Explicitly unrecoverable)
- Provide stable aggregation dimensions for
error_summary/error_codeto facilitate alerting and troubleshootingerror_summary: Human-readable short sentence (Do not stuff large objects/long stacks)error_code: Stable, low cardinality (e.g.,tcp_connect_timeout/auth_failed/config_invalid/protocol_decode_error)
- Do not do any network/blocking I/O here (
Connector::new(ctx)explicitly forbids I/O;connect()is the place to establish session) - Leave "High frequency details" to
tracing, leave "Low frequency aggregation dimensions" toerror_code/error_summary
connector.rs
use ng_gateway_sdk::supervision::{Connector, Session, SessionContext, FailureKind, FailurePhase};
use ng_gateway_sdk::{DriverError, SouthwardInitContext};
use std::sync::Arc;
use crate::session::YourSession;
use crate::handle::YourHandle;
use crate::types::YourProtoChannelConfig;
pub struct YourConnector {
config: YourProtoChannelConfig,
handle: Arc<YourHandle>,
}
#[async_trait::async_trait]
impl Connector for YourConnector {
type InitContext = SouthwardInitContext;
type Handle = YourHandle;
type Session = YourSession;
fn new(ctx: Self::InitContext) -> Result<Self, DriverError> {
let config = crate::converter::parse_channel_config(&ctx)?;
let handle = Arc::new(YourHandle::new());
Ok(Self { config, handle })
}
async fn connect(&self, ctx: SessionContext) -> Result<Self::Session, DriverError> {
// Do I/O here, establish connection
let stream = tokio::net::TcpStream::connect((self.config.ip.as_str(), self.config.port))
.await
.map_err(|e| DriverError::SessionError(e.to_string()))?;
Ok(YourSession::new(self.handle.clone(), stream))
}
fn classify_error(&self, _phase: FailurePhase, _err: &DriverError) -> FailureKind {
// Simple classification, can refine based on specific err type
FailureKind::Retryable
}
}5.2 Session
Sessionrepresents "Lifecycle after a successful attempt (connection attempt)"
5.2.1 Session Responsibilities, Production Advice
Responsibilities
handle(): ReturnArc<Handle>(Hot path interface)init(&ctx): Complete "Initialization defining Ready" (e.g., read device info, subscribe, write initial state)run(ctx): Drive session until disconnect/cancel/request reconnect
Production Advice
- Do "Short and Fast" handshake validation in
init()(Controllable timeout), do not put long loop intoinit() - Use
tokio::select!inrun()to handle simultaneously:- cancel (Graceful exit)
- Heartbeat/Keep-alive
- Join of uplink/downlink tasks
- Listen to internal "Request Reconnect" signal: When protocol layer detects unrecoverable transport exception/long timeout, call
ctx.reconnect.try_request_reconnect(reason)to trigger supervision loop reconnect (Do not await)
Lifecycle Semantics Quick Reference
Connector::connect(ctx)- What to do: Establish transport (TCP/UDP/Serial) and complete protocol layer connect/handshake, construct
Session - Must: Respect
ctx.cancel; set controllable timeout; prioritize using SDK's metered transport (Ensuretransport_metermetering complete) - Do not: Publish handle here (Ready is defined by
Session::init); Do not spawn background tasks without cancellation
- What to do: Establish transport (TCP/UDP/Serial) and complete protocol layer connect/handshake, construct
Session::init(&ctx)- What to do: Define "Ready" boundary, inject dependencies needed for data-plane operation into
handle()(e.g., connection pool, subscription manager, reconnect handle) - Must: Low cost, controllable timeout; failure must be aggregatable by
error_summary/error_code(Commonly auth/permission/protocol incompatibility) - Do not: Start infinite loop (Should be put in
run()); Do not do uncontrollable full scan
- What to do: Define "Ready" boundary, inject dependencies needed for data-plane operation into
Session::run(ctx)- What to do: Drive session until disconnect/cancel/request reconnect, responsible for attempt level resource boundary and cleanup
- Return Value: Prioritize
DisconnectedorReconnectRequested(reason); only useFatal(FailureReport)when explicitly unrecoverable - Must: Release connection/subscription/background tasks on exit; ensure cancel path is fast and idempotent
session.rs
use ng_gateway_sdk::supervision::{Session, SessionContext, RunOutcome};
use ng_gateway_sdk::DriverError;
use std::sync::Arc;
use tokio::net::TcpStream;
use crate::handle::YourHandle;
pub struct YourSession {
handle: Arc<YourHandle>,
stream: Option<TcpStream>,
}
impl YourSession {
pub fn new(handle: Arc<YourHandle>, stream: TcpStream) -> Self {
Self { handle, stream: Some(stream) }
}
}
#[async_trait::async_trait]
impl Session for YourSession {
type Handle = YourHandle;
type Error = DriverError;
fn handle(&self) -> &Arc<Self::Handle> {
&self.handle
}
async fn init(&mut self, ctx: &SessionContext) -> Result<(), Self::Error> {
// Inject dependencies into Handle (e.g., stream)
if let Some(stream) = self.stream.take() {
self.handle.attach_transport(stream, ctx.reconnect.clone());
}
Ok(())
}
async fn run(self, ctx: SessionContext) -> Result<RunOutcome, Self::Error> {
// Wait for cancel signal
ctx.cancel.cancelled().await;
// Clean up resources
self.handle.detach_transport();
Ok(RunOutcome::Disconnected)
}
}5.3 Handle
5.3.1 Handle Responsibilities, Production Advice, Hot Path Contract
Responsibilities
- Collection Planning: Formulate optimal protocol request strategy based on
CollectItem(e.g., merge adjacent registers, batch request), reduce network I/O count. - Protocol Interaction: Acquire connection, execute protocol request, and handle timeout and retry.
- Data Mapping: Decode protocol response (bytes/words) to
NGValue, and encapsulate asNorthwardData. - Action/Write Point: Handle write request, execute necessary encoding conversion, and return execution result.
- Grouping Strategy: (Optional) Implement
collection_group_key, aggregate collection of multiple logical devices belonging to one physical connection.
Production Advice
- Do not block in hot path: Avoid holding lock for long time, avoid large allocation, avoid synchronous I/O; sink slow work to internal actor/worker if necessary.
- Timeout Control: All I/O operations must have timeout.
- Failure Semantics Must Be Clear: Errors like
ServiceUnavailable/Timeout/ExecutionErrorshould be diagnosable; encountering transport-level exception should triggertry_request_reconnect(...)ASAP, letting kernel supervision loop unify governance of backoff and reconnection.
5.3.2 Handle Function List and Details
collect_data: Batch collect data.collection_group_key: (Optional) Define how to group devices for collection.
When should group collection be used
Only when you can answer: Do these business Devices share the same "Physical Session Semantics", and does the protocol side have usable batch capabilities?
Typical Scenarios:
- Modbus: Points under the same slave (station number) are split into multiple business Devices (Modeled by production line/functional area/sub-device), but physically still "One batch read/write per slave".
- OPC UA: Under the same endpoint/channel/session, nodes are split to multiple business Devices for business organization; physically can be merged into one
Read(or few batch Reads). - EtherNet/IP / MC / S7: Multiple business Devices within the same connection (usually bounded by channel) share session and transport, suitable for merging into group calls to reduce scheduling and protocol request count.
Counter-examples (Do not group):
- Each business Device must exclusively occupy a connection (Different IP/Port/Serial Port), merging will only let slow devices drag down fast devices, and amplify timeout impact.
- Protocol does not support batch, or batch leads to worse stability (Some devices very sensitive to large requests/packets).
How should collection_group_key be defined?
The semantics of CollectionGroupKey must be: "A group of business Devices that can share one physical collection/same session context".
Must obey:
- Stable: Cannot use random numbers, cannot use temporary values that change with restart/refresh.
- Low Cardinality: Strictly forbid including
device_id/point_id(That equals no grouping and leads to oversized HashMap). - Allocation-free/Low Overhead: This method runs in high frequency path, must achieve O(1) and zero allocation.
- Express Physical Session Semantics: Usually comes from protocol layer "Shared Boundary", like slaveId, channelId, endpoint/session identity etc.
Concrete Examples: How to choose key's "Physical Semantics"
Ex 1: Modbus (Group by slaveId)
- Applicable: One slave split into multiple business Devices.
- key:
kind="MODB"+payload=slave_id. - Ref Implementation:
ng-gateway-southward/modbus/src/handle.rsusesCollectionGroupKey::from_u64(kind, slave_id as u64).
Ex 2: OPC UA (Group by channelId/endpoint session)
- Applicable: Multiple business Devices under same OPC UA connection/session share one batch Read.
- key:
kind="OPCU"+payload=channel_id(Or stable hash prefix of endpoint). - Ref Implementation:
ng-gateway-southward/opcua/src/handle.rsgroups bychannel_id.
Ex 3: Multi-session Scenario (Group by "Connection ID + Sub-channel")
- Applicable: One driver internally maintains multiple physical sessions (e.g., multiple targets under same channel, and each target needs independent connection pool).
- key: Use
CollectionGroupKey::from_pair_u64(kind, a, b)to combine two stable ids (Note it truncates to 48-bit, suitable for medium/small range integer ids). - If identifier is more complex (host:port, certificate fingerprint, endpoint URL etc.), suggest doing stable hash (e.g., 128-bit) on it, then use
from_hash128(kind, hash128)to truncate first 12 bytes as payload.
Rule of Thumb: Better group less than group wrong. Wrong grouping leads to protocol semantic errors (e.g., mixing different slaves/endpoints into same batch), usually harder to troubleshoot than "Slower due to no grouping".
collector_concurrency_profile(&self) -> CollectorConcurrencyProfile(Optional, declare collection concurrency capability: cross-group / intra-group / I/O lanes; used to protect device/bus and let the Collector adapt automatically)write_point: Write point value.execute_action: Execute action command.
write_point/execute_action Semantic Key Points
- Timeout:
write_point(..., timeout_ms)should be "Single Operation Limit", avoid infinite wait (Commonlytokio::time::timeout) - Reconnect: Encounter transport error/consecutive timeout, should trigger
try_request_reconnect(reason), and return diagnosable error (Do not await reconnect in hot path) - Return Value: Use
WriteResult/ExecuteResultto express business semantics (Completed/Queued), do not misreport "Queued Successfully" as "Execution Completed"
apply_runtime_delta(delta) -> DriverResult<()>
apply_runtime_delta(delta) In-depth Explanation
apply_runtime_delta(delta) is used to receive Runtime Model Incremental Change notifications from Host (Gateway) during driver operation, and apply these changes to Driver Internal Long-term State (Cache, Collection Plan, Subscription Set, Action Routing Table, etc.), thereby achieving:
- Make added/deleted/updated devices, points, actions effective without restarting driver/reconnecting device (or minimizing reconnection)
- Especially in
CollectionType::Report(Subscription/Event Reporting) scenarios, avoid data confusion like "Point changed but subscription set not updated"
Its semantics is not "Change everything arbitrarily", but Handle runtime RuntimeDelta events. Currently RuntimeDelta only contains three types (Channel scope, ordered delivery):
DevicesChanged: Device add/update/remove, and devicestatuschangePointsChanged: Point add/update/remove under a device (Including pointdriver_config/ transform etc. runtime info)ActionsChanged: Action add/update/remove under a device (Command definition, parameters, etc.)
When must implement?
- Subscription/Report Mode almost must implement: Because you usually maintain a "Subscription Set/Callback Map/Point Snapshot/Filter Rule", if not updated after point change, consequences are usually more severe than collection mode (Missed report, Wrong report, Duplicate report, Keep reporting deleted points).
- Collection Mode suggest implementing (Not mandatory, built-in polling drivers may also do no-op first): Because
collect_data()gets latest(device, points)(CollectItem) every time, many pure polling drivers can maintain "Functional Correctness" even without implementingapply_runtime_delta. But when you introduce Planner (Batch merge strategy), Address/Codec Cache, Device Session Table/Connection Pool or Background Worker, and want device/point changes to Take effect quickly and minimize restart/reconnect, implementingapply_runtime_deltawill significantly improve consistency, performance and O&M experience.
How to use?
Design it as Control-plane Entry: Receive changes quickly, update memory structure, notify background task to rebuild local state if necessary.
1) Avoid slow operations / Network I/O in
apply_runtime_delta- Suggest only doing: Update memory structure, write
ArcSwap/RwLock, send a "Change Message" to internal actor/task - If a certain type of change Must take effect by rebuilding protocol side state (e.g., Point update causes subscription key change needing subscription rebuild; or Device/Session lifecycle needs rebuild), suggest triggering "Reconnect Request/Rebuild Flow" inside driver, instead of synchronous blocking wait here
- Suggest only doing: Update memory structure, write
2) Maintain Hot Path Concurrency Safety
collect_data/write_point/execute_actionmay happen concurrently withapply_runtime_delta(They are all handle entries).- Recommended Pattern: Snapshot + Atomic Replacement
- Use
ArcSwap/watchto hold "Point Snapshot/Subscription Config Snapshot/Planner Snapshot" apply_runtime_deltabuilds new snapshot and atomically replaces; hot path read-only snapshot, avoid holding lock for long time
- Use
3) Follow "Incremental Update" instead of "Full Rebuild"
PointsChangedalready gives added/updated/removed separately, prioritize local changes:- added: Parse and add to Cache/Planner; (Subscription/Report Mode) Join subscription set and supplement callback required meta info
- updated: Update codec/transform/driver_config, and refresh Cache/Planner; If subscription key/filter strategy changes, trigger resubscribe or refresh callback snapshot
- removed: Remove from Cache/Planner/Session Table and clean up associated state; (Subscription/Report Mode) Cancel subscription, avoid continuing to report data of deleted points
Key Notes for Subscription/Report Scenario
In Report mode, driver usually has a long-running "Subscription Manager/Callback Thread/Reporting Actor". The core task of apply_runtime_delta is to keep these background components consistent with the latest model:
- Point Add (added): Convert point to protocol side subscription item (e.g., NodeId/IOA/Index), add to subscription manager; and initialize meta info required for reporting (point_id, data type, transform).
- Point Update (updated): At least consider three types of changes:
- Subscription Key Change (e.g., NodeId/Address/Register Range change): Must cancel old subscription first then subscribe new item
- Decode/Transform Change (datatype/scale/offset/negate etc.): Need to update conversion logic of callback path, otherwise "Value correct but semantics wrong"
- Reporting Strategy Change (e.g., change/always, deadband/sampling/filtering rules if carried by driver_config): Need to update filter/aggregator state
- Point Remove (removed): Must cancel subscription and clean up all associated states, avoid continuing to send data of deleted points from callback (This is the most common hidden bug).
Suggest making "Subscription Management" an internal actor, and providing two types of messages:
UpdateSubscription { added, updated, removed }: Only do subscription set incremental adjustment (Try to batch)UpdateSnapshot { new_snapshot }: Atomically replace snapshot relied upon by callback processing (Point metadata/transform/routing)
Common Pitfalls
- Only update memory point table, but forget to update subscription set: Manifests as added points not reporting, deleted points still reporting, or point_id not found in callback.
- Synchronously do I/O in
apply_runtime_delta(Cancel/Create subscription, Probe device capability etc.): Will block control plane into "Slow Path", easily dragging down driver in high frequency change or weak network environment. - Protect all states with one big lock: Hot path will be frequently blocked by runtime delta; suggest using snapshot/layered lock/actor messaging to reduce contention.
In a nutshell: apply_runtime_delta is the key entry to let driver "Evolve Online"; in Subscription/Report mode, it determines the constant consistency between your subscription/mapping set and point model.
handle.rs
use ng_gateway_sdk::{
SouthwardHandle, DriverResult, CollectItem, NorthwardData, DriverError,
RuntimeDevice, RuntimePoint, NGValue, WriteResult, RuntimeAction, RuntimeParameter, ExecuteResult
};
use ng_gateway_sdk::supervision::ReconnectHandle;
use tokio::sync::Mutex;
use tokio::net::TcpStream;
use std::sync::Arc;
pub struct YourHandle {
// Use Mutex to protect shared resources, pay attention to lock granularity
transport: Arc<Mutex<Option<TcpStream>>>,
reconnect: Arc<Mutex<Option<ReconnectHandle>>>,
}
impl YourHandle {
pub fn new() -> Self {
Self {
transport: Arc::new(Mutex::new(None)),
reconnect: Arc::new(Mutex::new(None)),
}
}
pub fn attach_transport(&self, stream: TcpStream, reconnect: ReconnectHandle) {
*self.transport.blocking_lock() = Some(stream);
*self.reconnect.blocking_lock() = Some(reconnect);
}
pub fn detach_transport(&self) {
*self.transport.blocking_lock() = None;
}
fn request_reconnect(&self, reason: &str) {
if let Some(h) = self.reconnect.blocking_lock().as_ref() {
h.try_request_reconnect(reason);
}
}
}
#[async_trait::async_trait]
impl SouthwardHandle for YourHandle {
async fn collect_data(&self, items: &[CollectItem]) -> DriverResult<Vec<NorthwardData>> {
let mut guard = self.transport.lock().await;
let stream = guard.as_mut().ok_or(DriverError::ServiceUnavailable)?;
// 1. Assemble packet
// 2. Send request (With timeout)
// 3. Receive response (With timeout)
// 4. Parse data
// Example: Encounter I/O error
// self.request_reconnect("IO Error");
// return Err(DriverError::SessionError(...));
Ok(vec![])
}
async fn write_point(
&self,
device: Arc<dyn RuntimeDevice>,
point: Arc<dyn RuntimePoint>,
value: &NGValue,
timeout_ms: Option<u64>,
) -> DriverResult<WriteResult> {
// 1. Downcast: Get driver specific Runtime structure
let device = device.downcast_ref::<YourProtoDevice>()
.ok_or(DriverError::ConfigurationError("Invalid device type".into()))?;
let point = point.downcast_ref::<YourProtoPoint>()
.ok_or(DriverError::ConfigurationError("Invalid point type".into()))?;
// 2. Encode: Convert NGValue to protocol raw value/byte stream
// (Suggest implementing in codec module, handling type conversion, byte order etc.)
// let raw_payload = codec::encode_write(value, point.data_type, point.address)?;
// 3. Acquire Transport: Get connection
let mut guard = self.transport.lock().await;
let stream = guard.as_mut().ok_or(DriverError::ServiceUnavailable)?;
// 4. Execute: Execute protocol write request (With timeout)
// stream.write_all(&raw_payload).await?;
// stream.read_response().await?;
Ok(WriteResult {
outcome: ng_gateway_sdk::WriteOutcome::Applied,
applied_value: Some(value.clone()),
})
}
async fn execute_action(
&self,
device: Arc<dyn RuntimeDevice>,
_action: Arc<dyn RuntimeAction>,
params: Vec<(Arc<dyn RuntimeParameter>, NGValue)>,
) -> DriverResult<ExecuteResult> {
// 1. Downcast
let device = device.downcast_ref::<YourProtoDevice>()
.ok_or(DriverError::ConfigurationError("Invalid device type".into()))?;
// 2. Resolve Parameters: Convert generic parameter list to driver strongly typed parameters
// (SDK provides helper function downcast_parameters)
let typed_params = ng_gateway_sdk::downcast_parameters::<YourProtoParameter>(params)?;
// 3. Build Command: Build protocol command based on action definition and parameters
// let cmd_frame = codec::build_action_frame(&typed_params)?;
// 4. Execute
// ... Get connection and send ...
Ok(ExecuteResult {
outcome: ng_gateway_sdk::ExecuteOutcome::Completed,
payload: Some(serde_json::json!({ "status": "ok" })),
})
}
}5.3.4 Best Practices
Hot Path Performance Checklist
- Zero Copy First: Try to parse on
&[u8]/Bytes; avoid repeatedVec::new()in loop - Preallocation:
Vec::with_capacity(items.len()),HashMap::with_capacity(n) - Reduce Lock Contention: Prioritize lock-free read (e.g., ArcSwap / watch), shorten critical section for necessary locks
- Batching: Merge multiple points into as few protocol requests as possible (Planner)
- Concurrency Control:
- RTU/RS-485: Usually must be single flight (Avoid bus conflict)
- TCP: Can use connection pool/concurrent in-flight, but respect device capability and gateway resources
When developing driver, you need to abstract batch processing/merged request strategies into configurable Planner, and design default values to be "Conservative but not too slow".
Correctly Handle Timeout, Retry and Backoff
Driver side generally encounters two types of retries:
Connection Lifecycle Retry (Handled by Gateway Kernel Supervision Loop) Driven by
connection_policy.backoff(Macro already injected policy intoSupervisorParams.retry_policy). What driver needs to do: Correctly classify errors (Retryable vs Fatal).Protocol Request Level Retry (Use with Caution) e.g., Retry once immediately after single read/write request timeout. Suggested Principles:
- Limit count (e.g., max 1-2 times)
- Backoff + jitter (Avoid storm)
- Only retry explicit transient errors (Timeout/Connection Reset), do not retry "Illegal Response/Protocol Error"
Error Classification & Context (Retryable vs Fatal, Merging "Forbid unwrap/expect")
Driver is the foundation of gateway stability. You need to classify errors into Retryable and Fatal, letting supervision loop make correct decision; meanwhile must ensure any I/O, parsing, type conversion returns via Result, Strictly Forbid unwrap() / expect() causing panic.
Suggest classifying errors into at least three layers (Defined from "System Action" perspective):
Connection Level Fatal (Fail immediately, wait for config/environment fix)
- Configuration Error: Required field missing, Type mismatch, Illegal port/address range, Point definition illegal
- Auth/Authorization Unavailable: Credential missing, Permission permanently rejected (e.g., 401/403 and explicitly unrecoverable)
- TLS Validation Failed: Certificate chain untrusted, Hostname validation failed, Certificate expired (Unless you explicitly support hot update credential/certificate, should be treated as Fatal)
Connection Level Retryable (Hand over to supervision loop for backoff reconnect)
- Network/Transport Transient Failure:
ConnectionRefused,ConnectionReset,BrokenPipe, DNS temporary failure - I/O Timeout: Connection establishment timeout, Read/Write timeout (Note distinction from protocol level timeout)
- Resource Transient Insufficiency: System load too high, Temporary resource shortage (Usually governed together with backpressure and rate limiting)
- Network/Transport Transient Failure:
Request/Response Level Error (Most should not trigger "Immediate Reconnect Storm")
- Protocol Timeout/No Response: Generally retryable once (With backoff + jitter), consecutive failures escalate to Connection Level Retryable
- Protocol Error/Illegal Response/Decode Failure: Default Fatal (For this request), usually should not retry immediately; need to output sufficient context for troubleshooting
- Device Side Exception Response (e.g., Protocol defined exception/error code): Default Fatal (For this request); whether reconnect is needed depends on protocol and device behavior (Not needed in most cases)
Implementation Suggestion (Actionable, Auditable):
- Do not use panic to express "Unreachable": Any failure on driver boundary should be
Result::Err, carrying context;Option/downcast useok_or(...)/ok_or_else(...). - Error should be "Machine Judgable + Human Troubleshooting":
- Machine Judgment: Use clear error kind (Retryable/Fatal, Config/Transport/Protocol/Auth/Backpressure etc.)
- Human Troubleshooting: Supplement key context in error (List below)
- Error Context Field List (Recommend at least include):
- Business Location:
channel_id / device_id / point_id(If available), driver name, target endpoint (IP:port / serial path / slave id) - Protocol Location: function code, address, quantity, transaction id / sequence (If protocol has)
- Timing Location: timeout config value, attempt (Which attempt), elapsed (Time taken)
- Data Location: Response length, Expected length, CRC/Checksum info, (Optional) Truncated hex dump (Only in
tracelevel and must limit length) - Underlying Error Chain: io error, timeout error, decode error (Keep original error as source)
- Business Location:
Rule of Thumb: If you can't see "Which device, Which link, Which request, Which protocol segment" in log/error, then this error is equivalent to "Unobservable".
Backpressure Boundary: Keep pressure outside driver boundary
Gateway stability comes from "Explicit Backpressure Boundary". Driver needs to guarantee: When upstream (Collection schedule/Write request) increases, driver won't create tasks limitlessly, stack memory limitlessly, nor blow up device/link.
Recommended Backpressure Strategy (Priority Strong to Weak):
Concurrency Limit (Hard Boundary): Set
max_in_flightfor each connection/device (Typical implementationSemaphore).- RTU/RS-485: Usually must be
max_in_flight = 1(Avoid bus conflict and maintain timing) - TCP: Can be appropriately increased, but must respect device capability and gateway resources (CPU/Memory/Bandwidth)
- RTU/RS-485: Usually must be
Bounded Queue (Memory Boundary): Collection/Write requests must enter Bounded buffer (bounded channel/queue).
- Strategy when queue full must be clear: Reject (Return Backpressure) / Coalesce / Discard Expired Read, avoid "Seemingly successful but actually piling up".
Batching and Merging (Reduce Request Count): Planner should prioritize merging multiple points into fewer protocol requests; when backpressure appears, prioritize increasing merging intensity, rather than increasing retry count.
Timeout and Cancellation (Prevent Zombie Request)
- Every I/O must have timeout; Task should support cancellation (e.g., shutdown signal /
select!branch) - When upstream cancels or times out, driver must not continue writing back result (Avoid "Expired Data Backfill")
- Every I/O must have timeout; Task should support cancellation (e.g., shutdown signal /
Observability Suggestion:
- Backpressure Must Be Observable: Record backpressure trigger count, rejection count, queue length/wait time (Note avoiding high cardinality fields).
- Backpressure Error Should Be "Retryable But Need Slow Down": Upstream seeing Backpressure should reduce frequency/concurrency, not immediate retry storm.
TLS / Credentials / Logging: Security & Operability Baseline
Once driver involves network (TCP/TLS/HTTP/MQTT bridge etc.), security and operability are "Default Requirements", not bonus items.
TLS (Suggest based on rustls/system trust store)
- Must enable certificate validation and hostname validation; Forbid disabling validation just to "Connect successfully"
- Support custom CA (Enterprise Intranet/Self-signed), Certificate Rotation (Update without restart is bonus)
- If using mTLS: Client certificate/private key missing or invalid should report error explicitly (Viewed as Fatal in most scenarios)
- TLS error log should be troubleshootable but not leak secrets: Output failure reason and certificate summary info only (e.g., subject/issuer/validity), do not output private key or full certificate content
Credentials
- Treat token/password/private key as secret: Do not write to log, do not
Debugprint, do not splice in error string - Suggest explicit redaction for sensitive fields: e.g., keep only first/last 2-4 chars, replace rest with
*** - Error message can contain conclusion "Credential Missing/Invalid", but cannot contain credential itself
- Treat token/password/private key as secret: Do not write to log, do not
Logging (tracing)
- Structurally record key events: Connection establish/disconnect, Reconnect backoff, Request timeout, Protocol exception, Backpressure trigger
- Record enough context (device/channel/endpoint/request), but avoid high cardinality (Do not use point value, full payload, random id as fields)
- Raw payload (hex dump/json) only allowed in
tracelevel, must limit length, and must confirm not containing credential/privacy data - Use
warnfor "Expected Transient Errors" (e.g., short timeout); Useerrorfor "Unrecoverable/Need Manual Intervention" errors
6. Observability
6.1 Logging (tracing) Best Practices
NG Gateway kernel will do the following when loading driver:
- Register host log sink (
ng_driver_set_log_sink) - Initialize driver tracing (
ng_driver_init_tracing) - Support dynamic log level setting (
ng_driver_set_max_level)
Therefore driver side should follow:
- Use
tracing::info!,warn!,error!to record key events. debug!,trace!used for dev debugging and hot path, usually closed in production.- Log fields should be structured and carry context:
tracing::info!(channel_id=?, device_id=?, ...)
6.2 Metric (Observer) Usage Principles
- SDK automatically collects basic metrics like connection status, collection frequency.
- Driver only needs to care about data plane byte metering via
transport_meter- Use SDK provided metered connection/wrapper (e.g.,
connect_tcp_metered_with_timeout/connect_serial_metered/MeteredStream) to let read/write automatically meter; do not handwrite byte counter in business loop (Easy to miss/miscalculate, also pollutes hot path).
- Use SDK provided metered connection/wrapper (e.g.,
7. lib.rs Export ABI Factory
use ng_gateway_sdk::ng_driver_factory;
use crate::connector::YourConnector;
use crate::metadata::build_metadata;
use crate::converter::YourConverter;
ng_driver_factory!(
name = "YourProto",
description = "Driver for Your Protocol",
driver_type = "your-proto", // Global Unique Identifier
component = YourConnector,
metadata_fn = build_metadata,
model_convert = YourConverter
);8. Testing Strategy
8.1 Unit Test
- codec: Byte order/Word order, Type conversion, Boundary value, Illegal data tolerance
- planner: Batch merge algorithm, Span limit clamp, Gap strategy
- model convert: Config validity, Default value, Illegal input error semantics
8.2 Integration Test
- Start Simulator
- Write test cases, connect simulator via
Connector, verify:- Normal read/write path
- Timeout, Disconnect, Reconnect
- Concurrency pressure (Verify backpressure and memory limit)
8.3 Performance Benchmark
Repository already has ng-gateway-bench (Can refer to its Modbus bench entry):
- codec micro-bench (ns/op per decode)
- planner bench (Point scale scaling: 1k/10k points)
- end-to-end bench (Collection → Northward Output)
9. Debugging and Release
9.1 Complete Process
- Start Backend (Suggest debug + skip UI build, speed up iteration)
- See:
Local Development
- See:
- Start WebUI (Recommend dev server integration with backend)
- Complete Driver Install → Probe → Enable in WebUI
- Focus on: Version info,
api_version/sdk_version, Architecture and Checksum match expectation
- Focus on: Version info,
- Create and Configure Channel/Device/Point/Action in WebUI
- Run through with minimal usable configuration first (Can connect, Can collect/Write back)
- Observe and Troubleshoot (Only look at "Low frequency, Aggregatable" key signals)
- Is failure classification stable (
error_code/error_summary) - Are
FailurePhaseand Reconnect Reason (try_request_reconnect(reason)) low cardinality/statistic-able - Throughput/Latency aggregated by "Batch/Count" (Strictly forbid per-point)
- Is failure classification stable (
9.2 Release and Compatibility Checklist
- Multi-platform Artifacts: Linux/macOS/Windows extensions differ (
.so/.dylib/.dll), and ensure target architecture matches - WebUI Probe Must Pass: Ensure exported metadata readable, Type/Name/Version correct, and Probe info displayable
- Custom Driver Upgrade Method: Install overwrite via WebUI after releasing new version artifact, and confirm
version/checksumupdated in probe page (File persists indrivers/custom) - ABI/API Version: Loader validates
ng_driver_api_versionconsistent with host - SDK Version: Current policy is non-strict (Warn if inconsistent), but not recommended to cross major versions
- Config Compatibility: Schema path and config fields keep backward compatible; New fields must provide default values
10. Common Pitfalls
- Doing I/O in
Connector::new(): Causes startup phase blocking, and violates SDK contract (Future may reject directly) - Putting string address/expression parsing in hot path: Throughput drops significantly, and harder to troubleshoot
- Error classification too coarse: All errors Retryable leads to meaningless retry storm; All errors Fatal leads to hang on transient fluctuation
- RTU Mis-concurrency: RS-485 bus concurrent write/read may cause device exception or serial driver chaos
- Missing Backpressure Strategy: Memory bloats rapidly when Collection Speed > Processing Speed (Even with bounded channel, may pile up in driver internal buffer)
11. Key Demo Code Explanation
1) Converter: Where do field-level constraints take effect?
Taking Modbus point as example, converter enforces field existence and range validity during runtime conversion phase:
functionCodemust exist and be number, and map to legal enumaddressmust exist and be inu16rangequantitydefaults to 1, and forced >=1
This ensures hot path does not need repeated validation for every collection (Significant performance and stability gain).
2) Connector: How to land TCP/RTU "Connection Pool Strategy"?
Modbus connect_pool() does two production-mandatory things:
- TCP: Establish pool by
tcpPoolSize, and clamp to 1..=32 (Avoid config blowing up PLC/Gateway) - RTU: Force single flight (pool size=1), guarantee serial bus semantics
And: connect process respects ctx.cancel, avoiding shutdown hang.
3) Session: Ready definition should be "Clear, Low Cost"
Modbus has no complex handshake, session Ready definition is "Connection/Connection Pool established and available". Therefore:
Session::init(): Inject reconnect handle + pool into data-plane handle (Dependency of publish handle)Session::run(): Wait for cancel; disconnect all contexts on exit (With timeout)
This is a very good "Attempt Resource Boundary" pattern, your new driver can directly reuse this structure if needed.
4) Handle: How does Timeout/Transport Error trigger reconnect?
Core of Modbus handle is run_op():
- Use
tokio::time::timeoutto set upper limit for each protocol operation (Avoid infinite wait) - When capturing transport error / timeout:
- Record structured warn
try_request_reconnect(...)(Do not await, avoid blocking hot path)- Return diagnosable error to upper layer
This quickly isolates "Weak Network/Device Occasional Exception" from hot path, and lets supervision loop unify governance of reconnection and backoff.
