Skip to content

PostgreSQL 审计方案

背景与目标

当前系统使用 PostgreSQL 数据库并计划实施完整的数据审计方案。核心诉求是:

  • INSERT:只需要记录新值或基本信息,不需要旧值;
  • DELETE:只需要记录关键标识(主键等),不需要旧值;
  • UPDATE:必须记录被修改行的 旧值(至少是被修改的那些字段),以满足审计和追责需求;
  • 同时,要避免审计表无限膨胀、影响业务写入与查询性能。

本文设计一套 三层核心审计方案 + 一套备选审计方案,各方案互不依赖,可根据需求独立实施:

审计架构规划

核心层审计架构(独立部署、职责分离)

层级名称职责特点实现方式核心存储对象
第一层数据库触发器审计记录业务表的 INSERT/UPDATE/DELETE 操作行级数据变更追溯,记录UPDATE旧值/新值PostgreSQL 触发器 + SECURITY DEFINER 函数audit.audit_log(分区表)
第二层应用层审计(框架)记录应用层的业务操作日志覆盖登录、请求、数据变更、第三方调用等业务行为应用代码(beyondsoft-log-spring3-starter)-(落地表见第三层)

备选方案:pgaudit 插件审计(SQL 级别审计)

  • 职责:记录数据库层面的安全事件和 SQL 语句
  • 特点:记录登录、权限变更、DDL、敏感 SQL 等,适合安全审计与合规留痕
  • 实现:通过 pgaudit 扩展
  • 日志输出:PostgreSQL 日志文件

方案优势

  • 层级独立:核心二层+备选方案互不依赖,可根据需求选择性实施
  • 职责分离:各层审计职责清晰,行级数据、业务行为、SQL级审计互为补充
  • 灵活部署:可分阶段逐步落地(如先部署第一层触发器审计,再落地应用层审计)
  • 性能可控:审计表分区化管理,对主业务表写入性能影响可控
  • 数据可控:审计表可控增长、可快速查询近期数据

数据库与 Schema 规划

  • 数据库: system
  • Schema 划分:
    • dev: 开发环境业务表所在 schema(现有,包含合同、工单、用户、权限等表)
    • audit: 审计表专用 schema(现有,用于存放所有审计日志表及审计触发器函数)
    • prod: 生产环境 schema(预留,暂不使用)
    • public: 默认 schema(暂不使用)

关于 partman schema:pg_partman 扩展默认安装在 public schema 或指定的 schema(如 partman)中,只是存放扩展自身的函数和配置表,不影响业务表和审计表的使用。可以保留但不是必须的,后文统一使用 partman.create_parent() 等函数调用。


第一层:数据库触发器审计(行级数据变更审计)

方案概述

适用场景:需要记录业务表的详细数据变更历史,包括 UPDATE 操作的旧值和新值。

核心特性

  • 记录 INSERT/UPDATE/DELETE 操作的行级数据变更
  • UPDATE 操作记录变更列的旧值和新值
  • 通过触发器自动记录,对业务代码无侵入
  • 使用 SECURITY DEFINER 实现权限隔离
  • 支持动态主键识别和复合主键
  • 支持应用层透传业务用户信息

数据流向

业务操作 (business_user)

dev.business_table INSERT/UPDATE/DELETE

触发器: audit.audit_trigger_func()

SECURITY DEFINER 函数: audit.log_data_change()

写入: audit.audit_log (以 audit_user 身份写入)

1. 数据库设计

1.1 创建审计 schema 与分区主表

sql
-- 1.1 创建审计 schema
CREATE SCHEMA IF NOT EXISTS audit;

-- 1.2 审计主表(分区表)
CREATE TABLE IF NOT EXISTS audit.audit_log (
    id              BIGSERIAL,
    change_time     TIMESTAMPTZ NOT NULL DEFAULT now(),   -- 变更时间
    table_schema    TEXT        NOT NULL,                 -- 业务表 schema
    table_name      TEXT        NOT NULL,                 -- 业务表名称
    operation       CHAR(1)     NOT NULL,                 -- I / U / D
    row_pk          JSONB,                                -- 主键信息(如 {"id": 123})
    old_data        JSONB,                                -- 旧值(主要用于 UPDATE)
    new_data        JSONB,                                -- 新值(INSERT/UPDATE 可选)
    changed_columns TEXT[],                               -- 发生变更的列名
    app_user        TEXT,                                 -- 业务用户(应用层透传)
    db_user         TEXT        NOT NULL DEFAULT current_user,
    client_ip       INET        DEFAULT inet_client_addr(),
    tx_id           BIGINT      NOT NULL DEFAULT txid_current(),
    PRIMARY KEY (id, change_time)                         -- 分区表主键必须包含分区键
) PARTITION BY RANGE (change_time);

-- 1.3 创建自动分区管理函数
-- 创建下个月分区的函数
CREATE OR REPLACE FUNCTION audit.create_next_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_start date;
    v_end   date;
    v_table text;
BEGIN
    -- 计算下一个月的起止日期
    v_start := date_trunc('month', now())::date + INTERVAL '1 month';
    v_end   := (v_start + INTERVAL '1 month');
    v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));

    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
         FOR VALUES FROM (%L) TO (%L);',
        v_table,
        v_start::timestamptz,
        v_end::timestamptz
    );
    
    RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;

-- 创建当前月分区的函数(初始化用)
CREATE OR REPLACE FUNCTION audit.create_current_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_start date;
    v_end   date;
    v_table text;
BEGIN
    v_start := date_trunc('month', now())::date;
    v_end   := (v_start + INTERVAL '1 month');
    v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));

    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
         FOR VALUES FROM (%L) TO (%L);',
        v_table,
        v_start::timestamptz,
        v_end::timestamptz
    );
    
    RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;

-- 创建删除老分区的函数(默认保留 24 个月)
CREATE OR REPLACE FUNCTION audit.drop_old_partitions(p_retention_months INT DEFAULT 24)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_cutoff date;
    v_partition record;
BEGIN
    v_cutoff := date_trunc('month', now())::date - (p_retention_months || ' months')::interval;
    
    FOR v_partition IN
        SELECT tablename
        FROM pg_tables
        WHERE schemaname = 'audit'
          AND tablename LIKE 'audit_log_%'
          AND tablename ~ '^audit_log_\d{4}_\d{2}$'
    LOOP
        -- 提取分区表名中的日期
        DECLARE
            v_partition_date date;
        BEGIN
            v_partition_date := to_date(substring(v_partition.tablename from 11), 'YYYY_MM');
            
            IF v_partition_date < v_cutoff THEN
                EXECUTE format('DROP TABLE IF EXISTS audit.%I;', v_partition.tablename);
                RAISE NOTICE 'Dropped old partition audit.%', v_partition.tablename;
            END IF;
        EXCEPTION WHEN OTHERS THEN
            RAISE NOTICE 'Skip invalid partition name: %', v_partition.tablename;
        END;
    END LOOP;
END;
$$;

-- 1.4 初始化:创建当前月和未来分区
SELECT audit.create_current_month_partition();
SELECT audit.create_next_month_partition();

-- 验证分区是否创建成功
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'audit' 
  AND tablename LIKE 'audit_log%'
ORDER BY tablename;

说明

  • 分区管理函数已内置,无需安装 pg_partman 扩展
  • 使用系统 cron 定时执行分区维护(见后文 3.1 节)
  • 默认保留 24 个月的审计数据,可根据需求调整

1.2 权限隔离方案(SECURITY DEFINER)

1.2.1 用户权限规划

为了实现审计日志与业务数据的权限隔离,采用 SECURITY DEFINER 函数 方案:

用户角色划分

  • business_user(业务用户):只能操作 dev schema 的业务表,无法直接访问 audit schema 的审计表
  • audit_user(审计用户):只能写入 audit schema 的审计表,无法访问 dev schema 的业务表
  • 触发器:通过 SECURITY DEFINER 函数以审计用户身份写入,对业务用户透明

