pgsql binlog监听功能点解析


    目录
  • 引言
  • 功能点
  • 机器宕机,能支持断点续接
    • 如果监听的表 长时间没有数据变动,delay_size 会变大
  • 每次binlog传的size 太多,导致服务器处理不过来
  • 对多个表的监听,应该只有一个流进行监听
  • 希望磁盘持久化offset,保持数据的正确性
  • 小工具

    引言
    

    监听mysql binlog 大家都知道canal,但是如果是pglog呢,先百度
    
    也就这个靠点谱,文章 没有我想要的demo
    去官网看看debezium.io/
    其中这个 网址给了demo 但是不能直接用于生产
    功能点
    首先 pglog binlog监听需要满足哪些功能点
    机器宕机,能支持断点续接
    进行磁盘持久化
    如果监听的表 长时间没有数据变动,delay_size 会变大
    
//设置心跳时间,就算没有数据 也会保持心跳
props.setProperty("heartbeat.interval.ms", "20000");

    
for (ChangeEvent<String, String> r : records) {
    try {
        if (log.isDebugEnabled()) {
            log.debug("{}\n{}", r.key(), r.value());
        }
        if (r.value() != null && r.value().startsWith("{"ts_ms")) {
            continue;
        }
        xxx 具体数据处理
    } catch (Exception e) {
        log.error("PGLog-binlog param:[{}]", r, e);
    }
}

    心跳这个是 当时上生产的时候,突然发现没有数据变更的时候 ,有报警,说delay了。。。这顿害怕
    
    大概意思
    数据库中有许多更新正在被跟踪,但只有极少数更新与连接器正在为其捕获更改的表和模式相关。这种情况可以通过周期性的心跳事件轻松解决。设置heartbeat.interval.ms连接器配置属性。
    由于WAL由所有数据库共享,因此使用的WAL数量趋于增长,直到Debezium为其捕获更改的数据库触发事件。为了克服这一点,有必要:使用heartbeat.interval.ms连接器配置属性启用周期性心跳记录生成。定期从Debezium正在捕捉变化的数据库中发出更改事件。
    其中
    
if (r.value() != null && r.value().startsWith("{"ts_ms")) {
    continue;
}

    这是因为 如果没有数据来的话,会是ts_ms 开头的,代表,没有新数据
    每次binlog传的size 太多,导致服务器处理不过来
    
props.setProperty("max.batch.size", "200");

    对多个表的监听,应该只有一个流进行监听
    
props.setProperty("table.include.list", schs.stream().map(BinlogConfig::getSch).map(a -> tables.stream().map(b -> a + "." + b).map(String::valueOf).collect(Collectors.joining(","))).map(String::valueOf).collect(Collectors.joining(",")));

    希望磁盘持久化offset,保持数据的正确性
    
props.setProperty("snapshot.mode", "never");

    小工具
    查询数据库 offset推迟多少
    
select pg_replication_slots.*, 
pg_current_wal_lsn(), 
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_insert_lsn(), 
restart_lsn)) as delay_size 
from pg_replication_slots;

    这个工具在生产已经实践稳定,可以直接使用,有问题 可以评论
    代码 github.com/a25017012/y…