kafka-go

kafka-go

Go语言开发的高性能Kafka客户端库

kafka-go是一款Go语言开发的高性能Kafka客户端库,提供低级和高级API与Kafka交互。该库实现Go标准库接口,易于使用和集成,支持生产者、消费者和消费者组功能。它具备自动重试、重连和偏移量管理等特性,适用于Kafka 0.10.1.0至2.7.1版本,需要Go 1.15或更高版本。

kafka-goKafkaGo消息队列分布式系统Github开源项目

kafka-go CircleCI Go Report Card GoDoc

动机

在Segment,我们大量依赖Go和Kafka。不幸的是,在撰写本文时,Go的Kafka客户端库的状态并不理想。可用的选择有:

  • sarama,是目前最受欢迎的库,但使用起来相当困难。文档不足,API暴露了Kafka协议的低级概念,并且不支持Go的最新特性,如contexts。它还将所有值作为指针传递,这会导致大量动态内存分配、更频繁的垃圾收集和更高的内存使用。

  • confluent-kafka-go是基于librdkafka的cgo包装器,这意味着它为所有使用该包的Go代码引入了对C库的依赖。它的文档比sarama好得多,但仍然缺乏对Go contexts的支持。

  • goka是一个较新的Go Kafka客户端,专注于特定的使用模式。它提供了将Kafka用作服务之间的消息传递总线而不是有序事件日志的抽象,但这不是我们在Segment典型的Kafka使用场景。该包还依赖sarama进行所有与Kafka的交互。

这就是kafka-go发挥作用的地方。它提供了与Kafka交互的低级和高级API,反映了Go标准库的概念并实现了接口,使其易于使用和与现有软件集成。

注意:

为了更好地符合我们新采用的行为准则,kafka-go项目已将默认分支重命名为main。有关行为准则的完整详细信息,请参阅此文档

Kafka版本

kafka-go目前已通过Kafka 0.10.1.0至2.7.1版本的测试。虽然它应该与更高版本兼容,但Kafka API中可用的较新功能可能尚未在客户端中实现。

Go版本

kafka-go需要Go 1.15或更高版本。

连接 GoDoc

Conn类型是kafka-go包的核心。它封装了与Kafka服务器的原始网络连接,以提供低级API。

以下是展示连接对象典型用法的一些示例:

// 生产消息 topic := "my-topic" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal("连接leader失败:", err) } conn.SetWriteDeadline(time.Now().Add(10*time.Second)) _, err = conn.WriteMessages( kafka.Message{Value: []byte("一!")}, kafka.Message{Value: []byte("二!")}, kafka.Message{Value: []byte("三!")}, ) if err != nil { log.Fatal("写入消息失败:", err) } if err := conn.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }
// 消费消息 topic := "my-topic" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal("连接leader失败:", err) } conn.SetReadDeadline(time.Now().Add(10*time.Second)) batch := conn.ReadBatch(10e3, 1e6) // 获取最小10KB,最大1MB b := make([]byte, 10e3) // 每条消息最大10KB for { n, err := batch.Read(b) if err != nil { break } fmt.Println(string(b[:n])) } if err := batch.Close(); err != nil { log.Fatal("关闭批处理失败:", err) } if err := conn.Close(); err != nil { log.Fatal("关闭连接失败:", err) }

创建主题