1.2.2 创建用户并设置权限

sql
-- 创建业务用户(如果尚未创建)
CREATE USER business_user WITH PASSWORD 'your_business_password';

-- 创建专用的审计写入用户(如果尚未创建)
CREATE USER audit_user WITH PASSWORD 'your_audit_password';

-- 1. 模式使用和创建权限(business_user)
GRANT USAGE, CREATE ON SCHEMA dev TO business_user;

-- 2. 现有表的DML权限(business_user)
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA dev TO business_user;

-- 3. 序列权限(business_user)
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA dev TO business_user;

-- 4. 未来表的默认权限(business_user)
ALTER DEFAULT PRIVILEGES IN SCHEMA dev 
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO business_user;

ALTER DEFAULT PRIVILEGES IN SCHEMA dev 
GRANT USAGE, SELECT ON SEQUENCES TO business_user;

-- 5. 授予审计用户对 audit schema 的权限(audit_user)
GRANT USAGE ON SCHEMA audit TO audit_user;

-- 6. 授予审计用户对 audit_log 表的权限(数据库层面触发器审计表)(audit_user)
GRANT INSERT, SELECT ON audit.audit_log TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.audit_log_id_seq TO audit_user;

-- 如果不确定序列名称,可以使用以下命令查看实际序列名称:
-- SELECT sequencename FROM pg_sequences WHERE schemaname = 'audit';

-- 7. 为未来创建的审计表自动授权(audit_user)
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
    GRANT INSERT, SELECT ON TABLES TO audit_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
    GRANT USAGE, SELECT ON SEQUENCES TO audit_user;

-- 7.1 【可选】批量授权所有现有的 audit_log 分区表
-- 如果分区已存在但权限缺失,执行以下批量授权脚本
DO $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN 
        SELECT tablename 
        FROM pg_tables 
        WHERE schemaname = 'audit' 
          AND tablename LIKE 'audit_log_%'
    LOOP
        EXECUTE format('GRANT INSERT, SELECT ON audit.%I TO audit_user;', r.tablename);
        RAISE NOTICE 'Granted permissions on audit.%', r.tablename;
    END LOOP;
END $$;

-- 8. 确保业务用户无法直接访问 audit schema(audit_user)
REVOKE ALL ON SCHEMA audit FROM business_user;
REVOKE ALL ON ALL TABLES IN SCHEMA audit FROM business_user;

-- 9. 确保审计用户无法访问 dev schema(audit_user)
REVOKE ALL ON SCHEMA dev FROM audit_user;
REVOKE ALL ON ALL TABLES IN SCHEMA dev FROM audit_user;

-- 10. 授予业务用户访问 audit schema 的最小权限(仅用于调用函数)(audit_user)
GRANT USAGE ON SCHEMA audit TO business_user;

1.2.3 创建权限提升的审计写入函数

使用 SECURITY DEFINER 创建审计写入函数,该函数以其所有者(audit_user)的权限执行:

sql
-- 创建权限提升的审计写入函数
CREATE OR REPLACE FUNCTION audit.log_data_change(
    p_table_schema TEXT,
    p_table_name TEXT,
    p_operation TEXT,
    p_row_pk JSONB,
    p_old_data JSONB,
    p_new_data JSONB,
    p_changed_columns TEXT[],
    p_app_user TEXT
) RETURNS VOID
LANGUAGE plpgsql
SECURITY DEFINER  -- 关键:以函数所有者(audit_user)权限执行
SET search_path = audit, pg_temp  -- 防止搜索路径劫持攻击
AS $$
BEGIN
    INSERT INTO audit.audit_log (
        change_time,
        table_schema,
        table_name,
        operation,
        row_pk,
        old_data,
        new_data,
        changed_columns,
        app_user,
        db_user,
        client_ip,
        tx_id
    ) VALUES (
        now(),
        p_table_schema,
        p_table_name,
        p_operation,
        p_row_pk,
        p_old_data,
        p_new_data,
        p_changed_columns,
        p_app_user,
        current_user,
        inet_client_addr(),
        txid_current()
    );
END;
$$;

-- 修改函数所有者为 audit_user
ALTER FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) OWNER TO audit_user;

-- 授予业务用户执行此函数的权限
GRANT EXECUTE ON FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) TO business_user;

安全说明

  • SECURITY DEFINER:函数以其所有者(audit_user)的权限执行,而不是调用者(business_user)的权限
  • SET search_path = audit, pg_temp:防止搜索路径劫持攻击,确保函数只访问 audit schema
  • 业务用户 business_user 只需要 EXECUTE 权限,无需直接访问 audit.audit_log

1.3 通用审计函数(触发器函数)

说明

  • 该函数自动识别表的主键列,支持不同表使用不同主键名称(如 idrole_iduser_id 等),也支持复合主键。
  • UPDATE:记录变更的列名、这些列的旧值和新值;
  • INSERT/DELETE:记录必要信息,包括主键和完整行数据。
  • 通过调用 audit.log_data_change() 函数实现权限提升,自动以 audit_user 身份写入审计日志。
sql
CREATE OR REPLACE FUNCTION audit.audit_trigger_func()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
    v_app_user      TEXT;
    v_row_pk        JSONB := '{}'::jsonb;
    v_old_row       JSONB;
    v_new_row       JSONB;
    v_old_changed   JSONB := '{}'::jsonb;
    v_new_changed   JSONB := '{}'::jsonb;
    v_changed_cols  TEXT[] := ARRAY[]::TEXT[];
    col             TEXT;
    pk_col          TEXT;
BEGIN
    -- 从会话变量中获取业务用户(应用层设置),允许为空
    BEGIN
        v_app_user := current_setting('audit.user_id', true);
    EXCEPTION WHEN others THEN
        v_app_user := NULL;
    END;

    -- UPDATE: 记录变更列的旧值和新值
    IF TG_OP = 'UPDATE' THEN
        v_old_row := to_jsonb(OLD);
        v_new_row := to_jsonb(NEW);

        -- 动态构建主键(支持多列主键)
        FOR pk_col IN
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = TG_RELID AND i.indisprimary
        LOOP
            v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
        END LOOP;

        -- 找出变更的列
        SELECT array_agg(key)
        INTO v_changed_cols
        FROM jsonb_each(v_new_row) AS e(key, value)
        WHERE (v_old_row -> key) IS DISTINCT FROM value;

        IF v_changed_cols IS NULL OR array_length(v_changed_cols, 1) IS NULL THEN
            RETURN NEW;
        END IF;

        -- 只保留变更列的旧值和新值
        FOREACH col IN ARRAY v_changed_cols LOOP
            v_old_changed := v_old_changed || jsonb_build_object(col, v_old_row -> col);
            v_new_changed := v_new_changed || jsonb_build_object(col, v_new_row -> col);
        END LOOP;

        -- 调用 SECURITY DEFINER 函数写入审计日志(权限自动提升为 audit_user)
        PERFORM audit.log_data_change(
            TG_TABLE_SCHEMA::TEXT,
            TG_TABLE_NAME::TEXT,
            'U',
            v_row_pk,
            v_old_changed,
            v_new_changed,
            v_changed_cols,
            v_app_user
        );
        RETURN NEW;

    -- INSERT: 只记录新值
    ELSIF TG_OP = 'INSERT' THEN
        v_new_row := to_jsonb(NEW);

        -- 动态构建主键
        FOR pk_col IN
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = TG_RELID AND i.indisprimary
        LOOP
            v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_new_row -> pk_col);
        END LOOP;

        PERFORM audit.log_data_change(
            TG_TABLE_SCHEMA::TEXT,
            TG_TABLE_NAME::TEXT,
            'I',
            v_row_pk,
            NULL,
            v_new_row,
            NULL,
            v_app_user
        );
        RETURN NEW;

    -- DELETE: 只记录旧值
    ELSIF TG_OP = 'DELETE' THEN
        v_old_row := to_jsonb(OLD);

        -- 动态构建主键
        FOR pk_col IN
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = TG_RELID AND i.indisprimary
        LOOP
            v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
        END LOOP;

        PERFORM audit.log_data_change(
            TG_TABLE_SCHEMA::TEXT,
            TG_TABLE_NAME::TEXT,
            'D',
            v_row_pk,
            v_old_row,
            NULL,
            NULL,
            v_app_user
        );
        RETURN OLD;

    END IF;

    RETURN NULL;
