文章

PgSQL槽位创建

创建槽位

1
2
3
4
5
6
7
8
9
-- 1️⃣ 复制标识(必须最先)(一次)
SELECT set_replica_identity_full_for_partition('spu');   -- 主表+分区全部设 FULL  

-- 2️⃣ 创建发布(此时无槽,仅元数据)
CREATE PUBLICATION spu_pub FOR TABLE spu;          -- 主表
ALTER  PUBLICATION spu_pub ADD TABLE spu_p2023, spu_p2024, ...;  -- 把分区也加进去 pg 13+:可以省略

-- 3️⃣ 最后创建槽(一经创建,WAL 保留点开始推进)
SELECT pg_create_logical_replication_slot('spu_goods_center_notify_new', 'pgoutput');

函数(设置表复制标识,确保能拿到完整旧值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
CREATE OR REPLACE FUNCTION set_replica_identity_full_for_partition(base_table_name text)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    partition_table_name text;
BEGIN
    -- 1. 处理所有分区表(public 模式下,名字 like 主表_p%)
    FOR partition_table_name IN
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = 'public'
          AND tablename LIKE base_table_name || '_p%'
        ORDER BY tablename
    LOOP
        EXECUTE format('ALTER TABLE public.%I REPLICA IDENTITY FULL', partition_table_name);
        RAISE NOTICE 'Set REPLICA IDENTITY FULL for partition: %', partition_table_name;
    END LOOP;

    -- 2. 处理主表本身
    EXECUTE format('ALTER TABLE public.%I REPLICA IDENTITY FULL', base_table_name);
    RAISE NOTICE 'Set REPLICA IDENTITY FULL for base table: %', base_table_name;
END; 
$$; 

删除槽位

1
SELECT pg_drop_replication_slot('your_slot');-- 若返回空,表示已删除

槽位堆积查询

1
2
3
4
 SELECT slot_name,
       pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal_size
FROM pg_replication_slots
WHERE slot_name = 'sku_goods_center_notify_new';

image-20251109140001734

槽位使用查询

1
 SELECT * FROM pg_replication_slots WHERE slot_name = 'spu_goods_center_notify_new';

image-20251109140401149

pg_publication

PostgreSQL 逻辑复制(Logical Replication)中 Publication(发布)的表范围指定功能

1
2
3
4
5
6
7
8
9
10
-- 你的版本:复制 spu 表的全部数据, 默认 INSERT + UPDATE + DELETE + TRUNCATE
CREATE PUBLICATION spu_pub FOR TABLE spu;

-- PostgreSQL 15+ 带行过滤:只复制符合条件的行
CREATE PUBLICATION spu_pub_filtered 
FOR TABLE spu WHERE (status = 'active');

-- PostgreSQL 15+ 带列过滤:只复制指定列
CREATE PUBLICATION spu_pub_columns 
FOR TABLE spu (id, name, price);

带行过滤(Row Filter)的发布

1. INSERT

1
2
3
-- 主库执行
INSERT INTO spu VALUES (1, '商品A', 'active');    -- ✅ 会复制到订阅端
INSERT INTO spu VALUES (2, '商品B', 'inactive');  -- ❌ 不会复制

2. UPDATE(关键行为)

1
2
3
4
5
6
7
-- 场景:把 inactive 改为 active
UPDATE spu SET status = 'active' WHERE id = 2;  
-- 结果:订阅端会执行 INSERT(新增这条数据)

-- 场景:把 active 改为 inactive  
UPDATE spu SET status = 'inactive' WHERE id = 1;
-- 结果:订阅端会执行 DELETE(删除这条数据)

3. DELETE

1
2
DELETE FROM spu WHERE id = 1;
-- 只有原本满足 status='active' 的行被删除时,才会同步删除

常见使用场景

1
2
3
4
5
6
7
8
9
10
11
-- 1. 只同步上架商品(排除下架/草稿)
CREATE PUBLICATION online_spu 
FOR TABLE spu WHERE (status = 'active' AND is_deleted = false);

-- 2. 只同步特定品类
CREATE PUBLICATION electronics_pub 
FOR TABLE spu WHERE (category_id = 3);

-- 3. 只同步近期数据
CREATE PUBLICATION recent_spu 
FOR TABLE spu WHERE (created_at > '2024-01-01');

© 2024- lfj