scalable-concurrent-containers

scalable-concurrent-containers

Rust并发编程高性能容器和工具库

scalable-concurrent-containers是一个Rust实现的高性能并发编程库。它提供了HashMap、HashSet、TreeIndex等并发容器,以及Queue、Stack等工具。这些组件支持异步操作,具有近线性扩展性,无自旋锁和忙等待。该库还支持Loom和Serde集成,并利用SIMD指令集优化查找性能。适用于要求高并发吞吐量的Rust项目。

并发容器异步编程高性能无锁线性扩展Github开源项目

可扩展并发容器

Cargo Crates.io GitHub Workflow Status

一个用于并发和异步编程的高性能容器和实用工具集合。

特性

  • 阻塞和同步方法的异步对应版本。
  • 支持LoomSerdefeatures = ["loom", "serde"]
  • 接近线性的可扩展性。
  • 无自旋锁和忙等循环。
  • 使用SIMD查找并行扫描多个条目1

并发和异步容器

  • HashMap是一个并发和异步哈希映射。
  • HashSet是一个并发和异步哈希集合。
  • HashIndex是一个针对读取优化的并发和异步哈希映射。
  • HashCache是一个由HashMap支持的32路关联缓存。
  • TreeIndex是一个针对读取优化的并发和异步B+树。

并发编程实用工具

  • LinkedList是一个实现无锁并发单向链表的类型特征。
  • Queue是一个并发无锁先进先出容器。
  • Stack是一个并发无锁后进先出容器。
  • Bag是一个并发无锁无序不透明容器。

HashMap

HashMap是一个并发哈希映射,针对高度并行的写入密集型工作负载进行了优化。HashMap的结构是一个无锁的条目桶数组栈。条目桶数组由sdd管理,从而实现无锁访问和非阻塞容器大小调整。每个桶是一个固定大小的条目数组,由一个特殊的读写锁保护,该锁提供阻塞和异步方法。

锁定行为

条目访问:细粒度锁定

对条目的读/写访问由包含该条目的桶中的读写锁序列化。没有容器级锁,因此,容器越大,桶级锁发生争用的机会就越小。

大小调整:无锁

HashMap的大小调整完全是非阻塞和无锁的;大小调整不会阻塞对容器的任何其他读/写访问或大小调整尝试。大小调整类似于将新的桶数组推入无锁栈。旧桶数组中的每个条目将在未来访问容器时逐步重新定位到新的桶数组,旧桶数组最终会在变空后被丢弃。

示例

如果键是唯一的,可以插入一个条目。插入的条目可以同步或异步地更新、读取和删除。

use scc::HashMap; let hashmap: HashMap<u64, u32> = HashMap::default(); assert!(hashmap.insert(1, 0).is_ok()); assert!(hashmap.insert(1, 1).is_err()); assert_eq!(hashmap.upsert(1, 1).unwrap(), 0); assert_eq!(hashmap.update(&1, |_, v| { *v = 3; *v }).unwrap(), 3); assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 3); assert_eq!(hashmap.remove(&1).unwrap(), (1, 3)); hashmap.entry(7).or_insert(17); assert_eq!(hashmap.read(&7, |_, v| *v).unwrap(), 17); let future_insert = hashmap.insert_async(2, 1); let future_remove = hashmap.remove_async(&1);

如果工作流程比较复杂,HashMapEntry API会很有用。

use scc::HashMap; let hashmap: HashMap<u64, u32> = HashMap::default(); hashmap.entry(3).or_insert(7); assert_eq!(hashmap.read(&3, |_, v| *v), Some(7)); hashmap.entry(4).and_modify(|v| { *v += 1 }).or_insert(5); assert_eq!(hashmap.read(&4, |_, v| *v), Some(5)); let future_entry = hashmap.entry_async(3);

HashMap不提供Iterator,因为不可能将Iterator::Item的生命周期限制在Iterator内。这个限制可以通过依赖内部可变性来规避,例如让返回的引用持有一个锁,但如果使用不当很容易导致死锁,而频繁获取锁可能会影响性能。因此,没有实现Iterator,相反,HashMap提供了一些方法来同步或异步地迭代条目:anyany_asyncpruneprune_asyncretainretain_asyncscanscan_asyncOccupiedEntry::nextOccupiedEntry::next_async