END;
$$;

-- 重要:修改触发器函数的所有者为 audit_user
-- 这样保证两个函数的所有者一致,避免权限混乱
ALTER FUNCTION audit.audit_trigger_func() OWNER TO audit_user;

优势

  • 动态主键识别:无需修改代码即可适配任何表(idrole_iduser_id 等)。
  • 支持复合主键:如果表有多列主键,全部记录到 row_pk JSONB 字段中。
  • 通用性强:一个函数适用于 dev schema 下所有表。
  • 权限隔离:业务用户 business_user 通过触发器调用 SECURITY DEFINER 函数,自动以 audit_user 权限写入审计日志,无需直接访问审计表。

1.4 权限验证

执行以下 SQL 验证权限设置是否正确:

sql
-- 1. 验证 business_user 无法直接访问 audit.audit_log
SET ROLE business_user;
SELECT * FROM audit.audit_log LIMIT 1;  -- 应该报错:permission denied for table audit_log

-- 2. 验证 business_user 可以通过触发器写入审计日志
SET ROLE business_user;
INSERT INTO dev.your_table (id, name) VALUES (1, 'Test');  -- 应该成功,且触发器自动写入审计日志

-- 3. 验证 audit_user 可以写入 audit_log 表
SET ROLE audit_user;
SELECT * FROM audit.audit_log LIMIT 1;  -- 应该成功

-- 4. 验证 audit_user 无法访问业务表
SET ROLE audit_user;
SELECT * FROM dev.your_table LIMIT 1;  -- 应该报错:permission denied for schema dev

-- 5. 重置角色
RESET ROLE;

1.5 触发器问题诊断与修复

1.5.1 常见问题:触发器创建后审计表无数据

症状

  • 触发器已创建,但 audit.audit_log 表中没有数据
  • 业务操作(INSERT/UPDATE/DELETE)执行成功,但没有触发审计记录

可能原因

  1. 触发器函数所有者权限不正确
  2. 触发器是在权限配置之前创建的
  3. SECURITY DEFINER 函数所有者不是 audit_user

1.5.2 诊断步骤

sql
-- 1. 检查 dev schema 下的表和触发器状态
SELECT 
    t.tablename,
    trg.tgname AS trigger_name,
    p.proname AS function_name,
    pg_get_userbyid(p.proowner) AS function_owner,
    CASE WHEN p.prosecdef THEN 'SECURITY DEFINER' ELSE 'SECURITY INVOKER' END AS security_type
FROM pg_tables t
LEFT JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
LEFT JOIN pg_proc p ON p.oid = trg.tgfoid
WHERE t.schemaname = 'dev'
ORDER BY t.tablename;

-- 2. 检查两个审计函数的所有者和安全类型
SELECT 
    proname, 
    pg_get_userbyid(proowner) AS owner,
    CASE WHEN prosecdef THEN 'SECURITY DEFINER' ELSE 'SECURITY INVOKER' END AS security_type
FROM pg_proc 
WHERE pronamespace = 'audit'::regnamespace 
  AND proname IN ('audit_trigger_func', 'log_data_change');

-- 3. 检查 business_user 对审计函数的执行权限
SELECT 
    p.proname,
    pg_get_userbyid(p.proowner) AS owner,
    has_function_privilege('business_user', p.oid, 'EXECUTE') AS has_execute_privilege
FROM pg_proc p
WHERE p.pronamespace = 'audit'::regnamespace 
  AND p.proname = 'log_data_change';

预期结果

  • audit_trigger_funclog_data_change 的所有者都应该是 audit_user
  • log_data_change 应该是 SECURITY DEFINER
  • business_userlog_data_change 应该有 EXECUTE 权限

1.5.3 修复方案:重新创建触发器

如果发现函数所有者不正确,或触发器无法正常工作,执行以下修复脚本:

sql
-- ========================================
-- 步骤 1: 删除所有旧触发器
-- ========================================
DO $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN 
        SELECT t.schemaname, t.tablename, trg.tgname
        FROM pg_tables t
        JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
        WHERE t.schemaname = 'dev' 
          AND trg.tgname LIKE 'audit_%'
    LOOP
        EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I.%I;', 
                      r.tgname, r.schemaname, r.tablename);
        RAISE NOTICE 'Dropped trigger % on %.%', r.tgname, r.schemaname, r.tablename;
    END LOOP;
END $$;

-- ========================================
-- 步骤 2: 确保函数所有者正确
-- ========================================
-- 修改触发器函数所有者
ALTER FUNCTION audit.audit_trigger_func() OWNER TO audit_user;

-- 修改 SECURITY DEFINER 函数所有者
ALTER FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) OWNER TO audit_user;

-- 确保 business_user 有执行权限
GRANT EXECUTE ON FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) TO business_user;

-- ========================================
-- 步骤 3: 重新创建触发器
-- ========================================
DO $$
DECLARE
    r RECORD;
    trigger_name TEXT;
BEGIN
    FOR r IN 
        SELECT tablename 
        FROM pg_tables 
        WHERE schemaname = 'dev'
    LOOP
        trigger_name := 'audit_' || r.tablename;
        
        EXECUTE format(
            'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
            'FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();',
            trigger_name, r.tablename
        );
        RAISE NOTICE 'Created trigger % on dev.%', trigger_name, r.tablename;
    END LOOP;
END $$;

-- ========================================
-- 步骤 4: 验证修复结果
-- ========================================
-- 查看函数所有者(应该都是 audit_user)
SELECT 
    proname, 
    pg_get_userbyid(proowner) AS owner
FROM pg_proc 
WHERE pronamespace = 'audit'::regnamespace 
  AND proname IN ('audit_trigger_func', 'log_data_change');

-- 查看触发器数量
SELECT 
    schemaname,
    COUNT(*) AS trigger_count
FROM pg_tables t
JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
WHERE t.schemaname = 'dev' 
  AND trg.tgname LIKE 'audit_%'
GROUP BY schemaname;

1.5.4 测试触发器是否正常工作

sql
-- 以 business_user 身份测试
SET ROLE business_user;

-- 在某个表中插入测试数据(替换为实际的表名和列)
INSERT INTO dev.your_test_table (id, name, description) 
VALUES (999, 'audit_test', '测试审计功能');

-- 更新测试数据
UPDATE dev.your_test_table 
SET description = '审计功能测试成功' 
WHERE id = 999;

-- 删除测试数据
DELETE FROM dev.your_test_table WHERE id = 999;

-- 切换回超级用户查看审计日志
RESET ROLE;

-- 查看刚才的操作是否被记录
SELECT 
    id,
    change_time,
    table_name,
    operation,
    row_pk,
    old_data,
    new_data,
    changed_columns,
    db_user
FROM audit.audit_log 
WHERE table_name = 'your_test_table' 
  AND (row_pk->>'id')::int = 999
ORDER BY change_time DESC;

预期结果

  • 应该看到 3 条审计记录(INSERT、UPDATE、DELETE)
  • INSERT 记录有 new_data
  • UPDATE 记录有 old_datanew_datachanged_columns
  • DELETE 记录有 old_data
  • 所有记录的 db_user 应该显示为 business_user

1.5.5 故障排查清单

如果修复后仍然没有审计数据,按以下顺序检查:

sql
-- ✅ 检查 1: 函数是否存在
SELECT COUNT(*) FROM pg_proc 
WHERE pronamespace = 'audit'::regnamespace 
  AND proname IN ('audit_trigger_func', 'log_data_change');
-- 预期结果: 2

