Connector 实现指南
架构总览
- 运行时通过
connectors/registry维护 Source/Sink Factory 的注册表,利用OnceCell + RwLock管理工厂实例,并在register_*时记录调用位置,方便诊断(src/connectors/registry.rs:1-99)。 - 应用启动时统一调用
connectors/startup::init_runtime_registries:它一次性注册内置 Sink(file/syslog/tcp/test_rescue/blackhole)与 Source(syslog/tcp/file),随后打印最终的 kind 列表,确保外部动态工厂也可追踪(src/connectors/startup.rs:1-42)。 - 若还需桥接
ConnectorKindAdapter,使用connectors/adapter.rs中的注册表;在 engine 内注册后,后续组件都能通过 kind 查到各自的 adapter(src/connectors/adapter.rs:1-43)。
┌────────────────────────────────────┐
│ wp_connector_api (Traits) │
│ ┌───────────────────────────────┐ │
│ │ SourceFactory / SinkFactory │ │
│ │ DataSource / Async* traits │ │
│ └───────────────────────────────┘ │
└────────────────────────────────────┘
│ 实现
▼
┌─────────────────────────────────────────────────────────────┐
│ 具体 Source / Sink 实现 (wp-engine) │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ FileSourceSpec ──► FileSourceFactory ──► FileSource │ │
│ │ TcpSourceSpec ──► TcpSourceFactory ──► TcpSource │ │
│ │ SyslogSourceSpec ─► SyslogFactory ─► Tcp/Udp Source │ │
│ │ TcpSinkSpec ──► TcpFactory ─► TcpSink │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ 注册
▼
┌───────────────────────────────────┐
│ connectors::registry / startup │
│ • register_*_factory(...) │
│ • log_registered_kinds() │
└───────────────────────────────────┘
│ 提供统一入口
▼
┌───────────────────────────────────┐
│ runtime / orchestrator │
│ 读取 kind → 获取 Factory → Build │
└───────────────────────────────────┘
必须实现的 Trait
- SourceFactory/SinkFactory:来自
wp_connector_api。工厂需实现kind(&self),validate_spec(&self, &Resolved*Spec)与build(&self, &Resolved*Spec, &*Ctx)。示例参见src/sources/file/factory.rs:65-123与src/sinks/backends/tcp.rs:240-259。 - DataSource:Source 运行时实现
DataSourcetrait,提供receive/try_receive/can_try_receive/identifier等接口;FileSource在src/sources/file/source.rs展示了如何在receive中返回批次并在stop时清理。 - AsyncCtrl/AsyncRecordSink/AsyncRawDataSink:Sink 运行时需实现这些异步 trait 以接收结构化记录和原始字符串;
TcpSink在src/sinks/backends/tcp.rs:62-215给出了完整实现(含批量方法)。 - 可选:ConnectorKindAdapter:若需要在运行时动态选择不同工厂组合,实现
wp_connector_api::ConnectorKindAdapter并通过connectors/adapter.rs注册;适用于同一 kind 在不同部署模式下映射到不同底层实现。 - 工程工具(wp-proj)一致性:
crates/wp-proj中的Sources、Sinks、Wpl、Oml管理器会读取EngineConfig(即conf/wparse.toml)中的src_root/sink_root/rule_root/oml_root,因此 connector 初始化必须保证这些路径正确。wp-proj 的 CLI 在调用init/check时直接依赖这些路径,无需再手工推断目录。
Source/Sink 实现步骤
- 先建立静态 Spec:
- 在 Source 端,以
FileSourceSpec为例,它负责从ResolvedSourceSpec中提取路径/编码/实例数并完成参数校验;validate_spec与build都仅调用from_resolved,防止重复解析(src/sources/file/factory.rs:15-123)。 - 在 Sink 端同理,
TcpSinkSpec负责提取地址/端口/分帧信息并校验布尔开关;后续连接逻辑只接收TcpSinkSpec,避免直接访问动态 Map(src/sinks/backends/tcp.rs:18-105)。
- 在 Source 端,以
- 在
validate_spec中仅做转换:始终让validate_spec里只调用一次Spec::from_*,把所有错误统一转成SourceReason::from_conf或SinkError,确保 CLI/控制面可以直接提示参数问题(src/sources/file/factory.rs:73-82、src/sinks/backends/tcp.rs:240-259)。 - 在
build中复用静态 Spec:build里禁止再次从params中取值,直接使用 Spec 产物,并在需要时注入上下文(如SourceBuildCtx的路径/副本信息或SinkBuildCtx的限速值)。 - 注册工厂:实现完成后,在相应模块提供
register_*函数并在connectors/startup中调用。FileSource 通过register_factory_only注册到全局表,是最简示例(src/sources/file/factory.rs:126-129)。 - 保持日志可读:网络类实现应在首次连接、首个包、错误等关键点打印
info!/warn!(可参考TcpSink::connect与SyslogSourceFactory中的日志调用)。 - 确保 EngineConfig 可解析:因为 wp-proj 的管理命令和 CLI 检查都直接加载
conf/wparse.toml来定位topology/models路径,connector 的默认模板、示例配置必须与 EngineConfig 中的路径保持一致。一旦添加新的默认目录或模板,需要同步更新wparse.toml示例和EngineConfig::init的默认字段。
参数校验与 Spec 转换建议
- 一次性检查:尽量用
anyhow::ensure!或模式匹配在 Spec 构造时完成所有合法性检查,再把anyhow::Error映射回连接器的Reason(示例:TcpSourceSpec::from_params对端口/缓冲区/帧模式/实例数的校验,见src/sources/tcp/config.rs:4-63)。 - 集中处理布尔或枚举参数:布尔开关使用
as_bool()并在 Spec 层给出默认值;枚举按to_ascii_lowercase()匹配,防止大小写问题(示例:SyslogSourceSpec中的protocol和header_mode,见src/sources/syslog/config.rs:7-74)。 - 先校验标签:Source 实现通常需要在
validate_spec开头调用wp_data_model::tags::validate_tags,并在build时通过parse_tags生成TagSet(src/sources/file/factory.rs:73-123)。
启动与诊断
- 集中注册:确保新工厂的
register_*在connectors/startup::init_runtime_registries中被调用,否则 CLI 虽能解析配置,但运行期无法找到对应 kind。 - 列出注册结果:通过
connectors/startup::log_registered_kinds可以快速查看当前进程加载的 Source/Sink,若出现找不到的 kind,优先检查是否忘记注册或重复注册(src/connectors/startup.rs:25-42)。 - 适配器使用场景:如果需要把同一种 connector kind 映射到多个 factory(比如企业版扩展),在
adapter.rs注册ConnectorKindAdapter,再由业务层读取list_kinds()决定要启用的适配路径(src/connectors/adapter.rs:1-43)。
测试策略
- 工厂级单元测试:所有新工厂都应像 File/Tcp/Syslog 一样包含
#[cfg(test)]模块,验证参数校验、实例数量、Tag 注入等关键路径。例如file::factory中的build_spec_with_instances、compute_file_ranges_aligns_to_line_boundaries等用例(src/sources/file/factory.rs:188-266)。 - 端到端验证:网络类 Source/Sink 建议提供受控的 e2e 测试,配合条件变量(如
WP_NET_TESTS)运行真实 TCP/UDP 循环,参考src/sinks/backends/tcp.rs:287-356与src/sources/tcp/conn/connection.rs:500-552的用例。 - 保持幂等:测试/工具函数不应依赖全局状态,使用
register_*时若会污染全局 registry,要在测试结束后清理或使用隔离的 runner。
提交流程提示
- 文档更新:当新 connector 引入新的 CLI/配置参数,需同步更新
docs、wpgen模板以及任何 CLI 帮助文本。 - 代码规范:遵守 Rustfmt、Clippy 以及仓库指引(宏/特性集中定义、错误提示使用
SourceReason/SinkReason)。 - 日志与可观测性:一旦连接建立、首包发送或异常发生应输出
info!/warn!,便于排查跨机问题。 - 注册核查:PR 提交前检查
connectors/startup.rs是否包含新工厂的注册逻辑,并在日志里确认可见。
遵循以上步骤,新 connector 可以快速接入 engine,并保持配置、诊断与回归测试的统一体验。