use scc::HashMap; let hashmap: HashMap<u64, u32> = HashMap::default(); assert!(hashmap.insert(1, 0).is_ok()); assert!(hashmap.insert(2, 1).is_ok()); // 可以通过`retain`修改或删除条目。 let mut acc = 0; hashmap.retain(|k, v_mut| { acc += *k; *v_mut = 2; true }); assert_eq!(acc, 3); assert_eq!(hashmap.read(&1, |_, v| *v).unwrap(), 2); assert_eq!(hashmap.read(&2, |_, v| *v).unwrap(), 2); // 一旦找到满足谓词的条目,`any`就会返回`true`。 assert!(hashmap.insert(3, 2).is_ok()); assert!(hashmap.any(|k, _| *k == 3)); // 可以通过`retain`删除多个条目。 hashmap.retain(|k, v| *k == 1 && *v == 2); // `hash_map::OccupiedEntry`也可以返回下一个最近的已占用条目。 let first_entry = hashmap.first_entry(); assert!(first_entry.is_some()); let second_entry = first_entry.and_then(|e| e.next()); assert!(second_entry.is_none()); // 使用`scan_async`异步迭代条目。 let future_scan = hashmap.scan_async(|k, v| println!("{k} {v}"));

HashSet

HashSetHashMap的一个特殊版本,其中值类型为()

示例

大多数HashSet方法与HashMap相同,只是它们不接收值参数,并且一些用于修改值的HashMap方法在HashSet中没有实现。

use scc::HashSet; let hashset: HashSet<u64> = HashSet::default(); assert!(hashset.read(&1, |_| true).is_none()); assert!(hashset.insert(1).is_ok()); assert!(hashset.read(&1, |_| true).unwrap()); let future_insert = hashset.insert_async(2); let future_remove = hashset.remove_async(&1);

HashIndex

HashIndexHashMap的一个针对读取优化的版本。在HashIndex中,不仅桶数组的内存由sdd管理,条目桶的内存也受sdd保护,从而实现对单个条目的无锁读取访问。

条目生命周期

HashIndex不会立即删除被移除的条目,而是在满足以下条件之一时才删除:

  1. 自上次在桶中移除条目以来,Epoch达到下一代,且桶被写入访问。
  2. HashIndex被清空或调整大小。
  3. 充满已移除条目的桶占用了容量的50%。

这些条件不保证被移除的条目会在确定的时间内被删除,因此如果工作负载以写入为主且条目大小较大,HashIndex可能不是最佳选择。

示例

peekpeek_with方法完全无锁。

use scc::HashIndex; let hashindex: HashIndex<u64, u32> = HashIndex::default(); assert!(hashindex.insert(1, 0).is_ok()); // `peek`和`peek_with`是无锁的。 assert_eq!(hashindex.peek_with(&1, |_, v| *v).unwrap(), 0); let future_insert = hashindex.insert_async(2, 1); let future_remove = hashindex.remove_if_async(&1, |_| true);

HashIndexEntry API可用于原地更新条目。