-- ✅ 检查 2: 触发器是否存在
SELECT COUNT(*) FROM pg_trigger 
WHERE tgname LIKE 'audit_%' 
  AND tgrelid IN (SELECT oid FROM pg_class WHERE relnamespace = 'dev'::regnamespace);
-- 预期结果: > 0

-- ✅ 检查 3: 触发器是否启用
SELECT 
    tgname,
    tgenabled,
    CASE tgenabled 
        WHEN 'O' THEN 'enabled'
        WHEN 'D' THEN 'disabled'
        WHEN 'R' THEN 'replica'
        WHEN 'A' THEN 'always'
    END AS status
FROM pg_trigger 
WHERE tgname LIKE 'audit_%'
LIMIT 5;
-- 预期结果: tgenabled = 'O' (enabled)

-- ✅ 检查 4: audit_log 表是否有分区
SELECT 
    schemaname,
    tablename,
    pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'audit' 
  AND tablename LIKE 'audit_log%';
-- 如果只有主表没有分区,需要先创建当前月份的分区

-- ✅ 检查 5: 创建分区(如果缺失)
CREATE TABLE IF NOT EXISTS audit.audit_log_2025_01 PARTITION OF audit.audit_log
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
-- 根据实际月份调整

重要提示:如果审计表是分区表但没有对应当前时间的分区,INSERT 会失败。请确保已创建当前月份的分区,或配置了自动分区管理(参考本文档的分区管理章节)。

1.6 权限矩阵(第一层)

用户/角色dev schema (业务表)audit schema (访问)audit_log (SELECT/INSERT)audit.log_data_change() (执行)触发器执行
business_user (业务用户)✅ SELECT/INSERT/UPDATE/DELETE✅ USAGE only✅ EXECUTE✅ 自动调用
audit_user (审计用户)✅ USAGE✅ SELECT/INSERT✅ OWNERN/A
dba/superuser

2. 为业务表创建审计触发器

2.1 为关键业务表创建触发器

建议:只在关键业务表上开启审计(例如合同、工单、用户、权限相关表),降低整体写入压力。如果需要对 dev schema 下所有表开启审计,可以使用后文的批量脚本。

假设 dev schema 下有表 contractworkordersys_usersys_role 等:

sql
-- dev.contract 合同表
CREATE TRIGGER audit_contract
AFTER INSERT OR UPDATE OR DELETE ON dev.contract
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();

-- dev.workorder 工单表
CREATE TRIGGER audit_workorder
AFTER INSERT OR UPDATE OR DELETE ON dev.workorder
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();

-- dev.sys_user 用户表
CREATE TRIGGER audit_sys_user
AFTER INSERT OR UPDATE OR DELETE ON dev.sys_user
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();

-- dev.sys_role 角色表
CREATE TRIGGER audit_sys_role
AFTER INSERT OR UPDATE OR DELETE ON dev.sys_role
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();

2.2 批量为 dev schema 下所有表创建审计触发器

如果需要对 dev schema 下的所有表统一开启审计,可以使用以下脚本:

sql
DO $$
DECLARE
    r RECORD;
    trigger_name TEXT;
BEGIN
    FOR r IN 
        SELECT tablename 
        FROM pg_tables 
        WHERE schemaname = 'dev'
    LOOP
        trigger_name := 'audit_' || r.tablename;
        
        -- 检查触发器是否已存在,避免重复创建
        IF NOT EXISTS (
            SELECT 1 FROM pg_trigger 
            WHERE tgname = trigger_name
        ) THEN
            EXECUTE format(
                'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
                'FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();',
                trigger_name, r.tablename
            );
            RAISE NOTICE 'Created trigger % on dev.%', trigger_name, r.tablename;
        END IF;
    END LOOP;
END $$;

3. 审计表性能优化

3.1 分区 + 保留策略(核心手段)

  • 审计表使用 按月份分区PARTITION BY RANGE(change_time)),避免单表行数累积到几千万/几亿;
  • 对于过旧的数据(如 6 个月/1 年前),通过删除或归档整个月的分区表进行"粗粒度清理",而不是在大表里 DELETE
  • 分区的创建和清理必须完全自动化,不依赖人工执行 SQL。

3.1.1 配置定时维护任务

分区管理函数已在前文 1.1 节创建完成,现在只需配置系统 cron 定时执行即可。

使用系统 cron(推荐,适用于所有平台和所有 PostgreSQL 版本)

bash
# 创建日志目录
sudo mkdir -p /var/log/postgresql
sudo chown postgres:postgres /var/log/postgresql

# 编辑 postgres 用户的 crontab(重要:必须使用 postgres 用户)
sudo crontab -u postgres -e

# 添加以下两行
# 每天凌晨 2 点创建下个月分区
0 2 * * * psql -d postgres -c "SELECT audit.create_next_month_partition();" >> /var/log/postgresql/partition_create.log 2>&1

# 每天凌晨 3 点清理老分区(保留 24 个月)
0 3 * * * psql -d postgres -c "SELECT audit.drop_old_partitions(24);" >> /var/log/postgresql/partition_cleanup.log 2>&1

第二层:应用层审计(业务日志审计)

方案概述

适用场景:需要记录应用层的全量业务行为,包括用户登录、接口请求、数据变更、第三方调用等,补充数据库层审计的视角盲区。

核心特性

  • 与业务代码解耦(通过 beyondsoft-log-spring3-starter 实现)
  • 记录更丰富的业务上下文(用户IP、请求参数、耗时、trace_id等)
  • 支持结构化存储(JSONB)和高效查询
  • 审计数据落地到 PostgreSQL 专用表(见第三层)
  • 对应用代码入侵性低(基于注解/拦截器实现)

数据流向

用户操作/接口调用

应用层拦截器/注解(beyondsoft-log-spring3-starter)

业务日志格式化处理

写入: audit 下4张应用层审计表(以 audit_user 身份写入)

核心职责

审计类型记录内容价值
登录审计用户名、IP、登录位置、终端信息、登录状态溯源用户登录行为,排查异常登录
请求审计请求URL、参数、耗时、响应、错误信息、操作用户全量接口调用追溯,性能问题定位
数据变更审计操作人、变更内容、业务ID、新旧值业务数据变更的应用层溯源
第三方调用审计调用地址、参数、耗时、响应第三方接口问题定位,责任界定

实现方式

  1. 引入 beyondsoft-log-spring3-starter 依赖
  2. 配置日志输出目标为 PostgreSQL(指向 audit 下的 4 张表)
  3. 通过注解/配置指定需要审计的接口/方法
  4. 配置 audit_user 数据库账号用于日志写入
  5. 配置日志保留策略(与第三层表分区策略对齐)

1. 应用层审计表设计

🚧 以下四张表是使用 PostgreSQL 替代 MongoDB 后落地应用层审计的核心表,需在 audit schema 下创建:

  • 📌 beyond_soft_login_log:登录日志表
  • 📌 beyond_soft_request_log:请求日志表
  • 📌 beyond_soft_data_audit_log:数据变更审计表
  • 📌 beyond_soft_third_part_log:第三方调用日志表

1.1 创建应用层审计表(分区表)

sql
-- 在audit schema下执行建表语句
-- ==========================================
-- 1. 登陆日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_login_log (
    id BIGSERIAL,
    username VARCHAR(100),
    ip VARCHAR(50),
    location VARCHAR(200),
    platform VARCHAR(50),
    browser VARCHAR(100),
    os VARCHAR(100),
    status VARCHAR(50) DEFAULT '登录成功',
    response TEXT,
    request_time TIMESTAMP NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);

-- 创建索引
CREATE INDEX IF NOT EXISTS idx_login_log_username ON audit.beyond_soft_login_log(username);
CREATE INDEX IF NOT EXISTS idx_login_log_request_time ON audit.beyond_soft_login_log(request_time);
CREATE INDEX IF NOT EXISTS idx_login_log_ip ON audit.beyond_soft_login_log(ip);

COMMENT ON TABLE audit.beyond_soft_login_log IS '登录日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_login_log.request_time IS '分区键:按此字段进行分区';

