combine-schedulers

combine-schedulers

Combine框架的多功能调度器扩展库

combine-schedulers为Combine框架提供了多种实用的调度器实现,包括AnyScheduler、TestScheduler和ImmediateScheduler等。这些工具可简化异步代码的测试和调试过程,提升开发效率。该库还支持自定义动画调度、并发API和定时器发布者,为iOS和macOS应用开发提供灵活的调度解决方案。

CombineSchedulerSwift测试异步编程Github开源项目

⏰ Combine调度器

CI

一些调度器,使Combine更易于测试和更加versatile。

动机

Combine框架提供了Scheduler协议,这是一个强大的抽象,用于描述如何以及何时执行工作单元。它统一了许多不同的执行工作的方式,如DispatchQueueRunLoopOperationQueue

但是,一旦在你的响应式代码中使用这些调度器,你就会立即使发布者变为异步,因此更难测试,迫使你使用expectations并等待时间流逝,以便你的发布者执行。

这个库提供了新的调度器,允许你将任何异步发布者转换为同步发布者,以便于测试和调试。

了解更多

这个库是在Point-Free的多个剧集中设计的,这是一个由Brandon WilliamsStephen Celis主持的探索函数式编程和Swift的视频系列。

你可以在这里观看所有剧集。

<a href="https://www.pointfree.co/collections/combine/schedulers"> <img alt="视频海报图片" src="https://yellow-cdn.veclightyear.com/2b54e442/4cf41ac7-03c9-4c48-bd19-94c31d89c7da.jpeg" width="480"> </a>

AnyScheduler

AnySchedulerScheduler协议提供了一个类型擦除的包装器,这在对多种类型的调度器进行泛型操作时非常有用,而无需在代码中引入泛型。Combine框架提供了许多类型擦除的包装器,如AnySubscriberAnyPublisherAnyCancellable,但出于某种原因没有提供AnyScheduler

当你想从外部自定义某些代码中使用的调度器,但又不想引入泛型来使其可定制时,这种类型非常有用。例如,假设你有一个ObservableObject视图模型,当调用某个方法时执行API请求:

class EpisodeViewModel: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient init(apiClient: ApiClient) { self.apiClient = apiClient } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: DispatchQueue.main) .assign(to: &self.$episode) } }

注意,我们在reloadButtonTapped方法中使用DispatchQueue.main,因为fetchEpisode端点很可能在后台线程上传递其输出(就像URLSession的情况一样)。

这段代码看起来很简单,但.receive(on: DispatchQueue.main)的存在使得这段代码更难测试,因为你必须使用XCTestexpectations来显式地等待一小段时间让队列执行。这可能导致测试不稳定,并使测试套件的执行时间比必要的更长。

解决这个测试问题的一种方法是使用"immediate"调度器而不是DispatchQueue.main,这将导致fetchEpisode尽快传递其输出,而不会发生线程跳转。为了实现这一点,我们需要将调度器注入到我们的视图模型中,以便从外部控制它:

class EpisodeViewModel<S: Scheduler>: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient let scheduler: S init(apiClient: ApiClient, scheduler: S) { self.apiClient = apiClient self.scheduler = scheduler } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.scheduler) .assign(to: &self.$episode) } }

现在我们可以在生产环境中使用DispatchQueue.main初始化这个视图模型,在测试中使用DispatchQueue.immediate初始化它。听起来是个胜利!

然而,在我们的视图模型中引入这个泛型是相当重量级的,因为它向外界大声宣布这个类型使用了调度器,更糟糕的是,它将影响任何接触这个视图模型并且也想要可测试的代码。例如,任何使用这个视图模型的视图如果也想控制调度器,就需要引入泛型,这在我们想要编写快照测试时会很有用。

我们可以使用AnyScheduler来允许替换不同的调度器,而不是引入泛型。它允许我们在调度器方面有一定的泛型性,但实际上并不引入泛型。

我们可以说我们只想要一个其关联类型与DispatchQueue匹配的调度器,而不是在视图模型中保持一个泛型调度器:

class EpisodeViewModel: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient let scheduler: AnySchedulerOf<DispatchQueue> init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) { self.apiClient = apiClient self.scheduler = scheduler } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.scheduler) .assign(to: &self.$episode) } }

