一个快速的 Golang Redis 客户端,具有自动流水线功能并支持服务器辅助的客户端缓存。
package main import ( "context" "github.com/redis/rueidis" ) func main() { client, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}}) if err != nil { panic(err) } defer client.Close() ctx := context.Background() // SET key val NX err = client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() // HGETALL hm hm, err := client.Do(ctx, client.B().Hgetall().Key("hm").Build()).AsStrMap() }
查看更多示例:命令响应速查表
client.B()
是构建 Redis 命令的入口点:
<sub>由 @FZambia 录制 使用 Rueidis Go 库提高 Centrifugo Redis 引擎的吞吐量和分配效率
</sub>
构建命令后,使用 client.Do()
或 client.DoMulti()
将其发送到 Redis。
你❗️不应该❗️在另一个 client.Do()
或 client.DoMulti()
调用中重复使用命令,因为默认情况下它已被回收到底层的 sync.Pool
中。
要重复使用命令,请在 Build()
后使用 Pin()
,这将防止命令被回收。
所有并发的非阻塞 Redis 命令(如 GET
、SET
)都会自动进行流水线处理,这减少了整体的往返次数和系统调用,从而获得更高的吞吐量。你可以通过从多个 goroutine 并发调用 client.Do()
来轻松获得流水线技术的好处。例如:
func BenchmarkPipelining(b *testing.B, client rueidis.Client) { // 以下 client.Do() 操作将从多个 goroutine 发出, // 因此会自动进行流水线处理。 b.RunParallel(func(pb *testing.PB) { for pb.Next() { client.Do(context.Background(), client.B().Get().Key("k").Build()).ToString() } }) }
与 go-redis 相比,Rueidis 在 1、8 和 64 并行度设置下都有更高的吞吐量。
在 Macbook Pro 16" M1 Pro 2021 的本地基准测试中,它甚至能够达到比 go-redis 高出 ~14 倍 的吞吐量。(参见 parallelism(64)-key(16)-value(64)-10
)
基准测试源代码:https://github.com/rueian/rueidis-benchmark
在两台 GCP n2-highcpu-2 机器上进行的基准测试结果也显示,rueidis 可以在更低延迟的情况下实现更高的吞吐量:https://github.com/redis/rueidis/pull/93
除了自动流水线外,你还可以使用 DoMulti()
手动进行流水线处理:
cmds := make(rueidis.Commands, 0, 10) for i := 0; i < 10; i++ { cmds = append(cmds, client.B().Set().Key("key").Value("value").Build()) } for _, resp := range client.DoMulti(ctx, cmds...) { if err := resp.Error(); err != nil { panic(err) } }
默认启用的服务器辅助客户端缓存的选择加入模式可以通过调用 DoCache()
或 DoMultiCache()
并指定客户端 TTL 来使用。
client.DoCache(ctx, client.B().Hmget().Key("mk").Field("1", "2").Cache(), time.Minute).ToArray() client.DoMultiCache(ctx, rueidis.CT(client.B().Get().Key("k1").Cache(), 1*time.Minute), rueidis.CT(client.B().Get().Key("k2").Cache(), 2*time.Minute))
缓存的响应,包括 Redis Nil,将在收到 Redis 服务器的通知或达到客户端 TTL 时被失效。有关更多详细信息,请参阅 https://github.com/redis/rueidis/issues/534。
服务器辅助的客户端缓存可以大幅提高延迟和吞吐量,就像在应用程序内部有一个 Redis 副本一样。例如:
基准测试源代码:https://github.com/rueian/rueidis-benchmark
使用 CacheTTL()
检查剩余的客户端缓存 TTL(以秒为单位):
client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).CacheTTL() == 60
使用 IsCacheHit()
验证响应是否来自客户端内存:
client.DoCache(ctx, client.B().Get().Key("k1").Cache(), time.Minute).IsCacheHit() == true
如果通过 rueidisotel.NewClient(option)
启用了 OpenTelemetry,则还会有两个指标被测量:
rueidis.MGetCache
和 rueidis.JsonMGetCache
是通过客户端缓存获取跨不同槽位的多个键的便捷辅助函数。它们首先按槽位分组键以构建 MGET
或 JSON.MGET
命令,然后仅将缓存未命中的键发送到 Redis 节点。
尽管默认是选择加入模式,但你可以通过在 ClientOption.ClientTrackingOptions
中指定前缀来使用广播模式:
client, err := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{"127.0.0.1:6379"}, ClientTrackingOptions: []string{"PREFIX", "prefix1:", "PREFIX", "prefix2:", "BCAST"}, }) if err != nil { panic(err) } client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == false client.DoCache(ctx, client.B().Get().Key("prefix1:1").Cache(), time.Minute).IsCacheHit() == true
请确保传递给 DoCache()
和 DoMultiCache()
的命令被你的前缀覆盖。否则,它们的客户端缓存将不会被 Redis 失效。
缓存旁路是一种广泛使用的缓存策略。 rueidisaside可以帮助你将数据缓存到由Redis支持的客户端缓存中。例如:
client, err := rueidisaside.NewClient(rueidisaside.ClientOption{ ClientOption: rueidis.ClientOption{InitAddress: []string{"127.0.0.1:6379"}}, }) if err != nil { panic(err) } val, err := client.Get(context.Background(), time.Minute, "mykey", func(ctx context.Context, key string) (val string, err error) { if err = db.QueryRowContext(ctx, "SELECT val FROM mytab WHERE id = ?", key).Scan(&val); err == sql.ErrNoRows { val = "_nil_" // 缓存nil以避免穿透。 err = nil // 在sql.ErrNoRows的情况下清除err。 } return }) // ...
请参考rueidisaside中的完整示例。
某些Redis提供商不支持客户端缓存,例如Google Cloud Memorystore。
你可以通过将ClientOption.DisableCache
设置为true
来禁用客户端缓存。
这也会将client.DoCache()
和client.DoMultiCache()
回退到client.Do()
和client.DoMulti()
。
如果上下文被取消或达到截止时间,client.Do()
、client.DoMulti()
、client.DoCache()
和client.DoMultiCache()
可以提前返回。
ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() client.Do(ctx, client.B().Set().Key("key").Value("val").Nx().Build()).Error() == context.DeadlineExceeded
请注意,虽然操作可以提前返回,但命令可能已经发送。
要接收来自频道的消息,应使用client.Receive()
。它支持SUBSCRIBE
、PSUBSCRIBE
和Redis 7.0的SSUBSCRIBE
:
err = client.Receive(context.Background(), client.B().Subscribe().Channel("ch1", "ch2").Build(), func(msg rueidis.PubSubMessage) { // 处理消息 })
提供的处理程序将使用接收到的消息进行调用。
重要的是要注意,client.Receive()
将在以下情况下一直阻塞直到返回一个值:
subscribe
命令相关的任何取消订阅/取消模式订阅消息时,返回nil
。rueidis.ErrClosing
。ctx
完成时,返回ctx.Err()
。subscribe
命令失败时,返回非nil的err
。虽然client.Receive()
调用是阻塞的,但Client
仍然能够接受其他并发请求,
它们共享同一个TCP连接。如果你的消息处理程序可能需要一些时间才能完成,建议
在client.Dedicated()
内使用client.Receive()
,以免阻塞其他并发请求。
client.Receive()
要求用户提前提供订阅命令。
还有一个替代方案Dedicatedclient.SetPubSubHooks()
,允许用户 稍后订阅/取消订阅频道。
c, cancel := client.Dedicate() defer cancel() wait := c.SetPubSubHooks(rueidis.PubSubHooks{ OnMessage: func(m rueidis.PubSubMessage) { // 处理消息。这个回调将在另一个goroutine中按顺序调用。 } }) c.Do(ctx, c.B().Subscribe().Channel("ch").Build()) err := <-wait // 断开连接并返回err
如果钩子不为nil,上面的wait
通道保证在钩子不再被调用时关闭,
并最多产生一个描述原因的错误。用户可以使用此通道检测断开连接。
要执行CAS事务(WATCH
+ MULTI
+ EXEC
),应该使用专用连接,因为在WATCH
和EXEC
之间不应有无意的写命令。否则,EXEC
可能不会按预期失败。
client.Dedicated(func(c rueidis.DedicatedClient) error { // 首先监视键 c.Do(ctx, c.B().Watch().Key("k1", "k2").Build()) // 在这里执行读取 c.Do(ctx, c.B().Mget().Key("k1", "k2").Build()) // 使用MULTI EXEC执行写入 c.DoMulti( ctx, c.B().Multi().Build(), c.B().Set().Key("k1").Value("1").Build(), c.B().Set().Key("k2").Value("2").Build(), c.B().Exec().Build(), ) return nil })
或者使用Dedicate()
并在完成时调用cancel()
将连接放回池中。
c, cancel := client.Dedicate() defer cancel() c.Do(ctx, c.B().Watch().Key("k1", "k2").Build()) // 使用占用连接的`client`执行剩余的CAS操作
然而,占用连接对吞吐量不利。最好使用Lua脚本来执行乐观锁定。
NewLuaScript
或NewLuaScriptReadOnly
将创建一个可安全并发使用的脚本。
调用script.Exec
时,它会首先尝试发送EVALSHA
,如果服务器返回NOSCRIPT
,则回退到EVAL
。
script := rueidis.NewLuaScript("return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}") // script.Exec可以安全地并发调用 list, err := script.Exec(ctx, client, []string{"k1", "k2"}, []string{"a1", "a2"}).ToArray()
client.DoStream()
和client.DoMultiStream()
可用于将大型Redis响应直接发送到io.Writer
,
而无需将它们分配到内存中。它们的工作原理是首先将命令发送到从池中获取的专用连接,
然后直接将响应值复制到给定的io.Writer
,最后回收连接。
s := client.DoMultiStream(ctx, client.B().Get().Key("a{slot1}").Build(), client.B().Get().Key("b{slot1}").Build()) for s.HasNext() { n, err := s.WriteTo(io.Discard) if rueidis.IsRedisNil(err) { // ... } }
注意,这两种方法在所有响应写入给定的io.Writer
之前会占用连接。
这可能需要很长时间并影响性能。除非你想避免为大型Redis响应分配内存,否则请使用普通的Do()
和DoMulti()
。
还要注意,这两种方法只适用于string
、integer
和float
类型的Redis响应。目前,DoMultiStream
在连接到Redis集群时不支持跨多个槽位进行键的流水线处理。
rueidis中的每个底层连接都为流水线分配一个环形缓冲区。
其大小由ClientOption.RingScaleEachConn
控制,默认值为10,结果是每个环的大小为2^10。
如果你有许多rueidis连接,你可能会发现它们占用了相当多的内存。
在这种情况下,你可以考虑将ClientOption.RingScaleEachConn
减少到8或9,但可能会导致潜在的吞吐量下降。
你也可以考虑将ClientOption.PipelineMultiplex
的值设置为-1
,这将让rueidis对每个Redis节点只使用1个连接进行流水线处理。
你可以使用NewClient
创建一个新的Redis客户端,并提供多个选项。
// 连接到单个Redis节点: client, err := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{"127.0.0.1:6379"}, }) // 连接到Redis集群 client, err := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"}, ShuffleInit: true, }) // 连接到Redis集群并使用副本进行读操作 client, err := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{"127.0.0.1:7001", "127.0.0.1:7002", "127.0.0.1:7003"}, SendToReplicas: func(cmd rueidis.Completed) bool { return cmd.IsReadOnly() }, }) // 连接到哨兵 client, err := rueidis.NewClient(rueidis.ClientOption{ InitAddress: []string{"127.0.0.1:26379", "127.0.0.1:26380", "127.0.0.1:26381"}, Sentinel: rueidis.SentinelOption{ MasterSet: "my_master", }, })
你可以使用 ParseURL
或 MustParseURL
来构造一个 ClientOption
。
提供的 URL 必须以 redis://
、rediss://
或 unix://
开头。
当前支持的 URL 参数有 db
、dial_timeout
、write_timeout
、addr
、protocol
、client_cache
、client_name
、max_retries
和 master_set
。
// 连接到 Redis 集群 client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:7001?addr=127.0.0.1:7002&addr=127.0.0.1:7003")) // 连接到 Redis 节点 client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:6379/0")) // 连接到 Redis 哨兵 client, err = rueidis.NewClient(rueidis.MustParseURL("redis://127.0.0.1:26379/0?master_set=my_master"))
如果你想构造命令构建器中没有的命令,可以使用 client.B().Arbitrary()
:
// 这将生成 [ANY CMD k1 k2 a1 a2] client.B().Arbitrary("ANY", "CMD").Keys("k1", "k2").Args("a1", "a2").Build()
[]byte
和向量相似度搜索命令构建器将所有参数视为 Redis 字符串,这些字符串是二进制安全的。这意味着用户可以直接将 []byte
存储到 Redis 中而无需转换。rueidis.BinaryString
辅助函数可以将 []byte
转换为 string
而无需复制。例如:
client.B().Set().Key("b").Value(rueidis.BinaryString([]byte{...})).Build()
将所有参数视为 Redis 字符串也意味着命令构建器不会自动为用户进行任何引用或转换。
在使用 RedisJSON 时,用户经常需要在 Redis 字符串中准备 JSON 字符串。rueidis.JSON
可以帮助实现这一点:
client.B().JsonSet().Key("j").Path("$.myStrField").Value(rueidis.JSON("str")).Build() // 等价于 client.B().JsonSet().Key("j").Path("$.myStrField").Value(`"str"`).Build()
在进行向量相似度搜索时,用户可以使用 rueidis.VectorString32
和 rueidis.VectorString64
来构建查询:
cmd := client.B().FtSearch().Index("idx").Query("*=>[KNN 5 @vec $V]"). Params().Nargs(2).NameValue().NameValue("V", rueidis.VectorString64([]float64{...})). Dialect(2).Build() n, resp, err := client.Do(ctx, cmd).AsFtSearch()
虽然命令构建器对开发者友好,但响应解析器稍显不友好。开发者必须预先知道服务器将返回什么类型的 Redis 响应,以及应该使用哪个解析器。
错误处理: 如果选择了不正确的解析器函数,将返回 errParse。以下是使用 ToArray 演示这种情况的示例:
// 尝试解析响应。如果发生解析错误,检查错误是否为解析错误并处理它。 // 通常,你应该通过选择正确的解析器函数来修复代码。 // 例如,如果预期响应是字符串,则使用 ToString(),如果预期响应是数组,则使用 ToArray(),如下所示: if err := client.Do(ctx, client.B().Get().Key("k").Build()).ToArray(); IsParseErr(err) { fmt.Println("解析错误:", err) }
要记住将返回什么类型的消息以及使用哪种解析方式是很困难的。因此,这里列出了一些常见的示例:
// GET client.Do(ctx, client.B().Get().Key("k").Build()).ToString() client.Do(ctx, client.B().Get().Key("k").Build()).AsInt64() // MGET client.Do(ctx, client.B().Mget().Key("k1", "k2").Build()).ToArray() // SET client.Do(ctx, client.B().Set().Key("k").Value("v").Build()).Error() // INCR client.Do(ctx, client.B().Incr().Key("k").Build()).AsInt64() // HGET client.Do(ctx, client.B().Hget().Key("k").Field("f").Build()).ToString() // HMGET client.Do(ctx, client.B().Hmget().Key("h").Field("a", "b").Build()).ToArray() // HGETALL client.Do(ctx, client.B().Hgetall().Key("h").Build()).AsStrMap() // EXPIRE client.Do(ctx, client.B().Expire().Key("k").Seconds(1).Build()).AsInt64() // HEXPIRE client.Do(ctx, client.B().Hexpire().Key("h").Seconds(1).Fields().Numfields(2).Field("f1", "f2").Build()).AsIntSlice() // ZRANGE client.Do(ctx, client.B().Zrange().Key("k").Min("1").Max("2").Build()).AsStrSlice() // ZRANK client.Do(ctx, client.B().Zrank().Key("k").Member("m").Build()).AsInt64() // ZSCORE client.Do(ctx, client.B().Zscore().Key("k").Member("m").Build()).AsFloat64() // ZRANGE client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Build()).AsStrSlice() client.Do(ctx, client.B().Zrange().Key("k").Min("0").Max("-1").Withscores().Build()).AsZScores() // ZPOPMIN client.Do(ctx, client.B().Zpopmin().Key("k").Build()).AsZScore() client.Do(ctx, client.B().Zpopmin().Key("myzset").Count(2).Build()).AsZScores() // SCARD client.Do(ctx, client.B().Scard().Key("k").Build()).AsInt64() // SMEMBERS client.Do(ctx, client.B().Smembers().Key("k").Build()).AsStrSlice() // LINDEX client.Do(ctx, client.B().Lindex().Key("k").Index(0).Build()).ToString() // LPOP client.Do(ctx, client.B().Lpop().Key("k").Build()).ToString() client.Do(ctx, client.B().Lpop().Key("k").Count(2).Build()).AsStrSlice() // SCAN client.Do(ctx, client.B().Scan().Cursor(0).Build()).AsScanEntry() // FT.SEARCH client.Do(ctx, client.B().FtSearch().Index("idx").Query("@f:v").Build()).AsFtSearch() // GEOSEARCH client.Do(ctx, client.B().Geosearch().Key("k").Fromlonlat(1, 1).Bybox(1).Height(1).Km().Build()).AsGeosearch()
当你想将数组结果扫描到特定结构体的切片中时,DecodeSliceOfJSON 非常有用。
type User struct { Name string `json:"name"` } // 设置一些值 if err = client.Do(ctx, client.B().Set().Key("user1").Value(`{"name": "name1"}`).Build()).Error(); err != nil { return err } if err = client.Do(ctx, client.B().Set().Key("user2").Value(`{"name": "name2"}`).Build()).Error(); err != nil { return err } // 将 MGET 结果扫描到 []*User 中 var users []*User // 或者 []User 也可以扫描 if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1", "user2").Build()), &users); err != nil { return err } for _, user := range users { fmt.Printf("%+v\n", user) } /* &{name:name1} &{name:name2} */
请确保结果中的所有值都具有相同的 JSON 结构。
// 设置一个纯字符 串值 if err = client.Do(ctx, client.B().Set().Key("user1").Value("userName1").Build()).Error(); err != nil { return err } // 错误做法 users := make([]*User, 0) if err := rueidis.DecodeSliceOfJSON(client.Do(ctx, client.B().Mget().Key("user1").Build()), &users); err != nil { return err } // -> 错误:在寻找值的开头时遇到无效字符 'u' // 在这种情况下,使用 client.Do(ctx, client.B().Mget().Key("user1").Build()).AsStrSlice()
欢迎贡献,包括问题、拉取请求和讨论。 贡献对我们意义重大,有助于改进这个库和社区!
命令构建器是基于 ./hack/cmds 中的定义通过运行以下命令生成的:
go generate
请使用./dockertest.sh脚本在本地运行测试用例。 并请尽最大努力确保代码更改的测试覆盖率达到100%。
AI小说写作助手,一站式润色、改写、扩写
蛙蛙写作—国内先进的AI写作平台,涵盖小说、学术、社交媒体等多场景。提供续写、改写、润色等功能,助力创作者高效优化写作流程。界面简洁,功能全面,适合各类写作者提升内容品质和工作效率。
字节跳动发布的AI编程神器IDE
Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。
全能AI智能助手,随时解答生活与工作的多样问题
问小白,由元石科技研发的AI智能助手,快速准确地解答各种生活和工作问题,包括但不限于搜索、规划和社交互动,帮助用户在日常生活中提高效率,轻松管理个人事务。
实时语音翻译/同声传译工具
Transly是一个多场景的AI大语言模型 驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。
一键生成PPT和Word,让学习生活更轻松
讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。
深度推理能力全新升级,全面对标OpenAI o1
科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。
一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型
Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。
AI助力,做PPT更简单!
咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。
选题、配图、成文,一站式创作,让内容运营更高效
讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。
专业的AI公文写作平台,公文写作神器
AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提 供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。
最新AI工具、AI资讯
独家AI资源、AI项目落地
微信扫一扫关注公众号