Looping over AsyncSequence gives a different result than subscribing with a 'sink'.

Hi, if values are PUBLISHED rapidly, then ALL are present in the Combine sink, but SOME of them are absent from the async loop. Why the difference?

For example, in the code below, tapping repeatedly 4 times gives the output: INPUT 24, INPUT 9, INPUT 31, INPUT 45, SINK 24, SINK 9, LOOP 24, SINK 31, SINK 45, LOOP 31.

import SwiftUI
import Combine
import PlaygroundSupport

var subject = PassthroughSubject<Int, Never>()

struct ContentView: View {
    @State var bag = [AnyCancellable]()
    @State var a = [String]()

    var body: some View {

        Text("TAP A FEW TIMES RAPIDLY")
        .frame(width: 160, height: 160)
        .onTapGesture {
            Task {
                let anyInt = Int.random(in: 1..<100)
                print("INPUT \(anyInt)")
                try await Task.sleep(nanoseconds: 3_000_000_000)
                subject.send(anyInt)
            }
        }
        .task {
            for await anyInt in subject.values {
                print("    LOOP \(anyInt)")
            }
        }
        .onAppear{
            subject.sink{ anyInt in
                print("  SINK \(anyInt)")
            }.store(in: &bag)
        }
    }
}

PlaygroundPage.current.setLiveView(ContentView())

Thank you.

Answered by fallthrough in 737298022

Answering my own question with help from Nate M from Apple:

The values sequence just requests one value at a time, so if another value arrives before it has been requested, it gets dropped. This can be fixed by adding a call to buffer - for example:

for await anyInt in subject.buffer(size: 1000, prefetch: .keepFull, whenFull: .dropOldest).values {
    print("    LOOP \(anyInt)")
}

Use a sensible value for the size parameter; .max is usable as long as you're aware that this can lead to high memory usage.

The sink sequence, on the other hand, requests unlimited elements, so it calls the closure with every value sent.

Accepted Answer

Answering my own question with help from Nate M from Apple:

The values sequence just requests one value at a time, so if another value arrives before it has been requested, it gets dropped. This can be fixed by adding a call to buffer - for example:

for await anyInt in subject.buffer(size: 1000, prefetch: .keepFull, whenFull: .dropOldest).values {
    print("    LOOP \(anyInt)")
}

Use a sensible value for the size parameter; .max is usable as long as you're aware that this can lead to high memory usage.

The sink sequence, on the other hand, requests unlimited elements, so it calls the closure with every value sent.

Looping over AsyncSequence gives a different result than subscribing with a 'sink'.
 
 
Q