Sink 基础
本文档介绍 warpparse 系统中数据输出端 (Sink) 的基础概念和配置结构。
概述
Sink 是 warpparse 系统的数据输出端,负责将处理后的数据发送到各种目标系统。系统支持多种输出类型,包括文件、Syslog、Prometheus 等。
核心概念
1. 配置层次结构
warpparse 系统采用分层配置架构:
全局默认配置 (defaults.toml)
↓
路由组配置 (business.d/**/*.toml, infra.d/**/*.toml)
↓
连接器定义 (connectors/sink.d/*.toml)
↓
解析后的 Sink 实例 (ResolvedSinkSpec)
2. 核心数据结构
配置文件结构
1. 连接器定义 (connectors.toml)
# connectors/sink.d/file_raw_sink.toml
[[connectors]]
id = "file_raw_sink"
type = "file"
allow_override = ["base", "file", "fmt"]
[connectors.params]
base = "./data/out_dat"
file = "default.dat"
fmt = "json"
关键字段说明:
id: 连接器唯一标识符type: 连接器类型 (file, syslog, prometheus 等)allow_override: 允许源配置覆盖的参数列表params: 连接器默认参数
2. 路由配置 (business.d//*.toml, infra.d//*.toml)
# business.d/example.toml
version = "2.0"
[sink_group]
name = "/sink/example"
oml = ["example_pattern"]
parallel = 2
tags = ["env:production"]
[[sink_group.sinks]]
name = "example_sink"
connect = "file_raw_sink"
params = {
base = "./output",
file = "example.dat"
}
filter = "./filter.wpl"
tags = ["type:example"]
[sink_group.sinks.expect]
ratio = 1.0
tol = 0.01
3. 全局默认配置 (defaults.toml)
# defaults.toml
version = "2.0"
[defaults]
tags = ["env:default"]
[defaults.expect]
basis = "total_input"
min_samples = 100
mode = "error"
基础配置示例
1. 简单文件输出
# infra.d/simple_file.toml
version = "2.0"
[sink_group]
name = "simple_output"
oml = []
[[sink_group.sinks]]
connect = "file_raw_sink"
params = { file = "simple.log" }
2. 带过滤器的输出
# business.d/filtered_output.toml
version = "2.0"
[sink_group]
name = "/sink/filtered"
oml = ["/oml/logs/*"]
[[sink_group.sinks]]
name = "all_logs"
connect = "file_json_sink"
params = { file = "all_logs.json" }
[[sink_group.sinks]]
name = "error_logs"
connect = "file_json_sink"
filter = "./error_filter.wpl"
params = { file = "error_logs.json" }
[sink_group.sinks.expect]
ratio = 0.1
tol = 0.02
3. 并行输出配置(仅业务组)
# business.d/parallel_output.toml
version = "2.0"
[sink_group]
name = "/sink/parallel"
oml = ["high_volume"]
parallel = 4
tags = ["type:parallel"]
[[sink_group.sinks]]
name = "output_1"
connect = "file_proto_sink"
params = { file = "output_1.dat" }
[[sink_group.sinks]]
name = "output_2"
connect = "file_proto_sink"
params = { file = "output_2.dat" }
注:基础组(infra.d)不支持 parallel 与文件分片;如需提升吞吐与分片,请在业务组配置。
标签系统
1. 标签继承层次
标签系统支持三层继承:
- 默认标签 (来自 defaults.toml)
- 组级标签 (来自 sink_group)
- Sink 级标签 (来自具体 sink)
2. 标签配置示例
# defaults.toml
[defaults]
tags = ["env:production", "service:warpflow"]
# business.d/example.toml
[sink_group]
tags = ["region:us-west", "tier:processing"]
[[sink_group.sinks]]
tags = ["output:file", "compression:gzip"]
最终标签合并结果:
["env:production", "service:warpflow", "region:us-west", "tier:processing", "output:file", "compression:gzip"]
期望值配置 (Expect)
1. 比例模式
[sink_group.sinks.expect]
ratio = 1.0 # 期望占比 100%
tol = 0.01 # 允许偏差 ±1%
2. 范围模式
[sink_group.sinks.expect]
min = 0.001 # 最小占比 0.1%
max = 2.0 # 最大占比 200%
3. 全局默认期望值
[defaults.expect]
basis = "total_input" # 计算基准
min_samples = 100 # 最小样本数
mode = "error" # 违规时处理模式
过滤器配置
1. 过滤器文件
过滤器文件使用 WPL (Warp Processing Language) 语法:
# filter.wpl
# 只处理错误级别的日志
level == "ERROR" || level == "FATAL"
# 或者复杂条件
(level == "ERROR" && source == "auth") ||
(level == "WARN" && message ~= "timeout")
2. 过滤器应用
[[sink_group.sinks]]
name = "filtered_output"
connect = "file_json_sink"
filter = "./error_filter.wpl" # 应用过滤器
params = { file = "errors.json" }
配置验证
1. 参数覆盖验证
系统严格验证参数覆盖:
- 只能覆盖
allow_override中指定的参数 - 不支持嵌套表结构覆盖
2. 唯一性验证
- 同一 sink_group 内 sink 名称必须唯一
- 连接器 ID 必须全局唯一
3. 文件存在性验证
- 过滤器文件必须存在且语法正确
- 文件路径参数必须有效