-- ==========================================
-- 2. 请求日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_request_log (
    id BIGSERIAL,
    ip VARCHAR(50),
    location VARCHAR(200),
    request_type VARCHAR(20),
    url VARCHAR(1000),
    request_params TEXT,
    content_type VARCHAR(100),
    operator VARCHAR(100),
    user_agent TEXT,
    start_time TIMESTAMP,
    end_time TIMESTAMP,
    spend_time BIGINT,
    status INTEGER,
    response TEXT,
    error TEXT,
    trace_id VARCHAR(100),
    request_time TIMESTAMP NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);

-- 创建索引
CREATE INDEX IF NOT EXISTS idx_request_log_url ON audit.beyond_soft_request_log USING hash(url);
CREATE INDEX IF NOT EXISTS idx_request_log_trace_id ON audit.beyond_soft_request_log(trace_id);
CREATE INDEX IF NOT EXISTS idx_request_log_request_time ON audit.beyond_soft_request_log(request_time);
CREATE INDEX IF NOT EXISTS idx_request_log_operator ON audit.beyond_soft_request_log(operator);

COMMENT ON TABLE audit.beyond_soft_request_log IS '请求日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_request_log.request_time IS '分区键:按此字段进行分区';

-- ==========================================
-- 3. 审计日志表(支持JSONB)
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_data_audit_log (
    id BIGSERIAL,
    operator VARCHAR(100),
    modify_date TIMESTAMP,
    operation VARCHAR(50),
    handle_name VARCHAR(200),
    modify_content TEXT,
    modifier_ip VARCHAR(50),
    modifier_location VARCHAR(200),
    business_id VARCHAR(100),
    business_data_status VARCHAR(50),
    old_object JSONB,
    new_object JSONB,
    response TEXT,
    request_time TIMESTAMP NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);

-- 创建索引
CREATE INDEX IF NOT EXISTS idx_audit_log_operator ON audit.beyond_soft_data_audit_log(operator);
CREATE INDEX IF NOT EXISTS idx_audit_log_business_id ON audit.beyond_soft_data_audit_log(business_id);
CREATE INDEX IF NOT EXISTS idx_audit_log_request_time ON audit.beyond_soft_data_audit_log(request_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_operation ON audit.beyond_soft_data_audit_log(operation);

-- JSONB 字段的 GIN 索引(支持高效 JSON 查询)
CREATE INDEX IF NOT EXISTS idx_audit_log_old_object_gin ON audit.beyond_soft_data_audit_log USING gin(old_object);
CREATE INDEX IF NOT EXISTS idx_audit_log_new_object_gin ON audit.beyond_soft_data_audit_log USING gin(new_object);

COMMENT ON TABLE audit.beyond_soft_data_audit_log IS '审计日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_data_audit_log.old_object IS 'JSONB格式:操作前的对象';
COMMENT ON COLUMN audit.beyond_soft_data_audit_log.new_object IS 'JSONB格式:操作后的对象';
COMMENT ON COLUMN audit.beyond_soft_data_audit_log.request_time IS '分区键:按此字段进行分区';

-- ==========================================
-- 4. 第三方调用日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_third_part_log (
    id BIGSERIAL,
    request_id VARCHAR(100),
    protocols VARCHAR(20),
    url VARCHAR(1000),
    query_params TEXT,
    body TEXT,
    call_source VARCHAR(200),
    method_name VARCHAR(200),
    headers JSONB,
    start_time TIMESTAMP NOT NULL,
    end_time TIMESTAMP,
    spend_time BIGINT,
    response TEXT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (id, start_time)
) PARTITION BY RANGE (start_time);

-- 创建索引
CREATE INDEX IF NOT EXISTS idx_third_part_log_request_id ON audit.beyond_soft_third_part_log(request_id);
CREATE INDEX IF NOT EXISTS idx_third_part_log_url ON audit.beyond_soft_third_part_log USING hash(url);
CREATE INDEX IF NOT EXISTS idx_third_part_log_call_source ON audit.beyond_soft_third_part_log(call_source);
CREATE INDEX IF NOT EXISTS idx_third_part_log_start_time ON audit.beyond_soft_third_part_log(start_time);

COMMENT ON TABLE audit.beyond_soft_third_part_log IS '第三方调用日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_third_part_log.headers IS 'JSONB格式:请求头信息';
COMMENT ON COLUMN audit.beyond_soft_third_part_log.start_time IS '分区键:按此字段进行分区';

1.2 应用层审计表权限配置

sql
-- 1. 授予审计用户对应用层四个日志表的权限
GRANT INSERT, SELECT ON audit.beyond_soft_data_audit_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_login_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_request_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_third_part_log TO audit_user;

-- 2. 授予审计用户对日志表序列的权限
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_data_audit_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_login_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_request_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_third_part_log_id_seq TO audit_user;

-- 3. 【重要】为未来创建的分区子表自动授权
-- 确保动态创建的分区子表自动继承权限
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
    GRANT INSERT, SELECT ON TABLES TO audit_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
    GRANT USAGE, SELECT ON SEQUENCES TO audit_user;

-- 4. 【可选】如果分区已经存在但权限缺失,批量授权所有现有分区
-- 对所有现有的应用层审计日志分区表授权
DO $$
DECLARE
    r RECORD;
BEGIN
    FOR r IN 
        SELECT tablename 
        FROM pg_tables 
        WHERE schemaname = 'audit' 
          AND (tablename LIKE 'beyond_soft_data_audit_log_%'
            OR tablename LIKE 'beyond_soft_login_log_%'
            OR tablename LIKE 'beyond_soft_request_log_%'
            OR tablename LIKE 'beyond_soft_third_part_log_%')
    LOOP
        EXECUTE format('GRANT INSERT, SELECT ON audit.%I TO audit_user;', r.tablename);
        RAISE NOTICE 'Granted permissions on audit.%', r.tablename;
    END LOOP;
END $$;

1.3 应用层审计表分区管理

说明:应用层审计表按分区(高频写入场景),分区管理通过beyondsoft-log-spring3-starter实现:

1.4 权限矩阵(第三层)

用户/角色dev schema (业务表)audit schema (访问)4张应用层审计表 (SELECT/INSERT)序列权限
business_user (业务用户)✅ USAGE only
audit_user (审计用户)✅ USAGE✅ SELECT/INSERT✅ USAGE/SELECT
dba/superuser

备选方案:pgaudit 插件审计(SQL 级别审计)

方案概述

适用场景:需要满足合规要求,记录数据库层面的所有 SQL 操作、权限变更、DDL 操作等安全事件。

核心特性

  • 记录所有数据库层面的 SQL 执行(SELECT/INSERT/UPDATE/DELETE/DDL/DCL)
  • 按角色/用户/表粒度过滤审计内容
  • 日志输出到 PostgreSQL 日志文件,便于集中收集(如 ELK)
  • 低性能损耗(相比触发器审计)
  • 无需修改业务代码/数据库结构

1. 安装与配置

1.1 安装 pgaudit 扩展

bash
# CentOS/RHEL
sudo yum install postgresql14-pgaudit

# Debian/Ubuntu
sudo apt-get install postgresql-14-pgaudit

# 重启PostgreSQL
sudo systemctl restart postgresql-14

1.2 启用 pgaudit 扩展

sql
-- 进入目标数据库
psql -U postgres -d system

-- 创建扩展
CREATE EXTENSION IF NOT EXISTS pgaudit;

1.3 配置 postgresql.conf

ini
# 修改 postgresql.conf(通常在 /var/lib/pgsql/14/data/ 目录)
shared_preload_libraries = 'pgaudit'  # 添加 pgaudit
pgaudit.log = 'ddl, write, function, role'  # 审计类型:DDL、写入操作、函数调用、角色变更
pgaudit.log_catalog = off  # 不审计系统表
pgaudit.log_level = 'notice'  # 日志级别
pgaudit.log_parameter = on  # 记录SQL参数
pgaudit.log_relation = on  # 记录表级操作
pgaudit.log_statement_once = off  # 每次执行都记录

# 日志输出配置
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_rotation_age = 1d
log_rotation_size = 100MB
log_line_prefix = '%t [%p]: [%c-%l] user=%u,db=%d,app=%a,client=%h '
log_statement = 'none'  # 关闭默认SQL记录,由pgaudit接管

1.4 重启生效

bash
sudo systemctl restart postgresql-14

2. 细粒度审计配置

2.1 按表配置审计

sql
-- 仅审计 dev schema 下的关键表
ALTER ROLE business_user SET pgaudit.log_relation = 'on';
ALTER TABLE dev.contract SET (pgaudit.log = 'write');
ALTER TABLE dev.sys_user SET (pgaudit.log = 'all');

2.2 审计结果查看

bash
# 查看审计日志
tail -f /var/lib/pgsql/14/data/pg_log/postgresql-2025-01-01_000000.log

# 示例日志内容
# 2025-01-01 10:00:00 UTC [1234]: [1-1] user=business_user,db=system,app=psql,client=192.168.1.100 LOG:  AUDIT: OBJECT,1,WRITE,TABLE,dev.contract,INSERT,public,"INSERT INTO dev.contract (id, name) VALUES (1, 'Test')",<none>

3. 方案对比

维度触发器审计(第一层)应用层审计(第二层)pgaudit 插件审计(备选)
审计粒度行级(数据变更)业务行为级(登录/请求)SQL语句级(数据库操作)
性能影响低(行级触发器)低(应用层异步写入)极低(内核级审计)
数据存储PostgreSQL 分区表PostgreSQL 分区表日志文件
可查询性高(SQL/JSONB查询)高(结构化查询)中(日志解析)
合规支持业务数据追溯业务行为追溯数据库安全合规
入侵性低(触发器)低(starter插件)无(扩展)

4. 部署建议

  • 核心审计:优先部署第一层(触发器审计)+ 第二层(应用层审计),覆盖业务数据和行为追溯;
  • 合规补充:在生产环境部署 pgaudit 插件审计,满足安全合规要求;
  • 性能优化:所有审计表均采用分区表,配置自动分区清理,避免数据膨胀;
  • 权限管控:严格遵循 "审计用户仅可写入审计表,不可访问业务表" 的原则。

跨库审计方案(业务库与审计库分离)

方案概述

适用场景:业务数据库与审计数据库物理隔离,触发器无法直接跨库写入审计日志。

核心问题

  • PostgreSQL 不支持触发器函数跨数据库调用
  • 触发器无法直接 INSERT 到另一个数据库的表
  • 需要在保证审计完整性的前提下实现数据库级别的隔离

解决思路: 采用 外部数据包装器(Foreign Data Wrapper, FDW) + 触发器 方案,将跨库写入转化为本地写入外部表。

架构对比

方案适用场景数据流向优势劣势
同库不同Schema业务表和审计表在同一数据库触发器 → 本地 audit schema简单、高性能、事务一致数据库压力集中
跨库 FDW 方案业务库和审计库完全分离触发器 → FDW外部表 → 远程审计库数据库隔离、故障隔离配置复杂、轻微性能损耗

1. 跨库审计架构设计

1.1 数据流向

业务操作 (business_user)

business_db.dev.business_table INSERT/UPDATE/DELETE

触发器: audit_trigger_func()

FDW 外部表: business_db.audit_fdw.audit_log_remote

postgres_fdw 连接

远程审计库: audit_db.audit.audit_log

1.2 数据库规划

数据库Schema用途主要对象
business_dbdev业务表contract, workorder, sys_user 等
business_dbaudit_fdw外部表(指向审计库)audit_log_remote(外部表)
audit_dbaudit审计日志存储audit_log(分区表)

2. 实施步骤

2.1 审计库(audit_db)准备

2.1.1 创建审计数据库和表结构

sql
-- 在 PostgreSQL 实例上创建审计数据库
CREATE DATABASE audit_db;

-- 切换到审计数据库
\c audit_db

-- 创建审计 schema
CREATE SCHEMA IF NOT EXISTS audit;

-- 创建审计分区主表(与前文第一层方案相同)
CREATE TABLE IF NOT EXISTS audit.audit_log (
    id              BIGSERIAL,
    change_time     TIMESTAMPTZ NOT NULL DEFAULT now(),
    table_schema    TEXT        NOT NULL,
    table_name      TEXT        NOT NULL,
    operation       CHAR(1)     NOT NULL,
    row_pk          JSONB,
    old_data        JSONB,
    new_data        JSONB,
    changed_columns TEXT[],
    app_user        TEXT,
    db_user         TEXT        NOT NULL DEFAULT current_user,
    client_ip       INET        DEFAULT inet_client_addr(),
    tx_id           BIGINT      NOT NULL DEFAULT txid_current(),
    source_db       TEXT,                                 -- 新增:记录来源数据库
    PRIMARY KEY (id, change_time)
) PARTITION BY RANGE (change_time);

-- 创建当前月和下月分区(使用前文的分区管理函数)
CREATE OR REPLACE FUNCTION audit.create_current_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
    v_start date;
    v_end   date;
    v_table text;
BEGIN
    v_start := date_trunc('month', now())::date;
    v_end   := (v_start + INTERVAL '1 month');
    v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));

    EXECUTE format(
        'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
         FOR VALUES FROM (%L) TO (%L);',
        v_table,
        v_start::timestamptz,
        v_end::timestamptz
    );
    
    RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;

