Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

PostgreSQL Sink

PostgreSQL sink 用于将解析后的记录写入 PostgreSQL 表。它会根据 columns 生成 INSERT IGNORE 语句,适合幂等重试场景;仅接受 Record 数据(不支持 raw 输入)。

连接器定义

推荐使用仓库自带模板(位于 connectors/sink.d/50-mysql.toml,配置项与 MySQL 保持一致):

[[connectors]]
id = "postgresql_sink"
type = "postgresql"
allow_override = ["endpoint", "username", "password", "database", "table", "columns", "batch_size"]

[connectors.params]
endpoint = "localhost:5432"
username = "${SEC_USERNAME}"
password = "${SEC_PASSWORD}"
database = "wparse"
table = "nginx_logs"
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
batch_size = 1024

可用参数

参数类型说明
endpointstringPostgreSQL 地址(host:port,必填)
usernamestring用户名(可选,默认 postgres
passwordstring密码(可选)
databasestring目标数据库(必填)
tablestring目标表名(必填)
columnsarray列名列表,决定写入字段顺序(必填)
batch_sizeint批量写入条数(可选)

配置示例

基础用法

version = "2.0"

[sink_group]
name = "all"
oml = ["/*"]
batch_timeout_ms=5000   # 当数据在设置的这个时间范围内不满足批量插入数量时自动插入
parallel = 8

[[sink_group.sinks]]
name = "main"
connect = "postgresql_sink"

[sink_group.sinks.params]
endpoint = "localhost:5432"
username = "${SEC_USERNAME}"
password = "${SEC_PASSWORD}"
database = "wparse"
table = "nginx_logs"
columns = ["sip", "timestamp", "http/request", "status", "size", "referer", "http/agent", "wp_event_id"]
batch_size = 1024

注意事项

  • columns 中的字段名需与 OML 输出字段一致;缺失的表字段会以 NULL 写入。
  • 可通过环境变量 POSTGRESQL_URL 覆盖连接串(格式:postgresql://user:pass@host:port/db)。
  • PostgreSQL 的配置项与 MySQL 完全一致,仅连接器 type、端口与连接信息不同。