Im using the low-level C xpc api <xpc/xpc.h> and i get this error when I run it: Underlying connection interrupted. I know this error stems from the call to xpc_session_send_message_with_reply_sync(session, message, &reply_err);. I have no previous experience with xpc or dispatch and I find the xpc docs very limited and I also found next to no code examples online. Can somebody take a look at my code and tell me what I did wrong and how to fix it? Thank you in advance.
Main code:
#include <stdio.h>
#include <xpc/xpc.h>
#include <dispatch/dispatch.h>
// the context passed to mainf()
struct context {
char* text;
xpc_session_t sess;
};
// This is for later implementation and the name is also rudimentary
void mainf(void* c) {
//char * text = ((struct context*)c)->text;
xpc_session_t session = ((struct context*)c)->sess;
dispatch_queue_t messageq = dispatch_queue_create("y.ddd.main",
DISPATCH_QUEUE_SERIAL);
xpc_object_t message = xpc_dictionary_create(NULL, NULL, 0);
xpc_dictionary_set_string(message, "test", "eeeee");
if (session == NULL) {
printf("Session is NULL\n");
exit(1);
}
__block xpc_rich_error_t reply_err = NULL;
__block xpc_object_t reply;
dispatch_sync(messageq, ^{
reply = xpc_session_send_message_with_reply_sync(session,
message,
&reply_err);
if (reply_err != NULL) printf("Reply Error: %s\n",
xpc_rich_error_copy_description(reply_err));
});
if (reply != NULL)
printf("Reply: %s\n", xpc_dictionary_get_string(reply, "test"));
else printf("Reply is NULL\n");
}
int main(int argc, char* argv[]) {
// Create seperate queue for mainf()
dispatch_queue_t mainq = dispatch_queue_create("y.ddd.main",
DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t xpcq = dispatch_queue_create("y.ddd.xpc",
NULL);
// Create the context being sent to mainf
struct context* c = malloc(sizeof(struct context));
c->text = malloc(sizeof("Hello"));
strcpy(c->text, "Hello");
xpc_rich_error_t sess_err = NULL;
xpc_session_t session = xpc_session_create_xpc_service("y.getFilec",
xpcq,
XPC_SESSION_CREATE_INACTIVE,
&sess_err);
if (sess_err != NULL) {
printf("Session Create Error: %s\n",
xpc_rich_error_copy_description(sess_err));
xpc_release(sess_err);
exit(1);
}
xpc_release(sess_err);
xpc_session_set_incoming_message_handler(session, ^(xpc_object_t message) {
printf("message recieved\n");
});
c->sess = session;
xpc_rich_error_t sess_ac_err = NULL;
xpc_session_activate(session, &sess_ac_err);
if (sess_err != NULL) {
printf("Session Activate Error: %s\n",
xpc_rich_error_copy_description(sess_ac_err));
xpc_release(sess_ac_err);
exit(1);
}
xpc_release(sess_ac_err);
xpc_retain(session);
dispatch_async_f(mainq, (void*)c, mainf);
xpc_release(session);
dispatch_main();
}
XPC Service code:
#include <stdio.h>
#include <xpc/xpc.h>
#include <dispatch/dispatch.h>
int main(void) {
xpc_rich_error_t lis_err = NULL;
xpc_listener_t listener = xpc_listener_create("y.getFilec",
NULL,
XPC_LISTENER_CREATE_INACTIVE,
^(xpc_session_t sess){
printf("Incoming Session: %s\n", xpc_session_copy_description(sess));
xpc_session_set_incoming_message_handler(sess,
^(xpc_object_t mess) {
xpc_object_t repl = xpc_dictionary_create_empty();
xpc_dictionary_set_string(repl, "test", "test");
xpc_rich_error_t send_repl_err = xpc_session_send_message(sess, repl);
if (send_repl_err != NULL) printf("Send Reply Error: %s\n",
xpc_rich_error_copy_description(send_repl_err));
});
xpc_rich_error_t sess_ac_err = NULL;
xpc_session_activate(sess, &sess_ac_err);
if (sess_ac_err != NULL) printf("Session Activate: %s\n",
xpc_rich_error_copy_description(sess_ac_err));
},
&lis_err);
if (lis_err != NULL) {
printf("Listener Error: %s\n", xpc_rich_error_copy_description(lis_err));
xpc_release(lis_err);
}
xpc_rich_error_t lis_ac_err = NULL;
xpc_listener_activate(listener, &lis_ac_err);
if (lis_ac_err != NULL) {
printf("Listener Activate Error: %s\n", xpc_rich_error_copy_description(lis_ac_err));
xpc_release(lis_ac_err);
}
dispatch_main();
}
Dispatch
RSS for tagExecute code concurrently on multicore hardware by submitting work to dispatch queues managed by the system using Dispatch.
Posts under Dispatch tag
22 Posts
Selecting any option will automatically load the page
Post
Replies
Boosts
Views
Activity
I’m trying to create a property wrapper that that can manage shared state across any context, which can get notified if changes happen from somewhere else.
I'm using mutex, and getting and setting values works great. However, I can't find a way to create an observer pattern that the property wrappers can use.
The problem is that I can’t trigger a notification from a different thread/context, and have that notification get called on the correct thread of the parent object that the property wrapper is used within.
I would like the property wrapper to work from anywhere: a SwiftUI view, an actor, or from a class that is created in the background. The notification preferably would get called synchronously if triggered from the same thread or actor, or otherwise asynchronously. I don’t have to worry about race conditions from the notification because the state only needs to reach eventuall consistency.
Here's the simplified pseudo code of what I'm trying to accomplish:
// A single source of truth storage container.
final class MemoryShared<Value>: Sendable {
let state = Mutex<Value>(0)
func withLock(_ action: (inout Value) -> Void) {
state.withLock(action)
notifyObservers()
}
func get() -> Value
func notifyObservers()
func addObserver()
}
// Some shared state used across the app
static let globalCount = MemoryShared<Int>(0)
// A property wrapper to access the shared state and receive changes
@propertyWrapper
struct SharedState<Value> {
public var wrappedValue: T {
get { state.get() }
nonmutating set { // Can't set directly }
}
var publisher: Publisher {}
init(state: MemoryShared) {
// ...
}
}
// I'd like to use it in multiple places:
@Observable
class MyObservable {
@SharedState(globalCount)
var count: Int
}
actor MyBackgroundActor {
@SharedState(globalCount)
var count: Int
}
@MainActor
struct MyView: View {
@SharedState(globalCount)
var count: Int
}
What I’ve Tried
All of the examples below are using the property wrapper within a @MainActor class. However the same issue happens no matter what context I use the wrapper in: The notification callback is never called on the context the property wrapper was created with.
I’ve tried using @isolated(any) to capture the context of the wrapper and save it to be called within the state in with unchecked sendable, which doesn’t work:
final class MemoryShared<Value: Sendable>: Sendable {
// Stores the callback for later.
public func subscribe(callback: @escaping @isolated(any) (Value) -> Void) -> Subscription
}
@propertyWrapper
struct SharedState<Value> {
init(state: MemoryShared<Value>) {
MainActor.assertIsolated() // Works!
state.subscribe {
MainActor.assertIsolated() // Fails
self.publisher.send()
}
}
}
I’ve tried capturing the isolation within a task with AsyncStream. This actually compiles with no sendable issues, but still fails:
@propertyWrapper
struct SharedState<Value> {
init(isolation: isolated (any Actor)? = #isolation, state: MemoryShared<Value>) {
let (taskStream, continuation) = AsyncStream<Value>.makeStream()
// The shared state sends new values to the continuation.
subscription = state.subscribe(continuation: continuation)
MainActor.assertIsolated() // Works!
let task = Task {
_ = isolation
for await value in taskStream {
_ = isolation
MainActor.assertIsolated() // Fails
}
}
}
}
I’ve tried using multiple combine subjects and publishers:
final class MemoryShared<Value: Sendable>: Sendable {
let subject: PassthroughSubject<T, Never> // ...
var publisher: Publisher {} // ...
}
@propertyWrapper
final class SharedState<Value> {
var localSubject: Subject
init(state: MemoryShared<Value>) {
MainActor.assertIsolated() // Works!
handle = localSubject.sink {
MainActor.assertIsolated() // Fails
}
stateHandle = state.publisher.subscribe(localSubject)
}
}
I’ve also tried:
Using NotificationCenter
Making the property wrapper a class
Using NSKeyValueObserving
Using a box class that is stored within the wrapper.
Using @_inheritActorContext.
All of these don’t work, because the event is never called from the thread the property wrapper resides in.
Is it possible at all to create an observation system that notifies the observer from the same context as where the observer was created?
Any help would be greatly appreciated!