Use of `for await` with `AyncStream`, and yielding async closures to its continuation

Hello, I was hoping to clarify my understanding of the use of for await with an AsyncStream. My use case is, I'd like to yield async closures to the stream's continuation, with the idea that, when I use for await with the stream to process and execute the closures, it would only continue on to the following closure once the current closure has been run to completion.

At a high level, I am trying to implement in-order execution of async closures in the context of re-entrancy. An example of asynchronous work I want to execute is a network call that should write to a database:

func syncWithRemote() async -> Void {
    let data = await fetchDataFromNetwork()
    await writeToLocalDatabase(data)
}

For the sake of example, I'll call the intended manager of closure submission SingleOperationRunner. where, at a use site such as this, my desired outcome is that call 1 of syncWithRemote() is always completed before call 2 of it:

let singleOperationRunner = SingleOperationRunner(priority: nil)

singleOperationRunner.run {
    syncWithRemote()
}

singleOperationRunner.run {
    syncWithRemote()
}

My sketch implementation looks like this:

public final class SingleOperationRunner {
    private let continuation: AsyncStream<() async -> Void>.Continuation
    
    public init(priority: TaskPriority?) {
        let (stream, continuation) = AsyncStream.makeStream(of: (() async -> Void).self)
        
        self.continuation = continuation
        
        Task.detached(priority: priority) {
            // Will this loop only continue when the `await operation()` completes?
            for await operation in stream {
                await operation()
            }
        }
    }
    
    public func run(operation: @escaping () async -> Void) {
        continuation.yield(operation)
    }
    
    deinit {
        continuation.finish()
    }
}

The resources I've found are https://developer.apple.com/videos/play/wwdc2022-110351/?time=1445 and https://forums.swift.org/t/swift-async-func-to-run-sequentially/60939/2 but do not think I have fully put the pieces together, so would appreciate any help!

Answered by DTS Engineer in 828369022

I’m not sure I fully understand where you’re going with this, but I can answer your direct question:

Task.detached(priority: priority) {
    // Will this loop only continue when the `await operation()` completes?
    for await operation in stream {
        await operation()
    }
}

Yes. The outer Task means that this code is asynchronous. There are two suspension points in the code:

  • One inside the for await, waiting for the next operation to be yielded to the stream.

  • One associated with the call of operation.

In both causes the calling async function doesn’t continue until the caller code completes.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

I’m not sure I fully understand where you’re going with this, but I can answer your direct question:

Task.detached(priority: priority) {
    // Will this loop only continue when the `await operation()` completes?
    for await operation in stream {
        await operation()
    }
}

Yes. The outer Task means that this code is asynchronous. There are two suspension points in the code:

  • One inside the for await, waiting for the next operation to be yielded to the stream.

  • One associated with the call of operation.

In both causes the calling async function doesn’t continue until the caller code completes.

Share and Enjoy

Quinn “The Eskimo!” @ Developer Technical Support @ Apple
let myEmail = "eskimo" + "1" + "@" + "apple.com"

Use of `for await` with `AyncStream`, and yielding async closures to its continuation
 
 
Q