use scc::HashIndex; let hashindex: HashIndex<u64, u32> = HashIndex::default(); assert!(hashindex.insert(1, 1).is_ok()); if let Some(mut o) = hashindex.get(&1) { // 创建条目的新版本。 o.update(2); }; if let Some(mut o) = hashindex.get(&1) { // 原地更新条目。 unsafe { *o.get_mut() = 3; } };

HashIndex实现了Iterator,因为任何衍生的引用都可以在关联的ebr::Guard存在期间存活。

use scc::ebr::Guard; use scc::HashIndex; let hashindex: HashIndex<u64, u32> = HashIndex::default(); assert!(hashindex.insert(1, 0).is_ok()); // 现有值可以被新值替换。 hashindex.get(&1).unwrap().update(1); let guard = Guard::new(); // 必须为`iter`提供一个`Guard`。 let mut iter = hashindex.iter(&guard); // 衍生的引用可以与`guard`同存。 let entry_ref = iter.next().unwrap(); assert_eq!(iter.next(), None); drop(hashindex); // 在`hashindex`被丢弃后仍可读取条目。 assert_eq!(entry_ref, (&1, &1));

HashCache

HashCache是一个基于HashMap实现的32路组相联并发缓存。HashCache不跟踪整个缓存中最近最少使用的条目,而是每个桶维护一个已占用条目的双向链表,该链表在访问条目时更新,以跟踪桶内最近最少使用的条目。

示例

当插入新条目且桶已满时,会驱逐桶中最近最少使用的条目。

use scc::HashCache; let hashcache: HashCache<u64, u32> = HashCache::with_capacity(100, 2000); /// 容量不能超过最大容量。 assert_eq!(hashcache.capacity_range(), 128..=2048); /// 如果对应`1`或`2`的桶已满,最近最少使用的条目将被驱逐。 assert!(hashcache.put(1, 0).is_ok()); assert!(hashcache.put(2, 0).is_ok()); /// `1`成为桶中最近访问的条目。 assert!(hashcache.get(&1).is_some()); /// 可以正常移除条目。 assert_eq!(hashcache.remove(&2).unwrap(), (2, 0));

TreeIndex

TreeIndex是一个针对读取操作优化的B+树变体。sdd保护单个条目使用的内存,从而实现对它们的无锁读取访问。

锁定行为

读取访问始终是无锁且非阻塞的。只要不需要结构变化,对条目的写入访问也是无锁且非阻塞的。然而,当写入操作正在分裂或合并节点时,受影响范围内的键的其他写入操作会被阻塞。

条目生命周期

TreeIndex不会立即删除被移除的条目,而是在叶节点被清空或分裂时才删除,这使得TreeIndex在写入密集的工作负载下可能不是最佳选择。

示例

如果键是唯一的,可以插入条目,然后可以读取和移除它。只有在内部节点分裂或合并时才会获取或等待锁。

use scc::TreeIndex; let treeindex: TreeIndex<u64, u32> = TreeIndex::new(); assert!(treeindex.insert(1, 2).is_ok()); // `peek`和`peek_with`是无锁操作。 assert_eq!(treeindex.peek_with(&1, |_, v| *v).unwrap(), 2); assert!(treeindex.remove(&1)); let future_insert = treeindex.insert_async(2, 3); let future_remove = treeindex.remove_if_async(&1, |v| *v == 2);

可以在不获取任何锁的情况下扫描条目。

use scc::TreeIndex; use sdd::Guard; let treeindex: TreeIndex<u64, u32> = TreeIndex::new(); assert!(treeindex.insert(1, 10).is_ok()); assert!(treeindex.insert(2, 11).is_ok()); assert!(treeindex.insert(3, 13).is_ok()); let guard = Guard::new(); // `visitor`在不获取锁的情况下迭代条目。 let mut visitor = treeindex.iter(&guard); assert_eq!(visitor.next().unwrap(), (&1, &10)); assert_eq!(visitor.next().unwrap(), (&2, &11)); assert_eq!(visitor.next().unwrap(), (&3, &13)); assert!(visitor.next().is_none());

可以扫描特定范围的键。

use scc::ebr::Guard; use scc::TreeIndex; let treeindex: TreeIndex<u64, u32> = TreeIndex::new(); for i in 0..10 { assert!(treeindex.insert(i, 10).is_ok()); } let guard = Guard::new(); assert_eq!(treeindex.range(1..1, &guard).count(), 0); assert_eq!(treeindex.range(4..8, &guard).count(), 4); assert_eq!(treeindex.range(4..=8, &guard).count(), 5);

Bag

Bag是一个并发无锁的无序容器。Bag完全不透明,在弹出之前不允许访问其中包含的实例。如果所包含实例的数量可以保持在ARRAY_LEN(默认:usize::BITS / 2)以下,Bag特别高效。

示例

use scc::Bag; let bag: Bag<usize> = Bag::default(); bag.push(1); assert!(!bag.is_empty()); assert_eq!(bag.pop(), Some(1)); assert!(bag.is_empty());

Queue

Queue是一个由sdd支持的并发无锁先进先出容器。

示例

use scc::Queue; let queue: Queue<usize> = Queue::default(); queue.push(1); assert!(queue.push_if(2, |e| e.map_or(false, |x| **x == 1)).is_ok()); assert!(queue.push_if(3, |e| e.map_or(false, |x| **x == 1)).is_err()); assert_eq!(queue.pop().map(|e| **e), Some(1)); assert_eq!(queue.pop().map(|e| **e), Some(2)); assert!(queue.pop().is_none());

Stack

Stack是一个由sdd支持的并发无锁后进先出容器。

示例

use scc::Stack; let stack: Stack<usize> = Stack::default(); stack.push(1); stack.push(2); assert_eq!(stack.pop().map(|e| **e), Some(2)); assert_eq!(stack.pop().map(|e| **e), Some(1)); assert!(stack.pop().is_none());

LinkedList

LinkedList是一个实现无锁并发单向链表操作的类型特征,由sdd支持。它还提供了一种方法来标记链表的一个条目以表示用户定义的状态。

示例

use scc::ebr::{AtomicShared, Guard, Shared}; use scc::LinkedList; use std::sync::atomic::Ordering::Relaxed; #[derive(Default)] struct L(AtomicShared<L>, usize); impl LinkedList for L { fn link_ref(&self) -> &AtomicShared<L> { &self.0 } } let guard = Guard::new(); let head: L = L::default(); let tail: Shared<L> = Shared::new(L(AtomicShared::null(), 1)); // 推入一个新条目。 assert!(head.push_back(tail.clone(), false, Relaxed, &guard).is_ok()); assert!(!head.is_marked(Relaxed)); // 用户可以在条目上标记一个标志。 head.mark(Relaxed); assert!(head.is_marked(Relaxed)); // `next_ptr`遍历链表。 let next_ptr = head.next_ptr(Relaxed, &guard); assert_eq!(next_ptr.as_ref().unwrap().1, 1); // 一旦`tail`被删除,它就变得不可见。 tail.delete_self(Relaxed); assert!(head.next_ptr(Relaxed, &guard).is_null());

性能

HashMap 尾部延迟

在Apple M2 Max上,1048576次插入操作(K = u64, V = u64)的延迟分布的预期尾部延迟范围从180微秒到200微秒。

HashMapHashIndex 吞吐量

更新日志

Footnotes

  1. 只有在启用相应目标特性时才会使用高级SIMD指令,例如-C target_feature=+avx2

编辑推荐精选

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

AI办公办公工具AI工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图热门
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

热门AI开发模型训练AI工具讯飞星火大模型智能问答内容创作多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

Trae

Trae

字节跳动发布的AI编程神器IDE

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

AI工具TraeAI IDE协作生产力转型热门
咔片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

热门AI辅助写作AI工具讯飞绘文内容运营AI创作个性化文章多平台分发AI助手
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

openai-agents-python

openai-agents-python

OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。

openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。

Hunyuan3D-2

Hunyuan3D-2

高分辨率纹理 3D 资产生成

Hunyuan3D-2 是腾讯开发的用于 3D 资产生成的强大工具,支持从文本描述、单张图片或多视角图片生成 3D 模型,具备快速形状生成能力,可生成带纹理的高质量 3D 模型,适用于多个领域,为 3D 创作提供了高效解决方案。

3FS

3FS

一个具备存储、管理和客户端操作等多种功能的分布式文件系统相关项目。

3FS 是一个功能强大的分布式文件系统项目,涵盖了存储引擎、元数据管理、客户端工具等多个模块。它支持多种文件操作,如创建文件和目录、设置布局等,同时具备高效的事件循环、节点选择和协程池管理等特性。适用于需要大规模数据存储和管理的场景,能够提高系统的性能和可靠性,是分布式存储领域的优质解决方案。

下拉加载更多