然后,在生产环境中,我们可以创建一个使用实时DispatchQueue的视图模型,但我们只需要先擦除其类型:

let viewModel = EpisodeViewModel( apiClient: ..., scheduler: DispatchQueue.main.eraseToAnyScheduler() )

对于常见的调度器,如DispatchQueueOperationQueueRunLoopAnyScheduler上甚至有一个静态辅助方法,进一步简化了这一点:

let viewModel = EpisodeViewModel( apiClient: ..., scheduler: .main )

然后在测试中我们可以使用immediate调度器:

let viewModel = EpisodeViewModel( apiClient: ..., scheduler: .immediate )

因此,总的来说,AnyScheduler非常适合允许控制在类、函数等中使用的调度器,而无需引入泛型,这可以帮助简化代码并减少实现细节的泄露。

TestScheduler

一个可以以确定性方式控制其当前时间和执行的调度器。这个调度器对于测试时间流如何影响使用异步操作符的发布者非常有用,例如debouncethrottledelaytimeoutreceive(on:)subscribe(on:)等。

例如,考虑以下race操作符,它并行运行两个futures,但只发出第一个完成的:

func race<Output, Failure: Error>( _ first: Future<Output, Failure>, _ second: Future<Output, Failure> ) -> AnyPublisher<Output, Failure> { first .merge(with: second) .prefix(1) .eraseToAnyPublisher() }

尽管这个发布者相当简单,我们可能仍然想为它编写一些测试。

为此,我们可以创建一个测试调度器,并创建两个futures,一个在一秒后发出,一个在两秒后发出:

let scheduler = DispatchQueue.test let first = Future<Int, Never> { callback in scheduler.schedule(after: scheduler.now.advanced(by: 1)) { callback(.success(1)) } } let second = Future<Int, Never> { callback in scheduler.schedule(after: scheduler.now.advanced(by: 2)) { callback(.success(2)) } }

然后我们可以race这些futures并将它们的发出值收集到一个数组中:

var output: [Int] = [] let cancellable = race(first, second).sink { output.append($0) }

然后我们可以确定性地在调度器中向前移动时间,看看发布者如何发出。我们可以从向前移动一秒开始:

scheduler.advance(by: 1) XCTAssertEqual(output, [1])

这证明我们从发布者那里得到了第一次发出,因为已经过了一秒钟。如果我们再向前推进一秒,我们可以证明我们不会得到更多的发出:

scheduler.advance(by: 1) XCTAssertEqual(output, [1])

这是一个非常简单的例子,展示了如何使用测试调度器控制时间流,但这种技术可以用于测试任何涉及Combine异步操作的发布者。

ImmediateScheduler

Combine框架自带一个ImmediateScheduler类型,但它为SchedulerTimeTypeSchedulerOptions的关联类型定义了所有新类型。这意味着你不能轻易地在实时DispatchQueue和同步执行工作的"immediate"DispatchQueue之间切换。唯一的方法是为使用该调度器的任何代码引入泛型,这可能会变得笨拙。

因此,这个库的ImmediateScheduler使用与现有调度器相同的关联类型,这意味着你可以使用DispatchQueue.immediate来获得一个看起来像调度队列但立即执行其工作的调度器。同样,你可以构造RunLoop.immediateOperationQueue.immediate

这个调度器对于编写针对使用异步操作符的发布者的测试很有用,比如receive(on:)subscribe(on:)等,因为它强制发布者立即发出,而不需要使用XCTestExpectation等待线程跳转或延迟。

这个调度器与TestScheduler的不同之处在于你不能显式控制时间如何流过你的发布者,而是立即将时间折叠成一个点。

作为一个基本例子,假设你有一个视图模型,在按钮被点击后等待10秒钟才加载一些数据:

class HomeViewModel: ObservableObject { @Published var episodes: [Episode]? let apiClient: ApiClient init(apiClient: ApiClient) { self.apiClient = apiClient } func reloadButtonTapped() { Just(()) .delay(for: .seconds(10), scheduler: DispatchQueue.main) .flatMap { apiClient.fetchEpisodes() } .assign(to: &self.$episodes) } }

为了测试这段代码,你真的需要等待10秒钟才能让发布者发出:

func testViewModel() { let viewModel = HomeViewModel(apiClient: .mock) viewModel.reloadButtonTapped() _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10) XCTAssert(viewModel.episodes, [Episode(id: 42)]) }

另外,我们可以显式地将调度器传递到视图模型初始化器中,以便从外部控制:

class HomeViewModel: ObservableObject { @Published var episodes: [Episode]? let apiClient: ApiClient let scheduler: AnySchedulerOf<DispatchQueue> init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) { self.apiClient = apiClient self.scheduler = scheduler } func reloadButtonTapped() { Just(()) .delay(for: .seconds(10), scheduler: self.scheduler) .flatMap { self.apiClient.fetchEpisodes() } .assign(to: &self.$episodes) } }

然后在测试中使用immediate调度器:

func testViewModel() { let viewModel = HomeViewModel( apiClient: .mock, scheduler: .immediate ) viewModel.reloadButtonTapped() // 不再需要等待... XCTAssert(viewModel.episodes, [Episode(id: 42)]) }

动画调度器

CombineSchedulers提供了一些辅助工具,用于在SwiftUI和UIKit中进行异步动画。

如果SwiftUI状态 例如,要在视图模型中为 API 响应添加动画效果,你可以指定接收此状态的调度器应该使用动画:

self.apiClient.fetchEpisode() .receive(on: self.scheduler.animation()) .assign(to: &self.$episode)

如果你正在使用 Combine 为 UIKit 功能提供支持,你可以使用 .animate 方法,它与 UIView.animate 类似:

self.apiClient.fetchEpisode() .receive(on: self.scheduler.animate(withDuration: 0.3)) .assign(to: &self.$episode)

UnimplementedScheduler

一个在使用时会导致测试失败的调度器。

这个调度器可以提供额外的确定性,确保被测试的代码路径不需要使用调度器。

随着视图模型变得更加复杂,只有部分逻辑可能需要调度器。在为不需要调度器的逻辑编写单元测试时,应该提供一个未实现的调度器。这直接在测试中记录了该功能不使用调度器。如果它使用了,或将来使用了,测试将会失败。

例如,以下视图模型有几个职责:

class EpisodeViewModel: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient let mainQueue: AnySchedulerOf<DispatchQueue> init(apiClient: ApiClient, mainQueue: AnySchedulerOf<DispatchQueue>) { self.apiClient = apiClient self.mainQueue = mainQueue } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.mainQueue) .assign(to: &self.$episode) } func favoriteButtonTapped() { self.episode?.isFavorite.toggle() } }
  • 它允许用户点击按钮刷新一些剧集数据
  • 它允许用户切换剧集是否为他们的收藏

API 客户端在后台队列上传递剧集,所以视图模型必须在主队列上接收它,然后才能修改其状态。

然而,点击收藏按钮不涉及调度。这意味着可以使用未实现的调度器编写测试:

func testFavoriteButton() { let viewModel = EpisodeViewModel( apiClient: .mock, mainQueue: .unimplemented ) viewModel.episode = .mock viewModel.favoriteButtonTapped() XCTAssert(viewModel.episode?.isFavorite == true) viewModel.favoriteButtonTapped() XCTAssert(viewModel.episode?.isFavorite == false) }

使用 .unimplemented,这个测试强烈声明收藏一个剧集不需要调度器来完成任务,这意味着可以合理地假设该功能很简单,不涉及任何异步操作。

将来,如果收藏一个剧集需要发送涉及调度器的 API 请求,这个测试将开始失败,这是好事!这将迫使我们解决引入的复杂性。如果我们使用任何其他调度器,它会静默地接收这些额外的工作,测试将继续通过。

UIScheduler

一个尽快在主队列上执行其工作的调度器。这个调度器的灵感来自 ReactiveSwift 项目中的等效调度器。

如果从主线程调用 UIScheduler.shared.schedule,则工作单元将立即执行。这与 DispatchQueue.main.schedule 形成对比,后者在执行之前会产生线程跳转,因为它在底层使用 DispatchQueue.main.async

这个调度器对于需要在主线程上尽快执行工作,且线程跳转会产生问题的情况很有用,例如执行动画时。

并发 API

这个库为与 Combine 调度器交互提供了 async 友好的 API。

