pg_replicatepg_replicate 是一个用于快速构建 Postgres 复制解决方案的 Rust crate。它提供了构建数据管道的基础模块,可以持续地将数据从 Postgres 复制到其他系统。它在 Postgres 的逻辑流复制协议之上构建抽象,引导用户走向成功之路,而无需担心协议的底层细节。
要快速尝试 pg_replicate,您可以运行 stdout 示例,它会将数据复制到标准输出。首先,在 Postgres 中创建一个包含您想要复制的表的发布:
create publication my_publication
for table table1, table2;
然后运行 stdout 示例:
cargo run --example stdout -- --db-host localhost --db-port 5432 --db-name postgres --db-username postgres --db-password password cdc my_publication stdout_slot
在上面的示例中,pg_replicate 连接到运行在 localhost:5432 上的名为 postgres 的 Postgres 数据库,使用用户名 postgres 和密码 password。槽名 stdout_slot 将由 pg_replicate 自动创建。
参考示例文件夹以运行除 stdout 之外的其他接收器的示例(目前仅支持 bigquery 和 duckdb)。小提示:要查看所有命令行选项,可以不指定任何选项运行示例,例如 cargo run --example bigquery 将打印 bigquery 接收器的详细使用说明。
要在您的 Rust 项目中使用 pg_replicate,请通过 Cargo.toml 中的 git 依赖添加它:
[dependencies] pg_replicate = { git = "https://github.com/supabase/pg_replicate" }
目前需要 git 依赖,因为 pg_replicate 尚未发布在 crates.io 上。您还需要添加 tokio 的依赖:
[dependencies] ... tokio = { version = "1.38" }
现在您的 main.rs 可以包含如下代码:
use std::error::Error; use pg_replicate::pipeline::{ data_pipeline::DataPipeline, sinks::stdout::StdoutSink, sources::postgres::{PostgresSource, TableNamesFrom}, PipelineAction, }; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let host = "localhost"; let port = 5432; let database = "postgres"; let username = "postgres"; let password = Some("password".to_string()); let slot_name = Some("my_slot".to_string()); let table_names = TableNamesFrom::Publication("my_publication".to_string()); // 创建 PostgresSource let postgres_source = PostgresSource::new( host, port, database, username, password, slot_name, table_names, ) .await?; // 创建 StdoutSink。这个接收器只是将接收到的事件打印到标准输出 let stdout_sink = StdoutSink; // 创建 `DataPipeline` 以连接源和接收器 let mut pipeline = DataPipeline::new(postgres_source, stdout_sink, PipelineAction::Both); // 启动 `DataPipeline` 开始从 Postgres 复制数据到标准输出 pipeline.start().await?; Ok(()) }
更多示例请参考源代码中的示例文件夹。
该仓库是一个 cargo 工作空间。每个子文件夹都是工作空间中的一个 crate。以下是每个 crate 的简要说明:
api - 用于在云环境中托管 pg_replicate 的 REST API。pg_replicate - 包含核心逻辑的主要库 crate。replicator - 使用 pg_replicate 的二进制 crate。打包为 Docker 容器以用于云托管。pg_replicate 仍在积极开发中,所以可能会有 bug 和小问题,但随着时间的推移,我们计划添加以下接收器:
注意:DuckDb 和 MotherDuck 接收器不使用批处理管道,因此目前性能较差。计划开发这些接收器的批处理管道版本。
查看未解决的问题以获取提议功能的完整列表(以及已知问题)。
根据 Apache-2.0 许可证分发。有关更多信息,请参阅 LICENSE。
要创建 replicator 的 Docker 镜像,请在仓库根目录运行 docker build -f ./replicator/Dockerfile .。同样,要创建 api 的 Docker 镜像,请运行 docker build -f ./api/Dockerfile .。
应用程序可以使用 pg_replicate 中的数据源和接收器来构建数据管道,以持续从源复制数据到接收器。例如,一个从 Postgres 复制数据到 DuckDB 的数据管道只需要约 100 行 Rust 代码。
数据管道中有三个组件:
数据源是数据将被复制的对象。数据接收器是数据将被复制到的对象。管道是驱动从源到接收器的数据复制操作的对象。
+----------+ +----------+
| | | |
| 数据源 |---- 数据管道 ----> | 接收器 |
| | | |
+----------+ +----------+
所以大致上你写的代码是这样的:
let postgres_source = PostgresSource::new(...); let duckdb_sink = DuckDbSink::new(..); let pipeline = DataPipeline(postgres_source, duckdb_sink); pipeline.start();
当然,实际代码不止这四行,但这是基本思路。完整示例请查看 duckdb 示例。
数据源是管道将复制到数据接收器的数据来源。目前,仓库只有一个数据源:PostgresSource。PostgresSource 是主要数据源;任何其他源或接收器中的数据都源自它。
数据接收器是数据源的数据被复制到的地方。有两种类型的数据接收器。一种保留从 PostgresSource 输出的数据本质特性,另一种则不保留。前者可以在未来充当数据源,后者不能充当数据源,是数据的最终存放地。
例如,DuckDbSink 确保从源进来的变更数据捕获(CDC)流被物化为 DuckDB 数据库中的表。一旦完成这种有损数据转换,它就不能再用作 CDC 流。
相比之下,潜在的未来接收器 S3Sink 或 KafkaSink 只是按原样复制 CDC 流。存放在接收器中的数据稍后可以像直接从 Postgres 来的一样使用。
数据管道封装了从源到接收器复制数据的业务逻辑。它还协调从上次停止的确切位置恢复 CDC 流。数据接收器通过持久化恢复状态并在重启时将其返回给管道来参与这个过程。
如果数据接收器不是事务性的(例如 S3Sink),就 不总是可能保持 CDC 流和恢复状态之间的一致性。这可能导致这些非事务性接收器有重复的 CDC 流部分。当数据被复制到像 DuckDB 这样的事务性存储时,数据管道有助于对这些重复的 CDC 事件进行去重。
最后,数据管道将 CDC 流已复制到接收器的日志序列号(LSN)报告回 PostgresSource。这允许 Postgres 数据库通过移除数据接收器不再需要的 WAL 段文件来回收磁盘空间。
+----------+ +----------+
| | | |
| 数据源 |<---- LSN 编号 -----| 接收器 |
| | | |
+----------+ +----------+
CDC 流不是数据管道执行的唯一数据类型。还有全表复制,也称为回填。这两种类型可以一起执行,也可以分开执行。例如,一次性数据复制可以使用回填。但如果你想定期从 Postgres 复制数据到 OLAP 数据库,就应该同时使用回填和 CDC 流。回填用于获取数据的初始副本,CDC 流用于保持这些副本随着 Postgres 中复制表的变化而更新。
目前,数据源和接收器一次复制一个表行和 CDC 事件。这预计会很慢。批处理和其他策略可能会大大提高性能。但在这个早期阶段,重点是正确性而非性能。目前还没有任何基准测试,所以关于性能的评论更接近推测而非现实。


