MySQL Sink
MySQL sink 用于将解析后的记录写入 MySQL 表。它会根据 columns 生成 INSERT IGNORE 语句,适合幂等重试场景;仅接受 Record 数据(不支持 raw 输入)。
连接器定义
推荐使用仓库自带模板(位于 connectors/sink.d/50-mysql.toml):
[[connectors]]
id = "mysql_sink"
type = "mysql"
allow_override = ["endpoint", "username", "password", "database", "table", "columns", "batch_size"]
[connectors.params]
endpoint = "localhost:3306"
username = "${SEC_USERNAME}"
password = "${SEC_PASSWORD}"
database = "wparse"
table = "nginx_logs"
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
batch_size = 1024
可用参数
| 参数 | 类型 | 说明 |
|---|---|---|
endpoint | string | MySQL 地址(host:port,必填) |
username | string | 用户名(可选,默认 root) |
password | string | 密码(可选) |
database | string | 目标数据库(必填) |
table | string | 目标表名(必填) |
columns | array | 列名列表,决定写入字段顺序(必填)) |
batch_size | int | 批量写入条数(可选) |
配置示例
基础用法
version = "2.0"
[sink_group]
name = "all"
oml = ["/*"]
batch_timeout_ms=5000 # 当数据在设置的这个时间范围类不满足批量插入的数量时自动插入
parallel = 8
[[sink_group.sinks]]
name = "main"
connect = "mysql_sink"
[sink_group.sinks.params]
endpoint = "localhost:3306"
username = "${SEC_USERNAME}"
password = "${SEC_PASSWORD}"
database = "wparse"
table = "nginx_logs"
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
batch_size = 1024
注意事项
columns中的字段名需与 OML 输出字段一致;缺失的表字段会以NULL写入。- 可通过环境变量
MYSQL_URL覆盖连接串(格式:mysql://user:pass@host:port/db)。 - 端到端示例可参考
wp-examples/extensions/tcp_mysql/README.md。