PostgreSQL 审计方案
背景与目标
当前系统使用 PostgreSQL 数据库并计划实施完整的数据审计方案。核心诉求是:
- INSERT:只需要记录新值或基本信息,不需要旧值;
- DELETE:只需要记录关键标识(主键等),不需要旧值;
- UPDATE:必须记录被修改行的 旧值(至少是被修改的那些字段),以满足审计和追责需求;
- 同时,要避免审计表无限膨胀、影响业务写入与查询性能。
本文设计一套 三层核心审计方案 + 一套备选审计方案,各方案互不依赖,可根据需求独立实施:
审计架构规划
核心层审计架构(独立部署、职责分离)
| 层级 | 名称 | 职责 | 特点 | 实现方式 | 核心存储对象 |
|---|---|---|---|---|---|
| 第一层 | 数据库触发器审计 | 记录业务表的 INSERT/UPDATE/DELETE 操作 | 行级数据变更追溯,记录UPDATE旧值/新值 | PostgreSQL 触发器 + SECURITY DEFINER 函数 | audit.audit_log(分区表) |
| 第二层 | 应用层审计(框架) | 记录应用层的业务操作日志 | 覆盖登录、请求、数据变更、第三方调用等业务行为 | 应用代码(beyondsoft-log-spring3-starter) | -(落地表见第三层) |
备选方案:pgaudit 插件审计(SQL 级别审计)
- 职责:记录数据库层面的安全事件和 SQL 语句
- 特点:记录登录、权限变更、DDL、敏感 SQL 等,适合安全审计与合规留痕
- 实现:通过 pgaudit 扩展
- 日志输出:PostgreSQL 日志文件
方案优势
- 层级独立:核心二层+备选方案互不依赖,可根据需求选择性实施
- 职责分离:各层审计职责清晰,行级数据、业务行为、SQL级审计互为补充
- 灵活部署:可分阶段逐步落地(如先部署第一层触发器审计,再落地应用层审计)
- 性能可控:审计表分区化管理,对主业务表写入性能影响可控
- 数据可控:审计表可控增长、可快速查询近期数据
数据库与 Schema 规划
- 数据库:
system - Schema 划分:
dev: 开发环境业务表所在 schema(现有,包含合同、工单、用户、权限等表)audit: 审计表专用 schema(现有,用于存放所有审计日志表及审计触发器函数)prod: 生产环境 schema(预留,暂不使用)public: 默认 schema(暂不使用)
关于 partman schema:pg_partman 扩展默认安装在
publicschema 或指定的 schema(如partman)中,只是存放扩展自身的函数和配置表,不影响业务表和审计表的使用。可以保留但不是必须的,后文统一使用partman.create_parent()等函数调用。
第一层:数据库触发器审计(行级数据变更审计)
方案概述
适用场景:需要记录业务表的详细数据变更历史,包括 UPDATE 操作的旧值和新值。
核心特性:
- 记录 INSERT/UPDATE/DELETE 操作的行级数据变更
- UPDATE 操作记录变更列的旧值和新值
- 通过触发器自动记录,对业务代码无侵入
- 使用 SECURITY DEFINER 实现权限隔离
- 支持动态主键识别和复合主键
- 支持应用层透传业务用户信息
数据流向:
业务操作 (business_user)
↓
dev.business_table INSERT/UPDATE/DELETE
↓
触发器: audit.audit_trigger_func()
↓
SECURITY DEFINER 函数: audit.log_data_change()
↓
写入: audit.audit_log (以 audit_user 身份写入)1. 数据库设计
1.1 创建审计 schema 与分区主表
-- 1.1 创建审计 schema
CREATE SCHEMA IF NOT EXISTS audit;
-- 1.2 审计主表(分区表)
CREATE TABLE IF NOT EXISTS audit.audit_log (
id BIGSERIAL,
change_time TIMESTAMPTZ NOT NULL DEFAULT now(), -- 变更时间
table_schema TEXT NOT NULL, -- 业务表 schema
table_name TEXT NOT NULL, -- 业务表名称
operation CHAR(1) NOT NULL, -- I / U / D
row_pk JSONB, -- 主键信息(如 {"id": 123})
old_data JSONB, -- 旧值(主要用于 UPDATE)
new_data JSONB, -- 新值(INSERT/UPDATE 可选)
changed_columns TEXT[], -- 发生变更的列名
app_user TEXT, -- 业务用户(应用层透传)
db_user TEXT NOT NULL DEFAULT current_user,
client_ip INET DEFAULT inet_client_addr(),
tx_id BIGINT NOT NULL DEFAULT txid_current(),
PRIMARY KEY (id, change_time) -- 分区表主键必须包含分区键
) PARTITION BY RANGE (change_time);
-- 1.3 创建自动分区管理函数
-- 创建下个月分区的函数
CREATE OR REPLACE FUNCTION audit.create_next_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_start date;
v_end date;
v_table text;
BEGIN
-- 计算下一个月的起止日期
v_start := date_trunc('month', now())::date + INTERVAL '1 month';
v_end := (v_start + INTERVAL '1 month');
v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
FOR VALUES FROM (%L) TO (%L);',
v_table,
v_start::timestamptz,
v_end::timestamptz
);
RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;
-- 创建当前月分区的函数(初始化用)
CREATE OR REPLACE FUNCTION audit.create_current_month_partition()
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_start date;
v_end date;
v_table text;
BEGIN
v_start := date_trunc('month', now())::date;
v_end := (v_start + INTERVAL '1 month');
v_table := format('audit_log_%s', to_char(v_start, 'YYYY_MM'));
EXECUTE format(
'CREATE TABLE IF NOT EXISTS audit.%I PARTITION OF audit.audit_log
FOR VALUES FROM (%L) TO (%L);',
v_table,
v_start::timestamptz,
v_end::timestamptz
);
RAISE NOTICE 'Created partition audit.% for period % to %', v_table, v_start, v_end;
END;
$$;
-- 创建删除老分区的函数(默认保留 24 个月)
CREATE OR REPLACE FUNCTION audit.drop_old_partitions(p_retention_months INT DEFAULT 24)
RETURNS void
LANGUAGE plpgsql
AS $$
DECLARE
v_cutoff date;
v_partition record;
BEGIN
v_cutoff := date_trunc('month', now())::date - (p_retention_months || ' months')::interval;
FOR v_partition IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log_%'
AND tablename ~ '^audit_log_\d{4}_\d{2}$'
LOOP
-- 提取分区表名中的日期
DECLARE
v_partition_date date;
BEGIN
v_partition_date := to_date(substring(v_partition.tablename from 11), 'YYYY_MM');
IF v_partition_date < v_cutoff THEN
EXECUTE format('DROP TABLE IF EXISTS audit.%I;', v_partition.tablename);
RAISE NOTICE 'Dropped old partition audit.%', v_partition.tablename;
END IF;
EXCEPTION WHEN OTHERS THEN
RAISE NOTICE 'Skip invalid partition name: %', v_partition.tablename;
END;
END LOOP;
END;
$$;
-- 1.4 初始化:创建当前月和未来分区
SELECT audit.create_current_month_partition();
SELECT audit.create_next_month_partition();
-- 验证分区是否创建成功
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log%'
ORDER BY tablename;说明:
- 分区管理函数已内置,无需安装 pg_partman 扩展
- 使用系统 cron 定时执行分区维护(见后文 3.1 节)
- 默认保留 24 个月的审计数据,可根据需求调整
1.2 权限隔离方案(SECURITY DEFINER)
1.2.1 用户权限规划
为了实现审计日志与业务数据的权限隔离,采用 SECURITY DEFINER 函数 方案:
用户角色划分:
business_user(业务用户):只能操作devschema 的业务表,无法直接访问auditschema 的审计表audit_user(审计用户):只能写入auditschema 的审计表,无法访问devschema 的业务表- 触发器:通过 SECURITY DEFINER 函数以审计用户身份写入,对业务用户透明
1.2.2 创建用户并设置权限
-- 创建业务用户(如果尚未创建)
CREATE USER business_user WITH PASSWORD 'your_business_password';
-- 创建专用的审计写入用户(如果尚未创建)
CREATE USER audit_user WITH PASSWORD 'your_audit_password';
-- 1. 模式使用和创建权限(business_user)
GRANT USAGE, CREATE ON SCHEMA dev TO business_user;
-- 2. 现有表的DML权限(business_user)
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA dev TO business_user;
-- 3. 序列权限(business_user)
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA dev TO business_user;
-- 4. 未来表的默认权限(business_user)
ALTER DEFAULT PRIVILEGES IN SCHEMA dev
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO business_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA dev
GRANT USAGE, SELECT ON SEQUENCES TO business_user;
-- 5. 授予审计用户对 audit schema 的权限(audit_user)
GRANT USAGE ON SCHEMA audit TO audit_user;
-- 6. 授予审计用户对 audit_log 表的权限(数据库层面触发器审计表)(audit_user)
GRANT INSERT, SELECT ON audit.audit_log TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.audit_log_id_seq TO audit_user;
-- 如果不确定序列名称,可以使用以下命令查看实际序列名称:
-- SELECT sequencename FROM pg_sequences WHERE schemaname = 'audit';
-- 7. 为未来创建的审计表自动授权(audit_user)
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT INSERT, SELECT ON TABLES TO audit_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA audit
GRANT USAGE, SELECT ON SEQUENCES TO audit_user;
-- 8. 确保业务用户无法直接访问 audit schema(audit_user)
REVOKE ALL ON SCHEMA audit FROM business_user;
REVOKE ALL ON ALL TABLES IN SCHEMA audit FROM business_user;
-- 9. 确保审计用户无法访问 dev schema(audit_user)
REVOKE ALL ON SCHEMA dev FROM audit_user;
REVOKE ALL ON ALL TABLES IN SCHEMA dev FROM audit_user;
-- 10. 授予业务用户访问 audit schema 的最小权限(仅用于调用函数)(audit_user)
GRANT USAGE ON SCHEMA audit TO business_user;1.2.3 创建权限提升的审计写入函数
使用 SECURITY DEFINER 创建审计写入函数,该函数以其所有者(audit_user)的权限执行:
-- 创建权限提升的审计写入函数
CREATE OR REPLACE FUNCTION audit.log_data_change(
p_table_schema TEXT,
p_table_name TEXT,
p_operation TEXT,
p_row_pk JSONB,
p_old_data JSONB,
p_new_data JSONB,
p_changed_columns TEXT[],
p_app_user TEXT
) RETURNS VOID
LANGUAGE plpgsql
SECURITY DEFINER -- 关键:以函数所有者(audit_user)权限执行
SET search_path = audit, pg_temp -- 防止搜索路径劫持攻击
AS $$
BEGIN
INSERT INTO audit.audit_log (
change_time,
table_schema,
table_name,
operation,
row_pk,
old_data,
new_data,
changed_columns,
app_user,
db_user,
client_ip,
tx_id
) VALUES (
now(),
p_table_schema,
p_table_name,
p_operation,
p_row_pk,
p_old_data,
p_new_data,
p_changed_columns,
p_app_user,
current_user,
inet_client_addr(),
txid_current()
);
END;
$$;
-- 修改函数所有者为 audit_user
ALTER FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) OWNER TO audit_user;
-- 授予业务用户执行此函数的权限
GRANT EXECUTE ON FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) TO business_user;安全说明:
SECURITY DEFINER:函数以其所有者(audit_user)的权限执行,而不是调用者(business_user)的权限SET search_path = audit, pg_temp:防止搜索路径劫持攻击,确保函数只访问auditschema- 业务用户
business_user只需要EXECUTE权限,无需直接访问audit.audit_log表
1.3 通用审计函数(触发器函数)
说明:
- 该函数自动识别表的主键列,支持不同表使用不同主键名称(如
id、role_id、user_id等),也支持复合主键。- 对 UPDATE:记录变更的列名、这些列的旧值和新值;
- 对 INSERT/DELETE:记录必要信息,包括主键和完整行数据。
- 通过调用
audit.log_data_change()函数实现权限提升,自动以audit_user身份写入审计日志。
CREATE OR REPLACE FUNCTION audit.audit_trigger_func()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
DECLARE
v_app_user TEXT;
v_row_pk JSONB := '{}'::jsonb;
v_old_row JSONB;
v_new_row JSONB;
v_old_changed JSONB := '{}'::jsonb;
v_new_changed JSONB := '{}'::jsonb;
v_changed_cols TEXT[] := ARRAY[]::TEXT[];
col TEXT;
pk_col TEXT;
BEGIN
-- 从会话变量中获取业务用户(应用层设置),允许为空
BEGIN
v_app_user := current_setting('audit.user_id', true);
EXCEPTION WHEN others THEN
v_app_user := NULL;
END;
-- UPDATE: 记录变更列的旧值和新值
IF TG_OP = 'UPDATE' THEN
v_old_row := to_jsonb(OLD);
v_new_row := to_jsonb(NEW);
-- 动态构建主键(支持多列主键)
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
END LOOP;
-- 找出变更的列
SELECT array_agg(key)
INTO v_changed_cols
FROM jsonb_each(v_new_row) AS e(key, value)
WHERE (v_old_row -> key) IS DISTINCT FROM value;
IF v_changed_cols IS NULL OR array_length(v_changed_cols, 1) IS NULL THEN
RETURN NEW;
END IF;
-- 只保留变更列的旧值和新值
FOREACH col IN ARRAY v_changed_cols LOOP
v_old_changed := v_old_changed || jsonb_build_object(col, v_old_row -> col);
v_new_changed := v_new_changed || jsonb_build_object(col, v_new_row -> col);
END LOOP;
-- 调用 SECURITY DEFINER 函数写入审计日志(权限自动提升为 audit_user)
PERFORM audit.log_data_change(
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'U',
v_row_pk,
v_old_changed,
v_new_changed,
v_changed_cols,
v_app_user
);
RETURN NEW;
-- INSERT: 只记录新值
ELSIF TG_OP = 'INSERT' THEN
v_new_row := to_jsonb(NEW);
-- 动态构建主键
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_new_row -> pk_col);
END LOOP;
PERFORM audit.log_data_change(
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'I',
v_row_pk,
NULL,
v_new_row,
NULL,
v_app_user
);
RETURN NEW;
-- DELETE: 只记录旧值
ELSIF TG_OP = 'DELETE' THEN
v_old_row := to_jsonb(OLD);
-- 动态构建主键
FOR pk_col IN
SELECT a.attname
FROM pg_index i
JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE i.indrelid = TG_RELID AND i.indisprimary
LOOP
v_row_pk := v_row_pk || jsonb_build_object(pk_col, v_old_row -> pk_col);
END LOOP;
PERFORM audit.log_data_change(
TG_TABLE_SCHEMA::TEXT,
TG_TABLE_NAME::TEXT,
'D',
v_row_pk,
v_old_row,
NULL,
NULL,
v_app_user
);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
-- 重要:修改触发器函数的所有者为 audit_user
-- 这样保证两个函数的所有者一致,避免权限混乱
ALTER FUNCTION audit.audit_trigger_func() OWNER TO audit_user;优势:
- 动态主键识别:无需修改代码即可适配任何表(
id、role_id、user_id等)。- 支持复合主键:如果表有多列主键,全部记录到
row_pkJSONB 字段中。- 通用性强:一个函数适用于
devschema 下所有表。- 权限隔离:业务用户
business_user通过触发器调用 SECURITY DEFINER 函数,自动以audit_user权限写入审计日志,无需直接访问审计表。
1.4 权限验证
执行以下 SQL 验证权限设置是否正确:
-- 1. 验证 business_user 无法直接访问 audit.audit_log
SET ROLE business_user;
SELECT * FROM audit.audit_log LIMIT 1; -- 应该报错:permission denied for table audit_log
-- 2. 验证 business_user 可以通过触发器写入审计日志
SET ROLE business_user;
INSERT INTO dev.your_table (id, name) VALUES (1, 'Test'); -- 应该成功,且触发器自动写入审计日志
-- 3. 验证 audit_user 可以写入 audit_log 表
SET ROLE audit_user;
SELECT * FROM audit.audit_log LIMIT 1; -- 应该成功
-- 4. 验证 audit_user 无法访问业务表
SET ROLE audit_user;
SELECT * FROM dev.your_table LIMIT 1; -- 应该报错:permission denied for schema dev
-- 5. 重置角色
RESET ROLE;1.5 触发器问题诊断与修复
1.5.1 常见问题:触发器创建后审计表无数据
症状:
- 触发器已创建,但
audit.audit_log表中没有数据 - 业务操作(INSERT/UPDATE/DELETE)执行成功,但没有触发审计记录
可能原因:
- 触发器函数所有者权限不正确
- 触发器是在权限配置之前创建的
- SECURITY DEFINER 函数所有者不是
audit_user
1.5.2 诊断步骤
-- 1. 检查 dev schema 下的表和触发器状态
SELECT
t.tablename,
trg.tgname AS trigger_name,
p.proname AS function_name,
pg_get_userbyid(p.proowner) AS function_owner,
CASE WHEN p.prosecdef THEN 'SECURITY DEFINER' ELSE 'SECURITY INVOKER' END AS security_type
FROM pg_tables t
LEFT JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
LEFT JOIN pg_proc p ON p.oid = trg.tgfoid
WHERE t.schemaname = 'dev'
ORDER BY t.tablename;
-- 2. 检查两个审计函数的所有者和安全类型
SELECT
proname,
pg_get_userbyid(proowner) AS owner,
CASE WHEN prosecdef THEN 'SECURITY DEFINER' ELSE 'SECURITY INVOKER' END AS security_type
FROM pg_proc
WHERE pronamespace = 'audit'::regnamespace
AND proname IN ('audit_trigger_func', 'log_data_change');
-- 3. 检查 business_user 对审计函数的执行权限
SELECT
p.proname,
pg_get_userbyid(p.proowner) AS owner,
has_function_privilege('business_user', p.oid, 'EXECUTE') AS has_execute_privilege
FROM pg_proc p
WHERE p.pronamespace = 'audit'::regnamespace
AND p.proname = 'log_data_change';预期结果:
audit_trigger_func和log_data_change的所有者都应该是audit_userlog_data_change应该是SECURITY DEFINERbusiness_user对log_data_change应该有EXECUTE权限
1.5.3 修复方案:重新创建触发器
如果发现函数所有者不正确,或触发器无法正常工作,执行以下修复脚本:
-- ========================================
-- 步骤 1: 删除所有旧触发器
-- ========================================
DO $$
DECLARE
r RECORD;
BEGIN
FOR r IN
SELECT t.schemaname, t.tablename, trg.tgname
FROM pg_tables t
JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
WHERE t.schemaname = 'dev'
AND trg.tgname LIKE 'audit_%'
LOOP
EXECUTE format('DROP TRIGGER IF EXISTS %I ON %I.%I;',
r.tgname, r.schemaname, r.tablename);
RAISE NOTICE 'Dropped trigger % on %.%', r.tgname, r.schemaname, r.tablename;
END LOOP;
END $$;
-- ========================================
-- 步骤 2: 确保函数所有者正确
-- ========================================
-- 修改触发器函数所有者
ALTER FUNCTION audit.audit_trigger_func() OWNER TO audit_user;
-- 修改 SECURITY DEFINER 函数所有者
ALTER FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) OWNER TO audit_user;
-- 确保 business_user 有执行权限
GRANT EXECUTE ON FUNCTION audit.log_data_change(TEXT, TEXT, TEXT, JSONB, JSONB, JSONB, TEXT[], TEXT) TO business_user;
-- ========================================
-- 步骤 3: 重新创建触发器
-- ========================================
DO $$
DECLARE
r RECORD;
trigger_name TEXT;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'dev'
LOOP
trigger_name := 'audit_' || r.tablename;
EXECUTE format(
'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
'FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();',
trigger_name, r.tablename
);
RAISE NOTICE 'Created trigger % on dev.%', trigger_name, r.tablename;
END LOOP;
END $$;
-- ========================================
-- 步骤 4: 验证修复结果
-- ========================================
-- 查看函数所有者(应该都是 audit_user)
SELECT
proname,
pg_get_userbyid(proowner) AS owner
FROM pg_proc
WHERE pronamespace = 'audit'::regnamespace
AND proname IN ('audit_trigger_func', 'log_data_change');
-- 查看触发器数量
SELECT
schemaname,
COUNT(*) AS trigger_count
FROM pg_tables t
JOIN pg_trigger trg ON trg.tgrelid = (t.schemaname || '.' || t.tablename)::regclass
WHERE t.schemaname = 'dev'
AND trg.tgname LIKE 'audit_%'
GROUP BY schemaname;1.5.4 测试触发器是否正常工作
-- 以 business_user 身份测试
SET ROLE business_user;
-- 在某个表中插入测试数据(替换为实际的表名和列)
INSERT INTO dev.your_test_table (id, name, description)
VALUES (999, 'audit_test', '测试审计功能');
-- 更新测试数据
UPDATE dev.your_test_table
SET description = '审计功能测试成功'
WHERE id = 999;
-- 删除测试数据
DELETE FROM dev.your_test_table WHERE id = 999;
-- 切换回超级用户查看审计日志
RESET ROLE;
-- 查看刚才的操作是否被记录
SELECT
id,
change_time,
table_name,
operation,
row_pk,
old_data,
new_data,
changed_columns,
db_user
FROM audit.audit_log
WHERE table_name = 'your_test_table'
AND (row_pk->>'id')::int = 999
ORDER BY change_time DESC;预期结果:
- 应该看到 3 条审计记录(INSERT、UPDATE、DELETE)
- INSERT 记录有
new_data - UPDATE 记录有
old_data、new_data和changed_columns - DELETE 记录有
old_data - 所有记录的
db_user应该显示为business_user
1.5.5 故障排查清单
如果修复后仍然没有审计数据,按以下顺序检查:
-- ✅ 检查 1: 函数是否存在
SELECT COUNT(*) FROM pg_proc
WHERE pronamespace = 'audit'::regnamespace
AND proname IN ('audit_trigger_func', 'log_data_change');
-- 预期结果: 2
-- ✅ 检查 2: 触发器是否存在
SELECT COUNT(*) FROM pg_trigger
WHERE tgname LIKE 'audit_%'
AND tgrelid IN (SELECT oid FROM pg_class WHERE relnamespace = 'dev'::regnamespace);
-- 预期结果: > 0
-- ✅ 检查 3: 触发器是否启用
SELECT
tgname,
tgenabled,
CASE tgenabled
WHEN 'O' THEN 'enabled'
WHEN 'D' THEN 'disabled'
WHEN 'R' THEN 'replica'
WHEN 'A' THEN 'always'
END AS status
FROM pg_trigger
WHERE tgname LIKE 'audit_%'
LIMIT 5;
-- 预期结果: tgenabled = 'O' (enabled)
-- ✅ 检查 4: audit_log 表是否有分区
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size
FROM pg_tables
WHERE schemaname = 'audit'
AND tablename LIKE 'audit_log%';
-- 如果只有主表没有分区,需要先创建当前月份的分区
-- ✅ 检查 5: 创建分区(如果缺失)
CREATE TABLE IF NOT EXISTS audit.audit_log_2025_01 PARTITION OF audit.audit_log
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
-- 根据实际月份调整重要提示:如果审计表是分区表但没有对应当前时间的分区,INSERT 会失败。请确保已创建当前月份的分区,或配置了自动分区管理(参考本文档的分区管理章节)。
1.6 权限矩阵(第一层)
| 用户/角色 | dev schema (业务表) | audit schema (访问) | audit_log (SELECT/INSERT) | audit.log_data_change() (执行) | 触发器执行 |
|---|---|---|---|---|---|
| business_user (业务用户) | ✅ SELECT/INSERT/UPDATE/DELETE | ✅ USAGE only | ❌ | ✅ EXECUTE | ✅ 自动调用 |
| audit_user (审计用户) | ❌ | ✅ USAGE | ✅ SELECT/INSERT | ✅ OWNER | N/A |
| dba/superuser | ✅ | ✅ | ✅ | ✅ | ✅ |
2. 为业务表创建审计触发器
2.1 为关键业务表创建触发器
建议:只在关键业务表上开启审计(例如合同、工单、用户、权限相关表),降低整体写入压力。如果需要对
devschema 下所有表开启审计,可以使用后文的批量脚本。
假设 dev schema 下有表 contract、workorder、sys_user、sys_role 等:
-- dev.contract 合同表
CREATE TRIGGER audit_contract
AFTER INSERT OR UPDATE OR DELETE ON dev.contract
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();
-- dev.workorder 工单表
CREATE TRIGGER audit_workorder
AFTER INSERT OR UPDATE OR DELETE ON dev.workorder
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();
-- dev.sys_user 用户表
CREATE TRIGGER audit_sys_user
AFTER INSERT OR UPDATE OR DELETE ON dev.sys_user
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();
-- dev.sys_role 角色表
CREATE TRIGGER audit_sys_role
AFTER INSERT OR UPDATE OR DELETE ON dev.sys_role
FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();2.2 批量为 dev schema 下所有表创建审计触发器
如果需要对 dev schema 下的所有表统一开启审计,可以使用以下脚本:
DO $$
DECLARE
r RECORD;
trigger_name TEXT;
BEGIN
FOR r IN
SELECT tablename
FROM pg_tables
WHERE schemaname = 'dev'
LOOP
trigger_name := 'audit_' || r.tablename;
-- 检查触发器是否已存在,避免重复创建
IF NOT EXISTS (
SELECT 1 FROM pg_trigger
WHERE tgname = trigger_name
) THEN
EXECUTE format(
'CREATE TRIGGER %I AFTER INSERT OR UPDATE OR DELETE ON dev.%I '
'FOR EACH ROW EXECUTE FUNCTION audit.audit_trigger_func();',
trigger_name, r.tablename
);
RAISE NOTICE 'Created trigger % on dev.%', trigger_name, r.tablename;
END IF;
END LOOP;
END $$;3. 审计表性能优化
3.1 分区 + 保留策略(核心手段)
- 审计表使用 按月份分区(
PARTITION BY RANGE(change_time)),避免单表行数累积到几千万/几亿; - 对于过旧的数据(如 6 个月/1 年前),通过删除或归档整个月的分区表进行"粗粒度清理",而不是在大表里
DELETE; - 分区的创建和清理必须完全自动化,不依赖人工执行 SQL。
3.1.1 配置定时维护任务
分区管理函数已在前文 1.1 节创建完成,现在只需配置系统 cron 定时执行即可。
使用系统 cron(推荐,适用于所有平台和所有 PostgreSQL 版本)
# 创建日志目录
sudo mkdir -p /var/log/postgresql
sudo chown postgres:postgres /var/log/postgresql
# 编辑 postgres 用户的 crontab(重要:必须使用 postgres 用户)
sudo crontab -u postgres -e
# 添加以下两行
# 每天凌晨 2 点创建下个月分区
0 2 * * * psql -d postgres -c "SELECT audit.create_next_month_partition();" >> /var/log/postgresql/partition_create.log 2>&1
# 每天凌晨 3 点清理老分区(保留 24 个月)
0 3 * * * psql -d postgres -c "SELECT audit.drop_old_partitions(24);" >> /var/log/postgresql/partition_cleanup.log 2>&1第二层:应用层审计(业务日志审计)
方案概述
适用场景:需要记录应用层的全量业务行为,包括用户登录、接口请求、数据变更、第三方调用等,补充数据库层审计的视角盲区。
核心特性:
- 与业务代码解耦(通过 beyondsoft-log-spring3-starter 实现)
- 记录更丰富的业务上下文(用户IP、请求参数、耗时、trace_id等)
- 支持结构化存储(JSONB)和高效查询
- 审计数据落地到 PostgreSQL 专用表(见第三层)
- 对应用代码入侵性低(基于注解/拦截器实现)
数据流向:
用户操作/接口调用
↓
应用层拦截器/注解(beyondsoft-log-spring3-starter)
↓
业务日志格式化处理
↓
写入: audit 下4张应用层审计表(以 audit_user 身份写入)核心职责
| 审计类型 | 记录内容 | 价值 |
|---|---|---|
| 登录审计 | 用户名、IP、登录位置、终端信息、登录状态 | 溯源用户登录行为,排查异常登录 |
| 请求审计 | 请求URL、参数、耗时、响应、错误信息、操作用户 | 全量接口调用追溯,性能问题定位 |
| 数据变更审计 | 操作人、变更内容、业务ID、新旧值 | 业务数据变更的应用层溯源 |
| 第三方调用审计 | 调用地址、参数、耗时、响应 | 第三方接口问题定位,责任界定 |
实现方式
- 引入
beyondsoft-log-spring3-starter依赖 - 配置日志输出目标为 PostgreSQL(指向 audit 下的 4 张表)
- 通过注解/配置指定需要审计的接口/方法
- 配置
audit_user数据库账号用于日志写入 - 配置日志保留策略(与第三层表分区策略对齐)
1. 应用层审计表设计
🚧 以下四张表是使用 PostgreSQL 替代 MongoDB 后落地应用层审计的核心表,需在
auditschema 下创建:
- 📌 beyond_soft_login_log:登录日志表
- 📌 beyond_soft_request_log:请求日志表
- 📌 beyond_soft_data_audit_log:数据变更审计表
- 📌 beyond_soft_third_part_log:第三方调用日志表
1.1 创建应用层审计表(分区表)
-- ==========================================
-- Beyond Soft 日志系统 - PostgreSQL 按天分区表
-- ==========================================
-- 功能说明:
-- 1. 创建4个日志主表(支持按天自动分区)
-- 2. 应用层通过 BeyondSoftLogServiceImpl 自动管理分区
-- 3. 使用 Caffeine Cache 缓存分区状态,避免重复检查
--
-- 重要说明:
-- - 此SQL默认在 public schema 中创建表
-- - 如果需要在其他schema,请:
-- 1. 先创建 schema:CREATE SCHEMA IF NOT EXISTS your_schema;
-- 2. 设置搜索路径:SET search_path TO your_schema, public;
-- 3. 配置文件中指定:beyond-soft.log.schema=your_schema
-- ==========================================
-- ==========================================
-- 1. 登录日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS beyond_soft_login_log (
id BIGSERIAL,
username VARCHAR(100),
ip VARCHAR(50),
ip_chain TEXT,
location VARCHAR(200),
platform VARCHAR(50),
browser VARCHAR(100),
os VARCHAR(100),
status VARCHAR(50) DEFAULT '登录成功',
response TEXT,
request_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_login_log_username ON beyond_soft_login_log(username);
CREATE INDEX IF NOT EXISTS idx_login_log_request_time ON beyond_soft_login_log(request_time);
CREATE INDEX IF NOT EXISTS idx_login_log_ip ON beyond_soft_login_log(ip);
COMMENT ON TABLE beyond_soft_login_log IS '登录日志表(按天分区)';
COMMENT ON COLUMN beyond_soft_login_log.ip_chain IS 'IP转发链路描述,用于多层代理环境的IP溯源';
COMMENT ON COLUMN beyond_soft_login_log.request_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 2. 请求日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS beyond_soft_request_log (
id BIGSERIAL,
ip VARCHAR(50),
ip_chain TEXT,
location VARCHAR(200),
request_type VARCHAR(20),
url VARCHAR(1000),
request_params TEXT,
content_type VARCHAR(100),
operator VARCHAR(100),
auth_type VARCHAR(50),
user_agent TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
spend_time BIGINT,
status INTEGER,
response TEXT,
error TEXT,
trace_id VARCHAR(100),
request_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_request_log_url ON beyond_soft_request_log USING hash(url);
CREATE INDEX IF NOT EXISTS idx_request_log_trace_id ON beyond_soft_request_log(trace_id);
CREATE INDEX IF NOT EXISTS idx_request_log_request_time ON beyond_soft_request_log(request_time);
CREATE INDEX IF NOT EXISTS idx_request_log_operator ON beyond_soft_request_log(operator);
CREATE INDEX IF NOT EXISTS idx_request_log_auth_type ON beyond_soft_request_log(auth_type);
COMMENT ON TABLE beyond_soft_request_log IS '请求日志表(按天分区)';
COMMENT ON COLUMN beyond_soft_request_log.ip_chain IS 'IP转发链路描述,用于多层代理环境的IP溯源';
COMMENT ON COLUMN beyond_soft_request_log.auth_type IS '授权方式: SA-TOKEN / THIRD-PARTY / ANONYMOUS';
COMMENT ON COLUMN beyond_soft_request_log.request_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 3. 审计日志表(支持JSONB)
-- ==========================================
CREATE TABLE IF NOT EXISTS beyond_soft_data_audit_log (
id BIGSERIAL,
operator VARCHAR(100),
modify_date TIMESTAMP,
operation VARCHAR(50),
handle_name VARCHAR(200),
modify_content TEXT,
modifier_ip VARCHAR(50),
modifier_ip_chain TEXT,
modifier_location VARCHAR(200),
business_id VARCHAR(100),
business_data_status VARCHAR(50),
old_object JSONB,
new_object JSONB,
response TEXT,
request_time TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, request_time)
) PARTITION BY RANGE (request_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_audit_log_operator ON beyond_soft_data_audit_log(operator);
CREATE INDEX IF NOT EXISTS idx_audit_log_business_id ON beyond_soft_data_audit_log(business_id);
CREATE INDEX IF NOT EXISTS idx_audit_log_request_time ON beyond_soft_data_audit_log(request_time);
CREATE INDEX IF NOT EXISTS idx_audit_log_operation ON beyond_soft_data_audit_log(operation);
-- JSONB 字段的 GIN 索引(支持高效 JSON 查询)
CREATE INDEX IF NOT EXISTS idx_audit_log_old_object_gin ON beyond_soft_data_audit_log USING gin(old_object);
CREATE INDEX IF NOT EXISTS idx_audit_log_new_object_gin ON beyond_soft_data_audit_log USING gin(new_object);
COMMENT ON TABLE beyond_soft_data_audit_log IS '审计日志表(按天分区)';
COMMENT ON COLUMN beyond_soft_data_audit_log.modifier_ip_chain IS 'IP转发链路描述,用于多层代理环境的IP溯源';
COMMENT ON COLUMN beyond_soft_data_audit_log.old_object IS 'JSONB格式:操作前的对象';
COMMENT ON COLUMN beyond_soft_data_audit_log.new_object IS 'JSONB格式:操作后的对象';
COMMENT ON COLUMN beyond_soft_data_audit_log.request_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 4. 第三方调用日志表
-- ==========================================
CREATE TABLE IF NOT EXISTS beyond_soft_third_part_log (
id BIGSERIAL,
request_id VARCHAR(100),
protocols VARCHAR(20),
url VARCHAR(1000),
query_params TEXT,
body TEXT,
call_source VARCHAR(200),
method_name VARCHAR(200),
headers JSONB,
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
spend_time BIGINT,
response TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, start_time)
) PARTITION BY RANGE (start_time);
-- 创建索引
CREATE INDEX IF NOT EXISTS idx_third_part_log_request_id ON beyond_soft_third_part_log(request_id);
CREATE INDEX IF NOT EXISTS idx_third_part_log_url ON beyond_soft_third_part_log USING hash(url);
CREATE INDEX IF NOT EXISTS idx_third_part_log_call_source ON beyond_soft_third_part_log(call_source);
CREATE INDEX IF NOT EXISTS idx_third_part_log_start_time ON beyond_soft_third_part_log(start_time);
COMMENT ON TABLE beyond_soft_third_part_log IS '第三方调用日志表(按天分区)';
COMMENT ON COLUMN beyond_soft_third_part_log.headers IS 'JSONB格式:请求头信息';
COMMENT ON COLUMN beyond_soft_third_part_log.start_time IS '分区键:按此字段进行分区';
-- ==========================================
-- 分区管理说明
-- ==========================================
/*
🔧 自动分区管理由应用层实现:
- BeyondSoftLogServiceImpl 在插入日志前自动检查分区
- 使用 Caffeine Cache 缓存分区状态(24小时过期)
- 首次插入时自动创建未来7天的分区
- 无需手动维护分区,应用层自动管理
📊 分区命名规则:
- 登录日志:beyond_soft_login_log_2025_01_15
- 请求日志:beyond_soft_request_log_2025_01_15
- 审计日志:beyond_soft_data_audit_log_2025_01_15
- 第三方日志:beyond_soft_third_part_log_2025_01_15
🗑️ 历史数据清理:
- 建议通过定时任务每周清理90天前的分区
- 示例SQL:
DROP TABLE beyond_soft_login_log_2024_10_15;
*/1.2 应用层审计表权限配置
GRANT CREATE, USAGE ON SCHEMA audit TO audit_user;
-- 授予表的分区管理权限
ALTER TABLE audit.beyond_soft_data_audit_log OWNER TO audit_user;
ALTER TABLE audit.beyond_soft_login_log OWNER TO audit_user;
ALTER TABLE audit.beyond_soft_request_log OWNER TO audit_user;
ALTER TABLE audit.beyond_soft_third_part_log OWNER TO audit_user;
-- 保留原有的读写权限
GRANT INSERT, SELECT ON audit.beyond_soft_data_audit_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_login_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_request_log TO audit_user;
GRANT INSERT, SELECT ON audit.beyond_soft_third_part_log TO audit_user;
-- 序列权限保持不变
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_data_audit_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_login_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_request_log_id_seq TO audit_user;
GRANT USAGE, SELECT ON SEQUENCE audit.beyond_soft_third_part_log_id_seq TO audit_user;
-- 查询所有分区表(父表)
SELECT
nmsp_parent.nspname AS parent_schema,
parent.relname AS parent_table,
nmsp_child.nspname AS child_schema,
child.relname AS child_table
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relkind = 'p' -- 父表是分区表
ORDER BY parent_schema, parent_table;
-- 生成删除分区子表的语句(不删除父表)
SELECT
'DROP TABLE IF EXISTS "' || nmsp_child.nspname || '"."' || child.relname || '" CASCADE;' AS drop_statement,
nmsp_child.nspname AS child_schema,
child.relname AS child_table
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relkind = 'p'
-- 可以添加过滤条件,例如:
-- AND nmsp_child.nspname = 'public'
-- AND parent.relname LIKE 'logs_%'
ORDER BY child_schema, child_table;1.3 应用层审计表分区管理
说明:应用层审计表按天分区(高频写入场景),分区管理通过beyondsoft-log-spring3-starter实现:
1.4 权限矩阵(第三层)
| 用户/角色 | dev schema (业务表) | audit schema (访问) | 4张应用层审计表 (SELECT/INSERT) | 序列权限 |
|---|---|---|---|---|
| business_user (业务用户) | ✅ | ✅ USAGE only | ❌ | ❌ |
| audit_user (审计用户) | ❌ | ✅ USAGE | ✅ SELECT/INSERT | ✅ USAGE/SELECT |
| dba/superuser | ✅ | ✅ | ✅ | ✅ |
备选方案:pgaudit 插件审计(SQL 级别审计)
方案概述
适用场景:需要满足合规要求,记录数据库层面的所有 SQL 操作、权限变更、DDL 操作等安全事件。
核心特性:
- 记录所有数据库层面的 SQL 执行(SELECT/INSERT/UPDATE/DELETE/DDL/DCL)
- 按角色/用户/表粒度过滤审计内容
- 日志输出到 PostgreSQL 日志文件,便于集中收集(如 ELK)
- 低性能损耗(相比触发器审计)
- 无需修改业务代码/数据库结构
1. 安装与配置
1.1 安装 pgaudit 扩展
# CentOS/RHEL
sudo yum install postgresql14-pgaudit
# Debian/Ubuntu
sudo apt-get install postgresql-14-pgaudit
# 重启PostgreSQL
sudo systemctl restart postgresql-141.2 启用 pgaudit 扩展
-- 进入目标数据库
psql -U postgres -d system
-- 创建扩展
CREATE EXTENSION IF NOT EXISTS pgaudit;1.3 配置 postgresql.conf
# 修改 postgresql.conf(通常在 /var/lib/pgsql/14/data/ 目录)
shared_preload_libraries = 'pgaudit' # 添加 pgaudit
pgaudit.log = 'ddl, write, function, role' # 审计类型:DDL、写入操作、函数调用、角色变更
pgaudit.log_catalog = off # 不审计系统表
pgaudit.log_level = 'notice' # 日志级别
pgaudit.log_parameter = on # 记录SQL参数
pgaudit.log_relation = on # 记录表级操作
pgaudit.log_statement_once = off # 每次执行都记录
# 日志输出配置
logging_collector = on
log_directory = 'pg_log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_rotation_age = 1d
log_rotation_size = 100MB
log_line_prefix = '%t [%p]: [%c-%l] user=%u,db=%d,app=%a,client=%h '
log_statement = 'none' # 关闭默认SQL记录,由pgaudit接管1.4 重启生效
sudo systemctl restart postgresql-142. 细粒度审计配置
2.1 按表配置审计
-- 仅审计 dev schema 下的关键表
ALTER ROLE business_user SET pgaudit.log_relation = 'on';
ALTER TABLE dev.contract SET (pgaudit.log = 'write');
ALTER TABLE dev.sys_user SET (pgaudit.log = 'all');2.2 审计结果查看
# 查看审计日志
tail -f /var/lib/pgsql/14/data/pg_log/postgresql-2025-01-01_000000.log
# 示例日志内容
# 2025-01-01 10:00:00 UTC [1234]: [1-1] user=business_user,db=system,app=psql,client=192.168.1.100 LOG: AUDIT: OBJECT,1,WRITE,TABLE,dev.contract,INSERT,public,"INSERT INTO dev.contract (id, name) VALUES (1, 'Test')",<none>3. 方案对比
| 维度 | 触发器审计(第一层) | 应用层审计(第二层) | pgaudit 插件审计(备选) |
|---|---|---|---|
| 审计粒度 | 行级(数据变更) | 业务行为级(登录/请求) | SQL语句级(数据库操作) |
| 性能影响 | 低(行级触发器) | 低(应用层异步写入) | 极低(内核级审计) |
| 数据存储 | PostgreSQL 分区表 | PostgreSQL 分区表 | 日志文件 |
| 可查询性 | 高(SQL/JSONB查询) | 高(结构化查询) | 中(日志解析) |
| 合规支持 | 业务数据追溯 | 业务行为追溯 | 数据库安全合规 |
| 入侵性 | 低(触发器) | 低(starter插件) | 无(扩展) |
4. 部署建议
- 核心审计:优先部署第一层(触发器审计)+ 第二层(应用层审计),覆盖业务数据和行为追溯;
- 合规补充:在生产环境部署 pgaudit 插件审计,满足安全合规要求;
- 性能优化:所有审计表均采用分区表,配置自动分区清理,避免数据膨胀;
- 权限管控:严格遵循 "审计用户仅可写入审计表,不可访问业务表" 的原则。