Combine repositories sink with Future

Hello everyone. I encountered a problem while trying to reproduce a real scenario in a simulated project using Combine. The idea is the following: I have a first request to a repository which performs some async task and returns the result via a Future publisher. Based on the results of this request I want to:

  1. open a specific number of streams(publishers),
  2. merge all the streams into one
  3. monitor the stream and transform the values before the final publisher is returned
  4. return the merged stream.

Here is the code:

First the View Model which will make a request to the interactor. The request will return a publisher which will publish a list of results, which will be reflected in the view using the published property(the view will call openCombinedStream via a button press).

class FakeViewModel: ObservableObject {

    let interactor = FakeInteractor()

    @Published var values: [SearchResult] = []

    private var subscriber: AnyCancellable?

    func openCombinedStream() {

            self.subscriber = self.interactor.provideResults(for: "")

                .sink(receiveCompletion: { completion in

            }, receiveValue: { newValues in

                debugPrint("-----------------------------")
                self.values = newValues
                debugPrint("-----------------------------")
            })
    }
}

The view is something like this:

import SwiftUI

struct ContentView: View {

    @StateObject private var viewModel = FakeViewModel()

    var body: some View {
        VStack {
            Button(action: {
                viewModel.openCombinedStream()
            }, label: {
                Text("Search")
                    .padding()
            })
            HStack  {
                VStack {
                    Text("Item id")
                    ForEach(vm.values, id: \.id) { item in
                        Text("\(item.id)")
                    }
                }
                VStack {
                    Text("Item value")
                    ForEach(vm.values, id: \.id) { item in
                        Text("\(item.value)")
                    }
                }
            }
        }
        .padding()
    }
}

The interactor has the following implementation:

typealias SearchResult = (value: String, id: Int)
typealias ResultsPublisher = AnyPublisher<[SearchResult], DomainError>
typealias SearchRepoResult = Result<[SearchResult], DomainError>

class FakeInteractor {

    let fakeStreamingRepo = FakeStreamingRepo()
    let fakeSearchRepo = FakeSearchRepo()

    var subscribers: [AnyCancellable] = []

    func provideResults(for query: String) -> ResultsPublisher {

        let subject = PassthroughSubject<[SearchResult], DomainError>()

        queue.async {
            let future = self.fakeSearchRepo.provideResultsPublisher(for: "")
            let futureSubscriber = future
                .sink(receiveCompletion: { _ in }, receiveValue: { newList in
                    var results = newList
                    let mergedSubscriber = Publishers.MergeMany(results.map{ item -> AnyPublisher<SearchResult, DomainError> in
                        return self.fakeStreamingRepo.subscribe(to: item.id)
                    })
                        .sink(receiveCompletion: { _ in }, receiveValue: { newValue in
                            if let index = results.firstIndex(where: { $0.id == newValue.id }) {
                                results[index] = newValue
                                subject.send(results)
                            }
                        })
                    self.subscribers.append(mergedSubscriber)
                })
            self.subscribers.append(futureSubscriber)
        }
        return subject
            .eraseToAnyPublisher()
    }
}

enum DomainError: Error { }

let queue = DispatchQueue(label: "")

The Search Repo:

typealias SearchRequest = String

class FakeSearchRepo {

    func provideSearchResult(for request: SearchRequest) -> SearchRepoResult {
        return .success([(value: "10", id: 1),
                         (value: "20", id: 2),
                         (value: "30", id: 3),
                         (value: "40", id: 4),
                         (value: "50", id: 5)])
    }

    func provideResultsPublisher(for request: SearchRequest) -> AnyPublisher<[SearchResult], DomainError> {
        Future<[SearchResult], DomainError>({ promise in
                let result: SearchRepoResult = self.provideSearchResult(for: request)
                switch result {
                    case .success(let response):
                        sleep(3)
                        promise(.success(response))
                    case .failure(let error):
                        promise(.failure(error))
            }
        })
            .eraseToAnyPublisher()
    }
}

And the streaming repo:

typealias TopicRequest = Int

class FakeStreamingRepo {

    func subscribe(to topic: Int) -> AnyPublisher<SearchResult, DomainError> {

        let subject = PassthroughSubject<SearchResult, DomainError>()
        _ = Timer.scheduledTimer(withTimeInterval: Double.random(in: 0...5), repeats: true) { _ in
            switch topic {
                case _ where topic == 1:
                    let value = String(format: "%.2f", Double.random(in: 0...10))
                    debugPrint("Sending: value for category 1: \((value))")
                    subject.send((value: value, id: topic))
                case _ where topic == 2:
                    let value = String(format: "%.2f", Double.random(in: 10...20))
                    debugPrint("Sending: value for category 2: \(value)")
                    subject.send((value: value, id: topic))
                case _ where topic == 3:
                    let value = String(format: "%.2f", Double.random(in: 20...30))
                    debugPrint("Sending: value for category 3: \(value)")
                    subject.send((value: value, id: topic))
                case _ where topic == 4:
                    let value = String(format: "%.2f", Double.random(in: 30...40))
                    debugPrint("Sending: value for category 4: \(value)")
                    subject.send((value: value, id: topic))
                default:
                    let value = String(format: "%.2f", Double.random(in: 40...50))
                    debugPrint("Sending: value for category 5: \(value)")
                    subject.send((value: value, id: topic))
            }
        }
        return subject.eraseToAnyPublisher()
    }
}

The problem is that the publisher returned from the method openCombinedStream does not work unless I specify the runloop to be the main one with .receive(on: RunLoop.main) inside the method before the flatMap. What would be the best approach to this scenario?

Combine repositories sink with Future
 
 
Q