pg_incremental
概览
| 扩展包名 | 版本 | 分类 | 许可证 | 语言 |
|---|---|---|---|---|
pg_incremental | 1.5.0 | FEAT | PostgreSQL | C |
| ID | 扩展名 | Bin | Lib | Load | Create | Trust | Reloc | 模式 |
|---|---|---|---|---|---|---|---|---|
| 2850 | pg_incremental | 否 | 是 | 否 | 是 | 否 | 否 | pg_catalog |
| 相关扩展 | age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan |
|---|
pg_cron is optional since v1.3 and only required for scheduled pipelines.
版本
| 类型 | 仓库 | 版本 | PG 大版本 | 包名 | 依赖 |
|---|---|---|---|---|---|
| EXT | PIGSTY | 1.5.0 | 1817161514 | pg_incremental | - |
| RPM | PIGSTY | 1.5.0 | 1817161514 | pg_incremental_$v | - |
| DEB | PIGSTY | 1.5.0 | 1817161514 | postgresql-$v-pg-incremental | - |
构建
您可以使用 pig build 命令构建 pg_incremental 扩展的 RPM / DEB 包:
pig build pkg pg_incremental # 构建 RPM / DEB 包
安装
您可以直接安装 pg_incremental 扩展包的预置二进制包,首先确保 PGDG 和 PIGSTY 仓库已经添加并启用:
pig repo add pgsql -u # 添加仓库并更新缓存
使用 pig 或者是 apt/yum/dnf 安装扩展:
pig install pg_incremental; # 当前活跃 PG 版本安装
pig ext install -y pg_incremental -v 18 # PG 18
pig ext install -y pg_incremental -v 17 # PG 17
pig ext install -y pg_incremental -v 16 # PG 16
dnf install -y pg_incremental_18 # PG 18
dnf install -y pg_incremental_17 # PG 17
dnf install -y pg_incremental_16 # PG 16
apt install -y postgresql-18-pg-incremental # PG 18
apt install -y postgresql-17-pg-incremental # PG 17
apt install -y postgresql-16-pg-incremental # PG 16
创建扩展:
CREATE EXTENSION pg_incremental;
用法
pg_incremental 为 append-only table 和 file feed 定义 exactly-once 增量流水线。上游文档记录了三类 pipeline:sequence、time-interval 和 file-list。
安装与调度模型
上游 README 仍然使用基于 pg_cron 的调度模型,并通过下面的方式安装:
CREATE EXTENSION pg_incremental CASCADE;
除非显式指定 execute_immediately := false,否则 pipeline 会在创建时立刻运行一次,之后继续按 pg_cron 调度执行。README 还说明,即使没有新数据,每次调度执行也会出现在 cron.job_run_details 中。
Sequence Pipelines
sequence pipeline 用于处理可安全消费的序列值范围:
SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
INSERT INTO events_agg
SELECT date_trunc('day', event_time), count(*)
FROM events
WHERE event_id BETWEEN $1 AND $2
GROUP BY 1
ON CONFLICT (day) DO UPDATE
SET event_count = events_agg.event_count + excluded.event_count
$$);
README 记录了 max_batch_size,可用于限制每次运行处理的 sequence ID 数量。
Time-Interval Pipelines
当命令希望把 $1 和 $2 作为时间区间边界接收时,可以使用时间窗口:
SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
INSERT INTO events_agg
SELECT event_time::date, count(DISTINCT event_id)
FROM events
WHERE event_time >= $1 AND event_time < $2
GROUP BY 1
$$);
对于导出类任务,README 记录了 batched := false,这样每个时间区间都会单独执行。
File-List Pipelines
file-list pipeline 用于处理新发现的文件:
SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
SELECT import_events($1)
$$);
v1.5.0 release 为 file-list pipeline 增加了 max_batches_per_run。README 还记录了 incremental.skip_file(),可将坏文件永久标记为已处理。
运维与监控
README 记录了以下接口:
CALL incremental.execute_pipeline(name):若存在新工作则执行一次。SELECT incremental.reset_pipeline(name):重置进度。SELECT incremental.drop_pipeline(name):删除 pipeline。incremental.sequence_pipelines、incremental.time_interval_pipelines、incremental.file_list_pipelines与incremental.processed_files等视图和表。
v1.5.0 release note 还提到修复了在未安装 pg_cron 环境下的 DROP EXTENSION 问题。