fileprivate final class Subscription< Downstream: Subscriber >: Combine.Subscription, Subscriber { // Subscriber typealias requirements typealias Input = Upstream.Output typealias Failure = Upstream.Failure // State protected by Mutex private struct State { var upstream: Upstream? var downstream: Downstream? var upstreamSubscription: (any Combine.Subscription)? var buffer: Data = Data() var downstreamDemand: Subscribers.Demand = .none } private let state = Mutex(State()) init(upstream: Upstream, downstream: Downstream) { // Initialize state under lock, then subscribe after unlocking. state.withLock { (existing: inout State) in existing.upstream = upstream existing.downstream = downstream } upstream.receive(subscriber: self) } // MARK: - Subscription Methods (from downstream) func request(_ demand: Subscribers.Demand) { let shouldRequest: Bool = state.withLock { st in st.downstreamDemand += demand return st.downstreamDemand > .none && st.upstreamSubscription != nil } if shouldRequest { state.withLock { st in st.upstreamSubscription?.request(.unlimited) } } } func cancel() { let subscription = state.withLock { existing in let subscription = existing.upstreamSubscription existing.upstreamSubscription = nil existing.upstream = nil existing.downstream = nil existing.buffer.removeAll() //subscription?.cancel() // extra return subscription } subscription?.cancel() } // MARK: - Subscriber Methods (from upstream) func receive(subscription: Combine.Subscription) { let demand = state.withLock { (existing: inout State) in existing.upstreamSubscription = subscription return existing.downstreamDemand } if demand > .none { subscription.request(.unlimited) } } func receive(_ input: UInt8) -> Subscribers.Demand { if input == 0 { let emission: (Data, Downstream)? = state.withLock { st in guard let downstream = st.downstream else { return nil } guard !st.buffer.isEmpty else { return nil } guard st.downstreamDemand > .none else { return nil } let dataToEmit = st.buffer st.buffer = Data() st.downstreamDemand -= 1 return (dataToEmit, downstream) } if let (dataToEmit, downstream) = emission { let newDemand = downstream.receive(dataToEmit) state.withLock { st in st.downstreamDemand += newDemand } } } else { state.withLock { (existing: inout State) in existing.buffer.append(input) } } return .unlimited } func receive(completion: Subscribers.Completion) { // Snapshot what to emit and whether we still have a downstream let snapshot: (Data?, Downstream?) = state.withLock { st in let downstream = st.downstream if !st.buffer.isEmpty && st.downstreamDemand > .none { let dataToEmit = st.buffer st.buffer = Data() st.downstreamDemand -= 1 return (dataToEmit, downstream) } else { st.buffer.removeAll() st.downstream = nil st.upstream = nil return (nil, downstream) } } if let data = snapshot.0, let downstream = snapshot.1 { _ = downstream.receive(data) downstream.receive(completion: completion) } else if let downstream = snapshot.1 { downstream.receive(completion: completion) } } }