// 暂停当前任务 1 秒 try await scheduler.sleep(for: .seconds(1)) // 每 1 秒执行一次工作 for await instant in scheduler.timer(interval: .seconds(1)) { ... }

Publishers.Timer

一个在重复间隔发射调度器当前时间的发布者。

这个发布者是 Foundation 的 Timer.publisher 的替代品,其主要区别在于它允许你为计时器使用任何调度器,而不仅仅是 RunLoop。这很有用,因为 RunLoop 调度器在测试方面不可测试,如果你想针对使用 Timer.publisher 的发布者编写测试,你必须显式等待时间流逝才能得到发射。这可能导致脆弱的测试,并大大增加测试执行的时间。

它的使用方式与 Foundation 的计时器类似,只是你指定一个调度器而不是运行循环:

Publishers.Timer(every: .seconds(1), scheduler: DispatchQueue.main) .autoconnect() .sink { print("Timer", $0) }

或者,你可以在调度器上调用 timerPublisher 方法,以在该调度器上派生一个重复计时器:

DispatchQueue.main.timerPublisher(every: .seconds(1)) .autoconnect() .sink { print("Timer", $0) }

但这个计时器最好的部分是你可以将它与 TestScheduler 一起使用,这样你编写的任何涉及计时器的 Combine 代码都变得更可测试。这显示了我们如何轻松模拟在计时器中前进 1,000 秒的想法:

let scheduler = DispatchQueue.test var output: [Int] = [] Publishers.Timer(every: 1, scheduler: scheduler) .autoconnect() .sink { _ in output.append(output.count) } .store(in: &self.cancellables) XCTAssertEqual(output, []) scheduler.advance(by: 1) XCTAssertEqual(output, [0]) scheduler.advance(by: 1) XCTAssertEqual(output, [0, 1]) scheduler.advance(by: 1_000) XCTAssertEqual(output, Array(0...1_001))

兼容性

此库兼容 iOS 13.2 及更高版本。请注意,Combine 框架和 iOS 13.1 及更低版本中存在一些 bug,在尝试比较 DispatchQueue.SchedulerTimeType 值时会导致崩溃,而 TestScheduler 依赖于这个操作。

安装

你可以通过添加包依赖将 CombineSchedulers 添加到 Xcode 项目中。

  1. 文件菜单中,选择 Swift Packages › 添加包依赖...
  2. 在包仓库 URL 文本字段中输入 "https://github.com/pointfreeco/combine-schedulers"
  3. 根据你的项目结构:
    • 如果你有一个需要访问库的单一应用目标,那么直接将 CombineSchedulers 添加到你的应用程序中。
    • 如果你想从多个目标使用这个库,你必须创建一个依赖于 CombineSchedulers 的共享框架,然后从你的其他目标依赖于该框架。

文档

Combine Schedulers API 的最新文档可在这里获得。

其他库

许可证

该库在 MIT 许可下发布。详情请见 LICENSE

编辑推荐精选

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

AI办公办公工具AI工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图热门
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

热门AI开发模型训练AI工具讯飞星火大模型智能问答内容创作多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

Trae

Trae

字节跳动发布的AI编程神器IDE

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

AI工具TraeAI IDE协作生产力转型热门
咔片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

热门AI辅助写作AI工具讯飞绘文内容运营AI创作个性化文章多平台分发AI助手
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

openai-agents-python

openai-agents-python

OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。

openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。

Hunyuan3D-2

Hunyuan3D-2

高分辨率纹理 3D 资产生成

Hunyuan3D-2 是腾讯开发的用于 3D 资产生成的强大工具,支持从文本描述、单张图片或多视角图片生成 3D 模型,具备快速形状生成能力,可生成带纹理的高质量 3D 模型,适用于多个领域,为 3D 创作提供了高效解决方案。

3FS

3FS

一个具备存储、管理和客户端操作等多种功能的分布式文件系统相关项目。

3FS 是一个功能强大的分布式文件系统项目,涵盖了存储引擎、元数据管理、客户端工具等多个模块。它支持多种文件操作,如创建文件和目录、设置布局等,同时具备高效的事件循环、节点选择和协程池管理等特性。适用于需要大规模数据存储和管理的场景,能够提高系统的性能和可靠性,是分布式存储领域的优质解决方案。

下拉加载更多