Skip to content

Commit

Permalink
qio: non-default context for threaded qtask
Browse files Browse the repository at this point in the history
qio_task_run_in_thread() allows main thread to run blocking operations
in the background. However it has an assumption on that it's always
working with the default context. This patch tries to allow the threaded
QIO task framework to run with non-default gcontext.

Currently no functional change so far, so the QIOTasks are still always
running on main context.

Reviewed-by: Daniel P. Berrange <[email protected]>
Signed-off-by: Peter Xu <[email protected]>
Signed-off-by: Daniel P. Berrangé <[email protected]>
  • Loading branch information
xzpeter authored and berrange committed Mar 6, 2018
1 parent 938c8b7 commit a17536c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 8 deletions.
7 changes: 5 additions & 2 deletions include/io/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,15 +227,18 @@ QIOTask *qio_task_new(Object *source,
* @worker: the function to invoke in a thread
* @opaque: opaque data to pass to @worker
* @destroy: function to free @opaque
* @context: the context to run the complete hook. If %NULL, the
* default context will be used.
*
* Run a task in a background thread. When @worker
* returns it will call qio_task_complete() in
* the main event thread context.
* the event thread context that provided.
*/
void qio_task_run_in_thread(QIOTask *task,
QIOTaskWorker worker,
gpointer opaque,
GDestroyNotify destroy);
GDestroyNotify destroy,
GMainContext *context);

/**
* qio_task_complete:
Expand Down
9 changes: 6 additions & 3 deletions io/channel-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
qio_task_run_in_thread(task,
qio_channel_socket_connect_worker,
addrCopy,
(GDestroyNotify)qapi_free_SocketAddress);
(GDestroyNotify)qapi_free_SocketAddress,
NULL);
}


Expand Down Expand Up @@ -246,7 +247,8 @@ void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
qio_task_run_in_thread(task,
qio_channel_socket_listen_worker,
addrCopy,
(GDestroyNotify)qapi_free_SocketAddress);
(GDestroyNotify)qapi_free_SocketAddress,
NULL);
}


Expand Down Expand Up @@ -322,7 +324,8 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
qio_task_run_in_thread(task,
qio_channel_socket_dgram_worker,
data,
qio_channel_socket_dgram_worker_free);
qio_channel_socket_dgram_worker_free,
NULL);
}


Expand Down
3 changes: 2 additions & 1 deletion io/dns-resolver.c
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ void qio_dns_resolver_lookup_async(QIODNSResolver *resolver,
qio_task_run_in_thread(task,
qio_dns_resolver_lookup_worker,
data,
qio_dns_resolver_lookup_data_free);
qio_dns_resolver_lookup_data_free,
NULL);
}


Expand Down
20 changes: 18 additions & 2 deletions io/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ struct QIOTaskThreadData {
QIOTaskWorker worker;
gpointer opaque;
GDestroyNotify destroy;
GMainContext *context;
};


Expand All @@ -91,6 +92,10 @@ static gboolean qio_task_thread_result(gpointer opaque)
data->destroy(data->opaque);
}

if (data->context) {
g_main_context_unref(data->context);
}

g_free(data);

return FALSE;
Expand All @@ -100,6 +105,7 @@ static gboolean qio_task_thread_result(gpointer opaque)
static gpointer qio_task_thread_worker(gpointer opaque)
{
struct QIOTaskThreadData *data = opaque;
GSource *idle;

trace_qio_task_thread_run(data->task);
data->worker(data->task, data->opaque);
Expand All @@ -110,23 +116,33 @@ static gpointer qio_task_thread_worker(gpointer opaque)
* the worker results
*/
trace_qio_task_thread_exit(data->task);
g_idle_add(qio_task_thread_result, data);

idle = g_idle_source_new();
g_source_set_callback(idle, qio_task_thread_result, data, NULL);
g_source_attach(idle, data->context);

return NULL;
}


void qio_task_run_in_thread(QIOTask *task,
QIOTaskWorker worker,
gpointer opaque,
GDestroyNotify destroy)
GDestroyNotify destroy,
GMainContext *context)
{
struct QIOTaskThreadData *data = g_new0(struct QIOTaskThreadData, 1);
QemuThread thread;

if (context) {
g_main_context_ref(context);
}

data->task = task;
data->worker = worker;
data->opaque = opaque;
data->destroy = destroy;
data->context = context;

trace_qio_task_thread_start(task, worker, opaque);
qemu_thread_create(&thread,
Expand Down
2 changes: 2 additions & 0 deletions tests/test-io-task.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ static void test_task_thread_complete(void)
qio_task_run_in_thread(task,
test_task_thread_worker,
&data,
NULL,
NULL);

g_main_loop_run(data.loop);
Expand Down Expand Up @@ -228,6 +229,7 @@ static void test_task_thread_failure(void)
qio_task_run_in_thread(task,
test_task_thread_worker,
&data,
NULL,
NULL);

g_main_loop_run(data.loop);
Expand Down

0 comments on commit a17536c

Please sign in to comment.