一些调度器,使Combine更易于测试和更加versatile。
Combine框架提供了Scheduler
协议,这是一个强大的抽象,用于描述如何以及何时执行工作单元。它统一了许多不同的执行工作的方式,如DispatchQueue
、RunLoop
和OperationQueue
。
但是,一旦在你的响应式代码中使用这些调度器,你就会立即使发布者变为异步,因此更难测试,迫使你使用expectations并等待时间流逝,以便你的发布者执行。
这个库提供了新的调度器,允许你将任何异步发布者转换为同步发布者,以便于测试和调试。
这个库是在Point-Free的多个剧集中设计的,这是一个由Brandon Williams和Stephen Celis主持的探索函数式编程和Swift的视频系列。
你可以在这里观看所有剧集。
<a href="https://www.pointfree.co/collections/combine/schedulers"> <img alt="视频海报图片" src="https://yellow-cdn.veclightyear.com/2b54e442/4cf41ac7-03c9-4c48-bd19-94c31d89c7da.jpeg" width="480"> </a>AnyScheduler
AnyScheduler
为Scheduler
协议提供了一个类型擦除的包装器,这在对多种类型的调度器进行泛型操作时非常有用,而无需在代码中引入泛型。Combine框架提供了许多类型擦除的包装器,如AnySubscriber
、AnyPublisher
和AnyCancellable
,但出于某种原因没有提供AnyScheduler
。
当你想从外部自定义某些代码中使用的调度器,但又不想引入泛型来使其可定制时,这种类型非常有用。例如,假设你有一个ObservableObject
视图模型,当调用某个方法时执行API请求:
class EpisodeViewModel: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient init(apiClient: ApiClient) { self.apiClient = apiClient } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: DispatchQueue.main) .assign(to: &self.$episode) } }
注意,我们在reloadButtonTapped
方法中使用DispatchQueue.main
,因为fetchEpisode
端点很可能在后台线程上传递其输出(就像URLSession
的情况一样)。
这段代码看起来很简单,但.receive(on: DispatchQueue.main)
的存在使得这段代码更难测试,因为你必须使用XCTest
expectations来显式地等待一小段时间让队列执行。这可能导致测试不稳定,并使测试套件的执行时间比必要的更长。
解决这个测试问题的一种方法是使用"immediate"调度器而不是DispatchQueue.main
,这将导致fetchEpisode
尽快传递其输出,而不会发生线程跳转。为了实现这一点,我们需要将调度器注入到我们的视图模型中,以便从外部控制它:
class EpisodeViewModel<S: Scheduler>: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient let scheduler: S init(apiClient: ApiClient, scheduler: S) { self.apiClient = apiClient self.scheduler = scheduler } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.scheduler) .assign(to: &self.$episode) } }
现在我们可以在生产环境中使用DispatchQueue.main
初始化这个视图模型,在测试中使用DispatchQueue.immediate
初始化它。听起来是个胜利!
然而,在我们的视图模型中引入这个泛型是相当重量级的,因为它向外界大声宣布这个类型使用了调度器,更糟糕的是,它将影响任何接触这个视图模型并且也想要可测试的代码。例如,任何使用这个视图模型的视图如果也想控制调度器,就需要引入泛型,这在我们想要编写快照测试时会很有用。
我们可以使用AnyScheduler
来允许替换不同的调度器,而不是引入泛型。它允许我们在调度器方面有一定的泛型性,但实际上并不引入泛型。
我们可以说我们只想要一个其关联类型与DispatchQueue
匹配的调度器,而不是在视图模型中保持一个泛型调度器:
class EpisodeViewModel: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient let scheduler: AnySchedulerOf<DispatchQueue> init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) { self.apiClient = apiClient self.scheduler = scheduler } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.scheduler) .assign(to: &self.$episode) } }
然后,在生产环境中,我们可以创建一个使用实时DispatchQueue
的视图模型,但我们只需要先擦除其类型:
let viewModel = EpisodeViewModel( apiClient: ..., scheduler: DispatchQueue.main.eraseToAnyScheduler() )
对于常见的调度器,如DispatchQueue
、OperationQueue
和RunLoop
,AnyScheduler
上甚至有一个静态辅助方法,进一步简化了这一点:
let viewModel = EpisodeViewModel( apiClient: ..., scheduler: .main )
然后在测试中我们可以使用immediate调度器:
let viewModel = EpisodeViewModel( apiClient: ..., scheduler: .immediate )
因此,总的来说,AnyScheduler
非常适合允许控制在类、函数等中使用的调度器,而无需引入泛型,这可以帮助简化代码并减少实现细节的泄露。
TestScheduler
一个可以以确定性方式控制其当前时间和执行的调度器。这个调度器对于测试时间流如何影响使用异步操作符的发布者非常有用,例如debounce
、throttle
、delay
、timeout
、receive(on:)
、subscribe(on:)
等。
例如,考虑以下race
操作符,它并行运行两个futures,但只发出第一个完成的:
func race<Output, Failure: Error>( _ first: Future<Output, Failure>, _ second: Future<Output, Failure> ) -> AnyPublisher<Output, Failure> { first .merge(with: second) .prefix(1) .eraseToAnyPublisher() }
尽管这个发布者相当简单,我们可能仍然想为它编写一些测试。
为此,我们可以创建一个测试调度器,并创建两个futures,一个在一秒后发出,一个在两秒后发出:
let scheduler = DispatchQueue.test let first = Future<Int, Never> { callback in scheduler.schedule(after: scheduler.now.advanced(by: 1)) { callback(.success(1)) } } let second = Future<Int, Never> { callback in scheduler.schedule(after: scheduler.now.advanced(by: 2)) { callback(.success(2)) } }
然后我们可以race这些futures并将它们的发出值收集到一个数组中:
var output: [Int] = [] let cancellable = race(first, second).sink { output.append($0) }
然后我们可以确定性地在调度器中向前移动时间,看看发布者如何发出。我们可以从向前移动一秒开始:
scheduler.advance(by: 1) XCTAssertEqual(output, [1])
这证 明我们从发布者那里得到了第一次发出,因为已经过了一秒钟。如果我们再向前推进一秒,我们可以证明我们不会得到更多的发出:
scheduler.advance(by: 1) XCTAssertEqual(output, [1])
这是一个非常简单的例子,展示了如何使用测试调度器控制时间流,但这种技术可以用于测试任何涉及Combine异步操作的发布者。
ImmediateScheduler
Combine框架自带一个ImmediateScheduler
类型,但它为SchedulerTimeType
和SchedulerOptions
的关联类型定义了所有新类型。这意味着你不能轻易地在实时DispatchQueue
和同步执行工作的"immediate"DispatchQueue
之间切换。唯一的方法是为使用该调度器的任何代码引入泛型,这可能会变得笨拙。
因此,这个库的ImmediateScheduler
使用与现有调度器相同的关联类型,这意味着你可以使用DispatchQueue.immediate
来获得一个看起来像调度队列但立即执行其工作的调度器。同样,你可以构造RunLoop.immediate
和OperationQueue.immediate
。
这个调度器对于编写针对使用异步操作符的发布者的测试很有用,比如receive(on:)
、subscribe(on:)
等,因为它强制发布者立即发出,而不需要使用XCTestExpectation
等待线程跳转或延迟。
这个调度器与TestScheduler
的不同之处在于你不能显式控制时间如何流过你的发布者,而是立即将时间折叠成一个点。
作为一个基本例子,假设你有一个视图模型,在按钮被点击后等待10秒钟才加载一些数据:
class HomeViewModel: ObservableObject { @Published var episodes: [Episode]? let apiClient: ApiClient init(apiClient: ApiClient) { self.apiClient = apiClient } func reloadButtonTapped() { Just(()) .delay(for: .seconds(10), scheduler: DispatchQueue.main) .flatMap { apiClient.fetchEpisodes() } .assign(to: &self.$episodes) } }
为了测试这段代码,你真的需要等待10秒钟才能让发布者发出:
func testViewModel() { let viewModel = HomeViewModel(apiClient: .mock) viewModel.reloadButtonTapped() _ = XCTWaiter.wait(for: [XCTestExpectation()], timeout: 10) XCTAssert(viewModel.episodes, [Episode(id: 42)]) }
另外,我们可以显式地将调度器传递到视图模型初始化器中,以便从外部控制:
class HomeViewModel: ObservableObject { @Published var episodes: [Episode]? let apiClient: ApiClient let scheduler: AnySchedulerOf<DispatchQueue> init(apiClient: ApiClient, scheduler: AnySchedulerOf<DispatchQueue>) { self.apiClient = apiClient self.scheduler = scheduler } func reloadButtonTapped() { Just(()) .delay(for: .seconds(10), scheduler: self.scheduler) .flatMap { self.apiClient.fetchEpisodes() } .assign(to: &self.$episodes) } }
然后在测试中使用immediate调度器:
func testViewModel() { let viewModel = HomeViewModel( apiClient: .mock, scheduler: .immediate ) viewModel.reloadButtonTapped() // 不再需要等待... XCTAssert(viewModel.episodes, [Episode(id: 42)]) }
CombineSchedulers提供了一些辅助工具,用于在SwiftUI和UIKit中进行异步动画。
如果SwiftUI状态 例如,要在视图模型中为 API 响应添加动画效果,你可以指定接收此状态的调度器应该使用动画:
self.apiClient.fetchEpisode() .receive(on: self.scheduler.animation()) .assign(to: &self.$episode)
如果你正在使用 Combine 为 UIKit 功能提供支持,你可以使用 .animate
方法,它与 UIView.animate
类似:
self.apiClient.fetchEpisode() .receive(on: self.scheduler.animate(withDuration: 0.3)) .assign(to: &self.$episode)
UnimplementedScheduler
一个在使用时会导致测试失败的调度器。
这个调度器可以提供额外的确定性,确保被测试的代码路径不需要使用调度器。
随着视图模型变得更加复杂,只有部分逻辑可能需要调度器。在为不需要调度器的逻辑编写单元测试时,应该提供一个未实现的调度器。这直接在测试中记录了该功能不使用调度器。如果它使用了,或将来使用了,测试将会失败。
例如,以下视图模型有几个职责:
class EpisodeViewModel: ObservableObject { @Published var episode: Episode? let apiClient: ApiClient let mainQueue: AnySchedulerOf<DispatchQueue> init(apiClient: ApiClient, mainQueue: AnySchedulerOf<DispatchQueue>) { self.apiClient = apiClient self.mainQueue = mainQueue } func reloadButtonTapped() { self.apiClient.fetchEpisode() .receive(on: self.mainQueue) .assign(to: &self.$episode) } func favoriteButtonTapped() { self.episode?.isFavorite.toggle() } }