/ COMBINE, SWIFT

Understanding Schedulers in Swift Combine Framework

Now that we’ve painted the big picture of Combine framework, it’s time to learn what are schedulers:

  1. Which schedulers are built into Combine?
  2. How to switch schedulers?
  3. How to perform asynchronous work with Combine?
  4. What’s the difference between receive(on:) and subscribe(on:)?

Defining a Scheduler

Scheduler is the synchronization mechanism of the Combine framework, which defines the context for where and when the work is performed.

Combine does not work directly with threads. Instead, it allows Publishers to operate on specific Schedulers.

The where means current run loop, dispatch queue or operation queue.

The when means virtual time, according to the scheduler’s clock. The work performed by a scheduler will adhere to the scheduler’s clock only, which might not correspond to the real time of the system.

Combine Scheduler Types

Combine framework provides different kinds of schedulers, all of which conform to the Scheduler protocol:

  • DispatchQueue. Performs the work on a specific dispatch queue: serial, concurrent, main and global. You’ll commonly use serial and global queues for the background work, and the main queue for the UI-related work. As of Xcode 11 GM Seed, it is not recommended to use concurrent queues.
  • OperationQueue. Performs the work on a specific operation queue. Similarly to the dispatch queues, use OperationQueue.main for UI work, and other queues for the background work. According to this conversation on Swift forum, it’s not recommended to use operation queues with maxConcurrentOperations greater than 1.
  • RunLoop. Performs the work on the specific run loop.
  • ImmediateScheduler. Performs synchronous actions immediately. It will terminate the app with fatal error if you attempt to execute delayed work with this scheduler.

Use RunLoop.main, DispatchQueue.main or OperationQueue.main to perform the UI-related work. As specified in this thread, there is no difference between them.

Aside from ImmediateScheduler, Combine does not introduce any new scheduler types. Instead, it extends the existing Swift multithreading APIs to become schedulers. You can use these APIs the same way you were doing without the Combine framework, which lowers the entry barrier.

Default Scheduler

Even if you don’t specify any scheduler, Combine provides you with the default one. The scheduler uses the same thread, where the element was generated. Say, if you send the element from the background thread, you receive it on the same background thread.

let subject = PassthroughSubject<Int, Never>()
// 1
let token = subject.sink(receiveValue: { value in
    print(Thread.isMainThread)
})
// 2
subject.send(1)
// 3
DispatchQueue.global().async {
    subject.send(2)
}
  1. Print true if the value is received on the main thread, and false otherwise.
  2. Send 1 from the main thread.
  3. Send 2 from the background thread.

It will print:

true
false

As expected, the values are received on different threads.

Switching Schedulers

Resource-consuming operations are typically processed in the background, so that the user interface does not freeze. Their result is then handled on the main thread. The Combine’s way of doing this is by switching schedulers. It’s accomplished with the help of two methods: subscribe(on:) and receive(on:).

receive(on:)

The receive(on:) method changes a scheduler for all publishers that come after it.

Just(1)
   .map { _ in print(Thread.isMainThread) }
   .receive(on: DispatchQueue.global())
   .map { print(Thread.isMainThread) }
   .sink { print(Thread.isMainThread) }

It will print:

true
false
false

The process is visualized as follows:

Schedulers in Swift Combine Framework: Getting Started Tutorial - receive(on:) vs subscribe(on:)

All operators to the right of receive(on:) deliver elements on DispatchQueue.global() scheduler.

subscribe(on:)

The subscribe(on:) method changes the scheduler that is used to perform subscribe, cancel, and request operations. The chain will stay on that scheduler all the way downstream, unless receive(on:) is specified somewhere.

Just(1)
   .subscribe(on: DispatchQueue.global())
   .map { _ in print(Thread.isMainThread) }
   .sink { print(Thread.isMainThread) }

It will print:

false
false

Let’s visualize the process:

Schedulers in Swift Combine Framework: Getting Started Tutorial - receive(on:) vs subscribe(on:)

All the operations happen on the DispatchQueue.global() scheduler.

The position of subscribe(on:) does not matter, as it affects the time of subscription. This snippet is equivalent to the previous one:

Just(1) 
   .map { _ in print(Thread.isMainThread) }
   .subscribe(on: DispatchQueue.global()) // Position of subscribe(on:) has changed
   .sink { print(Thread.isMainThread) }

In spite of subscribe(on:) position, all events and values are still received on DispatchQueue.global() scheduler.

Eagle-eye readers must have noticed that the definition of subscribe(on:) says nothing about the scheduler on which we receive values. In case a publisher emits values on a different thread, it will be received on that thread. Typical example is a data task publisher:

URLSession.shared.dataTaskPublisher(for: URL(string: "https://www.vadimbulavin.com")!)
   .subscribe(on: DispatchQueue.main) // Subscribe on the main thread
   .sink(receiveCompletion: { _ in },
         receiveValue: { _ in
           print(Thread.isMainThread) // Are we on the main thread?
   })

The code will print false, indicating that the publisher emits values on a background thread. In such cases we must use receive(on:) to specify a scheduler.

I am thankful to Joseph Heck for explaining this to me. Make sure to read his book on Combine which is by far the most comprehensive and reliable source on the subject.

Performing Asynchronous Work with Combine

Finally, let’s see how we can switch schedulers by combining subscribe(on:) and receive(on:).

Say, we have a publisher with a long-running task:

struct BusyPublisher: Publisher {
    typealias Output = Int
    typealias Failure = Never
    
    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        sleep(10)
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(1)
        subscriber.receive(completion: .finished)
    }
}

When called from the UI thread, it freezes the app for 10 seconds. Remember, Combine defaults to the same scheduler from where the element is fired:

BusyPublisher()
   .sink { _ in print("Received value") }

print("Hello")

As expected, Hello is printed after the value is received:

Received value
Hello

The common pattern of doing the asynchronous work with Combine is subscribing on the background scheduler and receiving the events on the UI scheduler:

BusyPublisher()
   .subscribe(on: DispatchQueue.global())
   .receive(on: DispatchQueue.main)
   .sink { _ in print("Received value") }

print("Hello")

It will print:

Hello
Received value

This time Hello is printer before the value is received. This means that the publisher does not freeze the app by blocking the main thread.

Summary

Let’s summarize the key points to remember:

  • subscribe(on:) and receive(on:) are primary multithreading methods of Combine Swift framework.
  • The default scheduler uses the same thread from where the element was generated.
  • receive(on:) sets a scheduler for all operators coming afterwards.
  • subscribe(on:) sets a scheduler for the whole stream, starting at the time the Publisher is subscribed to. The stream stays on the same scheduler, until receive(on:) specifies another scheduler.
  • The position of subscribe(on:) does not matter.
  • Asynchronous work is typically performed by subscribing on the background scheduler and receiving values on the UI scheduler.

Further reading: