Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

MySQL Sink

MySQL sink 用于将解析后的记录写入 MySQL 表。它会根据 columns 生成 INSERT IGNORE 语句,适合幂等重试场景;仅接受 Record 数据(不支持 raw 输入)。

连接器定义

推荐使用仓库自带模板(位于 connectors/sink.d/50-mysql.toml):

[[connectors]]
id = "mysql_sink"
type = "mysql"
allow_override = ["endpoint", "username", "password", "database", "table", "columns", "batch_size"]

[connectors.params]
endpoint = "localhost:3306"
username = "${SEC_USERNAME}"
password = "${SEC_PASSWORD}"
database = "wparse"
table = "nginx_logs"
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
batch_size = 1024

可用参数

参数类型说明
endpointstringMySQL 地址(host:port,必填)
usernamestring用户名(可选,默认 root
passwordstring密码(可选)
databasestring目标数据库(必填)
tablestring目标表名(必填)
columnsarray列名列表,决定写入字段顺序(必填))
batch_sizeint批量写入条数(可选)

配置示例

基础用法

version = "2.0"

[sink_group]
name = "all"
oml = ["/*"]
batch_timeout_ms=5000   # 当数据在设置的这个时间范围类不满足批量插入的数量时自动插入
parallel = 8

[[sink_group.sinks]]
name = "main"
connect = "mysql_sink"

[sink_group.sinks.params]
endpoint = "localhost:3306"
username = "${SEC_USERNAME}"
password = "${SEC_PASSWORD}"
database = "wparse"
table = "nginx_logs"
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
batch_size = 1024   

注意事项

  • columns 中的字段名需与 OML 输出字段一致;缺失的表字段会以 NULL 写入。
  • 可通过环境变量 MYSQL_URL 覆盖连接串(格式:mysql://user:pass@host:port/db)。
  • 端到端示例可参考 wp-examples/extensions/tcp_mysql/README.md