Hello everyone. I encountered a problem while trying to reproduce a real scenario in a simulated project using Combine. The idea is the following: I have a first request to a repository which performs some async task and returns the result via a Future publisher. Based on the results of this request I want to:
- open a specific number of streams(publishers),
- merge all the streams into one
- monitor the stream and transform the values before the final publisher is returned
- return the merged stream.
Here is the code:
First the View Model which will make a request to the interactor. The request will return a publisher which will publish a list of results, which will be reflected in the view using the published property(the view will call openCombinedStream via a button press).
class FakeViewModel: ObservableObject {
let interactor = FakeInteractor()
@Published var values: [SearchResult] = []
private var subscriber: AnyCancellable?
func openCombinedStream() {
self.subscriber = self.interactor.provideResults(for: "")
.sink(receiveCompletion: { completion in
}, receiveValue: { newValues in
debugPrint("-----------------------------")
self.values = newValues
debugPrint("-----------------------------")
})
}
}
The view is something like this:
import SwiftUI
struct ContentView: View {
@StateObject private var viewModel = FakeViewModel()
var body: some View {
VStack {
Button(action: {
viewModel.openCombinedStream()
}, label: {
Text("Search")
.padding()
})
HStack {
VStack {
Text("Item id")
ForEach(vm.values, id: \.id) { item in
Text("\(item.id)")
}
}
VStack {
Text("Item value")
ForEach(vm.values, id: \.id) { item in
Text("\(item.value)")
}
}
}
}
.padding()
}
}
The interactor has the following implementation:
typealias SearchResult = (value: String, id: Int)
typealias ResultsPublisher = AnyPublisher<[SearchResult], DomainError>
typealias SearchRepoResult = Result<[SearchResult], DomainError>
class FakeInteractor {
let fakeStreamingRepo = FakeStreamingRepo()
let fakeSearchRepo = FakeSearchRepo()
var subscribers: [AnyCancellable] = []
func provideResults(for query: String) -> ResultsPublisher {
let subject = PassthroughSubject<[SearchResult], DomainError>()
queue.async {
let future = self.fakeSearchRepo.provideResultsPublisher(for: "")
let futureSubscriber = future
.sink(receiveCompletion: { _ in }, receiveValue: { newList in
var results = newList
let mergedSubscriber = Publishers.MergeMany(results.map{ item -> AnyPublisher<SearchResult, DomainError> in
return self.fakeStreamingRepo.subscribe(to: item.id)
})
.sink(receiveCompletion: { _ in }, receiveValue: { newValue in
if let index = results.firstIndex(where: { $0.id == newValue.id }) {
results[index] = newValue
subject.send(results)
}
})
self.subscribers.append(mergedSubscriber)
})
self.subscribers.append(futureSubscriber)
}
return subject
.eraseToAnyPublisher()
}
}
enum DomainError: Error { }
let queue = DispatchQueue(label: "")
The Search Repo:
typealias SearchRequest = String
class FakeSearchRepo {
func provideSearchResult(for request: SearchRequest) -> SearchRepoResult {
return .success([(value: "10", id: 1),
(value: "20", id: 2),
(value: "30", id: 3),
(value: "40", id: 4),
(value: "50", id: 5)])
}
func provideResultsPublisher(for request: SearchRequest) -> AnyPublisher<[SearchResult], DomainError> {
Future<[SearchResult], DomainError>({ promise in
let result: SearchRepoResult = self.provideSearchResult(for: request)
switch result {
case .success(let response):
sleep(3)
promise(.success(response))
case .failure(let error):
promise(.failure(error))
}
})
.eraseToAnyPublisher()
}
}
And the streaming repo:
typealias TopicRequest = Int
class FakeStreamingRepo {
func subscribe(to topic: Int) -> AnyPublisher<SearchResult, DomainError> {
let subject = PassthroughSubject<SearchResult, DomainError>()
_ = Timer.scheduledTimer(withTimeInterval: Double.random(in: 0...5), repeats: true) { _ in
switch topic {
case _ where topic == 1:
let value = String(format: "%.2f", Double.random(in: 0...10))
debugPrint("Sending: value for category 1: \((value))")
subject.send((value: value, id: topic))
case _ where topic == 2:
let value = String(format: "%.2f", Double.random(in: 10...20))
debugPrint("Sending: value for category 2: \(value)")
subject.send((value: value, id: topic))
case _ where topic == 3:
let value = String(format: "%.2f", Double.random(in: 20...30))
debugPrint("Sending: value for category 3: \(value)")
subject.send((value: value, id: topic))
case _ where topic == 4:
let value = String(format: "%.2f", Double.random(in: 30...40))
debugPrint("Sending: value for category 4: \(value)")
subject.send((value: value, id: topic))
default:
let value = String(format: "%.2f", Double.random(in: 40...50))
debugPrint("Sending: value for category 5: \(value)")
subject.send((value: value, id: topic))
}
}
return subject.eraseToAnyPublisher()
}
}
The problem is that the publisher returned from the method openCombinedStream does not work unless I specify the runloop to be the main one with .receive(on: RunLoop.main)
inside the method before the flatMap. What would be the best approach to this scenario?