pgsql binlog监听功能点解析
目录
- 引言
- 功能点
- 机器宕机,能支持断点续接
- 如果监听的表 长时间没有数据变动,delay_size 会变大
- 每次binlog传的size 太多,导致服务器处理不过来
- 对多个表的监听,应该只有一个流进行监听
- 希望磁盘持久化offset,保持数据的正确性
- 小工具
引言
监听mysql binlog 大家都知道canal,但是如果是pglog呢,先百度
也就这个靠点谱,文章 没有我想要的demo
去官网看看debezium.io/
其中这个 网址给了demo 但是不能直接用于生产
功能点
首先 pglog binlog监听需要满足哪些功能点
机器宕机,能支持断点续接
进行磁盘持久化
如果监听的表 长时间没有数据变动,delay_size 会变大
//设置心跳时间,就算没有数据 也会保持心跳 props.setProperty("heartbeat.interval.ms", "20000");
for (ChangeEvent<String, String> r : records) { try { if (log.isDebugEnabled()) { log.debug("{}\n{}", r.key(), r.value()); } if (r.value() != null && r.value().startsWith("{"ts_ms")) { continue; } xxx 具体数据处理 } catch (Exception e) { log.error("PGLog-binlog param:[{}]", r, e); } }
心跳这个是 当时上生产的时候,突然发现没有数据变更的时候 ,有报警,说delay了。。。这顿害怕
大概意思
数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置heartbeat.interval.ms连接器配置属性。
由于WAL由所有数据库共享,因此使用的WAL数量趋于增长,直到Debezium为其捕获更改的数据库触发事件。为了克服这一点,有必要:使用heartbeat.interval.ms连接器配置属性启用周期性心跳记录生成。定期从Debezium正在捕捉变化的数据库中发出更改事件。
其中
if (r.value() != null && r.value().startsWith("{"ts_ms")) { continue; }
这是因为 如果没有数据来的话,会是ts_ms 开头的,代表,没有新数据
每次binlog传的size 太多,导致服务器处理不过来
props.setProperty("max.batch.size", "200");
对多个表的监听,应该只有一个流进行监听
props.setProperty("table.include.list", schs.stream().map(BinlogConfig::getSch).map(a -> tables.stream().map(b -> a + "." + b).map(String::valueOf).collect(Collectors.joining(","))).map(String::valueOf).collect(Collectors.joining(",")));
希望磁盘持久化offset,保持数据的正确性
props.setProperty("snapshot.mode", "never");
小工具
查询数据库 offset推迟多少
select pg_replication_slots.*, pg_current_wal_lsn(), pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), restart_lsn)) as delay_size from pg_replication_slots;
这个工具在生产已经实践稳定,可以直接使用,有问题 可以评论
代码 github.com/a25017012/y…