默认情况下,Kafka的auto.create.topics.enable设置为'true'(在bitnami/kafka的Kafka Docker镜像中为KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true')。如果此值设置为'true',则主题将作为kafka.DialLeader的副作用被创建,如下所示:

// 当auto.create.topics.enable='true'时创建主题 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0) if err != nil { panic(err.Error()) }

如果auto.create.topics.enable='false',则需要显式创建主题,如下所示:

// 当auto.create.topics.enable='false'时创建主题 topic := "my-topic" conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() controller, err := conn.Controller() if err != nil { panic(err.Error()) } var controllerConn *kafka.Conn controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer controllerConn.Close() topicConfigs := []kafka.TopicConfig{ { Topic: topic, NumPartitions: 1, ReplicationFactor: 1, }, } err = controllerConn.CreateTopics(topicConfigs...) if err != nil { panic(err.Error()) }

通过非leader连接连接到leader

// 通过现有的非leader连接连接到Kafka leader,而不是使用DialLeader conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() controller, err := conn.Controller() if err != nil { panic(err.Error()) } var connLeader *kafka.Conn connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer connLeader.Close()

列出主题

conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() partitions, err := conn.ReadPartitions() if err != nil { panic(err.Error()) } m := map[string]struct{}{} for _, p := range partitions { m[p.Topic] = struct{}{} } for k := range m { fmt.Println(k) }

由于它是低级的,Conn类型成为构建更高级抽象的绝佳基础,例如Reader

Reader GoDoc

Readerkafka-go包暴露的另一个概念,旨在简化从单个主题-分区对消费的典型用例。Reader还自动处理重新连接和偏移量管理,并暴露一个支持使用Go contexts进行异步取消和超时的API。

请注意,在进程退出时调用ReaderClose()方法很重要。Kafka服务器需要优雅断开连接,以停止继续尝试向已连接的客户端发送消息。如果进程被SIGINT(在shell中按ctrl-c)或SIGTERM(如docker stop或kubernetes重启所做的)终止,给定的示例将不会调用Close()。这可能会导致新的reader连接到同一主题时出现延迟(例如,启动新进程或运行新容器)。使用signal.Notify处理程序在进程关闭时关闭reader。

// 创建一个新的读取器,从主题A的分区0的偏移量42开始消费 r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, Topic: "topic-A", Partition: 0, MaxBytes: 10e6, // 10MB }) r.SetOffset(42) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("偏移量 %d 的消息: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("关闭读取器失败:", err) }

消费者组

kafka-go 也支持Kafka消费者组,包括由代理管理的偏移量。 要启用消费者组,只需在ReaderConfig中指定GroupID。

使用消费者组时,ReadMessage会自动提交偏移量。

// 创建一个新的读取器,消费主题A r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MaxBytes: 10e6, // 10MB }) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("关闭读取器失败:", err) }

使用消费者组时有一些限制:

  • 当设置GroupID时,(*Reader).SetOffset 将返回错误
  • 当设置GroupID时,(*Reader).Offset 将始终返回 -1
  • 当设置GroupID时,(*Reader).Lag 将始终返回 -1
  • 当设置GroupID时,(*Reader).ReadLag 将返回错误
  • 当设置GroupID时,(*Reader).Stats 将返回分区 -1

显式提交

kafka-go 也支持显式提交。不调用 ReadMessage, 而是调用 FetchMessage 然后调用 CommitMessages

ctx := context.Background() for { m, err := r.FetchMessage(ctx) if err != nil { break } fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) if err := r.CommitMessages(ctx, m); err != nil { log.Fatal("提交消息失败:", err) } }

在消费者组中提交消息时,给定主题/分区中偏移量最高的消息 决定了该分区提交的偏移量值。例如,如果通过调用 FetchMessage 检索到单个分区的偏移量为1、2和3的消息,那么用偏移量为3的消息 调用 CommitMessages 也会导致该分区的偏移量1和2的消息被提交。

管理提交

默认情况下,CommitMessages会同步地将偏移量提交到Kafka。为了 提高性能,你可以通过在ReaderConfig上设置CommitInterval来 定期将偏移量提交到Kafka。

// 创建一个新的读取器,消费主题A r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MaxBytes: 10e6, // 10MB CommitInterval: time.Second, // 每秒向Kafka刷新提交 })

写入器 GoDoc

