Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Sink 基础

本文档介绍 warpparse 系统中数据输出端 (Sink) 的基础概念和配置结构。

概述

Sink 是 warpparse 系统的数据输出端,负责将处理后的数据发送到各种目标系统。系统支持多种输出类型,包括文件、Syslog、Prometheus 等。

核心概念

1. 配置层次结构

warpparse 系统采用分层配置架构:

全局默认配置 (defaults.toml)
    ↓
路由组配置 (business.d/**/*.toml, infra.d/**/*.toml)
    ↓
连接器定义 (connectors/sink.d/*.toml)
    ↓
解析后的 Sink 实例 (ResolvedSinkSpec)

2. 核心数据结构

配置文件结构

1. 连接器定义 (connectors.toml)

# connectors/sink.d/file_raw_sink.toml
[[connectors]]
id = "file_raw_sink"
type = "file"
allow_override = ["base", "file", "fmt"]

[connectors.params]
base = "./data/out_dat"
file = "default.dat"
fmt = "json"

关键字段说明

  • id: 连接器唯一标识符
  • type: 连接器类型 (file, syslog, prometheus 等)
  • allow_override: 允许源配置覆盖的参数列表
  • params: 连接器默认参数

2. 路由配置 (business.d//*.toml, infra.d//*.toml)

# business.d/example.toml
version = "2.0"

[sink_group]
name = "/sink/example"
oml = ["example_pattern"]
parallel = 2
tags = ["env:production"]

[[sink_group.sinks]]
name = "example_sink"
connect = "file_raw_sink"
params = {
    base = "./output",
    file = "example.dat"
}
filter = "./filter.wpl"
tags = ["type:example"]

[sink_group.sinks.expect]
ratio = 1.0
tol = 0.01

3. 全局默认配置 (defaults.toml)

# defaults.toml
version = "2.0"

[defaults]
tags = ["env:default"]

[defaults.expect]
basis = "total_input"
min_samples = 100
mode = "error"

基础配置示例

1. 简单文件输出

# infra.d/simple_file.toml
version = "2.0"

[sink_group]
name = "simple_output"
oml = []
[[sink_group.sinks]]
connect = "file_raw_sink"
params = { file = "simple.log" }

2. 带过滤器的输出

# business.d/filtered_output.toml
version = "2.0"

[sink_group]
name = "/sink/filtered"
oml = ["/oml/logs/*"]

[[sink_group.sinks]]
name = "all_logs"
connect = "file_json_sink"
params = { file = "all_logs.json" }

[[sink_group.sinks]]
name = "error_logs"
connect = "file_json_sink"
filter = "./error_filter.wpl"
params = { file = "error_logs.json" }
[sink_group.sinks.expect]
ratio = 0.1
tol = 0.02

3. 并行输出配置(仅业务组)

# business.d/parallel_output.toml
version = "2.0"

[sink_group]
name = "/sink/parallel"
oml = ["high_volume"]
parallel = 4
tags = ["type:parallel"]

[[sink_group.sinks]]
name = "output_1"
connect = "file_proto_sink"
params = { file = "output_1.dat" }

[[sink_group.sinks]]
name = "output_2"
connect = "file_proto_sink"
params = { file = "output_2.dat" }

注:基础组(infra.d)不支持 parallel 与文件分片;如需提升吞吐与分片,请在业务组配置。

标签系统

1. 标签继承层次

标签系统支持三层继承:

  1. 默认标签 (来自 defaults.toml)
  2. 组级标签 (来自 sink_group)
  3. Sink 级标签 (来自具体 sink)

2. 标签配置示例

# defaults.toml
[defaults]
tags = ["env:production", "service:warpflow"]

# business.d/example.toml
[sink_group]
tags = ["region:us-west", "tier:processing"]

[[sink_group.sinks]]
tags = ["output:file", "compression:gzip"]

最终标签合并结果

["env:production", "service:warpflow", "region:us-west", "tier:processing", "output:file", "compression:gzip"]

期望值配置 (Expect)

1. 比例模式

[sink_group.sinks.expect]
ratio = 1.0    # 期望占比 100%
tol = 0.01     # 允许偏差 ±1%

2. 范围模式

[sink_group.sinks.expect]
min = 0.001    # 最小占比 0.1%
max = 2.0      # 最大占比 200%

3. 全局默认期望值

[defaults.expect]
basis = "total_input"  # 计算基准
min_samples = 100      # 最小样本数
mode = "error"         # 违规时处理模式

过滤器配置

1. 过滤器文件

过滤器文件使用 WPL (Warp Processing Language) 语法:

# filter.wpl
# 只处理错误级别的日志
level == "ERROR" || level == "FATAL"

# 或者复杂条件
(level == "ERROR" && source == "auth") ||
(level == "WARN" && message ~= "timeout")

2. 过滤器应用

[[sink_group.sinks]]
name = "filtered_output"
connect = "file_json_sink"
filter = "./error_filter.wpl"    # 应用过滤器
params = { file = "errors.json" }

配置验证

1. 参数覆盖验证

系统严格验证参数覆盖:

  • 只能覆盖 allow_override 中指定的参数
  • 不支持嵌套表结构覆盖

2. 唯一性验证

  • 同一 sink_group 内 sink 名称必须唯一
  • 连接器 ID 必须全局唯一

3. 文件存在性验证

  • 过滤器文件必须存在且语法正确
  • 文件路径参数必须有效