南向是什么
南向负责把“现实世界的设备/总线/控制器”稳定接入网关,并把读写结果标准化为统一的 NorthwardData 进入核心管线与北向。它的目标是可靠、可观测、可扩展、性能优先。
设计原则
南向只解决“如何连接、如何采集、如何编解码、如何容错”;不要在 driver 里绑定北向协议、业务规则或平台差异。
心智模型
Driver:协议适配器(Modbus/S7/OPC UA/IEC104…),负责连接管理、协议编解码、读写语义与容错策略。
Channel(通道):driver 的一个“运行实例”。一个 channel 绑定一个 driver factory + 一份运行时配置(连接策略、采集策略等),其下面挂载多个device。
Channel 是连接与会话的边界:同一协议的不同现场线路/PLC/服务器通常用不同 channel 隔离。Device(设备):channel 下的一个采集对象(站点/PLC/表计/节点)。Device 的
device_type用于 driver 做模型选择/差异化解析。Point(点位):device 下的一个数据点(遥测/属性),携带点位类型、数据类型、访问模式(只读/只写/读写)以及单位/量程/比例等元信息。Point 的
key是对外稳定标识,id是网关内部高频热路径主键。Action(动作):device 下的一个命令/RPC 定义(例如“合闸/分闸/复位/写参数”)。Action 的
command是 driver 识别的命令名,输入参数由Parameter描述(类型/必填/默认值/范围)。
Polling vs Driver Push
当前网关有且仅有两条“南向 → 核心 → 北向”的数据路径;两条路径在进入 core 之后完全复用同一条转发/路由链路。
1) Polling(轮询采集,由 Collector 调度)
- 适用场景:Modbus/S7 等“读寄存器/读变量”为主的协议;或现场要求固定周期采集。
- 触发机制:当且仅当 channel 的
collection_type == Collection时,该 channel 才会被Collector拉起轮询任务;period决定 tick 周期。 - 核心链路:
Collector按 channel tick → 并发遍历 device(buffer_unordered+ 全局Semaphore做有界并发)- 每个 device:调用
driver.collect_data(device, points) - 返回
Vec<NorthwardData>→ 逐条发送到 core 的 bounded mpsc(数据转发队列) Gatewayforwarding task 从队列recv→ 广播给实时监控(realtime hub)→NorthwardManager::route(快照/变化过滤 + 按“北向 app 订阅”路由到各 app)
关键语义
Polling 的“调度者”是 core 的 Collector,driver 只实现“如何高效读点位与解析”。
2) Driver Push(订阅/上报,由 Driver 自己驱动)
- 适用场景:OPC UA subscription、IEC104 主动上送、DNP3 SOE、任何协议的异步事件/变化上报。
- 触发机制:driver 在
start()后自行建立订阅/监听/接收循环,遇到数据时直接 publish。 - 核心链路(按实现):
- 网关在创建 driver 时,会通过
SouthwardInitContext注入一个publisher: Arc<dyn NorthwardPublisher> - driver 在内部任务里调用
publisher.try_publish(Arc<NorthwardData>)(非阻塞,背压通过错误返回) - 数据进入同一条 core 转发队列 → forwarding task →
NorthwardManager::route(与 Polling 完全一致)
- 网关在创建 driver 时,会通过
关键语义
Subscription不是由 Collector 调度;它属于 driver 的“会话层/协议层”职责,core 只提供一个高性能的 publish 入口与后续统一路由。
反向路径
反向路径的共同目标是:在尽可能靠近入口处做“可判定”的校验与限流,避免把非法/高风险/洪峰请求直接打到现场设备。
WritePoint(点位写入)
- 入口:北向插件下行事件
NorthwardEvent::WritePoint。 - core 侧校验:
- NotFound:point_id 不存在。
- NotWriteable:点位
access_mode不是Write/ReadWrite。 - TypeMismatch:写入值与点位
data_type不匹配。 - OutOfRange:仅数值类型;当 min_value 与 max_value 都存在 时,对写入值做区间校验。
- NotConnected:所属 channel 未连接。
- QueueTimeout:同一 channel 的写入串行队列等待超时。
- 串行化与并发模型:
- 同一 channel 内写入严格串行(避免协议/设备不支持并发写导致的乱序与状态撕裂)。
- 不同 channel 之间可并行(网关会把 WritePoint 处理丢进独立 task,充分利用多核与 I/O 并发)。
- 执行:通过
driver.write_point(device, point, value, timeout_ms)进入 driver。 - 响应:写入完成后回传
NorthwardData::WritePointResponse(控制面响应不会被“数据背压”丢弃)。
ExecuteAction(动作/命令)
- 入口:北向插件下行事件
NorthwardEvent::CommandReceived。 - core 侧校验:
- NotFound:action 不存在。
- TypeMismatch:写入值与点位
data_type不匹配。 - OutOfRange:仅数值类型;当 min_value 与 max_value 都存在 时,对写入值做区间校验。
- NotConnected:所属 channel 未连接。
- QueueTimeout:同一 channel 的写入串行队列等待超时。
- 执行:通过
driver.execute_action(device, action, parameters)进入 driver。 - 响应:写入完成后回传
NorthwardData::RpcResponse(控制面响应不会被“数据背压”丢弃)。
通用属性
Channel 通用属性
- name:人类可读名称(日志/监控/诊断的首选标识)。
- driver_id:绑定的 driver 工厂标识。
- collection_type:采集类型
Collection表示该 channel 会被Collector轮询Report表示该 channel 不参与轮询,主要依赖 driver 主动 Push(订阅/上报)。
- report_type:上报策略
Always全量上报;Change由 core 维护 device 快照并做变化过滤(减少北向带宽与计算)。
- period:轮询周期(ms,仅在
collection_type == Collection时生效)。 - status:启用/禁用。禁用 channel 不会被启动/轮询/路由。
- connection_policy:连接策略(由 core 提供统一字段,具体行为由 driver 在连接/读/写处使用)
- connect_timeout_ms:建立连接/会话握手超时(默认 10000ms)。
- read_timeout_ms:协议读超时(默认 10000ms)。
- write_timeout_ms:协议写超时(默认 10000ms)。
- backoff(RetryPolicy):用于重连/重试的统一指数退避策略(driver 与北向插件复用同一模型)
- max_attempts:最大重试次数(
Some(0)表示不重试;None表示无限重试,需谨慎)。 - initial_interval_ms:初始退避间隔(默认 1000ms)。
- max_interval_ms:最大退避上限(默认 30000ms)。
- multiplier:指数倍率(默认 2.0)。
- randomization_factor:抖动系数(默认 0.2,代表 ±20% jitter,避免“重连风暴/惊群”)。
- max_elapsed_time_ms:最大累计重试时长(默认 None,表示不限制;与 max_attempts 同时设置时先到者生效)。
- max_attempts:最大重试次数(
Device 通用属性
- device_name:设备名称(用于 northward 编码与可观测性)。
- device_type:设备类型/机型。
- channel_id:所属 channel ID。
- status:启用/禁用(禁用设备应在 driver 侧与 core 路由侧被跳过)。
Point 通用属性
- id:point 唯一 ID(热路径主键,变化检测/快照索引优先用它)。
- device_id:所属 device ID。
- name:点位名称(人类可读)。
- key:点位稳定 key(对外引用/写回/主题路由的首选标识)。
- type:点位类别(Telemetry / Attribute)。
- data_type:值类型(bool/i32/f64/string/…)。
- access_mode:访问模式(Read / Write / ReadWrite)
- 何时使用:
- 采集侧:core 会按
access_mode过滤可读点位(Read/ReadWrite)用于采集;过滤可写点位(Write/ReadWrite)用于写入能力展示/路由。 - 写入侧:WritePoint 入口会用它做强校验;非
Write/ReadWrite会被直接拒绝并返回NotWriteable。
- 采集侧:core 会按
- 注意事项:
Read并不代表“协议不支持写”,而是产品/现场层面的安全边界;应在建模阶段正确配置,避免误写关键点位。ReadWrite的点位,driver 必须保证读写路径对同一地址/变量的语义一致(单位/比例/编码)。
- 何时使用:
- unit:展示单位(如 ℃、kPa、A),通常用于 UI 与北向展示,不建议在热路径字符串拼接。
- min_value / max_value:
- 何时使用:反向入口对数值类型做区间校验(仅当 min 与 max 同时存在时生效),超出范围返回
OutOfRange,避免危险写入落到设备侧。 - 注意事项:当前 core 的范围校验直接比较写入值与
[min,max],不会自动应用scale;因此 min/max 必须与外部写入值处于同一“值域”(原始值或工程值)。
- 何时使用:反向入口对数值类型做区间校验(仅当 min 与 max 同时存在时生效),超出范围返回
- scale(比例/换算因子):
- 使用场景:常用于“协议原始值 ↔ 工程值”的换算(例如寄存器值 1234 表示 12.34℃,scale=0.01)。
- 注意事项:在当前实现中,scale 主要作为元数据透传至driver;是否在读写时应用 scale 由 driver 决定。若 driver 选择对外暴露工程值,则应同时保证:
- 上行采集输出与下行写入输入都使用同一尺度;
- min/max 也按工程尺度配置,才能与 core 的
OutOfRange校验对齐。
Action & Parameter 通用属性
Action.id:动作唯一 ID。
Action.name:动作名称(人类可读)。
Action.device_id:所属 device ID。
Action.command:driver 识别的命令名(协议/实现相关,但对外稳定)
- 何时使用:core 在执行动作时通过
command定位要执行的 action(因此 command 应稳定且同设备内唯一)。 - 注意事项:建议把 “协议动作名/功能码/对象地址” 等编码细节封装在 driver 内部,不直接暴露为不可读字符串;对外提供稳定 command,同时在文档里解释其含义。
- 何时使用:core 在执行动作时通过
Action.input_parameters:输入参数定义列表(
Parameter)。Parameter.name/key:参数展示名/稳定 key。
Parameter.data_type:参数类型。
Parameter.required:是否必填。
Parameter.default_value:默认值(如有)。
Parameter.min_value / max_value:范围约束(如有)。
Parameter 在动作执行时会被 core 统一校验与解析,关键语义如下:
- 参数结构:
- 多参数动作:
params必须是 JSON Object(按key取值)。 - 单参数动作:允许直接给标量(scalar),也允许给
{key: value}。
- 多参数动作:
- 必填与默认值:
required=true:必须提供(否则报错)。required=false:允许不提供,但必须有default_value(否则报错)。
- 类型转换(尽量宽容,但可预测):会把 JSON scalar 尝试转换成目标
data_type(包含数字字符串、bool 字符串、时间戳、binary 的 base64/hex 等常见形式)。 - 范围校验:当 Parameter 声明了
min_value/max_value时,会对数值型输入做区间校验,并汇总成可读的错误信息返回。
最佳实践
背压与队列
- publisher.try_publish 是非阻塞的:当 core 转发队列满时会返回
QueueFull(背压信号)。driver 必须决定策略:丢弃、聚合、降采样、重试(带退避),而不是在热路径里无界堆积。 - 批量优先:Polling 场景下,driver 应尽量将一次采集结果组成少量
NorthwardData(例如按 device 分组),减少发送次数与调度开销。
轮询采集(Polling)
- 避免每设备
tokio::spawn:当前Collector已采用buffer_unordered+Semaphore,driver 侧也应避免在单次采集里产生大量短生命周期 task。 - 超时/重试/退避要可配置:使用
connection_policy提供的超时与 backoff;连续失败要指数退避,避免重连风暴。 - 批量读取策略:按协议能力将点位拆成批次(上限/对齐/地址连续性),并将批大小、并发度、超时做成可调参数。
订阅/上报(Subscription/Push)
- 订阅循环要可取消:在
stop()时保证能快速退出(配合取消令牌/会话生命周期)。 - 噪声与抖动隔离:对频繁变化点位要有采样/节流,避免把北向/核心队列打满。
解析容错
- 解析失败不 panic:坏帧/半包/CRC 错/乱序都必须返回可操作的错误语义,并携带足够上下文(channel/device/地址/计数器)。
- 可恢复同步:出现坏帧应丢弃并重新同步帧头;出现短暂超时应可重试;出现认证失败/连接断开应触发重连路径。
- 错误分级:区分“可重试/需重连/不可重试/需降级”,把“现场噪声”限制在本 channel 内,不扩散到全局。
运行时变更(RuntimeDelta)
网关支持在运行中对 device/point/action 做增删改并通知 driver(RuntimeDelta)。driver 的实现应:
- 增量更新本地索引:避免全量重建;
- 保证顺序与幂等:同一 channel 内 delta 需要按序处理;
- 不要在持锁状态 await:更新结构应尽量快,I/O 放到后台任务。