要向Kafka生产消息,程序可以使用低级别的 Conn API,但 该包还提供了更高级别的 Writer 类型,在大多数情况下更适合 使用,因为它提供了额外的功能:

  • 自动重试和在错误时重新连接。
  • 可配置的消息在可用分区间的分布。
  • 同步或异步向Kafka写入消息。
  • 使用上下文进行异步取消。
  • 关闭时刷新待处理的消息以支持优雅关闭。
  • 在发布消息之前创建缺失的主题。注意! 这是直到版本 v0.4.30 的默认行为。
// 创建一个写入器,生产到主题A,使用最少字节分布 w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.LeastBytes{}, } err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("写入消息失败:", err) } if err := w.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }

在发布之前创建缺失的主题

// 创建一个写入器,发布消息到主题A。 // 如果主题不存在,将会被创建。 w := &Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", AllowAutoTopicCreation: true, } messages := []kafka.Message{ { Key: []byte("Key-A"), Value: []byte("Hello World!"), }, { Key: []byte("Key-B"), Value: []byte("One!"), }, { Key: []byte("Key-C"), Value: []byte("Two!"), }, } var err error const retries = 3 for i := 0; i < retries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 在发布消息之前尝试创建主题 err = w.WriteMessages(ctx, messages...) if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } if err != nil { log.Fatalf("意外错误 %v", err) } break } if err := w.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }

写入多个主题

通常,WriterConfig.Topic 用于初始化单一主题的写入器。 通过排除该特定配置,你可以通过设置 Message.Topic 来 定义每条消息的主题。

w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), // 注意:当这里没有定义Topic时,每条Message必须定义它。 Balancer: &kafka.LeastBytes{}, } err := w.WriteMessages(context.Background(), // 注意:每条Message都定义了Topic,否则将返回错误。 kafka.Message{ Topic: "topic-A", Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Topic: "topic-B", Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Topic: "topic-C", Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("写入消息失败:", err) } if err := w.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }

注意: 这两种模式是互斥的,如果你设置了 Writer.Topic, 你就不能在写入的消息上显式定义 Message.Topic。当你没有为写入器 定义主题时,相反的情况也适用。如果 Writer 检测到这种歧义, 将返回错误。

与其他客户端的兼容性

Sarama

如果你从Sarama切换过来,并且需要/想要使用相同的消息分区算法,你可以使用 kafka.Hash 平衡器或 kafka.ReferenceHash 平衡器:

  • kafka.Hash = sarama.NewHashPartitioner
  • kafka.ReferenceHash = sarama.NewReferenceHashPartitioner kafka.Hashkafka.ReferenceHash均衡器会将消息路由到与前述两个Sarama分区器相同的分区。
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, }

librdkafka和confluent-kafka-go

使用kafka.CRC32Balancer均衡器可以获得与librdkafka的默认consistent_random分区策略相同的行为。

w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: kafka.CRC32Balancer{}, }

Java

使用kafka.Murmur2Balancer均衡器可以获得与标准Java客户端默认分区器相同的行为。注意:Java类允许直接指定分区,这在这里是不允许的。

w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: kafka.Murmur2Balancer{}, }

压缩

可以通过设置Compression字段在Writer上启用压缩:

w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Compression: kafka.Snappy, }

Reader会通过检查消息属性来确定消费的消息是否被压缩。但是,必须导入所有预期编解码器的包,以确保它们正确加载。

注意:在0.4版本之前,程序必须导入压缩包以安装编解码器并支持从kafka读取压缩消息。现在不再需要这样做,压缩包的导入现在是无操作的。

TLS支持

对于基本的Conn类型或在Reader/Writer配置中,你可以指定一个dialer选项来支持TLS。如果TLS字段为nil,它将不会使用TLS连接。 *注意:*如果在Conn/Reader/Writer上未配置TLS而连接到启用了TLS的Kafka集群,可能会导致不透明的io.ErrUnexpectedEOF错误。

连接

dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls配置...}, } conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls配置...}, } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, })

Writer

直接创建Writer

w := kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: &kafka.Transport{ TLS: &tls.Config{}, }, }