免费创建高清无水印Sora视频
Vora是一个免费创建高清无水印Sora视频的AI工具


最适合小白的AI自动化工作流平台
无需编码,轻松生成可复用、可变现的AI自动化工作流

大模型驱动的Excel数据处理工具
基于大模型交互的表格处理系统,允许用户通过对话方式完成数据整理和可视化分析。系统采用机器学习算法解析用户指令,自动执行排序、公式计算和数据透视等操作,支持多种文件格式导入导出。数据处理响应速度保持在0.8秒以内,支持超过100万行数据的即时分析。


AI辅助编程,代码自动修复
Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。


AI论文写作指导平台
AIWritePaper论文写作是一站式AI论文写作辅助工具,简化了选题、文献检索至论文撰写的整个过程。通过简单设定,平台可快速生成高质量论文大纲和全文, 配合图表、参考文献等一应俱全,同时提供开题报告和答辩PPT等增值服务,保障数据安全,有效提升写作效率和论文质量。


AI一键生成PPT,就用博思AIPPT!
博思AIPPT,新一代的AI生成PPT平台,支持智能生成PPT、AI美化PPT、文本&链接生成PPT、导入Word/PDF/Markdown文档生成PPT等,内置海量精美PPT模板,涵盖商务、教育、科技等不同风格,同时针对每个页面提供多种版式,一键自适应切换,完美适配各种办公场景。


AI赋能电商视觉革命,一站式智能商拍平台
潮际好麦深耕服装行业,是国内AI试衣效果最好的软件。使用先进AIGC能力为电商卖家批量提供优质的、低成本的商拍图。合作品牌有Shein、Lazada、安踏、百丽等65个国内外头部品牌,以及国内10万+淘宝、天猫、京东等主流平台的品牌商家,为卖家节省将近85%的出图成本,提升约3倍出图效率,让品牌能够快速上架。


企业专属的AI法律顾问
iTerms是法大大集团旗下法律子品牌,基于最先进的大语言模型(LLM)、专业的法律知识库和强大的智能体架构,帮助企业扫清合规障碍,筑牢风控防线,成为您企业专属的AI法律顾问。


稳定高效的流量提升解决方案,助力品牌曝光
稳定高效的流量提升解决方案,助力品牌曝光


最新版Sora2模型免费使用,一键生成无水印视频
最新版Sora2模型免费使用,一键生成无水印视频