南向驱动开发
本章面向 驱动开发者,目标是让你能按步骤完成一个 生产级 的南向驱动(Southward Driver):
- 能被网关动态加载:以
cdylib形式发布,满足 ABI/版本约束 - 能被 UI 自动建模:通过 Driver Metadata Schema 自动渲染配置表单与 Excel 导入模板
- 能跑得稳/跑得快:高吞吐、低延迟、弱网容错、可追踪、可排障
0. 前置条件与硬性约束
0.1 关键概念
在 NG Gateway 中,“南向驱动”不是一个单纯的协议解析库,而是一个 被网关托管生命周期 的组件:
- 网关侧 - Host 负责:
- 动态加载/探测驱动库
- 统一日志桥接、动态日志级别、可观测性汇聚
- 按配置创建 Channel/Device/Point/Action 的 runtime 视图,并驱动采集/写回
- 驱动侧 - Driver cdylib 负责:
- 实现“如何连接设备 + 如何采集 + 如何写回/执行动作”
- 提供一份 静态元数据 Schema(UI和excel导入用),以及必要的 ABI 导出符号
0.2 你需要准备什么
- Rust 开发环境:安装最新 stable 工具链。
- 网关本地环境:参考 本地开发 搭建好后端与 WebUI,确保能运行。
- 协议模拟器:准备好你要开发的协议模拟器(如 Modbus Slave / TCP Server),用于本地联调。
0.3 硬性约束
必须遵守的契约
metadata_fn必须纯:不得读文件、环境变量或网络。Probe 阶段必须可复现且零副作用。Connector::new(ctx)必须同步且无 I/O:所有网络/文件/阻塞 I/O 必须放在connect()/init()/run()中。- 热路径(
SouthwardHandle)禁止阻塞:collect_data/write_point必须是异步非阻塞的,禁止长时间持锁或同步 I/O。 - 严禁
unwrap()/expect():生产级代码必须处理所有错误,返回Result并携带上下文。
1. 创建插件 crate
在开始编写代码前,先创建一个新的驱动 crate(建议以 ng-driver- 作为前缀)。
cargo new --lib ng-driver-yourproto
cd ng-driver-yourproto1.1 Cargo.toml 最小约束
推荐在独立仓库创建驱动 crate,依赖 ng-gateway-sdk。
[package]
name = "ng-driver-yourproto"
version = "0.1.0"
edition = "2021"
publish = false
[lib]
crate-type = ["cdylib", "rlib"]
[dependencies]
ng-gateway-sdk = "0.1"
tokio = { version = "1", features = ["full", "tracing"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
async-trait = "0.1"
tracing = "0.1"
thiserror = "2"
bytes = "1"1.2 构建产物
cargo build --release产物位于 target/release/,根据平台不同为 *.so (Linux), *.dylib (macOS), *.dll (Windows)。
2. 创建插件模块
2.1 推荐工程模块
建议采用以下目录结构,保持职责清晰:
ng-driver-yourproto/
Cargo.toml
src/
lib.rs // 导出 factory
metadata.rs // UI Schema (纯静态)
types.rs // 运行时配置结构 (serde)
converter.rs // Model -> Runtime 转换
connector.rs // Connector 实现
session.rs // Session 实现
handle.rs // SouthwardHandle 实现 (热路径)
codec.rs // 协议编解码
planner.rs // (可选) 批量规划策略
protocol/ // (可选) 复杂协议栈封装
mod.rs
frame.rs
codec.rs
client.rs
session.rs2.2 工程模块边界
- 导出层 (
lib.rs):只做ng_driver_factory!宏调用。 - 配置层 (
metadata.rs,types.rs):定义 UI 怎么展示,以及配置怎么反序列化。 - 协议层 (
codec.rs或protocol/):处理字节流与协议帧的转换。简单协议直接写在codec.rs;复杂协议(如 S7/IEC104)建议抽取protocol模块,包含帧定义、状态机等。 - 连接层 (
connector.rs,session.rs):管理连接生命周期、重连、资源初始化。 - 热路径层 (
handle.rs):负责高频的采集与控制指令执行。
3. 配置及 Schema
3.1 config.rs/types.rs - 运行时配置
定义强类型的配置结构,用于运行时逻辑。
config.rs
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct YourProtoChannelConfig {
pub ip: String,
pub port: u16,
#[serde(default = "default_timeout")]
pub timeout_ms: u64,
}
fn default_timeout() -> u64 { 1000 }
// Device, Point, Action 配置同理...3.2 metadata.rs - UI/Excel Schema
定义 UI 表单结构,支持校验与国际化。
metadata.rs
use ng_gateway_sdk::{
ui_text, DriverSchemas, Field, Node, Rules, RuleValue, UiDataType,
};
use serde_json::json;
pub(super) fn build_metadata() -> DriverSchemas {
DriverSchemas {
channel: vec![
Node::Field(Box::new(Field {
path: "ip".into(),
label: ui_text!(en = "IP Address", zh = "IP 地址"),
data_type: UiDataType::String,
rules: Some(Rules {
required: Some(RuleValue::Value(true)),
..Default::default()
}),
..Default::default()
})),
// ... port, timeout 等字段
],
device: vec![], // 定义设备级配置
point: vec![], // 定义点位级配置
action: vec![], // 定义动作级配置
}
}Schema 设计要点
- 字段路径 (path):必须与
config.rs/types.rs中的 serde 字段名一致。 - 校验前置:利用
Rules(min, max, pattern, required) 在 UI 层拦截错误。 - 默认值:为非必填项提供合理的
default_value。 - 国际化:使用
ui_text!宏提供中英文对照。
4. 实现 Model Convert
职责
- 解析/归一化地址(例如 0/1 基换算、字符串 trim)
- 预编译表达式(例如映射/过滤/模板)
- 预计算寄存器 span、固定长度、字节序策略
- 预分配/缓存 Planner 的静态结构(例如按函数码分组的索引)
这样可以显著降低每次采集的 CPU 与分配开销
converter.rs
use ng_gateway_sdk::{
supervision::converter::SouthwardModelConverter, ActionModel, ChannelModel, DeviceModel,
DriverError, DriverResult, PointModel, RuntimeAction, RuntimeChannel, RuntimeDevice,
RuntimePoint,
};
use std::sync::Arc;
use crate::types::{YourProtoChannel, YourProtoDevice, YourProtoPoint, YourProtoChannelConfig};
#[derive(Debug, Clone, Default)]
pub struct YourConverter;
impl SouthwardModelConverter for YourConverter {
fn convert_runtime_channel(
&self,
channel: ChannelModel,
) -> DriverResult<Arc<dyn RuntimeChannel>> {
// 1. 反序列化 driver_config 为强类型配置
let config: YourProtoChannelConfig = serde_json::from_value(channel.driver_config.clone())
.map_err(|e| DriverError::ConfigurationError(format!("Invalid channel config: {e}")))?;
// 2. 构造 RuntimeChannel (通常是一个包含通用字段 + 强类型配置的结构)
Ok(Arc::new(YourProtoChannel {
id: channel.id,
name: channel.name,
// ... 复制其他通用字段 (status, collection_type 等)
config, // 注入强类型配置
}))
}
fn convert_runtime_device(&self, device: DeviceModel) -> DriverResult<Arc<dyn RuntimeDevice>> {
// 解析设备级 driver_config
let driver_config = device.driver_config.as_ref().ok_or(DriverError::ConfigurationError(
"Driver config is required".to_string(),
))?;
let slave_id = driver_config.get("slaveId")
.and_then(|v| v.as_u64())
.ok_or(DriverError::ConfigurationError("slaveId is required".into()))? as u8;
Ok(Arc::new(YourProtoDevice {
id: device.id,
// ...
slave_id,
}))
}
fn convert_runtime_point(&self, point: PointModel) -> DriverResult<Arc<dyn RuntimePoint>> {
// 解析点位级 driver_config,提取热路径需要的关键参数
let address = point.driver_config.get("address")
.and_then(|v| v.as_u64())
.ok_or(DriverError::ConfigurationError("address is required".into()))? as u16;
Ok(Arc::new(YourProtoPoint {
id: point.id,
// ...
address,
}))
}
fn convert_runtime_action(&self, _action: ActionModel) -> DriverResult<Arc<dyn RuntimeAction>> {
// 如果支持动作,在这里解析 ActionModel
Err(DriverError::NotImplemented("Action not supported".into()))
}
}5. 实现 Connector / Session / Handle
5.1 Connector
5.1.1 Connector 职责、生产级建议
职责
- 保存初始化上下文(配置、runtime 视图、observer、策略等)
- 实现
connect(ctx):建立一个Session - 实现
classify_error(phase, err):告诉 supervision loop 这个错误要不要重试
生产级建议
- 在
classify_error里把常见错误做“可控分类”- Retryable:短暂网络波动、超时、连接被重置、串口临时不可用
- Fatal/Stop:配置错误(非法地址/非法证书路径)、鉴权失败(明确不可恢复)
- 给
error_summary/error_code提供稳定聚合维度,便于告警与排障error_summary:面向人类阅读的短句(不要塞大对象/长堆栈)error_code:稳定、低基数(例如tcp_connect_timeout/auth_failed/config_invalid/protocol_decode_error)
- 不要在这里做任何网络/阻塞 I/O(
Connector::new(ctx)明确禁止 I/O;connect()才是建立会话的地方) - 把“高频细节”留给
tracing,把“低频聚合维度”留给error_code/error_summary
connector.rs
use ng_gateway_sdk::supervision::{Connector, Session, SessionContext, FailureKind, FailurePhase};
use ng_gateway_sdk::{DriverError, SouthwardInitContext};
use std::sync::Arc;
use crate::session::YourSession;
use crate::handle::YourHandle;
use crate::types::YourProtoChannelConfig;
pub struct YourConnector {
config: YourProtoChannelConfig,
handle: Arc<YourHandle>,
}
#[async_trait::async_trait]
impl Connector for YourConnector {
type InitContext = SouthwardInitContext;
type Handle = YourHandle;
type Session = YourSession;
fn new(ctx: Self::InitContext) -> Result<Self, DriverError> {
let config = crate::converter::parse_channel_config(&ctx)?;
let handle = Arc::new(YourHandle::new());
Ok(Self { config, handle })
}
async fn connect(&self, ctx: SessionContext) -> Result<Self::Session, DriverError> {
// 在这里做 I/O,建立连接
let stream = tokio::net::TcpStream::connect((self.config.ip.as_str(), self.config.port))
.await
.map_err(|e| DriverError::SessionError(e.to_string()))?;
Ok(YourSession::new(self.handle.clone(), stream))
}
fn classify_error(&self, _phase: FailurePhase, _err: &DriverError) -> FailureKind {
// 简单分类,可根据 err 具体类型细化
FailureKind::Retryable
}
}5.2 Session
Session表示“一次 attempt(连接尝试)成功创建后的生命周期”
5.2.1 Session 职责、生产级建议
职责
handle():返回Arc<Handle>(热路径接口)init(&ctx):完成“定义 Ready 的初始化”(例如读设备信息、订阅、写入初始状态)run(ctx):驱动会话直到断开/取消/请求重连
生产级建议
init()里做“短平快”的握手校验(超时可控),不要把长循环塞进init()run()里使用tokio::select!同时处理:- cancel(优雅退出)
- 心跳/保活
- 上行/下行任务的 join
- 监听内部“请求重连”信号:当协议层检测到不可恢复的 transport 异常/长时间超时,调用
ctx.reconnect.try_request_reconnect(reason)触发监督循环重连(不 await)
生命周期语义速查
Connector::connect(ctx)- 做什么:建立 transport(TCP/UDP/Serial)并完成协议层 connect/握手,构造
Session - 必须:尊重
ctx.cancel;设置可控超时;优先使用 SDK 的 metered transport(确保transport_meter计量完整) - 不要:在这里 publish handle(Ready 由
Session::init定义);不要 spawn 无取消的后台任务
- 做什么:建立 transport(TCP/UDP/Serial)并完成协议层 connect/握手,构造
Session::init(&ctx)- 做什么:定义“Ready”边界,把 data-plane 运行所需依赖注入到
handle()(例如连接池、订阅管理器、reconnect handle) - 必须:低成本、可控超时;失败要能用
error_summary/error_code聚合定位(常见是鉴权/权限/协议不兼容) - 不要:启动无限循环(应该放到
run());不要做不可控的全量扫描
- 做什么:定义“Ready”边界,把 data-plane 运行所需依赖注入到
Session::run(ctx)- 做什么:驱动会话直到断开/取消/请求重连,负责 attempt 级资源边界与清理
- 返回值:优先
Disconnected或ReconnectRequested(reason);只有明确不可恢复时才用Fatal(FailureReport) - 必须:退出时释放连接/订阅/后台任务;确保 cancel 路径快速、幂等
session.rs
use ng_gateway_sdk::supervision::{Session, SessionContext, RunOutcome};
use ng_gateway_sdk::DriverError;
use std::sync::Arc;
use tokio::net::TcpStream;
use crate::handle::YourHandle;
pub struct YourSession {
handle: Arc<YourHandle>,
stream: Option<TcpStream>,
}
impl YourSession {
pub fn new(handle: Arc<YourHandle>, stream: TcpStream) -> Self {
Self { handle, stream: Some(stream) }
}
}
#[async_trait::async_trait]
impl Session for YourSession {
type Handle = YourHandle;
type Error = DriverError;
fn handle(&self) -> &Arc<Self::Handle> {
&self.handle
}
async fn init(&mut self, ctx: &SessionContext) -> Result<(), Self::Error> {
// 注入依赖到 Handle (例如 stream)
if let Some(stream) = self.stream.take() {
self.handle.attach_transport(stream, ctx.reconnect.clone());
}
Ok(())
}
async fn run(self, ctx: SessionContext) -> Result<RunOutcome, Self::Error> {
// 等待取消信号
ctx.cancel.cancelled().await;
// 清理资源
self.handle.detach_transport();
Ok(RunOutcome::Disconnected)
}
}5.3 Handle
5.3.1 Handle 职责、生产级建议、热路径契约
职责
- 采集规划:根据
CollectItem制定最优的协议请求策略(例如合并相邻寄存器、分批请求),减少网络 I/O 次数。 - 协议交互:获取连接,执行协议请求,并处理超时与重试。
- 数据映射:将协议响应(bytes/words)解码为
NGValue,并封装为NorthwardData。 - 动作/写点:处理写请求,执行必要的编码转换,并返回执行结果。
- 分组策略:(可选) 实现
collection_group_key,将同属一个物理连接的多个逻辑设备聚合采集。
生产级建议
- 不要在热路径里阻塞:避免长时间持锁、避免大分配、避免同步 I/O;必要时把慢工作下沉到内部 actor/worker。
- 超时控制:所有 I/O 操作必须有超时。
- 失败语义要清晰:
ServiceUnavailable/Timeout/ExecutionError等错误应可诊断;遇到 transport-level 异常应尽快触发try_request_reconnect(...),让内核 supervision loop 统一治理退避与重连。
5.3.2 Handle 函数清单及详解
collect_data: 批量采集数据。collection_group_key: (可选) 定义如何对设备进行分组采集。
什么时候应该使用 group collection
当且仅当你能回答一句话:这些业务 Device 是否共享同一个“物理会话语义”,并且协议侧存在可用的批量能力?
典型场景:
- Modbus:同一个 slave(站号)下的点位被拆成多个业务 Device(按产线/功能区/子设备建模),但物理上仍是“同一个 slave 一次批读/批写”。
- OPC UA:同一个 endpoint/channel/session 下,为了业务组织把 node 拆到多个业务 Device;物理上可以合并为一次
Read(或少量分批 Read)。 - EtherNet/IP / MC / S7:同一条连接(通常以 channel 为边界)内多个业务 Device 共享会话与 transport,适合合并为组调用以减少调度与协议请求次数。
反例(不要分组):
- 每个业务 Device 必须独占连接(不同 IP/端口/串口),合并只会让慢设备拖累快设备,并放大 timeout 影响面。
- 协议不支持 batch,或 batch 导致稳定性变差(某些设备对大请求/大包非常敏感)。
collection_group_key 应该如何定义?
CollectionGroupKey 的语义必须是:“可以共享一次物理采集/同一会话上下文的一组业务 Device”。
必须遵守:
- 稳定:不能用随机数、不能用会随重启/刷新变化的临时值。
- 低基数:严禁包含
device_id/point_id(那等于不分组且会导致 HashMap 过大)。 - 无分配/低开销:该方法运行在高频路径,必须做到 O(1) 且零分配。
- 表达物理会话语义:通常来自协议层的“共享边界”,如 slaveId、channelId、endpoint/session identity 等。
具体例子:如何选择 key 的“物理语义”
例 1:Modbus(按 slaveId 分组)
- 适用:一个 slave 被拆成多个业务 Device。
- key:
kind="MODB"+payload=slave_id。 - 参考实现:
ng-gateway-southward/modbus/src/handle.rs使用CollectionGroupKey::from_u64(kind, slave_id as u64)。
例 2:OPC UA(按 channelId/endpoint 会话分组)
- 适用:同一个 OPC UA 连接/会话下的多个业务 Device 共享一次批量 Read。
- key:
kind="OPCU"+payload=channel_id(或 endpoint 的稳定 hash 前缀)。 - 参考实现:
ng-gateway-southward/opcua/src/handle.rs以channel_id做分组。
例 3:多会话场景(按 “连接标识 + 子通道” 分组)
- 适用:一个 driver 内部维护多个物理会话(例如同一 channel 下有多个 target,且每个 target 需要独立连接池)。
- key:用
CollectionGroupKey::from_pair_u64(kind, a, b)组合两个稳定 id(注意它会截断为 48-bit,适合中小范围整数 id)。 - 如果标识更复杂(host:port、证书指纹、endpoint URL 等),建议对其做稳定 hash(例如 128-bit),再用
from_hash128(kind, hash128)截取前 12 bytes 作为 payload。
经验法则:宁可少分组,也不要错分组。错分组会导致协议语义错误(例如把不同 slave/不同 endpoint 混到同一批次),通常比“不分组导致慢一些”更难排障。
collector_concurrency_profile(&self) -> CollectorConcurrencyProfile(可选,声明采集并发能力:跨组并发 / 组内并发 / lane 数;用于保护设备/总线并让 Collector 自动适配)write_point: 写点位值。execute_action: 执行动作指令。
write_point/execute_action 语义要点
- 超时:
write_point(..., timeout_ms)应作为“单次操作上限”,避免无限等待(常用tokio::time::timeout) - 重连:遇到 transport error/连续超时,应触发
try_request_reconnect(reason),并返回可诊断错误(不要在热路径里 await 重连) - 返回值:使用
WriteResult/ExecuteResult表达业务语义(Completed/Queued),不要把“排队成功”误报为“执行完成”
apply_runtime_delta(delta) -> DriverResult<()>
apply_runtime_delta(delta) 深入说明
apply_runtime_delta(delta) 用于在驱动运行过程中接收来自 Host(网关)的运行期模型增量变更通知,并把这些变更应用到驱动内部的长期状态里(缓存、采集规划、订阅集、动作路由表等),从而做到:
- 不重启驱动/不重连设备(或尽量减少重连)就能让新增/删除/更新的设备、点位、动作生效
- 特别是在
CollectionType::Report(订阅/事件上报) 场景,避免出现“点位已变更但订阅集没更新”的数据错乱
它的语义不是“随便改一切配置”,而是处理运行期 RuntimeDelta 事件。目前 RuntimeDelta 只包含三类(按 channel 作用域、按序投递):
DevicesChanged:设备新增/更新/移除、以及设备status变化PointsChanged:某个设备下点位新增/更新/移除(包含点位driver_config/ transform 等运行期信息)ActionsChanged:某个设备下动作新增/更新/移除(命令定义、参数等)
什么时候必须实现?
- 订阅/上报(Report)模式几乎必须实现:因为你通常会维护一个“订阅集/回调映射/点位快照(snapshot)/过滤规则”,点位变化后如果不更新,后果通常比采集模式更严重(漏报、错报、重复报、一直报已删除点位)。
- 采集(Collection)模式建议实现(非强制,且内置 polling 驱动也可能先做 no-op):因为
collect_data()每次都会拿到最新的(device, points)(CollectItem),所以很多纯 polling 驱动即使不实现apply_runtime_delta也能保持“功能正确”。但当你引入 Planner(批量合并策略)、地址/编解码缓存、设备会话表/连接池或后台 worker,并希望设备/点位变更能快速生效且尽量减少重启/重连时,实现apply_runtime_delta会显著提升一致性、性能与运维体验。
怎么用?
把它当成控制面(control-plane)入口来设计:快速接收变更、更新内存结构、必要时通知后台任务重建局部状态。
1)避免在
apply_runtime_delta里做慢操作 / 网络 I/O- 建议只做:更新内存结构、写入
ArcSwap/RwLock、向内部 actor/任务发送一条“变更消息” - 如果某类变更必须通过重建协议侧状态才能生效(例如:点位更新导致订阅键变化需要重建订阅;或设备/会话生命周期需要重建),建议在驱动内部触发“重连请求/重建流程”,而不是在这里同步阻塞等待
- 建议只做:更新内存结构、写入
2)保持热路径并发安全
collect_data/write_point/execute_action可能与apply_runtime_delta并发发生(它们都是 handle 的入口)。- 推荐模式:快照 + 原子替换
- 用
ArcSwap/watch持有“点位快照/订阅配置快照/Planner 快照” apply_runtime_delta构建新快照并原子替换;热路径只读快照,避免长时间持锁
- 用
3)遵循“增量更新”而不是“全量重建”
PointsChanged已经把 added/updated/removed 分开给出,优先做局部变更:- added:解析并加入缓存/Planner;(订阅/上报模式)加入订阅集并补齐回调所需元信息
- updated:更新编解码/transform/driver_config,并刷新缓存/Planner;若涉及订阅键/过滤策略变化,则触发重订阅或刷新回调快照
- removed:从缓存/Planner/会话表中移除并清理关联状态;(订阅/上报模式)取消订阅,避免继续上报已删除点位
订阅/上报(Report)场景的关键注意事项
在 Report 模式下,驱动通常会有一个长期运行的“订阅管理器/回调线程/上报 actor”。apply_runtime_delta 的核心任务是让这些后台组件与最新模型保持一致:
- 点位新增(added):把点位转成协议侧订阅项(如 NodeId/IOA/Index 等),加入订阅管理器;并初始化上报所需的元信息(point_id、数据类型、transform)。
- 点位更新(updated):至少要考虑三类变化:
- 订阅键变化(例如 NodeId/地址/寄存器区间变化):必须先取消旧订阅再订阅新项
- 解码/变换变化(datatype/scale/offset/negate 等):需要更新回调路径的转换逻辑,否则会“值对了但语义错”
- 上报策略变化(如 change/always、死区/采样/过滤规则若由 driver_config 承载):需要更新过滤器/聚合器状态
- 点位移除(removed):必须取消订阅并清理所有关联状态,避免继续从回调里发已删除点位的数据(这是最常见的隐藏 bug)。
建议把“订阅管理”做成一个内部 actor,并提供两类消息:
UpdateSubscription { added, updated, removed }:只做订阅集增量调整(尽量批量化)UpdateSnapshot { new_snapshot }:原子替换回调处理所依赖的快照(点位元数据/transform/路由)
常见坑
- 只更新内存点位表,但忘记更新订阅集:表现为新增点不上报、删点仍在报、或回调里找不到 point_id。
- 在
apply_runtime_delta里同步做 I/O(取消/创建订阅、探测设备能力等):会把控制面阻塞成“慢路径”,在高频变更或弱网环境下容易拖垮驱动。 - 用一把大锁保护所有状态:热路径会被 runtime delta 频繁阻塞;建议用快照/分层锁/actor 消息化降低争用。
一句话:apply_runtime_delta 是让驱动“在线演进”的关键入口;在订阅/上报模式下,它决定了你的订阅/映射集与点位模型的始终一致性。
handle.rs
use ng_gateway_sdk::{
SouthwardHandle, DriverResult, CollectItem, NorthwardData, DriverError,
RuntimeDevice, RuntimePoint, NGValue, WriteResult, RuntimeAction, RuntimeParameter, ExecuteResult
};
use ng_gateway_sdk::supervision::ReconnectHandle;
use tokio::sync::Mutex;
use tokio::net::TcpStream;
use std::sync::Arc;
pub struct YourHandle {
// 使用 Mutex 保护共享资源,注意锁的粒度
transport: Arc<Mutex<Option<TcpStream>>>,
reconnect: Arc<Mutex<Option<ReconnectHandle>>>,
}
impl YourHandle {
pub fn new() -> Self {
Self {
transport: Arc::new(Mutex::new(None)),
reconnect: Arc::new(Mutex::new(None)),
}
}
pub fn attach_transport(&self, stream: TcpStream, reconnect: ReconnectHandle) {
*self.transport.blocking_lock() = Some(stream);
*self.reconnect.blocking_lock() = Some(reconnect);
}
pub fn detach_transport(&self) {
*self.transport.blocking_lock() = None;
}
fn request_reconnect(&self, reason: &str) {
if let Some(h) = self.reconnect.blocking_lock().as_ref() {
h.try_request_reconnect(reason);
}
}
}
#[async_trait::async_trait]
impl SouthwardHandle for YourHandle {
async fn collect_data(&self, items: &[CollectItem]) -> DriverResult<Vec<NorthwardData>> {
let mut guard = self.transport.lock().await;
let stream = guard.as_mut().ok_or(DriverError::ServiceUnavailable)?;
// 1. 组装报文
// 2. 发送请求 (带超时)
// 3. 接收响应 (带超时)
// 4. 解析数据
// 示例:遇到 I/O 错误
// self.request_reconnect("IO Error");
// return Err(DriverError::SessionError(...));
Ok(vec![])
}
async fn write_point(
&self,
device: Arc<dyn RuntimeDevice>,
point: Arc<dyn RuntimePoint>,
value: &NGValue,
timeout_ms: Option<u64>,
) -> DriverResult<WriteResult> {
// 1. Downcast:获取驱动特定的 Runtime 结构
let device = device.downcast_ref::<YourProtoDevice>()
.ok_or(DriverError::ConfigurationError("Invalid device type".into()))?;
let point = point.downcast_ref::<YourProtoPoint>()
.ok_or(DriverError::ConfigurationError("Invalid point type".into()))?;
// 2. Encode:将 NGValue 转换为协议原始值/字节流
// (建议在 codec 模块实现,处理类型转换、字节序等)
// let raw_payload = codec::encode_write(value, point.data_type, point.address)?;
// 3. Acquire Transport:获取连接
let mut guard = self.transport.lock().await;
let stream = guard.as_mut().ok_or(DriverError::ServiceUnavailable)?;
// 4. Execute:执行协议写请求 (带超时)
// stream.write_all(&raw_payload).await?;
// stream.read_response().await?;
Ok(WriteResult {
outcome: ng_gateway_sdk::WriteOutcome::Applied,
applied_value: Some(value.clone()),
})
}
async fn execute_action(
&self,
device: Arc<dyn RuntimeDevice>,
_action: Arc<dyn RuntimeAction>,
params: Vec<(Arc<dyn RuntimeParameter>, NGValue)>,
) -> DriverResult<ExecuteResult> {
// 1. Downcast
let device = device.downcast_ref::<YourProtoDevice>()
.ok_or(DriverError::ConfigurationError("Invalid device type".into()))?;
// 2. Resolve Parameters:将通用参数列表转换为驱动强类型参数
// (SDK 提供了辅助函数 downcast_parameters)
let typed_params = ng_gateway_sdk::downcast_parameters::<YourProtoParameter>(params)?;
// 3. Build Command:根据动作定义和参数构建协议指令
// let cmd_frame = codec::build_action_frame(&typed_params)?;
// 4. Execute
// ... 获取连接并发送 ...
Ok(ExecuteResult {
outcome: ng_gateway_sdk::ExecuteOutcome::Completed,
payload: Some(serde_json::json!({ "status": "ok" })),
})
}
}5.3.4 最佳实践
热路径性能清单
- 零拷贝优先:尽量在
&[u8]/Bytes上解析;避免在循环中反复Vec::new() - 预分配:
Vec::with_capacity(items.len())、HashMap::with_capacity(n) - 减少锁争用:优先无锁读(如 ArcSwap / watch),必要锁要缩短临界区
- 批量化:将多个点位合并为尽可能少的协议请求(Planner)
- 并发受控:
- RTU/RS-485:通常必须单飞(避免总线冲突)
- TCP:可用连接池/并发在飞,但要尊重设备能力与网关资源
驱动开发时你需要把批量处理/合并请求这些策略抽象为可配置的 Planner,并把默认值设计成“保守但不太慢”。
正确处理超时、重试与退避
驱动端一般会遇到两类重试:
连接生命周期重试(交给网关内核的 supervision loop)
由connection_policy.backoff驱动(宏已经把 policy 注入到SupervisorParams.retry_policy)。
驱动要做的是:正确分类错误(Retryable vs Fatal)。协议请求级别重试(谨慎使用)
例如单次读写请求超时后立刻重试 1 次。
建议原则:- 限制次数(例如最多 1-2 次)
- 退避 + jitter(避免风暴)
- 只对明确的瞬时错误重试(超时/连接重置),不要对“非法响应/协议错误”重试
错误分类与上下文(Retryable vs Fatal,合并「严禁 unwrap/expect」)
驱动是网关稳定性的底座。你需要把错误分成可重试(Retryable)与不可重试(Fatal),让 supervision loop 能做出正确决策;同时必须保证任何 I/O、解析、类型转换都通过 Result 返回,严禁 unwrap() / expect() 导致 panic。
建议把错误至少分为三层(从“系统动作”视角定义):
连接级别 Fatal(立即失败,等待配置/环境修复)
- 配置错误:必填字段缺失、类型不匹配、非法端口/地址范围、点位定义不合法
- 认证/授权不可用:凭据缺失、权限被永久拒绝(例如 401/403 且明确不可恢复)
- TLS 校验失败:证书链不可信、主机名校验失败、证书过期(除非你明确支持热更新凭据/证书,否则应视为 Fatal)
连接级别 Retryable(交给 supervision loop 退避重连)
- 网络/传输瞬时失败:
ConnectionRefused、ConnectionReset、BrokenPipe、DNS 临时失败 - I/O 超时:连接建立超时、读写超时(注意与协议级超时区分)
- 资源瞬时不足:系统负载过高、临时资源不足(通常配合背压与限流一起治理)
- 网络/传输瞬时失败:
请求/响应级别错误(多数不应触发“立刻重连风暴”)
- 协议超时/无响应:一般可重试 1 次(有退避 + jitter),连续失败再上升为连接级 Retryable
- 协议错误/非法响应/解码失败:默认 Fatal(对该请求),通常不应立即重试;需要输出充分上下文以便排障
- 设备侧异常响应(例如协议定义的 exception/错误码):默认 Fatal(对该请求);是否需要重连取决于协议与设备行为(多数情况下不需要)
实现建议(可操作、可审计):
- 不要用 panic 表达“不可达”:驱动边界上的任何失败都应该是
Result::Err,并携带上下文;Option/downcast 使用ok_or(...)/ok_or_else(...)。 - 错误要“可被机器判断 + 可被人排查”:
- 机器判断:用清晰的 error kind(Retryable/Fatal、Config/Transport/Protocol/Auth/Backpressure 等)
- 人类排查:在错误中补齐关键上下文(下面清单)
- 错误上下文字段清单(推荐至少包含):
- 业务定位:
channel_id / device_id / point_id(如可得)、driver 名称、目标端点(IP:port / serial path / slave id) - 协议定位:function code、address、quantity、transaction id / sequence(如果协议有)
- 时序定位:timeout 配置值、attempt(第几次尝试)、elapsed(耗时)
- 数据定位:响应长度、期望长度、CRC/校验信息、(可选)截断后的 hex dump(仅在
trace级别且必须限长) - 底层错误链:io error、timeout error、decode error(保持原始错误作为 source)
- 业务定位:
经验法则:如果你在日志/错误里看不到“哪台设备、哪条链路、哪次请求、哪段协议”,那这个错误等同于“不可观测”。
背压边界(Backpressure):把压力挡在驱动边界之外
网关的稳定性来自“明确的背压边界”。驱动需要保证:当上游(采集调度/写入请求)变大时,驱动不会无限制创建任务、无限制堆积内存、也不会把设备/链路打爆。
推荐的背压策略(按优先级从强到弱):
并发上限(硬边界):对每个连接/设备设置
max_in_flight(典型实现为Semaphore)。- RTU/RS-485:通常必须
max_in_flight = 1(避免总线冲突并保持时序) - TCP:可适当提高,但必须尊重设备能力与网关资源(CPU/内存/带宽)
- RTU/RS-485:通常必须
有界队列(内存边界):采集/写入请求必须进入有界缓冲(bounded channel/queue)。
- 队列满时的处理策略必须明确:拒绝(返回 Backpressure)/ 合并(coalesce)/ 丢弃过期读,避免“看似成功实际堆积”。
批量与合并(减少请求数):Planner 应优先把多个点位合并为更少的协议请求;当背压出现时优先提升合并力度,而不是增加重试次数。
超时与取消(防止僵尸请求)
- 每个 I/O 必须有超时;任务应支持取消(例如 shutdown 信号 /
select!分支) - 当上游取消或超时后,驱动不得继续把结果写回(避免“过期数据回灌”)
- 每个 I/O 必须有超时;任务应支持取消(例如 shutdown 信号 /
可观测性建议:
- 背压必须可观测:记录 backpressure 的触发次数、拒绝次数、队列长度/等待时间(注意避免高基数字段)。
- 背压错误应该是“可重试但需要降速”:上游看到 Backpressure 应减少频率/并发,而不是立刻重试风暴。
TLS / 凭据 / 日志:安全与可运维性底线
驱动一旦涉及网络(TCP/TLS/HTTP/MQTT 桥接等),安全与可运维性是“默认要求”,不是加分项。
TLS(建议基于 rustls/系统信任库)
- 必须开启证书校验与主机名校验;禁止为了“连上就行”而关闭验证
- 支持自定义 CA(企业内网/自签)、证书轮换(更新后无需重启是加分项)
- 若使用 mTLS:客户端证书/私钥的缺失或无效应明确报错(多数场景视为 Fatal)
- TLS 错误日志要可排障但不泄密:只输出失败原因与证书摘要信息(如 subject/issuer/有效期),不要输出私钥或完整证书内容
凭据(Credentials)
- 把 token/密码/私钥当作 secret:不要写入日志、不要
Debug打印、不要在错误字符串中拼接 - 建议对敏感字段做显式脱敏(redaction):例如只保留前后各 2-4 位,其余用
***代替 - 错误消息中可以包含“凭据缺失/无效”的结论,但不能包含凭据本体
- 把 token/密码/私钥当作 secret:不要写入日志、不要
日志(tracing)
- 结构化记录关键事件:连接建立/断开、重连退避、请求超时、协议异常、背压触发
- 记录足够上下文(device/channel/endpoint/request),但避免高基数(不要把 point value、全量 payload、随机 id 当字段)
- 原始 payload(hex dump/json)只允许在
trace级别、必须限长、并且必须确认不包含凭据/隐私数据 - 对“可预期的瞬时错误”(例如短暂超时)使用
warn;对“不可恢复/需要人工介入”的错误使用error
6. 可观测性
6.1 日志(tracing)最佳实践
NG Gateway内核会在加载驱动时:
- 注册 host log sink(
ng_driver_set_log_sink) - 初始化驱动 tracing(
ng_driver_init_tracing) - 支持动态日志级别设置(
ng_driver_set_max_level)
因此驱动侧应该遵循:
- 使用
tracing::info!,warn!,error!记录关键事件。 - 热
debug!,trace!用于开发调试及热路径,生产环境通常关闭。 - 日志字段尽量结构化且携带上下文:
tracing::info!(channel_id=?, device_id=?, ...)
6.2 指标(Observer)使用原则
- SDK 会自动收集连接状态、采集频率等基础指标。
- 驱动册只需关心数据面字节计量交给
transport_meter- 使用 SDK 提供的 metered 连接/包装器(如
connect_tcp_metered_with_timeout/connect_serial_metered/MeteredStream)让 read/write 自动计量;不要在业务循环里手写 byte counter(容易漏算/误算,也会污染热路径)。
- 使用 SDK 提供的 metered 连接/包装器(如
7. lib.rs 导出 ABI Factory
use ng_gateway_sdk::ng_driver_factory;
use crate::connector::YourConnector;
use crate::metadata::build_metadata;
use crate::converter::YourConverter;
ng_driver_factory!(
name = "YourProto",
description = "Driver for Your Protocol",
driver_type = "your-proto", // 全局唯一标识
component = YourConnector,
metadata_fn = build_metadata,
model_convert = YourConverter
);8. 测试策略
8.1 单元测试
- codec:字节序/字序、类型转换、边界值、非法数据容错
- planner:批量合并算法、span 上限 clamp、gap 策略
- model convert:配置合法性、默认值、非法输入报错语义
8.2 集成测试
- 启动模拟器
- 编写测试用例,通过
Connector连接模拟器,验证:- 正常读写路径
- 超时、断链、重连
- 并发压力(验证背压与内存上限)
8.3 性能基准测试
仓库已有 ng-gateway-bench(可参考其 Modbus bench 入口):
- codec micro-bench(每次 decode 的 ns/op)
- planner bench(点位规模扩展:1k/10k points)
- end-to-end bench(采集→northward 输出)
9. 调试与发布
9.1 完整流程
- 启动后端(建议 debug + 跳过 UI build,加快迭代)
- 详见:
本地开发
- 详见:
- 启动 WebUI(推荐 dev server 联调后端)
- 在 WebUI 完成 驱动安装 → 探测(probe)→ 启用
- 重点关注:版本信息、
api_version/sdk_version、架构与 checksum 是否符合预期
- 重点关注:版本信息、
- 在 WebUI 创建并配置 Channel/Device/Point/Action
- 用最小可用配置先跑通(能连上、能采集/能写回)
- 观测与排障(只看“低频、可聚合”的关键信号)
- 失败分类是否稳定(
error_code/error_summary) FailurePhase与重连原因(try_request_reconnect(reason))是否低基数/可统计- 吞吐/延迟是否按“批/次”聚合(严禁 per-point)
- 失败分类是否稳定(
9.2 发布与兼容性清单
- 多平台产物:Linux/macOS/Windows 的扩展名不同(
.so/.dylib/.dll),并确保目标架构匹配 - WebUI 探测(probe)必须通过:确保导出元数据可读、类型/名称/版本正确,且能展示 Probe 信息
- 自定义驱动升级方式:发布新版本产物后通过 WebUI 安装覆盖,并在 probe 页确认
version/checksum已更新(文件落盘在drivers/custom) - ABI/API 版本:loader 会校验
ng_driver_api_version与 host 一致 - SDK 版本:当前策略为非严格(不一致会 warn),但不建议跨大版本
- 配置兼容:Schema path 与配置字段保持向后兼容;新增字段要提供默认值
10. 常见坑
- 在
Connector::new()里做了 I/O:会导致启动阶段阻塞、并且违反 SDK 契约(未来可能直接拒绝) - 把字符串地址/表达式解析放到热路径:吞吐下降明显,且更难排障
- 错误分类过粗:所有错误都 Retryable 会导致无意义重试风暴;所有错误都 Fatal 会导致短暂波动直接挂死
- RTU 误并发:RS-485 总线并发写/读可能造成设备异常或串口驱动混乱
- 缺少背压策略:采集速度 > 处理速度时内存会快速膨胀(即使有 bounded channel,也可能在驱动内部 buffer 堆积)
11. 关键 Demo 代码详解
1) Converter:字段级约束在哪里生效?
以 Modbus point 为例,converter 会在 runtime 转换阶段强制要求字段存在且范围合法:
functionCode必须存在且是数字,并且能映射到合法枚举address必须存在且在u16范围quantity缺省为 1,并强制 >=1
这保证了热路径不需要在每次采集时做重复校验(性能与稳定性收益都很大)。
关键代码:
点击展开:关键片段
fn extract_point_driver_config(
driver_config: serde_json::Value,
) -> DriverResult<(ModbusFunctionCode, u16, u16)> {
let function_code = driver_config
.get("functionCode")
.ok_or(DriverError::ConfigurationError(
"functionCode is required".to_string(),
))
.and_then(Self::parse_function_code)?;
let address = driver_config
.get("address")
.and_then(|v| v.as_u64())
.ok_or(DriverError::ConfigurationError(
"address is required".to_string(),
))
.and_then(|v| {
u16::try_from(v).map_err(|_| DriverError::ConfigurationError("address out of range".to_string()))
})?;
let quantity = driver_config
.get("quantity")
.and_then(|v| v.as_u64())
.unwrap_or(1);
let quantity = u16::try_from(quantity)
.map_err(|_| DriverError::ConfigurationError("quantity out of range".to_string()))?
.max(1);
Ok((function_code, address, quantity))
}2) Connector:TCP/RTU 的“连接池策略”怎么落地?
Modbus 的 connect_pool() 做了两件生产级必须做的事:
- TCP:按
tcpPoolSize建立 pool,并 clamp 到 1..=32(避免配置把 PLC/网关打爆) - RTU:强制单飞(pool size=1),保证串口总线语义
并且:connect 过程尊重 ctx.cancel,避免 shutdown 卡死。
关键代码:
点击展开:关键片段
async fn connect_pool(
&self,
ctx: &SessionContext,
cfg: &ModbusChannelConfig,
) -> DriverResult<Arc<SessionPool>> {
match &cfg.connection {
ModbusConnection::Tcp { host, port } => {
let addr = format!("{host}:{port}")
.parse::<SocketAddr>()
.map_err(|e| DriverError::ConfigurationError(format!("Invalid socket address: {e}")))?;
let size = cfg.tcp_pool_size.clamp(1, 32) as usize;
let mut contexts = Vec::with_capacity(size);
for _ in 0..size {
let fut = connect_tcp_metered_with_timeout(
addr,
Arc::clone(&self.transport_meter),
self.channel.connection_policy.connect_timeout_ms,
);
let stream = tokio::select! {
_ = ctx.cancel.cancelled() => {
return Err(DriverError::ServiceUnavailable);
}
res = fut => res.map_err(|e| DriverError::SessionError(format!("Modbus TCP connect error: {e}")))?,
};
contexts.push(tcp::attach(stream));
}
Ok(Arc::new(SessionPool::new(contexts)))
}
ModbusConnection::Rtu { port, baud_rate, data_bits, stop_bits, parity } => {
if ctx.cancel.is_cancelled() {
return Err(DriverError::ServiceUnavailable);
}
let stream = connect_serial_metered(
SerialConnectConfig {
port: port.to_string(),
baud_rate: *baud_rate,
data_bits: (*data_bits).into(),
stop_bits: (*stop_bits).into(),
parity: (*parity).into(),
},
Arc::clone(&self.transport_meter),
)
.map_err(|e| DriverError::SessionError(format!("Failed to open serial port {port}: {e}")))?;
Ok(Arc::new(SessionPool::new(vec![rtu::attach(stream)])))
}
}
}3) Session:Ready 的定义要“明确、低成本”
Modbus 没有复杂握手,session 的 Ready 定义就是“连接/连接池已建立且可用”。因此:
Session::init():把 reconnect handle + pool 注入到 data-plane handle(publish handle 的依赖)Session::run():等待 cancel;退出时断开所有 context(带 timeout)
这是一种非常好的“attempt 资源边界”写法,如有需要你的新驱动可以直接复用这种结构。
关键代码:
点击展开:关键片段
async fn init(&mut self, ctx: &SessionContext) -> Result<(), Self::Error> {
self.handle.set_reconnect(ctx.reconnect.clone());
self.handle.attach_pool(Arc::clone(&self.pool));
Ok(())
}
async fn run(self, ctx: SessionContext) -> Result<RunOutcome, Self::Error> {
ctx.cancel.cancelled().await;
if let Some(pool) = self.handle.detach_pool() {
pool.disconnect_all(std::time::Duration::from_secs(2)).await;
}
Ok(RunOutcome::Disconnected)
}4) Handle:超时/传输错误如何触发重连?
Modbus handle 的核心是 run_op():
- 使用
tokio::time::timeout给每次协议操作设置上限(避免无限等待) - 捕获 transport error / timeout 时:
- 记录结构化 warn
try_request_reconnect(...)(不 await,避免阻塞热路径)- 返回可诊断的错误给上层
这能把“弱网/设备偶发异常”从热路径中快速隔离,并让 supervision loop 统一治理重连与退避。
关键代码:
点击展开:关键片段
#[inline]
fn try_request_reconnect(&self, reason: &'static str) {
if let Some(h) = self.reconnect.get() {
let _ = h.try_request_reconnect(reason);
}
}
#[inline]
fn pick_ctx(&self) -> DriverResult<Arc<Mutex<Context>>> {
let pool = self.pool.load_full();
let Some(pool) = pool else {
self.try_request_reconnect("modbus no session pool");
return Err(DriverError::ServiceUnavailable);
};
pool.pick().ok_or_else(|| {
self.try_request_reconnect("modbus empty session pool");
DriverError::ServiceUnavailable
})
}
async fn run_op<T, F, Fut>(
&self,
ctx: Arc<Mutex<Context>>,
op_timeout_ms: u64,
op_label: &'static str,
op: F,
) -> DriverResult<T>
where
F: FnOnce(Arc<Mutex<Context>>) -> Fut + Send + 'static,
Fut: Future<Output = Result<Result<T, ExceptionCode>, tokio_modbus::Error>> + Send + 'static,
T: Send + 'static,
{
let duration = StdDuration::from_millis(op_timeout_ms.max(1));
match timeout(duration, op(Arc::clone(&ctx))).await {
Ok(Ok(inner)) => inner.map_err(|code| {
DriverError::ExecutionError(format!("Modbus exception on {op_label}: {code:?}"))
}),
Ok(Err(e)) => {
let msg = e.to_string();
warn!(op = op_label, err = %msg, "Transport error, request reconnect");
self.try_request_reconnect("modbus transport error");
Err(DriverError::ExecutionError(msg))
}
Err(_) => {
warn!(op = op_label, "Operation timeout, request reconnect");
self.try_request_reconnect("modbus timeout");
Err(DriverError::Timeout(tokio::time::Duration::from_millis(op_timeout_ms.max(1))))
}
}
}