pg_incremental

增量处理流式事件

概览

扩展包名版本分类许可证语言
pg_incremental1.5.0FEATPostgreSQLC
ID扩展名BinLibLoadCreateTrustReloc模式
2850pg_incrementalpg_catalog
相关扩展age hll rum pg_graphql pg_jsonschema jsquery pg_hint_plan

pg_cron is optional since v1.3 and only required for scheduled pipelines.

版本

类型仓库版本PG 大版本包名依赖
EXTPIGSTY1.5.01817161514pg_incremental-
RPMPIGSTY1.5.01817161514pg_incremental_$v-
DEBPIGSTY1.5.01817161514postgresql-$v-pg-incremental-
OS / PGPG18PG17PG16PG15PG14
el8.x86_64PIGSTY MISSPIGSTY MISS
el8.aarch64PIGSTY MISSPIGSTY MISS
el9.x86_64PIGSTY MISSPIGSTY MISS
el9.aarch64PIGSTY MISSPIGSTY MISS
el10.x86_64PIGSTY MISSPIGSTY MISS
el10.aarch64PIGSTY MISSPIGSTY MISS
d12.x86_64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
d12.aarch64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
d13.x86_64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
d13.aarch64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
u22.x86_64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
u22.aarch64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
u24.x86_64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
u24.aarch64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
u26.x86_64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS
u26.aarch64
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY 1.5.0
PIGSTY MISSPIGSTY MISS

构建

您可以使用 pig build 命令构建 pg_incremental 扩展的 RPM / DEB 包:

pig build pkg pg_incremental         # 构建 RPM / DEB 包

安装

您可以直接安装 pg_incremental 扩展包的预置二进制包,首先确保 PGDGPIGSTY 仓库已经添加并启用:

pig repo add pgsql -u          # 添加仓库并更新缓存

使用 pig 或者是 apt/yum/dnf 安装扩展:

pig install pg_incremental;          # 当前活跃 PG 版本安装
pig ext install -y pg_incremental -v 18  # PG 18
pig ext install -y pg_incremental -v 17  # PG 17
pig ext install -y pg_incremental -v 16  # PG 16
dnf install -y pg_incremental_18       # PG 18
dnf install -y pg_incremental_17       # PG 17
dnf install -y pg_incremental_16       # PG 16
apt install -y postgresql-18-pg-incremental   # PG 18
apt install -y postgresql-17-pg-incremental   # PG 17
apt install -y postgresql-16-pg-incremental   # PG 16

创建扩展

CREATE EXTENSION pg_incremental;

用法

pg_incremental 为 append-only table 和 file feed 定义 exactly-once 增量流水线。上游文档记录了三类 pipeline:sequence、time-interval 和 file-list。

安装与调度模型

上游 README 仍然使用基于 pg_cron 的调度模型,并通过下面的方式安装:

CREATE EXTENSION pg_incremental CASCADE;

除非显式指定 execute_immediately := false,否则 pipeline 会在创建时立刻运行一次,之后继续按 pg_cron 调度执行。README 还说明,即使没有新数据,每次调度执行也会出现在 cron.job_run_details 中。

Sequence Pipelines

sequence pipeline 用于处理可安全消费的序列值范围:

SELECT incremental.create_sequence_pipeline('event-aggregation', 'events', $$
  INSERT INTO events_agg
  SELECT date_trunc('day', event_time), count(*)
  FROM events
  WHERE event_id BETWEEN $1 AND $2
  GROUP BY 1
  ON CONFLICT (day) DO UPDATE
  SET event_count = events_agg.event_count + excluded.event_count
$$);

README 记录了 max_batch_size,可用于限制每次运行处理的 sequence ID 数量。

Time-Interval Pipelines

当命令希望把 $1$2 作为时间区间边界接收时,可以使用时间窗口:

SELECT incremental.create_time_interval_pipeline('event-aggregation', '1 day', $$
  INSERT INTO events_agg
  SELECT event_time::date, count(DISTINCT event_id)
  FROM events
  WHERE event_time >= $1 AND event_time < $2
  GROUP BY 1
$$);

对于导出类任务,README 记录了 batched := false,这样每个时间区间都会单独执行。

File-List Pipelines

file-list pipeline 用于处理新发现的文件:

SELECT incremental.create_file_list_pipeline('event-import', 's3://mybucket/events/*.csv', $$
  SELECT import_events($1)
$$);

v1.5.0 release 为 file-list pipeline 增加了 max_batches_per_run。README 还记录了 incremental.skip_file(),可将坏文件永久标记为已处理。

运维与监控

README 记录了以下接口:

  • CALL incremental.execute_pipeline(name):若存在新工作则执行一次。
  • SELECT incremental.reset_pipeline(name):重置进度。
  • SELECT incremental.drop_pipeline(name):删除 pipeline。
  • incremental.sequence_pipelinesincremental.time_interval_pipelinesincremental.file_list_pipelinesincremental.processed_files 等视图和表。

v1.5.0 release note 还提到修复了在未安装 pg_cron 环境下的 DROP EXTENSION 问题。


最后修改 2026-05-01: update extension data (e399d22)