[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH v3 04/24] thread-pool: Implement generic (non-AIO) pool suppo
From: |
Fabiano Rosas |
Subject: |
Re: [PATCH v3 04/24] thread-pool: Implement generic (non-AIO) pool support |
Date: |
Mon, 25 Nov 2024 17:51:42 -0300 |
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> On 25.11.2024 20:41, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Migration code wants to manage device data sending threads in one place.
>>>
>>> QEMU has an existing thread pool implementation, however it is limited
>>> to queuing AIO operations only and essentially has a 1:1 mapping between
>>> the current AioContext and the AIO ThreadPool in use.
>>>
>>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>>> GThreadPool.
>>>
>>> This brings a few new operations on a pool:
>>> * thread_pool_wait() operation waits until all the submitted work requests
>>> have finished.
>>>
>>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>>> in the pool.
>>>
>>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>>> in the pool to equal the number of still waiting in queue or unfinished
>>> work.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>> include/block/thread-pool.h | 9 +++
>>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>>> 2 files changed, 118 insertions(+)
>>>
>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>> index 6f27eb085b45..3f9f66307b65 100644
>>> --- a/include/block/thread-pool.h
>>> +++ b/include/block/thread-pool.h
>>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
>>> void *arg,
>>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext
>>> *ctx);
>>>
>>> +typedef struct ThreadPool ThreadPool;
>>> +
>>> +ThreadPool *thread_pool_new(void);
>>> +void thread_pool_free(ThreadPool *pool);
>>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> + void *opaque, GDestroyNotify opaque_destroy);
>>> +void thread_pool_wait(ThreadPool *pool);
>>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>>
>>> #endif
>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>> index 908194dc070f..d80c4181c897 100644
>>> --- a/util/thread-pool.c
>>> +++ b/util/thread-pool.c
>>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>>> qemu_mutex_destroy(&pool->lock);
>>> g_free(pool);
>>> }
>>> +
>>> +struct ThreadPool { /* type safety */
>>> + GThreadPool *t;
>>> + size_t unfinished_el_ctr;
>>> + QemuMutex unfinished_el_ctr_mutex;
>>> + QemuCond unfinished_el_ctr_zero_cond;
>>> +};
>>> +
>>> +typedef struct {
>>> + ThreadPoolFunc *func;
>>> + void *opaque;
>>> + GDestroyNotify opaque_destroy;
>>> +} ThreadPoolElement;
>>> +
>>> +static void thread_pool_func(gpointer data, gpointer user_data)
>>> +{
>>> + ThreadPool *pool = user_data;
>>> + g_autofree ThreadPoolElement *el = data;
>>> +
>>> + el->func(el->opaque);
>>> +
>>> + if (el->opaque_destroy) {
>>> + el->opaque_destroy(el->opaque);
>>> + }
>>> +
>>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>>> +
>>> + assert(pool->unfinished_el_ctr > 0);
>>> + pool->unfinished_el_ctr--;
>>> +
>>> + if (pool->unfinished_el_ctr == 0) {
>>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>>> + }
>>> +}
>>> +
>>> +ThreadPool *thread_pool_new(void)
>>> +{
>>> + ThreadPool *pool = g_new(ThreadPool, 1);
>>> +
>>> + pool->unfinished_el_ctr = 0;
>>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>>> +
>>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>>> + /*
>>> + * g_thread_pool_new() can only return errors if initial thread(s)
>>> + * creation fails but we ask for 0 initial threads above.
>>> + */
>>> + assert(pool->t);
>>> +
>>> + return pool;
>>> +}
>>> +
>>> +void thread_pool_free(ThreadPool *pool)
>>> +{
>>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>>
>> Should we make it an error to call thread_poll_free without first
>> calling thread_poll_wait? I worry the current usage will lead to having
>> two different ways of waiting with one of them (this one) being quite
>> implicit.
>>
>
> thread_pool_wait() can be used as a barrier between two sets of
> tasks executed on a thread pool without destroying it or in a performance
> sensitive path where we want to just wait for task completion while
> deferring the free operation for later, less sensitive time.
>
> I don't think requiring explicit thread_pool_wait() before
> thread_pool_free() actually gives any advantage, while at the same
> time it's making this API usage slightly more complex in cases
> where the consumer is fine with having combined wait+free semantics
> for thread_pool_free().
Fair enough,
Reviewed-by: Fabiano Rosas <farosas@suse.de>
[PATCH v3 05/24] migration: Add MIG_CMD_SWITCHOVER_START and its load handler, Maciej S. Szmigiero, 2024/11/17