使用kafka.NewWriter

dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls配置...}, } w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "topic-A", Balancer: &kafka.Hash{}, Dialer: dialer, })

注意kafka.NewWriterkafka.WriterConfig已被弃用,将在未来版本中移除。

SASL支持

你可以在Dialer上指定一个选项来使用SASL认证。Dialer可以直接用于打开Conn,也可以通过各自的配置传递给ReaderWriter。如果SASLMechanism字段为nil,它将不会使用SASL进行认证。

SASL认证类型

Plain

mechanism := plain.Mechanism{ Username: "username", Password: "password", }

SCRAM

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) }

连接

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, } conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")

Reader

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, })

Writer

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } // Transport负责管理连接池和其他资源, // 通常最好创建几个并在应用程序中共享它们。 sharedTransport := &kafka.Transport{ SASL: mechanism, } w := kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: sharedTransport, }

Client

mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } // Transport负责管理连接池和其他资源, // 通常最好创建几个并在应用程序中共享它们。 sharedTransport := &kafka.Transport{ SASL: mechanism, } client := &kafka.Client{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Timeout: 10 * time.Second, Transport: sharedTransport, }

读取指定时间范围内的所有消息

startTime := time.Now().Add(-time.Hour) endTime := time.Now() batchSize := int(10e6) // 10MB r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, MaxBytes: batchSize, }) r.SetOffsetAt(context.Background(), startTime) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } if m.Time.After(endTime) { break } // TODO: 处理消息 fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("failed to close reader:", err) }

日志记录

为了查看Reader/Writer类型的操作,在创建时配置一个日志记录器。

Reader

func logf(msg string, a ...interface{}) { fmt.Printf(msg, a...) fmt.Println() } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, Logger: kafka.LoggerFunc(logf), ErrorLogger: kafka.LoggerFunc(logf), })

Writer

func logf(msg string, a ...interface{}) { fmt.Printf(msg, a...) fmt.Println() } w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "topic", Logger: kafka.LoggerFunc(logf), ErrorLogger: kafka.LoggerFunc(logf), }

测试

较新版本的Kafka中微妙的行为变化导致一些历史测试失败,如果您正在运行Kafka 2.3.1或更高版本,导出KAFKA_SKIP_NETTEST=1环境变量将跳过这些测试。

在Docker中本地运行Kafka

docker-compose up -d

运行测试

KAFKA_VERSION=2.3.1 \ KAFKA_SKIP_NETTEST=1 \ go test -race ./...

(或者)清理缓存的测试结果并运行测试:

go clean -cache && make test

编辑推荐精选

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

热门AI工具AI办公办公工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

模型训练热门AI工具内容创作智能问答AI开发讯飞星火大模型多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

Trae

Trae

字节跳动发布的AI编程神器IDE

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

热门AI工具生产力协作转型TraeAI IDE
咔片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

AI助手热门AI工具AI创作AI辅助写作讯飞绘文内容运营个性化文章多平台分发
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

openai-agents-python

openai-agents-python

OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。

openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。

Hunyuan3D-2

Hunyuan3D-2

高分辨率纹理 3D 资产生成

Hunyuan3D-2 是腾讯开发的用于 3D 资产生成的强大工具,支持从文本描述、单张图片或多视角图片生成 3D 模型,具备快速形状生成能力,可生成带纹理的高质量 3D 模型,适用于多个领域,为 3D 创作提供了高效解决方案。

3FS

3FS

一个具备存储、管理和客户端操作等多种功能的分布式文件系统相关项目。

3FS 是一个功能强大的分布式文件系统项目,涵盖了存储引擎、元数据管理、客户端工具等多个模块。它支持多种文件操作,如创建文件和目录、设置布局等,同时具备高效的事件循环、节点选择和协程池管理等特性。适用于需要大规模数据存储和管理的场景,能够提高系统的性能和可靠性,是分布式存储领域的优质解决方案。

下拉加载更多