Article

Controlling Publishing with Connectable Publishers

Coordinate when publishers start sending elements to subscribers.

Overview

Sometimes, you want to configure a publisher before it starts producing elements, such as when a publisher has properties that affect its behavior. But commonly used subscribers like sink(receiveValue:) demand unlimited elements immediately, which might prevent you from setting up the publisher the way you like. A publisher that produces values before you’re ready for them can also be a problem when the publisher has two or more subscribers. This multi-subscriber scenario creates a race condition: the publisher can send elements to the first subscriber before the second even exists.

Consider the scenario in the following figure. You create a URLSession.DataTaskPublisher and attach a sink subscriber to it (Subscriber 1) which causes the data task to start fetching the URL’s data. At some later point, you attach a second subscriber (Subscriber 2). If the data task completes its download before the second subscriber attaches, the second subscriber misses the data and only sees the completion.

A chain of Combine publishers, consisting of DataTaskPublisher connected to Share. Two subscribers are attached to the Share publisher. Subscriber 1 indicates successful calls to both receiveValue and receiveCompletion. Subscriber 2 indicates no call to receiveValue, and instead only shows receiveCompletion.

Hold Publishing by Using a Connectable Publisher

To prevent a publisher from sending elements before you’re ready, Combine provides the ConnectablePublisher protocol. A connectable publisher produces no elements until you call its connect() method. Even if it’s ready to produce elements and has unsatisfied demand, a connectable publisher doesn’t deliver any elements to subscribers until you explicitly call connect().

The following figure shows the URLSession.DataTaskPublisher scenario from above, but with a ConnectablePublisher ahead of the subscribers. By waiting to call connect() until both subscribers attach, the data task doesn’t start downloading until then. This eliminates the race condition and guarantees both subscribers can receive the data.

A chain of Combine publishers, consisting of DataTaskPublisher connected to Share, which is then connected to ConnectablePublisher. Two subscribers are attached to the ConnectablePublisher. A label shows an explicit call to connect() on the ConnectablePublisher. Both subscribers indicate they get both the receiveValue and receiveCompletion calls.

To use a ConnectablePublisher in your own Combine code, use the makeConnectable() operator to wrap an existing publisher with a Publishers.MakeConnectable instance. The following code shows how makeConnectable() fixes the data task publisher race condition described above. Typically, attaching a sink — identified here by the AnyCancellable it returns, cancellable1 — would cause the data task to start immediately. In this scenario, the second sink, identified as cancellable2, doesn’t attach until one second later, and the data task publisher might complete before the second sink attaches. Instead, explicitly using a ConnectablePublisher causes the data task to start only after the app calls connect(), which it does after a two-second delay.

let url = URL(string: "https://example.com/")!
let connectable = URLSession.shared
    .dataTaskPublisher(for: url)
    .map() { $0.data }
    .catch() { _ in Just(Data() )}
    .share()
    .makeConnectable()

cancellable1 = connectable
    .sink(receiveCompletion: { print("Received completion 1: \($0).") },
          receiveValue: { print("Received data 1: \($0.count) bytes.") })

DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
    self.cancellable2 = connectable
        .sink(receiveCompletion: { print("Received completion 2: \($0).") },
              receiveValue: { print("Received data 2: \($0.count) bytes.") })
}

DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    self.connection = connectable.connect()
}

Use the Autoconnect Operator If You Don’t Need to Explicitly Connect

Some Combine publishers already implement ConnectablePublisher, such as Publishers.Multicast and Timer.TimerPublisher. Using these publishers can cause the opposite problem: having to explicitly connect() could be burdensome if you don’t need to configure the publisher or attach multiple subscribers.

For cases like these, ConnectablePublisher provides the autoconnect() operator. This operator immediately calls connect() when a Subscriber attaches to the publisher with the subscribe(_:) method.

The following example uses autoconnect(), so a subscriber immediately receives elements from a once-a-second Timer.TimerPublisher. Without autoconnect(), the example would need to explicitly start the timer publisher by calling connect() at some point.

let cancellable = Timer.publish(every: 1, on: .main, in: .default)
    .autoconnect()
    .sink() { date in
        print ("Date now: \(date)")
     }

See Also

Connectable Publishers

protocol ConnectablePublisher

A publisher that provides an explicit means of connecting and canceling publication.