在Segment,我们大量依赖Go和Kafka。不幸的是,在撰写本文时,Go的Kafka客户端库的状态并不理想。可用的选择有:
sarama,是目前最受欢迎的库,但使用起来相当困难。文档不足,API暴露了Kafka协议的低级概念,并且不支持Go的最新特性,如contexts。它还将所有值作为指针传递,这会导致大量动态内存分配、更频繁的垃圾收集和更高的内存使用。
confluent-kafka-go是基于librdkafka的cgo包装器,这意味着它为所有使用该包的Go代码引入了对C库的依赖。它的文档比sarama好得多,但仍然缺乏对Go contexts的支持。
goka是一个较新的Go Kafka客户端,专注于特定的使用模式。它提供了将Kafka用作服务之间的消息传递总线而不是有序事件日志的抽象,但这不是我们在Segment典型的Kafka使用场景。该包还依赖sarama进行所有与Kafka的交互。
这就是kafka-go
发挥作用的地方。它提供了与Kafka交互的低级和高级API,反映了Go标准库的概念并实现了接口,使其易于使用和与现有软件集成。
为了更好 地符合我们新采用的行为准则,kafka-go项目已将默认分支重命名为main
。有关行为准则的完整详细信息,请参阅此文档。
kafka-go
目前已通过Kafka 0.10.1.0至2.7.1版本的测试。虽然它应该与更高版本兼容,但Kafka API中可用的较新功能可能尚未在客户端中实现。
kafka-go
需要Go 1.15或更高版本。
Conn
类型是kafka-go
包的核心。它封装了与Kafka服务器的原始网络连接,以提供低级API。
以下是展示连接对象典型用法的一些示例:
// 生产消息 topic := "my-topic" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal("连接leader失败:", err) } conn.SetWriteDeadline(time.Now().Add(10*time.Second)) _, err = conn.WriteMessages( kafka.Message{Value: []byte("一!")}, kafka.Message{Value: []byte("二!")}, kafka.Message{Value: []byte("三!")}, ) if err != nil { log.Fatal("写入消息失败:", err) } if err := conn.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }
// 消费消息 topic := "my-topic" partition := 0 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition) if err != nil { log.Fatal("连接leader失败:", err) } conn.SetReadDeadline(time.Now().Add(10*time.Second)) batch := conn.ReadBatch(10e3, 1e6) // 获取最小10KB,最大1MB b := make([]byte, 10e3) // 每条消息最大10KB for { n, err := batch.Read(b) if err != nil { break } fmt.Println(string(b[:n])) } if err := batch.Close(); err != nil { log.Fatal("关闭批处理失败:", err) } if err := conn.Close(); err != nil { log.Fatal("关闭连接失败:", err) }
默认情况下,Kafka的auto.create.topics.enable
设置为'true'
(在bitnami/kafka的Kafka Docker镜像中为KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE='true'
)。如果此值设置为'true'
,则主题将作为kafka.DialLeader
的副作用被创建,如下所示:
// 当auto.create.topics.enable='true'时创建主题 conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", "my-topic", 0) if err != nil { panic(err.Error()) }
如果auto.create.topics.enable='false'
,则需要显式创建主题,如下所示:
// 当auto.create.topics.enable='false'时创建主题 topic := "my-topic" conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() controller, err := conn.Controller() if err != nil { panic(err.Error()) } var controllerConn *kafka.Conn controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer controllerConn.Close() topicConfigs := []kafka.TopicConfig{ { Topic: topic, NumPartitions: 1, ReplicationFactor: 1, }, } err = controllerConn.CreateTopics(topicConfigs...) if err != nil { panic(err.Error()) }
// 通过现有的非leader连接连接到Kafka leader,而不是使用DialLeader conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() controller, err := conn.Controller() if err != nil { panic(err.Error()) } var connLeader *kafka.Conn connLeader, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) if err != nil { panic(err.Error()) } defer connLeader.Close()
conn, err := kafka.Dial("tcp", "localhost:9092") if err != nil { panic(err.Error()) } defer conn.Close() partitions, err := conn.ReadPartitions() if err != nil { panic(err.Error()) } m := map[string]struct{}{} for _, p := range partitions { m[p.Topic] = struct{}{} } for k := range m { fmt.Println(k) }
由于它是低级的,Conn
类型成为构建更高级抽象的绝佳基础,例如Reader
。
Reader
是kafka-go
包暴露的另一个概念,旨在简化从单个主题-分区对消费的典型用例。Reader
还自动处理重新连接和偏移量管理,并暴露一个支持使用Go contexts进行异步取消和超时的API。
请注意,在进程退出时调用Reader
的Close()
方法很重要。Kafka服务器需要优雅断开连接,以停止继续尝试向已连接的客户端发送消息。如果进程被SIGINT(在shell中按ctrl-c)或SIGTERM(如docker stop或kubernetes重启所做的)终止,给定的示例将不会调用Close()
。这可能会导致新的reader连接到同一主题时出现延迟(例如,启动新进程或运行新容器)。使用signal.Notify
处理程序在进程关闭时关闭reader。
// 创建一个新的读取器,从主题A的分区0的偏移量42开始消费 r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, Topic: "topic-A", Partition: 0, MaxBytes: 10e6, // 10MB }) r.SetOffset(42) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("偏移量 %d 的消息: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("关闭读取器失败:", err) }
kafka-go
也支持Kafka消费者组,包括由代理管理的偏移量。
要启用消费者组,只需在ReaderConfig中指定GroupID。
使用消费者组时,ReadMessage会自动提交偏移量。
// 创建一个新的读取器,消费主题A r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MaxBytes: 10e6, // 10MB }) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("关闭读取器失败:", err) }
使用消费者组时有一些限制:
(*Reader).SetOffset
将返回错误(*Reader).Offset
将始终返回 -1
(*Reader).Lag
将始终返回 -1
(*Reader).ReadLag
将返回错误(*Reader).Stats
将返回分区 -1
kafka-go
也支持显式提交。不调用 ReadMessage
,
而是调用 FetchMessage
然后调用 CommitMessages
。
ctx := context.Background() for { m, err := r.FetchMessage(ctx) if err != nil { break } fmt.Printf("主题/分区/偏移量 %v/%v/%v 的消息: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value)) if err := r.CommitMessages(ctx, m); err != nil { log.Fatal("提交消息失败:", err) } }
在消费者组中提交消息时,给定主题/分区中偏移量最高的消息
决定了该分区提交的偏移量值。例如,如果通过调用 FetchMessage
检索到单个分区的偏移量为1、2和3的消息,那么用偏移量为3的消息
调用 CommitMessages
也会导致该分区的偏移量1和2的消息被提交。
默认情况下,CommitMessages会同步地将偏移量提交到Kafka。为了 提高性能,你可以通过在ReaderConfig上设置CommitInterval来 定期将偏移量提交到Kafka。
// 创建一个新的读取器, 消费主题A r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", MaxBytes: 10e6, // 10MB CommitInterval: time.Second, // 每秒向Kafka刷新提交 })
要向Kafka生产消息,程序可以使用低级别的 Conn
API,但
该包还提供了更高级别的 Writer
类型,在大多数情况下更适合
使用,因为它提供了额外的功能:
v0.4.30
的默认行为。// 创建一个写入器,生产到主题A,使用最少字节分布 w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.LeastBytes{}, } err := w.WriteMessages(context.Background(), kafka.Message{ Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("写入消息失败:", err) } if err := w.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }
// 创建一个写入器,发布消息到主题A。 // 如果主题不存在,将会被创建。 w := &Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", AllowAutoTopicCreation: true, } messages := []kafka.Message{ { Key: []byte("Key-A"), Value: []byte("Hello World!"), }, { Key: []byte("Key-B"), Value: []byte("One!"), }, { Key: []byte("Key-C"), Value: []byte("Two!"), }, } var err error const retries = 3 for i := 0; i < retries; i++ { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 在发布消息之前尝试创建主题 err = w.WriteMessages(ctx, messages...) if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { time.Sleep(time.Millisecond * 250) continue } if err != nil { log.Fatalf("意外错误 %v", err) } break } if err := w.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }
通常,WriterConfig.Topic
用于初始化单一主题的写入器。
通过排除该特定配置,你可以通过设置 Message.Topic
来
定义每条消息的主题。
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), // 注意:当这里没有定义Topic时,每条Message必须定义它。 Balancer: &kafka.LeastBytes{}, } err := w.WriteMessages(context.Background(), // 注意:每条Message都定义了Topic,否则将返回错误。 kafka.Message{ Topic: "topic-A", Key: []byte("Key-A"), Value: []byte("Hello World!"), }, kafka.Message{ Topic: "topic-B", Key: []byte("Key-B"), Value: []byte("One!"), }, kafka.Message{ Topic: "topic-C", Key: []byte("Key-C"), Value: []byte("Two!"), }, ) if err != nil { log.Fatal("写入消息失败:", err) } if err := w.Close(); err != nil { log.Fatal("关闭写入器失败:", err) }
注意: 这两种模式是互斥的,如果你设置了 Writer.Topic
,
你就不能在写入的消息上显式定义 Message.Topic
。当你没有为写入器
定义主题时,相反的情况也适用。如果 Writer
检测到这种歧义,
将返回错误。
如果你从Sarama切换过来,并且需要/想要使用相同的消息分区算法,你可以使用
kafka.Hash
平衡器或 kafka.ReferenceHash
平衡器:
kafka.Hash
= sarama.NewHashPartitioner
kafka.ReferenceHash
= sarama.NewReferenceHashPartitioner
kafka.Hash
和kafka.ReferenceHash
均衡器会将消息路由到与前述两个Sarama分区器相同的分区。w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, }
使用kafka.CRC32Balancer
均衡器可以获得与librdkafka的默认consistent_random
分区策略相同的行为。
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: kafka.CRC32Balancer{}, }
使用kafka.Murmur2Balancer
均衡器可以获得与标准Java客户端默认分区器相同的行为。注意:Java类允许直接指定分区,这在这里是不允许的。
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: kafka.Murmur2Balancer{}, }
可以通过设置Compression
字段在Writer
上启用压缩:
w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Compression: kafka.Snappy, }
Reader
会通过检查消息属性来确定消费的消息是否被压缩。但是,必须导入所有预期编解码器的包,以确保它们正确加载。
注意:在0.4版本之前,程序必须导入压缩包以安装编解码器并支持从kafka读取压缩消息。现在不再需要这样做,压缩包的导入现在是无操作的。
对于基本的Conn类型或在Reader/Writer配置中,你可以指定一个dialer选项来支持TLS。如果TLS字段为nil,它将不会使用TLS连接。 *注意:*如果在Conn/Reader/Writer上未配置TLS而连接到启用了TLS的Kafka集群,可能会导致不透明的io.ErrUnexpectedEOF错误。
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls配置...}, } conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls配置...}, } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, })
直接创建Writer
w := kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: &kafka.Transport{ TLS: &tls.Config{}, }, }
使用kafka.NewWriter
dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, TLS: &tls.Config{...tls配置...}, } w := kafka.NewWriter(kafka.WriterConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "topic-A", Balancer: &kafka.Hash{}, Dialer: dialer, })
注意kafka.NewWriter
和kafka.WriterConfig
已被弃用,将在未来版本中移除。
你可以在Dialer
上指定一个选项来使用SASL认证。Dialer
可以直接用于打开Conn
,也可以通过各自的配置传递给Reader
或Writer
。如果SASLMechanism
字段为nil
,它将不会使用SASL进行认证。
mechanism := plain.Mechanism{ Username: "username", Password: "password", }
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) }
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, } conn, err := dialer.DialContext(ctx, "tcp", "localhost:9093")
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } dialer := &kafka.Dialer{ Timeout: 10 * time.Second, DualStack: true, SASLMechanism: mechanism, } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092","localhost:9093", "localhost:9094"}, GroupID: "consumer-group-id", Topic: "topic-A", Dialer: dialer, })
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } // Transport负责管理连接池和其他资源, // 通常最好创建几个并在应用程序中共享它们。 sharedTransport := &kafka.Transport{ SASL: mechanism, } w := kafka.Writer{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Topic: "topic-A", Balancer: &kafka.Hash{}, Transport: sharedTransport, }
mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") if err != nil { panic(err) } // Transport负责管理连接池和其他资源, // 通常最好创建几个并在应用程序中共享它们。 sharedTransport := &kafka.Transport{ SASL: mechanism, } client := &kafka.Client{ Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"), Timeout: 10 * time.Second, Transport: sharedTransport, }
startTime := time.Now().Add(-time.Hour) endTime := time.Now() batchSize := int(10e6) // 10MB r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, MaxBytes: batchSize, }) r.SetOffsetAt(context.Background(), startTime) for { m, err := r.ReadMessage(context.Background()) if err != nil { break } if m.Time.After(endTime) { break } // TODO: 处理消息 fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } if err := r.Close(); err != nil { log.Fatal("failed to close reader:", err) }
为了查看Reader/Writer类型的操作,在创建时配置一个日志记录器。
func logf(msg string, a ...interface{}) { fmt.Printf(msg, a...) fmt.Println() } r := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"}, Topic: "my-topic1", Partition: 0, Logger: kafka.LoggerFunc(logf), ErrorLogger: kafka.LoggerFunc(logf), })
func logf(msg string, a ...interface{}) { fmt.Printf(msg, a...) fmt.Println() } w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "topic", Logger: kafka.LoggerFunc(logf), ErrorLogger: kafka.LoggerFunc(logf), }
较新版本的Kafka中微妙的行为变化导致一些历史测试失败,如果您正在运行Kafka 2.3.1或更高版本,导出KAFKA_SKIP_NETTEST=1
环境变量将跳过这些测试。
在Docker中本地运行Kafka
docker-compose up -d
运行测试
KAFKA_VERSION=2.3.1 \ KAFKA_SKIP_NETTEST=1 \ go test -race ./...
(或者)清理缓存的测试结果并运行测试:
go clean -cache && make test
一键生成PPT和Word,让学习生活更轻松
讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。
深度推理能力全新升级,全面对标OpenAI o1
科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。
一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型
Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能, 可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。
字节跳动发布的AI编程神器IDE
Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。
AI助力,做PPT更简单!
咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场 景演示需求。
选题、配图、成文,一站式创作,让内容运营更高效
讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。
专业的AI公文写作平台,公文写作神器
AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。
OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。
openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。
高分辨率纹理 3D 资产生成
Hunyuan3D-2 是腾讯开发的用于 3D 资产生成的强大工具,支持从文本描述、单张图片或多视角图片生成 3D 模型,具备快速形状生成能力,可生成带纹理的高质量 3D 模型,适用于多个领域,为 3D 创作提供了高效解决方案。
一个具备存储、管理和客户端操作等多种功能的分布式文件系统相关项目。
3FS 是一个功能强大的分布式文件系统项目,涵盖了存储引擎、元数据管理、客户端工具等多个模 块。它支持多种文件操作,如创建文件和目录、设置布局等,同时具备高效的事件循环、节点选择和协程池管理等特性。适用于需要大规模数据存储和管理的场景,能够提高系统的性能和可靠性,是分布式存储领域的优质解决方案。
最新AI工具、AI资讯
独家AI资源、AI项目落地
微信扫一扫关注公众号