-- 初始化分区
SELECT audit.create_current_month_partition();

2.1.2 创建审计用户并授权

sql
-- 在审计库创建专用用户
CREATE USER audit_writer WITH PASSWORD 'your_secure_password';

-- 授予审计 schema 权限
GRANT USAGE ON SCHEMA audit TO audit_writer;

-- 授予审计表写入权限
GRANT INSERT, SELECT ON audit.audit_log TO audit_writer;
GRANT USAGE, SELECT ON SEQUENCE audit.audit_log_id_seq TO audit_writer;

-- 为未来分区自动授权
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
    GRANT INSERT, SELECT ON TABLES TO audit_writer;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
    GRANT USAGE, SELECT ON SEQUENCES TO audit_writer;

2.2 业务库(business_db)配置 FDW

2.2.1 安装并启用 postgres_fdw 扩展

sql
-- 切换到业务数据库
\c business_db

-- 创建外部数据包装器扩展
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

-- 创建 FDW schema(用于存放外部表)
CREATE SCHEMA IF NOT EXISTS audit_fdw;

2.2.2 创建外部服务器连接

sql
-- 创建指向审计库的外部服务器
CREATE SERVER audit_db_server
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (
        host 'localhost',           -- 审计库主机地址(如果在不同机器上需修改)
        port '5432',                -- 审计库端口
        dbname 'audit_db'           -- 审计数据库名
    );

-- 创建用户映射(业务库用户映射到审计库的 audit_writer)
CREATE USER MAPPING FOR business_user
    SERVER audit_db_server
    OPTIONS (
        user 'audit_writer',
        password 'your_secure_password'
    );

-- 如果有多个业务用户,需要为每个用户创建映射
-- 或者创建 PUBLIC 映射(所有用户都使用相同的远程账号)
CREATE USER MAPPING FOR PUBLIC
    SERVER audit_db_server
    OPTIONS (
        user 'audit_writer',
        password 'your_secure_password'
    );

2.2.3 创建外部表(映射到审计库的 audit_log)

sql
-- 方式一:手动创建外部表
CREATE FOREIGN TABLE audit_fdw.audit_log_remote (
    id              BIGINT,
    change_time     TIMESTAMPTZ,
    table_schema    TEXT,
    table_name      TEXT,
    operation       CHAR(1),
    row_pk          JSONB,
    old_data        JSONB,
    new_data        JSONB,
    changed_columns TEXT[],
    app_user        TEXT,
    db_user         TEXT,
    client_ip       INET,
    tx_id           BIGINT,
    source_db       TEXT
)
SERVER audit_db_server
OPTIONS (schema_name 'audit', table_name 'audit_log');

