Dispatch

RSS for tag

Execute code concurrently on multicore hardware by submitting work to dispatch queues managed by the system using Dispatch.

Posts under Dispatch tag

22 Posts

Post

Replies

Boosts

Views

Activity

XPC Error: Underlying connection interrupted
Im using the low-level C xpc api <xpc/xpc.h> and i get this error when I run it: Underlying connection interrupted. I know this error stems from the call to xpc_session_send_message_with_reply_sync(session, message, &reply_err);. I have no previous experience with xpc or dispatch and I find the xpc docs very limited and I also found next to no code examples online. Can somebody take a look at my code and tell me what I did wrong and how to fix it? Thank you in advance. Main code: #include <stdio.h> #include <xpc/xpc.h> #include <dispatch/dispatch.h> // the context passed to mainf() struct context { char* text; xpc_session_t sess; }; // This is for later implementation and the name is also rudimentary void mainf(void* c) { //char * text = ((struct context*)c)->text; xpc_session_t session = ((struct context*)c)->sess; dispatch_queue_t messageq = dispatch_queue_create("y.ddd.main", DISPATCH_QUEUE_SERIAL); xpc_object_t message = xpc_dictionary_create(NULL, NULL, 0); xpc_dictionary_set_string(message, "test", "eeeee"); if (session == NULL) { printf("Session is NULL\n"); exit(1); } __block xpc_rich_error_t reply_err = NULL; __block xpc_object_t reply; dispatch_sync(messageq, ^{ reply = xpc_session_send_message_with_reply_sync(session, message, &reply_err); if (reply_err != NULL) printf("Reply Error: %s\n", xpc_rich_error_copy_description(reply_err)); }); if (reply != NULL) printf("Reply: %s\n", xpc_dictionary_get_string(reply, "test")); else printf("Reply is NULL\n"); } int main(int argc, char* argv[]) { // Create seperate queue for mainf() dispatch_queue_t mainq = dispatch_queue_create("y.ddd.main", DISPATCH_QUEUE_CONCURRENT); dispatch_queue_t xpcq = dispatch_queue_create("y.ddd.xpc", NULL); // Create the context being sent to mainf struct context* c = malloc(sizeof(struct context)); c->text = malloc(sizeof("Hello")); strcpy(c->text, "Hello"); xpc_rich_error_t sess_err = NULL; xpc_session_t session = xpc_session_create_xpc_service("y.getFilec", xpcq, XPC_SESSION_CREATE_INACTIVE, &sess_err); if (sess_err != NULL) { printf("Session Create Error: %s\n", xpc_rich_error_copy_description(sess_err)); xpc_release(sess_err); exit(1); } xpc_release(sess_err); xpc_session_set_incoming_message_handler(session, ^(xpc_object_t message) { printf("message recieved\n"); }); c->sess = session; xpc_rich_error_t sess_ac_err = NULL; xpc_session_activate(session, &sess_ac_err); if (sess_err != NULL) { printf("Session Activate Error: %s\n", xpc_rich_error_copy_description(sess_ac_err)); xpc_release(sess_ac_err); exit(1); } xpc_release(sess_ac_err); xpc_retain(session); dispatch_async_f(mainq, (void*)c, mainf); xpc_release(session); dispatch_main(); } XPC Service code: #include <stdio.h> #include <xpc/xpc.h> #include <dispatch/dispatch.h> int main(void) { xpc_rich_error_t lis_err = NULL; xpc_listener_t listener = xpc_listener_create("y.getFilec", NULL, XPC_LISTENER_CREATE_INACTIVE, ^(xpc_session_t sess){ printf("Incoming Session: %s\n", xpc_session_copy_description(sess)); xpc_session_set_incoming_message_handler(sess, ^(xpc_object_t mess) { xpc_object_t repl = xpc_dictionary_create_empty(); xpc_dictionary_set_string(repl, "test", "test"); xpc_rich_error_t send_repl_err = xpc_session_send_message(sess, repl); if (send_repl_err != NULL) printf("Send Reply Error: %s\n", xpc_rich_error_copy_description(send_repl_err)); }); xpc_rich_error_t sess_ac_err = NULL; xpc_session_activate(sess, &sess_ac_err); if (sess_ac_err != NULL) printf("Session Activate: %s\n", xpc_rich_error_copy_description(sess_ac_err)); }, &lis_err); if (lis_err != NULL) { printf("Listener Error: %s\n", xpc_rich_error_copy_description(lis_err)); xpc_release(lis_err); } xpc_rich_error_t lis_ac_err = NULL; xpc_listener_activate(listener, &lis_ac_err); if (lis_ac_err != NULL) { printf("Listener Activate Error: %s\n", xpc_rich_error_copy_description(lis_ac_err)); xpc_release(lis_ac_err); } dispatch_main(); }
1
0
450
Mar ’25
How to implement thread-safe property wrapper notifications across different contexts in Swift?
I’m trying to create a property wrapper that that can manage shared state across any context, which can get notified if changes happen from somewhere else. I'm using mutex, and getting and setting values works great. However, I can't find a way to create an observer pattern that the property wrappers can use. The problem is that I can’t trigger a notification from a different thread/context, and have that notification get called on the correct thread of the parent object that the property wrapper is used within. I would like the property wrapper to work from anywhere: a SwiftUI view, an actor, or from a class that is created in the background. The notification preferably would get called synchronously if triggered from the same thread or actor, or otherwise asynchronously. I don’t have to worry about race conditions from the notification because the state only needs to reach eventuall consistency. Here's the simplified pseudo code of what I'm trying to accomplish: // A single source of truth storage container. final class MemoryShared<Value>: Sendable { let state = Mutex<Value>(0) func withLock(_ action: (inout Value) -> Void) { state.withLock(action) notifyObservers() } func get() -> Value func notifyObservers() func addObserver() } // Some shared state used across the app static let globalCount = MemoryShared<Int>(0) // A property wrapper to access the shared state and receive changes @propertyWrapper struct SharedState<Value> { public var wrappedValue: T { get { state.get() } nonmutating set { // Can't set directly } } var publisher: Publisher {} init(state: MemoryShared) { // ... } } // I'd like to use it in multiple places: @Observable class MyObservable { @SharedState(globalCount) var count: Int } actor MyBackgroundActor { @SharedState(globalCount) var count: Int } @MainActor struct MyView: View { @SharedState(globalCount) var count: Int } What I’ve Tried All of the examples below are using the property wrapper within a @MainActor class. However the same issue happens no matter what context I use the wrapper in: The notification callback is never called on the context the property wrapper was created with. I’ve tried using @isolated(any) to capture the context of the wrapper and save it to be called within the state in with unchecked sendable, which doesn’t work: final class MemoryShared<Value: Sendable>: Sendable { // Stores the callback for later. public func subscribe(callback: @escaping @isolated(any) (Value) -> Void) -> Subscription } @propertyWrapper struct SharedState<Value> { init(state: MemoryShared<Value>) { MainActor.assertIsolated() // Works! state.subscribe { MainActor.assertIsolated() // Fails self.publisher.send() } } } I’ve tried capturing the isolation within a task with AsyncStream. This actually compiles with no sendable issues, but still fails: @propertyWrapper struct SharedState<Value> { init(isolation: isolated (any Actor)? = #isolation, state: MemoryShared<Value>) { let (taskStream, continuation) = AsyncStream<Value>.makeStream() // The shared state sends new values to the continuation. subscription = state.subscribe(continuation: continuation) MainActor.assertIsolated() // Works! let task = Task { _ = isolation for await value in taskStream { _ = isolation MainActor.assertIsolated() // Fails } } } } I’ve tried using multiple combine subjects and publishers: final class MemoryShared<Value: Sendable>: Sendable { let subject: PassthroughSubject<T, Never> // ... var publisher: Publisher {} // ... } @propertyWrapper final class SharedState<Value> { var localSubject: Subject init(state: MemoryShared<Value>) { MainActor.assertIsolated() // Works! handle = localSubject.sink { MainActor.assertIsolated() // Fails } stateHandle = state.publisher.subscribe(localSubject) } } I’ve also tried: Using NotificationCenter Making the property wrapper a class Using NSKeyValueObserving Using a box class that is stored within the wrapper. Using @_inheritActorContext. All of these don’t work, because the event is never called from the thread the property wrapper resides in. Is it possible at all to create an observation system that notifies the observer from the same context as where the observer was created? Any help would be greatly appreciated!
2
0
596
Mar ’25