pg_replicate
pg_replicate
是一个用于快速构建 Postgres 复制解决方案的 Rust crate。它提供了构建数据管道的基础模块,可以持续地将数据从 Postgres 复制到其他系统。它在 Postgres 的逻辑流复制协议之上构建抽象,引导用户走向成功之路,而无需担心协议的底层细节。
要快速尝试 pg_replicate
,您可以运行 stdout
示例,它会将数据复制到标准输出。首先,在 Postgres 中创建一个包含您想要复制的表的发布:
create publication my_publication
for table table1, table2;
然后运行 stdout
示例:
cargo run --example stdout -- --db-host localhost --db-port 5432 --db-name postgres --db-username postgres --db-password password cdc my_publication stdout_slot
在上面的示例中,pg_replicate
连接到运行在 localhost:5432
上的名为 postgres
的 Postgres 数据库,使用用户名 postgres
和密码 password
。槽名 stdout_slot
将由 pg_replicate
自动创建。
参考示例文件夹以运行除 stdout
之外的其他接收器的示例(目前仅支持 bigquery
和 duckdb
)。小提示:要查看所有命令行选项,可以不指定任何选项运行示例,例如 cargo run --example bigquery
将打印 bigquery
接收器的详细使用说明。
要在您的 Rust 项目中使用 pg_replicate
,请通过 Cargo.toml
中的 git 依赖添加它:
[dependencies] pg_replicate = { git = "https://github.com/supabase/pg_replicate" }
目前需要 git 依赖,因为 pg_replicate
尚未发布在 crates.io 上。您还需要添加 tokio 的依赖:
[dependencies] ... tokio = { version = "1.38" }
现在您的 main.rs
可以包含如下代码:
use std::error::Error; use pg_replicate::pipeline::{ data_pipeline::DataPipeline, sinks::stdout::StdoutSink, sources::postgres::{PostgresSource, TableNamesFrom}, PipelineAction, }; #[tokio::main] async fn main() -> Result<(), Box<dyn Error>> { let host = "localhost"; let port = 5432; let database = "postgres"; let username = "postgres"; let password = Some("password".to_string()); let slot_name = Some("my_slot".to_string()); let table_names = TableNamesFrom::Publication("my_publication".to_string()); // 创建 PostgresSource let postgres_source = PostgresSource::new( host, port, database, username, password, slot_name, table_names, ) .await?; // 创建 StdoutSink。这个接收器只是将接收到的事件打印到标准输出 let stdout_sink = StdoutSink; // 创建 `DataPipeline` 以连接源和接收器 let mut pipeline = DataPipeline::new(postgres_source, stdout_sink, PipelineAction::Both); // 启动 `DataPipeline` 开始从 Postgres 复制数据到标准输出 pipeline.start().await?; Ok(()) }
更多示例请参考源代码中的示例文件夹。
该仓库是一个 cargo 工作空间。每个子文件夹都是工作空间中的一个 crate。以下是每个 crate 的简要说明:
api
- 用于在云环境中托管 pg_replicate
的 REST API。pg_replicate
- 包含核心逻辑的主要库 crate。replicator
- 使用 pg_replicate
的二进制 crate。打包为 Docker 容器以用于云托管。pg_replicate
仍在积极开发中,所以可能会有 bug 和小问题,但随着时间的推移,我们计划添加以下接收器:
注意:DuckDb 和 MotherDuck 接收器不使用批处理管道,因此目前性能较差。计划开发这些接收器的批处理管道版本。
查看未解决的问题以获取提议功能的完整列表(以及已知问题)。
根据 Apache-2.0 许可证分发。有关更多信息,请参阅 LICENSE
。
要创建 replicator
的 Docker 镜像,请在仓库根目录运行 docker build -f ./replicator/Dockerfile .
。同样,要创建 api
的 Docker 镜像,请运行 docker build -f ./api/Dockerfile .
。
应用程序可以使用 pg_replicate
中的数据源和接收器来构建数据管道,以持续从源复制数据到接收器。例如,一个从 Postgres 复制数据到 DuckDB 的数据管道只需要约 100 行 Rust 代码。
数据管道中有三个组件:
数据源是数据将被复制的对象。数据接收器是数据将被复制到的对象。管道是驱动从源到接收器的数据复制操作的对象。
+----------+ +----------+
| | | |
| 数据源 |---- 数据管道 ----> | 接收器 |
| | | |
+----------+ +----------+
所以大致上你写的代码是这样的:
let postgres_source = PostgresSource::new(...); let duckdb_sink = DuckDbSink::new(..); let pipeline = DataPipeline(postgres_source, duckdb_sink); pipeline.start();
当然,实际代码不止这四行,但这是基本思路。完整示例请查看 duckdb 示例。
数据源是管道将复制到数据接收器的数据来源。目前,仓库只有一个数据源:PostgresSource
。PostgresSource
是主要数据源;任何其他源或接收器中的数据都源自它。
数据接收器是数据源的数据被复制到的地方。有两种类型的数据接收器。一种保留从 PostgresSource
输出的数据本质特性,另一种则不保留。前者可以在未来充当数据源,后者不能充当数据源,是数据的最终存放地。
例如,DuckDbSink
确保从源进来的变更数据捕获(CDC)流被物化为 DuckDB 数据库中的表。一旦完成这种有损数据转换,它就不能再用作 CDC 流。
相比之下,潜在的未来接收器 S3Sink
或 KafkaSink
只是按原样复制 CDC 流。存放在接收器中的数据稍后可以像直接从 Postgres 来的一样使用。
数据管道封装了从源到接收器复制数据的业务逻辑。它还协调从上次停止的确切位置恢复 CDC 流。数据接收器通过持久化恢复状态并在重启时将其返回给管道来参与这个过程。
如果数据接收器不是事务性的(例如 S3Sink
),就 不总是可能保持 CDC 流和恢复状态之间的一致性。这可能导致这些非事务性接收器有重复的 CDC 流部分。当数据被复制到像 DuckDB 这样的事务性存储时,数据管道有助于对这些重复的 CDC 事件进行去重。
最后,数据管道将 CDC 流已复制到接收器的日志序列号(LSN)报告回 PostgresSource
。这允许 Postgres 数据库通过移除数据接收器不再需要的 WAL 段文件来回收磁盘空间。
+----------+ +----------+
| | | |
| 数据源 |<---- LSN 编号 -----| 接收器 |
| | | |
+----------+ +----------+
CDC 流不是数据管道执行的唯一数据类型。还有全表复制,也称为回填。这两种类型可以一起执行,也可以分开执行。例如,一次性数据复制可以使用回填。但如果你想定期从 Postgres 复制数据到 OLAP 数据库,就应该同时使用回填和 CDC 流。回填用于获取数据的初始副本,CDC 流用于保持这些副本随着 Postgres 中复制表的变化而更新。
目前,数据源和接收器一次复制一个表行和 CDC 事件。这预计会很慢。批处理和其他策略可能会大大提高性能。但在这个早期阶段,重点是正确性而非性能。目前还没有任何基准测试,所以关于性能的评论更接近推测而非现实。
字节跳动发布的AI编程神器IDE
Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。
全能AI智能助手,随时解答生活与工作的多样问题
问小白,由元石科技研发的AI智能助手,快速准确地解答各种生活和工作问题,包括但不限于搜索、规划和社交互动,帮助用户在日常生活中提高效率,轻松管理个人事务。
实时语音翻译/同声传译工具
Transly是一个多场景的AI大语言模型驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。
一键生成PPT和Word,让学习生活更轻松
讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。
深度推理能力全新升级,全面对标OpenAI o1
科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。
一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型
Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项 目适用于多种场景,如有声读物制作、智能语音助手开发等。
AI助力,做PPT更简单!
咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。
选题、配图、成文,一站式创作,让内容运营更高效
讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。
专业的AI公文写作平台,公文写作神器
AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。
OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。
openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。
最新AI工具、AI资讯
独家AI资源、AI项目落地
微信扫一扫关注公众号