-- 方式二:自动导入审计库的表结构(推荐)
IMPORT FOREIGN SCHEMA audit
    LIMIT TO (audit_log)
    FROM SERVER audit_db_server
    INTO audit_fdw;

-- 重命名外部表(可选)
ALTER FOREIGN TABLE audit_fdw.audit_log RENAME TO audit_log_remote;

2.2.4 授权业务用户访问外部表

sql
-- 授予 business_user 使用 audit_fdw schema 的权限
GRANT USAGE ON SCHEMA audit_fdw TO business_user;

-- 授予外部表的 INSERT 权限
GRANT INSERT ON audit_fdw.audit_log_remote TO business_user;

2.3 修改触发器函数(写入外部表)

2.3.1 创建跨库审计触发器函数

sql
-- 在业务库创建触发器函数(写入外部表)
CREATE OR REPLACE FUNCTION audit_fdw.audit_trigger_func_fdw()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
    v_app_user      TEXT;
    v_row_pk        JSONB := '{}'::jsonb;
    v_old_row       JSONB;
    v_new_row       JSONB;
    v_old_changed   JSONB := '{}'::jsonb;
    v_new_changed   JSONB := '{}'::jsonb;
    v_changed_cols  TEXT[] := ARRAY[]::TEXT[];
    col             TEXT;
    pk_col          TEXT;
BEGIN
    -- 从会话变量中获取业务用户
    BEGIN
        v_app_user := current_setting('audit.user_id', true);
    EXCEPTION WHEN others THEN
        v_app_user := NULL;
    END;

    -- UPDATE: 记录变更列的旧值和新值
    IF TG_OP = 'UPDATE' THEN
        v_old_row := to_jsonb(OLD);
        v_new_row := to_jsonb(NEW);

        -- 动态构建主键
        FOR pk_col IN
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = TG_RELID AND i.indisprimary
        LOOP
            v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
        END LOOP;

        -- 找出变更的列
        SELECT array_agg(key)
        INTO v_changed_cols
        FROM jsonb_each(v_new_row) AS e(key, value)
        WHERE (v_old_row -> key) IS DISTINCT FROM value;

        IF v_changed_cols IS NULL OR array_length(v_changed_cols, 1) IS NULL THEN
            RETURN NEW;
        END IF;

        -- 只保留变更列的旧值和新值
        FOREACH col IN ARRAY v_changed_cols LOOP
            v_old_changed := v_old_changed || jsonb_build_object(col, v_old_row -> col);
            v_new_changed := v_new_changed || jsonb_build_object(col, v_new_row -> col);
        END LOOP;

        -- 写入外部表(跨库写入)
        INSERT INTO audit_fdw.audit_log_remote (
            change_time, table_schema, table_name, operation,
            row_pk, old_data, new_data, changed_columns,
            app_user, db_user, client_ip, tx_id, source_db
        ) VALUES (
            now(),
            TG_TABLE_SCHEMA::TEXT,
            TG_TABLE_NAME::TEXT,
            'U',
            v_row_pk,
            v_old_changed,
            v_new_changed,
            v_changed_cols,
            v_app_user,
            current_user,
            inet_client_addr(),
            txid_current(),
            current_database()  -- 记录来源数据库
        );
        RETURN NEW;

    -- INSERT: 只记录新值
    ELSIF TG_OP = 'INSERT' THEN
        v_new_row := to_jsonb(NEW);

        FOR pk_col IN
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = TG_RELID AND i.indisprimary
        LOOP
            v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_new_row -> pk_col);
        END LOOP;

        INSERT INTO audit_fdw.audit_log_remote (
            change_time, table_schema, table_name, operation,
            row_pk, old_data, new_data, changed_columns,
            app_user, db_user, client_ip, tx_id, source_db
        ) VALUES (
            now(),
            TG_TABLE_SCHEMA::TEXT,
            TG_TABLE_NAME::TEXT,
            'I',
            v_row_pk,
            NULL,
            v_new_row,
            NULL,
            v_app_user,
            current_user,
            inet_client_addr(),
            txid_current(),
            current_database()
        );
        RETURN NEW;

    -- DELETE: 只记录旧值
    ELSIF TG_OP = 'DELETE' THEN
        v_old_row := to_jsonb(OLD);

        FOR pk_col IN
            SELECT a.attname
            FROM pg_index i
            JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
            WHERE i.indrelid = TG_RELID AND i.indisprimary
        LOOP
            v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
        END LOOP;

        INSERT INTO audit_fdw.audit_log_remote (
            change_time, table_schema, table_name, operation,
            row_pk, old_data, new_data, changed_columns,
            app_user, db_user, client_ip, tx_id, source_db
        ) VALUES (
            now(),
            TG_TABLE_SCHEMA::TEXT,
            TG_TABLE_NAME::TEXT,
            'D',
            v_row_pk,
            v_old_row,
            NULL,
            NULL,
            v_app_user,
            current_user,
            inet_client_addr(),
            txid_current(),
            current_database()
        );
        RETURN OLD;

    END IF;

    RETURN NULL;
END;
$$;

2.3.2 为业务表创建跨库审计触发器

sql
-- 批量为 dev schema 下所有表创建跨库审计触发器
DO $$
DECLARE
    r RECORD;
    trigger_name TEXT;
BEGIN
    FOR r IN 
        SELECT tablename 
        FROM pg_tables 
        WHERE schemaname = 'dev'
    LOOP
        trigger_name := 'audit_fdw_' || r.tablename;
        
        -- 先删除旧触发器(如果存在)
        EXECUTE format('DROP TRIGGER IF EXISTS %I ON dev.%I;', trigger_name, r.tablename);
        
        -- 创建新触发器(使用 FDW 函数)
        EXECUTE format(
            'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
            'FOR EACH ROW EXECUTE FUNCTION audit_fdw.audit_trigger_func_fdw();',
            trigger_name, r.tablename
        );
        RAISE NOTICE 'Created FDW trigger % on dev.%', trigger_name, r.tablename;
    END LOOP;
END $$;

2.4 测试跨库审计

sql
-- 切换到业务库
\c business_db

-- 以 business_user 身份执行业务操作
SET ROLE business_user;

-- 插入测试数据
INSERT INTO dev.contract (id, name, amount) VALUES (9999, 'FDW审计测试', 100000);

-- 更新测试数据
UPDATE dev.contract SET amount = 200000 WHERE id = 9999;

-- 删除测试数据
DELETE FROM dev.contract WHERE id = 9999;

-- 重置角色
RESET ROLE;

-- 切换到审计库查看审计日志
\c audit_db

SELECT 
    id,
    change_time,
    source_db,
    table_name,
    operation,
    row_pk,
    old_data,
    new_data,
    changed_columns,
    db_user
FROM audit.audit_log 
WHERE table_name = 'contract' 
  AND (row_pk->>'id')::int = 9999
ORDER BY change_time DESC;

预期结果

  • 应该看到 3 条审计记录(INSERT、UPDATE、DELETE)
  • source_db 字段显示为 business_db
  • 所有操作都成功记录到审计库的 audit.audit_log 表中

3. 性能与事务一致性

3.1 性能影响

维度同库方案跨库 FDW 方案
网络开销有(本地网络约 0.1-1ms)
事务开销中(FDW 自动管理分布式事务)
写入延迟~1ms~2-5ms
适用场景单库高并发多库隔离、合规要求

3.2 事务一致性保证

FDW 方案下的事务行为:

sql
-- 示例:业务操作和审计写入在同一事务中
BEGIN;
    INSERT INTO dev.contract (id, name) VALUES (1, 'Test');  -- 业务库写入
    -- 触发器自动通过 FDW 写入审计库
COMMIT;  -- 两个库的操作要么全部成功,要么全部回滚

