PostgreSQL 审计方案
背景与目标
当前系统使用 PostgreSQL 数据库并计划实施完整的数据审计方案。核心诉求是:
- INSERT:只需要记录新值或基本信息,不需要旧值;
- DELETE:只需要记录关键标识(主键等),不需要旧值;
- UPDATE:必须记录被修改行的 旧值(至少是被修改的那些字段),以满足审计和追责需求;
- 同时,要避免审计表无限膨胀、影响业务写入与查询性能。
本文设计一套 三层核心审计方案 + 一套备选审计方案,各方案互不依赖,可根据需求独立实施:
审计架构规划
核心层审计架构(独立部署、职责分离)
| 层级 | 名称 | 职责 | 特点 | 实现方式 | 核心存储对象 |
|---|---|---|---|---|---|
| 第一层 | 数据库触发器审计 | 记录业务表的 INSERT/UPDATE/DELETE 操作 | 行级数据变更追溯,记录UPDATE旧值/新值 | PostgreSQL 触发器 + SECURITY DEFINER 函数 | audit.audit_log(分区表) |
| 第二层 | 应用层审计(框架) | 记录应用层的业务操作日志 | 覆盖登录、请求、数据变更、第三方调用等业务行为 | 应用代码(beyondsoft-log-spring3-starter) | -(落地表见第三层) |
备选方案:pgaudit 插件审计(SQL 级别审计)
- 职责:记录数据库层面的安全事件和 SQL 语句
- 特点:记录登录、权限变更、DDL、敏感 SQL 等,适合安全审计与合规留痕
- 实现:通过 pgaudit 扩展
- 日志输出:PostgreSQL 日志文件
方案优势
- 层级独立:核心二层+备选方案互不依赖,可根据需求选择性实施
- 职责分离:各层审计职责清晰,行级数据、业务行为、SQL级审计互为补充
- 灵活部署:可分阶段逐步落地(如先部署第一层触发器审计,再落地应用层审计)
- 性能可控:审计表分区化管理,对主业务表写入性能影响可控
- 数据可控:审计表可控增长、可快速查询近期数据
数据库与 Schema 规划
- 数据库:
system - Schema 划分:
dev: 开发环境业务表所在 schema(现有,包含合同、工单、用户、权限等表)audit: 审计表专用 schema(现有,用于存放所有审计日志表及审计触发器函数)prod: 生产环境 schema(预留,暂不使用)public: 默认 schema(暂不使用)
关于 partman schema:pg_partman 扩展默认安装在
publicschema 或指定的 schema(如partman)中,只是存放扩展自身的函数和配置表,不影响业务表和审计表的使用。可以保留但不是必须的,后文统一使用partman.create_parent()等函数调用。
第一层:数据库触发器审计(行级数据变更审计)
方案概述
适用场景:需要记录业务表的详细数据变更历史,包括 UPDATE 操作的旧值和新值。
核心特性:
- 记录 INSERT/UPDATE/DELETE 操作的行级数据变更
- UPDATE 操作记录变更列的旧值和新值
- 通过触发器自动记录,对业务代码无侵入
- 使用 SECURITY DEFINER 实现权限隔离
- 支持动态主键识别和复合主键
- 支持应用层透传业务用户信息
数据流向:
业务操作 (business_user)
↓
dev.business_table INSERT/UPDATE/DELETE
↓
触发器: audit.audit_trigger_func()
↓
SECURITY DEFINER 函数: audit.log_data_change()
↓
写入: audit.audit_log (以 audit_user 身份写入)1. 数据库设计
1.1 创建审计 schema 与分区主表
-- 1.1 创建审计 schema
CREATE SCHEMA IF NOT EXISTS audit;
-- 1.2 审计主表(分区表)
CREATE TABLE IF NOT EXISTS audit.audit_log (
id BIGSERIAL,
change_time TIMESTAMPTZ NOT NULL DEFAULT now(), -- 变更时间
table_schema TEXT NOT NULL, -- 业务表 schema
table_name TEXT NOT NULL, -- 业务表名称
operation CHAR(1) NOT NULL, -- I / U / D
row_pk JSONB, -- 主键信息(如 {"id": 123})
old_data JSONB, -- 旧值(主要用于 UPDATE)
new_data JSONB, -- 新值(INSERT/UPDATE 可选)
changed_columns TEXT[], -- 发生变更的列名
app_user TEXT, -- 业务用户(应用层透传)
db_user TEXT NOT NULL DEFAULT current_user,
client_ip INET DEFAULT inet_client_addr(),
tx_id BIGINT NOT NULL DEFAULT txid_current(),
PRIMARY KEY (id, change_time) -- 分区表主键必须包含分区键
) PARTITION BY RANGE (change_time);
-- 1.3 创建自动分区管理函数
-- 创建下个月分区的函数
CREATE OR REPLACE FUNCTION audit.create_next_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_start date;
v_end date;
v_table text;
BEGIN
-- 计算下一个月的起止日期
v_start := date_trunc('month', now())::date + INTERVAL '1 month';
v_end := (v_start + INTERVAL '1 month');
v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
FOR VALUES FROM (%L) TO (%L);',
v_table,
v_start::timestamptz,
v_end::timestamptz
);
RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;
-- 创建当前月分区的函数(初始化用)
CREATE OR REPLACE FUNCTION audit.create_current_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_start date;
v_end date;
v_table text;
BEGIN
v_start := date_trunc('month', now())::date;
v_end := (v_start + INTERVAL '1 month');
v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
FOR VALUES FROM (%L) TO (%L);',
v_table,
v_start::timestamptz,
v_end::timestamptz
);
RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;
-- 创建删除老分区的函数(默认保留 24 个月)
CREATE OR REPLACE FUNCTION audit.drop_old_partitions(p_retention_months INT DEFAULT 24)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_cutoff date;
v_partition record;
BEGIN
v_cutoff := date_trunc('month', now())::date - (p_retention_months || ' months')::interval;
FOR v_partition IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log_%'
AND tablename ~ '^audit_log_\d{4}_\d{2}$'
LOOP
-- 提取分区表名中的日期
DECLARE
v_partition_date date;
BEGIN
v_partition_date := to_date(substring(v_partition.tablename from 11), 'YYYY_MM');
IF v_partition_date < v_cutoff THEN
EXECUTE format('DROP TABLE IF EXISTS audit.%I;', v_partition.tablename);
RAISE NOTICE 'Dropped old partition audit.%', v_partition.tablename;
END IF;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Skip invalid partition name: %', v_partition.tablename;
END;
END LOOP;
END;
$$;
-- 1.4 初始化:创建当前月和未来分区
SELECT audit.create_current_month_partition();
SELECT audit.create_next_month_partition();
-- 验证分区是否创建成功
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log%'
ORDER BY tablename;说明:
- 分区管理函数已内置,无需安装 pg_partman 扩展
- 使用系统 cron 定时执行分区维护(见后文 3.1 节)
- 默认保留 24 个月的审计数据,可根据需求调整
1.2 权限隔离方案(SECURITY DEFINER)
1.2.1 用户权限规划
为了实现审计日志与业务数据的权限隔离,采用 SECURITY DEFINER 函数 方案:
用户角色划分:
business_user(业务用户):只能操作devschema 的业务表,无法直接访问auditschema 的审计表audit_user(审计用户):只能写入auditschema 的审计表,无法访问devschema 的业务表- 触发器:通过 SECURITY DEFINER 函数以审计用户身份写入,对业务用户透明
1.2.2 创建用户并设置权限
-- 创建业务用户(如果尚未创建)
CREATE USER business_user WITH PASSWORD 'your_business_password';
-- 创建专用的审计写入用户(如果尚未创建)
CREATE USER audit_user WITH PASSWORD 'your_audit_password';
-- 1. 模式使用和创建权限(business_user)
GRANT USAGE, CREATE ON SCHEMA dev TO business_user;
-- 2. 现有表的DML权限(business_user)
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA dev TO business_user;
-- 3. 序列权限(business_user)
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA dev TO business_user;
-- 4. 未来表的默认权限(business_user)
ALTER DEFAULT PRIVILEGES IN SCHEMA dev
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO business_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA dev
GRANT USAGE, SELECT ON SEQUENCES TO business_user;
-- 5. 授予审计用户对 audit schema 的权限(audit_user)
GRANT USAGE ON SCHEMA audit TO audit_user;
-- 6. 授予审计用户对 audit_log 表的权限(数据库层面触发器审计表)(audit_user)
GRANT INSERT, SELECT ON audit.audit_log TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.audit_log_id_seq TO audit_user;
-- 如果不确定序列名称,可以使用以下命令查看实际序列名称:
-- SELECT sequencename FROM pg_sequences WHERE schemaname = 'audit';
-- 7. 为未来创建的审计表自动授权(audit_user)
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT INSERT, SELECT ON TABLES TO audit_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT USAGE, SELECT ON SEQUENCES TO audit_user;
-- 7.1 【可选】批量授权所有现有的 audit_log 分区表
-- 如果分区已存在但权限缺失,执行以下批量授权脚本
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log_%'
LOOP
EXECUTE format('GRANT INSERT, SELECT ON audit.%I TO audit_user;', r.tablename);
RAISE NOTICE 'Granted permissions on audit.%', r.tablename;
END LOOP;
END $$;
-- 8. 确保业务用户无法直接访问 audit schema(audit_user)
REVOKE ALL ON SCHEMA audit FROM business_user;
REVOKE ALL ON ALL TABLES IN SCHEMA audit FROM business_user;
-- 9. 确保审计用户无法访问 dev schema(audit_user)
REVOKE ALL ON SCHEMA dev FROM audit_user;
REVOKE ALL ON ALL TABLES IN SCHEMA dev FROM audit_user;
-- 10. 授予业务用户访问 audit schema 的最小权限(仅用于调用函数)(audit_user)
GRANT USAGE ON SCHEMA audit TO business_user;1.2.3 创建权限提升的审计写入函数
使用 SECURITY DEFINER 创建审计写入函数,该函数以其所有者(audit_user)的权限执行:
-- 创建权限提升的审计写入函数
CREATE OR REPLACE FUNCTION audit.log_data_change(
p_table_schema TEXT,
p_table_name TEXT,
p_operation TEXT,
p_row_pk JSONB,
p_old_data JSONB,
p_new_data JSONB,
p_changed_columns TEXT[],
p_app_user TEXT
) RETURNS VOID
LANGUAGE plpgsql
SECURITY DEFINER -- 关键:以函数所有者(audit_user)权限执行
SET search_path = audit, pg_temp -- 防止搜索路径劫持攻击
AS $$
BEGIN
INSERT INTO audit.audit_log (
change_time,
table_schema,
table_name,
operation,
row_pk,
old_data,
new_data,
changed_columns,
app_user,
db_user,
client_ip,
tx_id
) VALUES (
now(),
p_table_schema,
p_table_name,
p_operation,
p_row_pk,
p_old_data,
p_new_data,
p_changed_columns,
p_app_user,
current_user,
inet_client_addr(),
txid_current()
);
END;
$$;
-- 修改函数所有者为 audit_user
ALTER FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) OWNER TO audit_user;
-- 授予业务用户执行此函数的权限
GRANT EXECUTE ON FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) TO business_user;安全说明:
SECURITY DEFINER:函数以其所有者(audit_user)的权限执行,而不是调用者(business_user)的权限SET search_path = audit, pg_temp:防止搜索路径劫持攻击,确保函数只访问auditschema- 业务用户
business_user只需要EXECUTE权限,无需直接访问audit.audit_log表
1.3 通用审计函数(触发器函数)
说明:
- 该函数自动识别表的主键列,支持不同表使用不同主键名称(如
id、role_id、user_id等),也支持复合主键。- 对 UPDATE:记录变更的列名、这些列的旧值和新值;
- 对 INSERT/DELETE:记录必要信息,包括主键和完整行数据。
- 通过调用
audit.log_data_change()函数实现权限提升,自动以audit_user身份写入审计日志。
CREATE OR REPLACE FUNCTION audit.audit_trigger_func()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
v_app_user TEXT;
v_row_pk JSONB := '{}'::jsonb;
v_old_row JSONB;
v_new_row JSONB;
v_old_changed JSONB := '{}'::jsonb;
v_new_changed JSONB := '{}'::jsonb;
v_changed_cols TEXT[] := ARRAY[]::TEXT[];
col TEXT;
pk_col TEXT;
BEGIN
-- 从会话变量中获取业务用户(应用层设置),允许为空
BEGIN
v_app_user := current_setting('audit.user_id', true);
EXCEPTION WHEN others THEN
v_app_user := NULL;
END;
-- UPDATE: 记录变更列的旧值和新值
IF TG_OP = 'UPDATE' THEN
v_old_row := to_jsonb(OLD);
v_new_row := to_jsonb(NEW);
-- 动态构建主键(支持多列主键)
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
END LOOP;
-- 找出变更的列
SELECT array_agg(key)
INTO v_changed_cols
FROM jsonb_each(v_new_row) AS e(key, value)
WHERE (v_old_row -> key) IS DISTINCT FROM value;
IF v_changed_cols IS NULL OR array_length(v_changed_cols, 1) IS NULL THEN
RETURN NEW;
END IF;
-- 只保留变更列的旧值和新值
FOREACH col IN ARRAY v_changed_cols LOOP
v_old_changed := v_old_changed || jsonb_build_object(col, v_old_row -> col);
v_new_changed := v_new_changed || jsonb_build_object(col, v_new_row -> col);
END LOOP;
-- 调用 SECURITY DEFINER 函数写入审计日志(权限自动提升为 audit_user)
PERFORM audit.log_data_change(
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'U',
v_row_pk,
v_old_changed,
v_new_changed,
v_changed_cols,
v_app_user
);
RETURN NEW;
-- INSERT: 只记录新值
ELSIF TG_OP = 'INSERT' THEN
v_new_row := to_jsonb(NEW);
-- 动态构建主键
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_new_row -> pk_col);
END LOOP;
PERFORM audit.log_data_change(
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'I',
v_row_pk,
NULL,
v_new_row,
NULL,
v_app_user
);
RETURN NEW;
-- DELETE: 只记录旧值
ELSIF TG_OP = 'DELETE' THEN
v_old_row := to_jsonb(OLD);
-- 动态构建主键
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
END LOOP;
PERFORM audit.log_data_change(
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'D',
v_row_pk,
v_old_row,
NULL,
NULL,
v_app_user
);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- 重要:修改触发器函数的所有者为 audit_user
-- 这样保证两个函数的所有者一致,避免权限混乱
ALTER FUNCTION audit.audit_trigger_func() OWNER TO audit_user;优势:
- 动态主键识别:无需修改代码即可适配任何表(
id、role_id、user_id等)。- 支持复合主键:如果表有多列主键,全部记录到
row_pkJSONB 字段中。- 通用性强:一个函数适用于
devschema 下所有表。- 权限隔离:业务用户
business_user通过触发器调用 SECURITY DEFINER 函数,自动以audit_user权限写入审计日志,无需直接访问审计表。
1.4 权限验证
执行以下 SQL 验证权限设置是否正确:
-- 1. 验证 business_user 无法直接访问 audit.audit_log
SET ROLE business_user;
SELECT * FROM audit.audit_log LIMIT 1; -- 应该报错:permission denied for table audit_log
-- 2. 验证 business_user 可以通过触发器写入审计日志
SET ROLE business_user;
INSERT INTO dev.your_table (id, name) VALUES (1, 'Test'); -- 应该成功,且触发器自动写入审计日志
-- 3. 验证 audit_user 可以写入 audit_log 表
SET ROLE audit_user;
SELECT * FROM audit.audit_log LIMIT 1; -- 应该成功
-- 4. 验证 audit_user 无法访问业务表
SET ROLE audit_user;
SELECT * FROM dev.your_table LIMIT 1; -- 应该报错:permission denied for schema dev
-- 5. 重置角色
RESET ROLE;1.5 触发器问题诊断与修复
1.5.1 常见问题:触发器创建后审计表无数据
症状:
- 触发器已创建,但
audit.audit_log表中没有数据 - 业务操作(INSERT/UPDATE/DELETE)执行成功,但没有触发审计记录
可能原因:
- 触发器函数所有者权限不正确
- 触发器是在权限配置之前创建的
- SECURITY DEFINER 函数所有者不是
audit_user
1.5.2 诊断步骤
-- 1. 检查 dev schema 下的表和触发器状态
SELECT
t.tablename,
trg.tgname AS trigger_name,
p.proname AS function_name,
pg_get_userbyid(p.proowner) AS function_owner,
CASE WHEN p.prosecdef THEN 'SECURITY DEFINER' ELSE 'SECURITY INVOKER' END AS security_type
FROM pg_tables t
LEFT JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
LEFT JOIN pg_proc p ON p.oid = trg.tgfoid
WHERE t.schemaname = 'dev'
ORDER BY t.tablename;
-- 2. 检查两个审计函数的所有者和安全类型
SELECT
proname,
pg_get_userbyid(proowner) AS owner,
CASE WHEN prosecdef THEN 'SECURITY DEFINER' ELSE 'SECURITY INVOKER' END AS security_type
FROM pg_proc
WHERE pronamespace = 'audit'::regnamespace
AND proname IN ('audit_trigger_func', 'log_data_change');
-- 3. 检查 business_user 对审计函数的执行权限
SELECT
p.proname,
pg_get_userbyid(p.proowner) AS owner,
has_function_privilege('business_user', p.oid, 'EXECUTE') AS has_execute_privilege
FROM pg_proc p
WHERE p.pronamespace = 'audit'::regnamespace
AND p.proname = 'log_data_change';预期结果:
audit_trigger_func和log_data_change的所有者都应该是audit_userlog_data_change应该是SECURITY DEFINERbusiness_user对log_data_change应该有EXECUTE权限
1.5.3 修复方案:重新创建触发器
如果发现函数所有者不正确,或触发器无法正常工作,执行以下修复脚本:
-- ========================================
-- 步骤 1: 删除所有旧触发器
-- ========================================
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN
SELECT t.schemaname, t.tablename, trg.tgname
FROM pg_tables t
JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
WHERE t.schemaname = 'dev'
AND trg.tgname LIKE 'audit_%'
LOOP
EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I.%I;',
r.tgname, r.schemaname, r.tablename);
RAISE NOTICE 'Dropped trigger % on %.%', r.tgname, r.schemaname, r.tablename;
END LOOP;
END $$;
-- ========================================
-- 步骤 2: 确保函数所有者正确
-- ========================================
-- 修改触发器函数所有者
ALTER FUNCTION audit.audit_trigger_func() OWNER TO audit_user;
-- 修改 SECURITY DEFINER 函数所有者
ALTER FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) OWNER TO audit_user;
-- 确保 business_user 有执行权限
GRANT EXECUTE ON FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) TO business_user;
-- ========================================
-- 步骤 3: 重新创建触发器
-- ========================================
DO $$
DECLARE
r RECORD;
trigger_name TEXT;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'dev'
LOOP
trigger_name := 'audit_' || r.tablename;
EXECUTE format(
'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
'FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();',
trigger_name, r.tablename
);
RAISE NOTICE 'Created trigger % on dev.%', trigger_name, r.tablename;
END LOOP;
END $$;
-- ========================================
-- 步骤 4: 验证修复结果
-- ========================================
-- 查看函数所有者(应该都是 audit_user)
SELECT
proname,
pg_get_userbyid(proowner) AS owner
FROM pg_proc
WHERE pronamespace = 'audit'::regnamespace
AND proname IN ('audit_trigger_func', 'log_data_change');
-- 查看触发器数量
SELECT
schemaname,
COUNT(*) AS trigger_count
FROM pg_tables t
JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
WHERE t.schemaname = 'dev'
AND trg.tgname LIKE 'audit_%'
GROUP BY schemaname;1.5.4 测试触发器是否正常工作
-- 以 business_user 身份测试
SET ROLE business_user;
-- 在某个表中插入测试数据(替换为实际的表名和列)
INSERT INTO dev.your_test_table (id, name, description)
VALUES (999, 'audit_test', '测试审计功能');
-- 更新测试数据
UPDATE dev.your_test_table
SET description = '审计功能测试成功'
WHERE id = 999;
-- 删除测试数据
DELETE FROM dev.your_test_table WHERE id = 999;
-- 切换回超级用户查看审计日志
RESET ROLE;
-- 查看刚才的操作是否被记录
SELECT
id,
change_time,
table_name,
operation,
row_pk,
old_data,
new_data,
changed_columns,
db_user
FROM audit.audit_log
WHERE table_name = 'your_test_table'
AND (row_pk->>'id')::int = 999
ORDER BY change_time DESC;预期结果:
- 应该看到 3 条审计记录(INSERT、UPDATE、DELETE)
- INSERT 记录有
new_data - UPDATE 记录有
old_data、new_data和changed_columns - DELETE 记录有
old_data - 所有记录的
db_user应该显示为business_user
1.5.5 故障排查清单
如果修复后仍然没有审计数据,按以下顺序检查:
-- ✅ 检查 1: 函数是否存在
SELECT COUNT(*) FROM pg_proc
WHERE pronamespace = 'audit'::regnamespace
AND proname IN ('audit_trigger_func', 'log_data_change');
-- 预期结果: 2
-- ✅ 检查 2: 触发器是否存在
SELECT COUNT(*) FROM pg_trigger
WHERE tgname LIKE 'audit_%'
AND tgrelid IN (SELECT oid FROM pg_class WHERE relnamespace = 'dev'::regnamespace);
-- 预期结果: > 0
-- ✅ 检查 3: 触发器是否启用
SELECT
tgname,
tgenabled,
CASE tgenabled
WHEN 'O' THEN 'enabled'
WHEN 'D' THEN 'disabled'
WHEN 'R' THEN 'replica'
WHEN 'A' THEN 'always'
END AS status
FROM pg_trigger
WHERE tgname LIKE 'audit_%'
LIMIT 5;
-- 预期结果: tgenabled = 'O' (enabled)
-- ✅ 检查 4: audit_log 表是否有分区
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log%';
-- 如果只有主表没有分区,需要先创建当前月份的分区
-- ✅ 检查 5: 创建分区(如果缺失)
CREATE TABLE IF NOT EXISTS audit.audit_log_2025_01 PARTITION OF audit.audit_log
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
-- 根据实际月份调整重要提示:如果审计表是分区表但没有对应当前时间的分区,INSERT 会失败。请确保已创建当前月份的分区,或配置了自动分区管理(参考本文档的分区管理章节)。
1.6 权限矩阵(第一层)
| 用户/角色 | dev schema (业务表) | audit schema (访问) | audit_log (SELECT/INSERT) | audit.log_data_change() (执行) | 触发器执行 |
|---|---|---|---|---|---|
| business_user (业务用户) | ✅ SELECT/INSERT/UPDATE/DELETE | ✅ USAGE only | ❌ | ✅ EXECUTE | ✅ 自动调用 |
| audit_user (审计用户) | ❌ | ✅ USAGE | ✅ SELECT/INSERT | ✅ OWNER | N/A |
| dba/superuser | ✅ | ✅ | ✅ | ✅ | ✅ |
2. 为业务表创建审计触发器
2.1 为关键业务表创建触发器
建议:只在关键业务表上开启审计(例如合同、工单、用户、权限相关表),降低整体写入压力。如果需要对
devschema 下所有表开启审计,可以使用后文的批量脚本。
假设 dev schema 下有表 contract、workorder、sys_user、sys_role 等:
-- dev.contract 合同表
CREATE TRIGGER audit_contract
AFTER INSERT OR UPDATE OR DELETE ON dev.contract
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();
-- dev.workorder 工单表
CREATE TRIGGER audit_workorder
AFTER INSERT OR UPDATE OR DELETE ON dev.workorder
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();
-- dev.sys_user 用户表
CREATE TRIGGER audit_sys_user
AFTER INSERT OR UPDATE OR DELETE ON dev.sys_user
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();
-- dev.sys_role 角色表
CREATE TRIGGER audit_sys_role
AFTER INSERT OR UPDATE OR DELETE ON dev.sys_role
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();2.2 批量为 dev schema 下所有表创建审计触发器
如果需要对 dev schema 下的所有表统一开启审计,可以使用以下脚本:
DO $$
DECLARE
r RECORD;
trigger_name TEXT;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'dev'
LOOP
trigger_name := 'audit_' || r.tablename;
-- 检查触发器是否已存在,避免重复创建
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = trigger_name
) THEN
EXECUTE format(
'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
'FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();',
trigger_name, r.tablename
);
RAISE NOTICE 'Created trigger % on dev.%', trigger_name, r.tablename;
END IF;
END LOOP;
END $$;3. 审计表性能优化
3.1 分区 + 保留策略(核心手段)
- 审计表使用 按月份分区(
PARTITION BY RANGE(change_time)),避免单表行数累积到几千万/几亿; - 对于过旧的数据(如 6 个月/1 年前),通过删除或归档整个月的分区表进行"粗粒度清理",而不是在大表里
DELETE; - 分区的创建和清理必须完全自动化,不依赖人工执行 SQL。
3.1.1 配置定时维护任务
分区管理函数已在前文 1.1 节创建完成,现在只需配置系统 cron 定时执行即可。
使用系统 cron(推荐,适用于所有平台和所有 PostgreSQL 版本)
# 创建日志目录
sudo mkdir -p /var/log/postgresql
sudo chown postgres:postgres /var/log/postgresql
# 编辑 postgres 用户的 crontab(重要:必须使用 postgres 用户)
sudo crontab -u postgres -e
# 添加以下两行
# 每天凌晨 2 点创建下个月分区
0 2 * * * psql -d postgres -c "SELECT audit.create_next_month_partition();" >> /var/log/postgresql/partition_create.log 2>&1
# 每天凌晨 3 点清理老分区(保留 24 个月)
0 3 * * * psql -d postgres -c "SELECT audit.drop_old_partitions(24);" >> /var/log/postgresql/partition_cleanup.log 2>&1第二层:应用层审计(业务日志审计)
方案概述
适用场景:需要记录应用层的全量业务行为,包括用户登录、接口请求、数据变更、第三方调用等,补充数据库层审计的视角盲区。
核心特性:
- 与业务代码解耦(通过 beyondsoft-log-spring3-starter 实现)
- 记录更丰富的业务上下文(用户IP、请求参数、耗时、trace_id等)
- 支持结构化存储(JSONB)和高效查询
- 审计数据落地到 PostgreSQL 专用表(见第三层)
- 对应用代码入侵性低(基于注解/拦截器实现)
数据流向:
用户操作/接口调用
↓
应用层拦截器/注解(beyondsoft-log-spring3-starter)
↓
业务日志格式化处理
↓
写入: audit 下4张应用层审计表(以 audit_user 身份写入)核心职责
| 审计类型 | 记录内容 | 价值 |
|---|---|---|
| 登录审计 | 用户名、IP、登录位置、终端信息、登录状态 | 溯源用户登录行为,排查异常登录 |
| 请求审计 | 请求URL、参数、耗时、响应、错误信息、操作用户 | 全量接口调用追溯,性能问题定位 |
| 数据变更审计 | 操作人、变更内容、业务ID、新旧值 | 业务数据变更的应用层溯源 |
| 第三方调用审计 | 调用地址、参数、耗时、响应 | 第三方接口问题定位,责任界定 |
实现方式
- 引入
beyondsoft-log-spring3-starter依赖 - 配置日志输出目标为 PostgreSQL(指向 audit 下的 4 张表)
- 通过注解/配置指定需要审计的接口/方法
- 配置
audit_user数据库账号用于日志写入 - 配置日志保留策略(与第三层表分区策略对齐)
1. 应用层审计表设计
🚧 以下四张表是使用 PostgreSQL 替代 MongoDB 后落地应用层审计的核心表,需在
auditschema 下创建:
- 📌 beyond_soft_login_log:登录日志表
- 📌 beyond_soft_request_log:请求日志表
- 📌 beyond_soft_data_audit_log:数据变更审计表
- 📌 beyond_soft_third_part_log:第三方调用日志表
1.1 创建应用层审计表(分区表)
-- 在audit schema下执行建表语句
-- ==========================================
-- 1. 登陆日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_login_log (
id BIGSERIAL,
username VARCHAR(100),
ip VARCHAR(50),
location VARCHAR(200),
platform VARCHAR(50),
browser VARCHAR(100),
os VARCHAR(100),
status VARCHAR(50) DEFAULT '登录成功',
response TEXT,
request_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_login_log_username ON audit.beyond_soft_login_log(username);
CREATE INDEX IF NOT EXISTS idx_login_log_request_time ON audit.beyond_soft_login_log(request_time);
CREATE INDEX IF NOT EXISTS idx_login_log_ip ON audit.beyond_soft_login_log(ip);
COMMENT ON TABLE audit.beyond_soft_login_log IS '登录日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_login_log.request_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 2. 请求日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_request_log (
id BIGSERIAL,
ip VARCHAR(50),
location VARCHAR(200),
request_type VARCHAR(20),
url VARCHAR(1000),
request_params TEXT,
content_type VARCHAR(100),
operator VARCHAR(100),
user_agent TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
spend_time BIGINT,
status INTEGER,
response TEXT,
error TEXT,
trace_id VARCHAR(100),
request_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_request_log_url ON audit.beyond_soft_request_log USING hash(url);
CREATE INDEX IF NOT EXISTS idx_request_log_trace_id ON audit.beyond_soft_request_log(trace_id);
CREATE INDEX IF NOT EXISTS idx_request_log_request_time ON audit.beyond_soft_request_log(request_time);
CREATE INDEX IF NOT EXISTS idx_request_log_operator ON audit.beyond_soft_request_log(operator);
COMMENT ON TABLE audit.beyond_soft_request_log IS '请求日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_request_log.request_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 3. 审计日志表(支持JSONB)
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_data_audit_log (
id BIGSERIAL,
operator VARCHAR(100),
modify_date TIMESTAMP,
operation VARCHAR(50),
handle_name VARCHAR(200),
modify_content TEXT,
modifier_ip VARCHAR(50),
modifier_location VARCHAR(200),
business_id VARCHAR(100),
business_data_status VARCHAR(50),
old_object JSONB,
new_object JSONB,
response TEXT,
request_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_audit_log_operator ON audit.beyond_soft_data_audit_log(operator);
CREATE INDEX IF NOT EXISTS idx_audit_log_business_id ON audit.beyond_soft_data_audit_log(business_id);
CREATE INDEX IF NOT EXISTS idx_audit_log_request_time ON audit.beyond_soft_data_audit_log(request_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_operation ON audit.beyond_soft_data_audit_log(operation);
-- JSONB 字段的 GIN 索引(支持高效 JSON 查询)
CREATE INDEX IF NOT EXISTS idx_audit_log_old_object_gin ON audit.beyond_soft_data_audit_log USING gin(old_object);
CREATE INDEX IF NOT EXISTS idx_audit_log_new_object_gin ON audit.beyond_soft_data_audit_log USING gin(new_object);
COMMENT ON TABLE audit.beyond_soft_data_audit_log IS '审计日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_data_audit_log.old_object IS 'JSONB格式:操作前的对象';
COMMENT ON COLUMN audit.beyond_soft_data_audit_log.new_object IS 'JSONB格式:操作后的对象';
COMMENT ON COLUMN audit.beyond_soft_data_audit_log.request_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 4. 第三方调用日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS audit.beyond_soft_third_part_log (
id BIGSERIAL,
request_id VARCHAR(100),
protocols VARCHAR(20),
url VARCHAR(1000),
query_params TEXT,
body TEXT,
call_source VARCHAR(200),
method_name VARCHAR(200),
headers JSONB,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
spend_time BIGINT,
response TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, start_time)
) PARTITION BY RANGE (start_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_third_part_log_request_id ON audit.beyond_soft_third_part_log(request_id);
CREATE INDEX IF NOT EXISTS idx_third_part_log_url ON audit.beyond_soft_third_part_log USING hash(url);
CREATE INDEX IF NOT EXISTS idx_third_part_log_call_source ON audit.beyond_soft_third_part_log(call_source);
CREATE INDEX IF NOT EXISTS idx_third_part_log_start_time ON audit.beyond_soft_third_part_log(start_time);
COMMENT ON TABLE audit.beyond_soft_third_part_log IS '第三方调用日志表(按天分区)';
COMMENT ON COLUMN audit.beyond_soft_third_part_log.headers IS 'JSONB格式:请求头信息';
COMMENT ON COLUMN audit.beyond_soft_third_part_log.start_time IS '分区键:按此字段进行分区';1.2 应用层审计表权限配置
-- 1. 授予审计用户对应用层四个日志表的权限
GRANT INSERT, SELECT ON audit.beyond_soft_data_audit_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_login_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_request_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_third_part_log TO audit_user;
-- 2. 授予审计用户对日志表序列的权限
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_data_audit_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_login_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_request_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_third_part_log_id_seq TO audit_user;
-- 3. 【重要】为未来创建的分区子表自动授权
-- 确保动态创建的分区子表自动继承权限
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT INSERT, SELECT ON TABLES TO audit_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT USAGE, SELECT ON SEQUENCES TO audit_user;
-- 4. 【可选】如果分区已经存在但权限缺失,批量授权所有现有分区
-- 对所有现有的应用层审计日志分区表授权
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'audit'
AND (tablename LIKE 'beyond_soft_data_audit_log_%'
OR tablename LIKE 'beyond_soft_login_log_%'
OR tablename LIKE 'beyond_soft_request_log_%'
OR tablename LIKE 'beyond_soft_third_part_log_%')
LOOP
EXECUTE format('GRANT INSERT, SELECT ON audit.%I TO audit_user;', r.tablename);
RAISE NOTICE 'Granted permissions on audit.%', r.tablename;
END LOOP;
END $$;1.3 应用层审计表分区管理
说明:应用层审计表按天分区(高频写入场景),分区管理通过beyondsoft-log-spring3-starter实现:
1.4 权限矩阵(第三层)
| 用户/角色 | dev schema (业务表) | audit schema (访问) | 4张应用层审计表 (SELECT/INSERT) | 序列权限 |
|---|---|---|---|---|
| business_user (业务用户) | ✅ | ✅ USAGE only | ❌ | ❌ |
| audit_user (审计用户) | ❌ | ✅ USAGE | ✅ SELECT/INSERT | ✅ USAGE/SELECT |
| dba/superuser | ✅ | ✅ | ✅ | ✅ |
备选方案:pgaudit 插件审计(SQL 级别审计)
方案概述
适用场景:需要满足合规要求,记录数据库层面的所有 SQL 操作、权限变更、DDL 操作等安全事件。
核心特性:
- 记录所有数据库层面的 SQL 执行(SELECT/INSERT/UPDATE/DELETE/DDL/DCL)
- 按角色/用户/表粒度过滤审计内容
- 日志输出到 PostgreSQL 日志文件,便于集中收集(如 ELK)
- 低性能损耗(相比触发器审计)
- 无需修改业务代码/数据库结构
1. 安装与配置
1.1 安装 pgaudit 扩展
# CentOS/RHEL
sudo yum install postgresql14-pgaudit
# Debian/Ubuntu
sudo apt-get install postgresql-14-pgaudit
# 重启PostgreSQL
sudo systemctl restart postgresql-141.2 启用 pgaudit 扩展
-- 进入目标数据库
psql -U postgres -d system
-- 创建扩展
CREATE EXTENSION IF NOT EXISTS pgaudit;1.3 配置 postgresql.conf
# 修改 postgresql.conf(通常在 /var/lib/pgsql/14/data/ 目录)
shared_preload_libraries = 'pgaudit' # 添加 pgaudit
pgaudit.log = 'ddl, write, function, role' # 审计类型:DDL、写入操作、函数调用、角色变更
pgaudit.log_catalog = off # 不审计系统表
pgaudit.log_level = 'notice' # 日志级别
pgaudit.log_parameter = on # 记录SQL参数
pgaudit.log_relation = on # 记录表级操作
pgaudit.log_statement_once = off # 每次执行都记录
# 日志输出配置
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_rotation_age = 1d
log_rotation_size = 100MB
log_line_prefix = '%t [%p]: [%c-%l] user=%u,db=%d,app=%a,client=%h '
log_statement = 'none' # 关闭默认SQL记录,由pgaudit接管1.4 重启生效
sudo systemctl restart postgresql-142. 细粒度审计配置
2.1 按表配置审计
-- 仅审计 dev schema 下的关键表
ALTER ROLE business_user SET pgaudit.log_relation = 'on';
ALTER TABLE dev.contract SET (pgaudit.log = 'write');
ALTER TABLE dev.sys_user SET (pgaudit.log = 'all');2.2 审计结果查看
# 查看审计日志
tail -f /var/lib/pgsql/14/data/pg_log/postgresql-2025-01-01_000000.log
# 示例日志内容
# 2025-01-01 10:00:00 UTC [1234]: [1-1] user=business_user,db=system,app=psql,client=192.168.1.100 LOG: AUDIT: OBJECT,1,WRITE,TABLE,dev.contract,INSERT,public,"INSERT INTO dev.contract (id, name) VALUES (1, 'Test')",<none>3. 方案对比
| 维度 | 触发器审计(第一层) | 应用层审计(第二层) | pgaudit 插件审计(备选) |
|---|---|---|---|
| 审计粒度 | 行级(数据变更) | 业务行为级(登录/请求) | SQL语句级(数据库操作) |
| 性能影响 | 低(行级触发器) | 低(应用层异步写入) | 极低(内核级审计) |
| 数据存储 | PostgreSQL 分区表 | PostgreSQL 分区表 | 日志文件 |
| 可查询性 | 高(SQL/JSONB查询) | 高(结构化查询) | 中(日志解析) |
| 合规支持 | 业务数据追溯 | 业务行为追溯 | 数据库安全合规 |
| 入侵性 | 低(触发器) | 低(starter插件) | 无(扩展) |
4. 部署建议
- 核心审计:优先部署第一层(触发器审计)+ 第二层(应用层审计),覆盖业务数据和行为追溯;
- 合规补充:在生产环境部署 pgaudit 插件审计,满足安全合规要求;
- 性能优化:所有审计表均采用分区表,配置自动分区清理,避免数据膨胀;
- 权限管控:严格遵循 "审计用户仅可写入审计表,不可访问业务表" 的原则。
跨库审计方案(业务库与审计库分离)
方案概述
适用场景:业务数据库与审计数据库物理隔离,触发器无法直接跨库写入审计日志。
核心问题:
- PostgreSQL 不支持触发器函数跨数据库调用
- 触发器无法直接 INSERT 到另一个数据库的表
- 需要在保证审计完整性的前提下实现数据库级别的隔离
解决思路: 采用 外部数据包装器(Foreign Data Wrapper, FDW) + 触发器 方案,将跨库写入转化为本地写入外部表。
架构对比
| 方案 | 适用场景 | 数据流向 | 优势 | 劣势 |
|---|---|---|---|---|
| 同库不同Schema | 业务表和审计表在同一数据库 | 触发器 → 本地 audit schema | 简单、高性能、事务一致 | 数据库压力集中 |
| 跨库 FDW 方案 | 业务库和审计库完全分离 | 触发器 → FDW外部表 → 远程审计库 | 数据库隔离、故障隔离 | 配置复杂、轻微性能损耗 |
1. 跨库审计架构设计
1.1 数据流向
业务操作 (business_user)
↓
business_db.dev.business_table INSERT/UPDATE/DELETE
↓
触发器: audit_trigger_func()
↓
FDW 外部表: business_db.audit_fdw.audit_log_remote
↓
postgres_fdw 连接
↓
远程审计库: audit_db.audit.audit_log1.2 数据库规划
| 数据库 | Schema | 用途 | 主要对象 |
|---|---|---|---|
| business_db | dev | 业务表 | contract, workorder, sys_user 等 |
| business_db | audit_fdw | 外部表(指向审计库) | audit_log_remote(外部表) |
| audit_db | audit | 审计日志存储 | audit_log(分区表) |
2. 实施步骤
2.1 审计库(audit_db)准备
2.1.1 创建审计数据库和表结构
-- 在 PostgreSQL 实例上创建审计数据库
CREATE DATABASE audit_db;
-- 切换到审计数据库
\c audit_db
-- 创建审计 schema
CREATE SCHEMA IF NOT EXISTS audit;
-- 创建审计分区主表(与前文第一层方案相同)
CREATE TABLE IF NOT EXISTS audit.audit_log (
id BIGSERIAL,
change_time TIMESTAMPTZ NOT NULL DEFAULT now(),
table_schema TEXT NOT NULL,
table_name TEXT NOT NULL,
operation CHAR(1) NOT NULL,
row_pk JSONB,
old_data JSONB,
new_data JSONB,
changed_columns TEXT[],
app_user TEXT,
db_user TEXT NOT NULL DEFAULT current_user,
client_ip INET DEFAULT inet_client_addr(),
tx_id BIGINT NOT NULL DEFAULT txid_current(),
source_db TEXT, -- 新增:记录来源数据库
PRIMARY KEY (id, change_time)
) PARTITION BY RANGE (change_time);
-- 创建当前月和下月分区(使用前文的分区管理函数)
CREATE OR REPLACE FUNCTION audit.create_current_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_start date;
v_end date;
v_table text;
BEGIN
v_start := date_trunc('month', now())::date;
v_end := (v_start + INTERVAL '1 month');
v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
FOR VALUES FROM (%L) TO (%L);',
v_table,
v_start::timestamptz,
v_end::timestamptz
);
RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;
-- 初始化分区
SELECT audit.create_current_month_partition();2.1.2 创建审计用户并授权
-- 在审计库创建专用用户
CREATE USER audit_writer WITH PASSWORD 'your_secure_password';
-- 授予审计 schema 权限
GRANT USAGE ON SCHEMA audit TO audit_writer;
-- 授予审计表写入权限
GRANT INSERT, SELECT ON audit.audit_log TO audit_writer;
GRANT USAGE, SELECT ON SEQUENCE audit.audit_log_id_seq TO audit_writer;
-- 为未来分区自动授权
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT INSERT, SELECT ON TABLES TO audit_writer;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT USAGE, SELECT ON SEQUENCES TO audit_writer;2.2 业务库(business_db)配置 FDW
2.2.1 安装并启用 postgres_fdw 扩展
-- 切换到业务数据库
\c business_db
-- 创建外部数据包装器扩展
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
-- 创建 FDW schema(用于存放外部表)
CREATE SCHEMA IF NOT EXISTS audit_fdw;2.2.2 创建外部服务器连接
-- 创建指向审计库的外部服务器
CREATE SERVER audit_db_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (
host 'localhost', -- 审计库主机地址(如果在不同机器上需修改)
port '5432', -- 审计库端口
dbname 'audit_db' -- 审计数据库名
);
-- 创建用户映射(业务库用户映射到审计库的 audit_writer)
CREATE USER MAPPING FOR business_user
SERVER audit_db_server
OPTIONS (
user 'audit_writer',
password 'your_secure_password'
);
-- 如果有多个业务用户,需要为每个用户创建映射
-- 或者创建 PUBLIC 映射(所有用户都使用相同的远程账号)
CREATE USER MAPPING FOR PUBLIC
SERVER audit_db_server
OPTIONS (
user 'audit_writer',
password 'your_secure_password'
);2.2.3 创建外部表(映射到审计库的 audit_log)
-- 方式一:手动创建外部表
CREATE FOREIGN TABLE audit_fdw.audit_log_remote (
id BIGINT,
change_time TIMESTAMPTZ,
table_schema TEXT,
table_name TEXT,
operation CHAR(1),
row_pk JSONB,
old_data JSONB,
new_data JSONB,
changed_columns TEXT[],
app_user TEXT,
db_user TEXT,
client_ip INET,
tx_id BIGINT,
source_db TEXT
)
SERVER audit_db_server
OPTIONS (schema_name 'audit', table_name 'audit_log');
-- 方式二:自动导入审计库的表结构(推荐)
IMPORT FOREIGN SCHEMA audit
LIMIT TO (audit_log)
FROM SERVER audit_db_server
INTO audit_fdw;
-- 重命名外部表(可选)
ALTER FOREIGN TABLE audit_fdw.audit_log RENAME TO audit_log_remote;2.2.4 授权业务用户访问外部表
-- 授予 business_user 使用 audit_fdw schema 的权限
GRANT USAGE ON SCHEMA audit_fdw TO business_user;
-- 授予外部表的 INSERT 权限
GRANT INSERT ON audit_fdw.audit_log_remote TO business_user;2.3 修改触发器函数(写入外部表)
2.3.1 创建跨库审计触发器函数
-- 在业务库创建触发器函数(写入外部表)
CREATE OR REPLACE FUNCTION audit_fdw.audit_trigger_func_fdw()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
v_app_user TEXT;
v_row_pk JSONB := '{}'::jsonb;
v_old_row JSONB;
v_new_row JSONB;
v_old_changed JSONB := '{}'::jsonb;
v_new_changed JSONB := '{}'::jsonb;
v_changed_cols TEXT[] := ARRAY[]::TEXT[];
col TEXT;
pk_col TEXT;
BEGIN
-- 从会话变量中获取业务用户
BEGIN
v_app_user := current_setting('audit.user_id', true);
EXCEPTION WHEN others THEN
v_app_user := NULL;
END;
-- UPDATE: 记录变更列的旧值和新值
IF TG_OP = 'UPDATE' THEN
v_old_row := to_jsonb(OLD);
v_new_row := to_jsonb(NEW);
-- 动态构建主键
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
END LOOP;
-- 找出变更的列
SELECT array_agg(key)
INTO v_changed_cols
FROM jsonb_each(v_new_row) AS e(key, value)
WHERE (v_old_row -> key) IS DISTINCT FROM value;
IF v_changed_cols IS NULL OR array_length(v_changed_cols, 1) IS NULL THEN
RETURN NEW;
END IF;
-- 只保留变更列的旧值和新值
FOREACH col IN ARRAY v_changed_cols LOOP
v_old_changed := v_old_changed || jsonb_build_object(col, v_old_row -> col);
v_new_changed := v_new_changed || jsonb_build_object(col, v_new_row -> col);
END LOOP;
-- 写入外部表(跨库写入)
INSERT INTO audit_fdw.audit_log_remote (
change_time, table_schema, table_name, operation,
row_pk, old_data, new_data, changed_columns,
app_user, db_user, client_ip, tx_id, source_db
) VALUES (
now(),
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'U',
v_row_pk,
v_old_changed,
v_new_changed,
v_changed_cols,
v_app_user,
current_user,
inet_client_addr(),
txid_current(),
current_database() -- 记录来源数据库
);
RETURN NEW;
-- INSERT: 只记录新值
ELSIF TG_OP = 'INSERT' THEN
v_new_row := to_jsonb(NEW);
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_new_row -> pk_col);
END LOOP;
INSERT INTO audit_fdw.audit_log_remote (
change_time, table_schema, table_name, operation,
row_pk, old_data, new_data, changed_columns,
app_user, db_user, client_ip, tx_id, source_db
) VALUES (
now(),
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'I',
v_row_pk,
NULL,
v_new_row,
NULL,
v_app_user,
current_user,
inet_client_addr(),
txid_current(),
current_database()
);
RETURN NEW;
-- DELETE: 只记录旧值
ELSIF TG_OP = 'DELETE' THEN
v_old_row := to_jsonb(OLD);
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
END LOOP;
INSERT INTO audit_fdw.audit_log_remote (
change_time, table_schema, table_name, operation,
row_pk, old_data, new_data, changed_columns,
app_user, db_user, client_ip, tx_id, source_db
) VALUES (
now(),
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'D',
v_row_pk,
v_old_row,
NULL,
NULL,
v_app_user,
current_user,
inet_client_addr(),
txid_current(),
current_database()
);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;2.3.2 为业务表创建跨库审计触发器
-- 批量为 dev schema 下所有表创建跨库审计触发器
DO $$
DECLARE
r RECORD;
trigger_name TEXT;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'dev'
LOOP
trigger_name := 'audit_fdw_' || r.tablename;
-- 先删除旧触发器(如果存在)
EXECUTE format('DROP TRIGGER IF EXISTS %I ON dev.%I;', trigger_name, r.tablename);
-- 创建新触发器(使用 FDW 函数)
EXECUTE format(
'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
'FOR EACH ROW EXECUTE FUNCTION audit_fdw.audit_trigger_func_fdw();',
trigger_name, r.tablename
);
RAISE NOTICE 'Created FDW trigger % on dev.%', trigger_name, r.tablename;
END LOOP;
END $$;2.4 测试跨库审计
-- 切换到业务库
\c business_db
-- 以 business_user 身份执行业务操作
SET ROLE business_user;
-- 插入测试数据
INSERT INTO dev.contract (id, name, amount) VALUES (9999, 'FDW审计测试', 100000);
-- 更新测试数据
UPDATE dev.contract SET amount = 200000 WHERE id = 9999;
-- 删除测试数据
DELETE FROM dev.contract WHERE id = 9999;
-- 重置角色
RESET ROLE;
-- 切换到审计库查看审计日志
\c audit_db
SELECT
id,
change_time,
source_db,
table_name,
operation,
row_pk,
old_data,
new_data,
changed_columns,
db_user
FROM audit.audit_log
WHERE table_name = 'contract'
AND (row_pk->>'id')::int = 9999
ORDER BY change_time DESC;预期结果:
- 应该看到 3 条审计记录(INSERT、UPDATE、DELETE)
source_db字段显示为business_db- 所有操作都成功记录到审计库的
audit.audit_log表中
3. 性能与事务一致性
3.1 性能影响
| 维度 | 同库方案 | 跨库 FDW 方案 |
|---|---|---|
| 网络开销 | 无 | 有(本地网络约 0.1-1ms) |
| 事务开销 | 低 | 中(FDW 自动管理分布式事务) |
| 写入延迟 | ~1ms | ~2-5ms |
| 适用场景 | 单库高并发 | 多库隔离、合规要求 |
3.2 事务一致性保证
FDW 方案下的事务行为:
-- 示例:业务操作和审计写入在同一事务中
BEGIN;
INSERT INTO dev.contract (id, name) VALUES (1, 'Test'); -- 业务库写入
-- 触发器自动通过 FDW 写入审计库
COMMIT; -- 两个库的操作要么全部成功,要么全部回滚
-- 如果任意一方失败,整个事务回滚
BEGIN;
INSERT INTO dev.contract (id, name) VALUES (2, 'Test2');
-- 假设审计库连接失败或分区不存在
ROLLBACK; -- 业务操作也会回滚注意事项:
- FDW 使用 两阶段提交(2PC) 保证跨库事务一致性
- 如果审计库不可用,业务操作会失败(可以通过异常处理优化)
3.3 高可用优化(可选)
如果希望审计库故障时不影响业务操作,可以使用异常捕获:
CREATE OR REPLACE FUNCTION audit_fdw.audit_trigger_func_fdw_safe()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
-- ... 变量声明同前 ...
BEGIN
-- ... 数据处理逻辑同前 ...
-- 在写入外部表时捕获异常
BEGIN
INSERT INTO audit_fdw.audit_log_remote (...) VALUES (...);
EXCEPTION WHEN OTHERS THEN
-- 记录到本地日志,但不影响业务事务
RAISE WARNING 'Audit log write failed: %', SQLERRM;
-- 可选:写入本地备份表
END;
RETURN NEW; -- 或 OLD(DELETE 操作)
END;
$$;⚠️ 警告:此方案会降低审计完整性,仅在对可用性要求极高的场景下使用。
4. 方案选择指南
| 场景 | 推荐方案 | 原因 |
|---|---|---|
| 业务库和审计库在同一数据库实例 | 同库不同 Schema 方案(第一层) | 简单、高性能、事务一致性强 |
| 业务库和审计库需要物理隔离 | 跨库 FDW 方案 | 数据库级隔离、故障隔离 |
| 多个业务库共享一个审计库 | 跨库 FDW 方案 | 集中式审计管理 |
| 对审计完整性要求极高 | 同库 + 应用层双写 | 双重保障 |
| 审计库需要部署在不同机器 | 跨库 FDW 方案 | 支持跨主机连接 |
5. 安全加固
5.1 FDW 连接加密
-- 修改外部服务器配置,启用 SSL 连接
ALTER SERVER audit_db_server OPTIONS (ADD sslmode 'require');
-- 在审计库的 pg_hba.conf 中限制连接来源
-- hostssl audit_db audit_writer 192.168.1.0/24 md55.2 密码管理
-- 使用 .pgpass 文件存储密码(避免在 SQL 中明文密码)
-- ~/.pgpass 内容示例:
-- localhost:5432:audit_db:audit_writer:your_secure_password
-- 创建用户映射时不指定密码(自动从 .pgpass 读取)
CREATE USER MAPPING FOR business_user
SERVER audit_db_server
OPTIONS (user 'audit_writer');5.3 权限最小化
-- 在审计库限制 audit_writer 只能 INSERT
REVOKE UPDATE, DELETE ON audit.audit_log FROM audit_writer;
-- 在业务库限制 business_user 只能 INSERT 外部表
REVOKE SELECT, UPDATE, DELETE ON audit_fdw.audit_log_remote FROM business_user;
GRANT INSERT ON audit_fdw.audit_log_remote TO business_user;6. 故障排查
6.1 常见问题
问题 1:触发器执行失败,提示 "foreign table not found"
-- 解决:检查外部表是否存在
SELECT * FROM information_schema.foreign_tables
WHERE foreign_table_schema = 'audit_fdw';
-- 如果不存在,重新创建外部表
IMPORT FOREIGN SCHEMA audit
LIMIT TO (audit_log)
FROM SERVER audit_db_server
INTO audit_fdw;问题 2:提示 "connection to server failed"
-- 解决:测试 FDW 连接
SELECT * FROM audit_fdw.audit_log_remote LIMIT 1;
-- 检查服务器配置
SELECT srvname, srvoptions FROM pg_foreign_server WHERE srvname = 'audit_db_server';
-- 检查用户映射
SELECT * FROM pg_user_mappings WHERE srvname = 'audit_db_server';问题 3:审计数据未写入
-- 1. 检查触发器是否存在
SELECT tgname, tgrelid::regclass, tgenabled
FROM pg_trigger
WHERE tgname LIKE 'audit_fdw_%';
-- 2. 手动测试外部表写入
INSERT INTO audit_fdw.audit_log_remote (
change_time, table_schema, table_name, operation, db_user, tx_id, source_db
) VALUES (
now(), 'dev', 'test_table', 'I', current_user, txid_current(), current_database()
);
-- 3. 到审计库查看是否有数据
\c audit_db
SELECT * FROM audit.audit_log ORDER BY change_time DESC LIMIT 5;6.2 性能监控
-- 在业务库监控 FDW 查询统计
SELECT * FROM pg_stat_user_tables WHERE schemaname = 'audit_fdw';
-- 查看慢查询(FDW 写入耗时)
SELECT query, mean_exec_time, calls
FROM pg_stat_statements
WHERE query LIKE '%audit_log_remote%'
ORDER BY mean_exec_time DESC;7. 迁移指南
7.1 从同库方案迁移到跨库方案
-- 步骤 1: 在审计库创建表和用户(见 2.1 节)
-- 步骤 2: 在业务库配置 FDW(见 2.2 节)
-- 步骤 3: 迁移历史审计数据(可选)
-- 在业务库执行:
INSERT INTO audit_fdw.audit_log_remote
SELECT * FROM audit.audit_log
WHERE change_time >= '2025-01-01'; -- 指定需要迁移的时间范围
-- 步骤 4: 删除旧触发器,创建新触发器(见 2.3.2 节)
-- 步骤 5: 验证新触发器工作正常后,删除业务库的本地审计表
DROP TABLE IF EXISTS audit.audit_log CASCADE;
DROP SCHEMA IF EXISTS audit CASCADE;7.2 多业务库接入统一审计库
-- 在每个业务库(business_db1, business_db2, ...)执行:
-- 1. 安装 postgres_fdw 扩展
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
-- 2. 创建指向同一审计库的外部服务器
CREATE SERVER audit_db_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'audit-db-host', port '5432', dbname 'audit_db');
-- 3. 创建用户映射和外部表(同前)
-- 4. 创建触发器(同前)
-- 最终效果:多个业务库的审计数据汇总到同一个审计库8. 完整部署检查清单
- [ ] 审计库(audit_db)已创建并完成分区表初始化
- [ ] 审计用户(audit_writer)已创建并授予最小权限
- [ ] 业务库已安装 postgres_fdw 扩展
- [ ] 外部服务器(audit_db_server)已创建并测试连接
- [ ] 用户映射已创建(business_user → audit_writer)
- [ ] 外部表(audit_log_remote)已创建并可写入
- [ ] 触发器函数(audit_trigger_func_fdw)已创建
- [ ] 所有业务表的触发器已创建并启用
- [ ] 测试插入/更新/删除操作,审计数据正常写入审计库
- [ ] 配置自动分区管理(审计库)
- [ ] 配置监控和告警(FDW 连接状态、写入延迟)
- [ ] 文档化密码管理方式(.pgpass 或密钥管理服务)
总结:三种审计方案对比
| 方案 | 部署复杂度 | 性能影响 | 隔离性 | 事务一致性 | 适用场景 |
|---|---|---|---|---|---|
| 同库不同 Schema | ⭐ 简单 | ⭐⭐⭐ 极低 | ⭐ Schema 级 | ⭐⭐⭐ 强 | 单库、高并发 |
| 跨库 FDW | ⭐⭐ 中等 | ⭐⭐ 低 | ⭐⭐⭐ 数据库级 | ⭐⭐ 中(2PC) | 多库、合规隔离 |
| 应用层双写 | ⭐⭐⭐ 复杂 | ⭐ 中 | ⭐⭐⭐ 数据库级 | ⭐ 弱(最终一致) | 异步审计、高可用 |
推荐组合:
- 生产环境(物理隔离):跨库 FDW 方案(数据库触发器) + 应用层审计(beyondsoft-log-spring3-starter)
- 开发/测试环境:同库不同 Schema 方案(简单高效)
- 合规审计:以上任一方案 + pgaudit 插件审计