-- 如果任意一方失败,整个事务回滚
BEGIN;
    INSERT INTO dev.contract (id, name) VALUES (2, 'Test2');
    -- 假设审计库连接失败或分区不存在
ROLLBACK;  -- 业务操作也会回滚

注意事项

  • FDW 使用 两阶段提交(2PC) 保证跨库事务一致性
  • 如果审计库不可用,业务操作会失败(可以通过异常处理优化)

3.3 高可用优化(可选)

如果希望审计库故障时不影响业务操作,可以使用异常捕获:

sql
CREATE OR REPLACE FUNCTION audit_fdw.audit_trigger_func_fdw_safe()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
    -- ... 变量声明同前 ...
BEGIN
    -- ... 数据处理逻辑同前 ...

    -- 在写入外部表时捕获异常
    BEGIN
        INSERT INTO audit_fdw.audit_log_remote (...) VALUES (...);
    EXCEPTION WHEN OTHERS THEN
        -- 记录到本地日志,但不影响业务事务
        RAISE WARNING 'Audit log write failed: %', SQLERRM;
        -- 可选:写入本地备份表
    END;

    RETURN NEW;  -- 或 OLD(DELETE 操作)
END;
$$;

⚠️ 警告:此方案会降低审计完整性,仅在对可用性要求极高的场景下使用。

4. 方案选择指南

场景推荐方案原因
业务库和审计库在同一数据库实例同库不同 Schema 方案(第一层)简单、高性能、事务一致性强
业务库和审计库需要物理隔离跨库 FDW 方案数据库级隔离、故障隔离
多个业务库共享一个审计库跨库 FDW 方案集中式审计管理
对审计完整性要求极高同库 + 应用层双写双重保障
审计库需要部署在不同机器跨库 FDW 方案支持跨主机连接

5. 安全加固

5.1 FDW 连接加密

sql
-- 修改外部服务器配置,启用 SSL 连接
ALTER SERVER audit_db_server OPTIONS (ADD sslmode 'require');

-- 在审计库的 pg_hba.conf 中限制连接来源
-- hostssl  audit_db  audit_writer  192.168.1.0/24  md5

5.2 密码管理

sql
-- 使用 .pgpass 文件存储密码(避免在 SQL 中明文密码)
-- ~/.pgpass 内容示例:
-- localhost:5432:audit_db:audit_writer:your_secure_password

-- 创建用户映射时不指定密码(自动从 .pgpass 读取)
CREATE USER MAPPING FOR business_user
    SERVER audit_db_server
    OPTIONS (user 'audit_writer');

5.3 权限最小化

sql
-- 在审计库限制 audit_writer 只能 INSERT
REVOKE UPDATE, DELETE ON audit.audit_log FROM audit_writer;

-- 在业务库限制 business_user 只能 INSERT 外部表
REVOKE SELECT, UPDATE, DELETE ON audit_fdw.audit_log_remote FROM business_user;
GRANT INSERT ON audit_fdw.audit_log_remote TO business_user;

6. 故障排查

6.1 常见问题

问题 1:触发器执行失败,提示 "foreign table not found"

sql
-- 解决:检查外部表是否存在
SELECT * FROM information_schema.foreign_tables 
WHERE foreign_table_schema = 'audit_fdw';

-- 如果不存在,重新创建外部表
IMPORT FOREIGN SCHEMA audit
    LIMIT TO (audit_log)
    FROM SERVER audit_db_server
    INTO audit_fdw;

问题 2:提示 "connection to server failed"

sql
-- 解决:测试 FDW 连接
SELECT * FROM audit_fdw.audit_log_remote LIMIT 1;

-- 检查服务器配置
SELECT srvname, srvoptions FROM pg_foreign_server WHERE srvname = 'audit_db_server';

-- 检查用户映射
SELECT * FROM pg_user_mappings WHERE srvname = 'audit_db_server';

问题 3:审计数据未写入

sql
-- 1. 检查触发器是否存在
SELECT tgname, tgrelid::regclass, tgenabled 
FROM pg_trigger 
WHERE tgname LIKE 'audit_fdw_%';

-- 2. 手动测试外部表写入
INSERT INTO audit_fdw.audit_log_remote (
    change_time, table_schema, table_name, operation, db_user, tx_id, source_db
) VALUES (
    now(), 'dev', 'test_table', 'I', current_user, txid_current(), current_database()
);

-- 3. 到审计库查看是否有数据
\c audit_db
SELECT * FROM audit.audit_log ORDER BY change_time DESC LIMIT 5;

6.2 性能监控

sql
-- 在业务库监控 FDW 查询统计
SELECT * FROM pg_stat_user_tables WHERE schemaname = 'audit_fdw';

-- 查看慢查询(FDW 写入耗时)
SELECT query, mean_exec_time, calls
FROM pg_stat_statements
WHERE query LIKE '%audit_log_remote%'
ORDER BY mean_exec_time DESC;

7. 迁移指南

7.1 从同库方案迁移到跨库方案

sql
-- 步骤 1: 在审计库创建表和用户(见 2.1 节)

-- 步骤 2: 在业务库配置 FDW(见 2.2 节)

-- 步骤 3: 迁移历史审计数据(可选)
-- 在业务库执行:
INSERT INTO audit_fdw.audit_log_remote
SELECT * FROM audit.audit_log
WHERE change_time >= '2025-01-01';  -- 指定需要迁移的时间范围

-- 步骤 4: 删除旧触发器,创建新触发器(见 2.3.2 节)

-- 步骤 5: 验证新触发器工作正常后,删除业务库的本地审计表
DROP TABLE IF EXISTS audit.audit_log CASCADE;
DROP SCHEMA IF EXISTS audit CASCADE;

7.2 多业务库接入统一审计库

sql
-- 在每个业务库(business_db1, business_db2, ...)执行:

-- 1. 安装 postgres_fdw 扩展
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

-- 2. 创建指向同一审计库的外部服务器
CREATE SERVER audit_db_server
    FOREIGN DATA WRAPPER postgres_fdw
    OPTIONS (host 'audit-db-host', port '5432', dbname 'audit_db');

-- 3. 创建用户映射和外部表(同前)

-- 4. 创建触发器(同前)

-- 最终效果:多个业务库的审计数据汇总到同一个审计库

8. 完整部署检查清单

  • [ ] 审计库(audit_db)已创建并完成分区表初始化
  • [ ] 审计用户(audit_writer)已创建并授予最小权限
  • [ ] 业务库已安装 postgres_fdw 扩展
  • [ ] 外部服务器(audit_db_server)已创建并测试连接
  • [ ] 用户映射已创建(business_user → audit_writer)
  • [ ] 外部表(audit_log_remote)已创建并可写入
  • [ ] 触发器函数(audit_trigger_func_fdw)已创建
  • [ ] 所有业务表的触发器已创建并启用
  • [ ] 测试插入/更新/删除操作,审计数据正常写入审计库
  • [ ] 配置自动分区管理(审计库)
  • [ ] 配置监控和告警(FDW 连接状态、写入延迟)
  • [ ] 文档化密码管理方式(.pgpass 或密钥管理服务)

总结:三种审计方案对比

方案部署复杂度性能影响隔离性事务一致性适用场景
同库不同 Schema⭐ 简单⭐⭐⭐ 极低⭐ Schema 级⭐⭐⭐ 强单库、高并发
跨库 FDW⭐⭐ 中等⭐⭐ 低⭐⭐⭐ 数据库级⭐⭐ 中(2PC)多库、合规隔离
应用层双写⭐⭐⭐ 复杂⭐ 中⭐⭐⭐ 数据库级⭐ 弱(最终一致)异步审计、高可用

推荐组合

  • 生产环境(物理隔离):跨库 FDW 方案(数据库触发器) + 应用层审计(beyondsoft-log-spring3-starter)
  • 开发/测试环境:同库不同 Schema 方案(简单高效)
  • 合规审计:以上任一方案 + pgaudit 插件审计

Copyright © 2025-present | 网站备案号:豫ICP备19038229号-1