qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [External] Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchr


From: Hao Xiang
Subject: Re: [External] Re: [PATCH v2 09/20] util/dsa: Implement DSA task asynchronous completion thread model.
Date: Mon, 18 Dec 2023 10:57:25 -0800

On Sun, Dec 17, 2023 at 7:11 PM Wang, Lei <lei4.wang@intel.com> wrote:
>
> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA 
> task
> completion.
> > * DSA completion thread runs a loop and poll for completed tasks.
> > * Start and stop DSA completion thread during DSA device start stop.
> >
> > User space application can directly submit task to Intel DSA
> > accelerator by writing to DSA's device memory (mapped in user space).
>
> > +            }
> > +            return;
> > +        }
> > +    } else {
> > +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> > +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
>
> Nit: indentation is broken here.
>
> > +    }
> > +
> > +    for (int i = 0; i < count; i++) {
> > +
> > +        completion = &batch_task->completions[i];
> > +        status = completion->status;
> > +
> > +        if (status == DSA_COMP_SUCCESS) {
> > +            results[i] = (completion->result == 0);
> > +            continue;
> > +        }
> > +
> > +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> > +            fprintf(stderr,
> > +                    "Unexpected completion status = %u.\n", status);
> > +            assert(false);
> > +        }
> > +    }
> > +}
> > +
> > +/**
> > + * @brief Handles an asynchronous DSA batch task completion.
> > + *
> > + * @param task A pointer to the batch buffer zero task structure.
> > + */
> > +static void
> > +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> > +{
> > +    batch_task->status = DSA_TASK_COMPLETION;
> > +    batch_task->completion_callback(batch_task);
> > +}
> > +
> > +/**
> > + * @brief The function entry point called by a dedicated DSA
> > + *        work item completion thread.
> > + *
> > + * @param opaque A pointer to the thread context.
> > + *
> > + * @return void* Not used.
> > + */
> > +static void *
> > +dsa_completion_loop(void *opaque)
>
> Per my understanding, if a multifd sending thread corresponds to a DSA device,
> then the batch tasks are executed in parallel which means a task may be
> completed slower than another even if this task is enqueued earlier than it. 
> If
> we poll on the slower task first it will block the handling of the faster one,
> even if the zero checking task for that thread is finished and it can go ahead
> and send the data to the wire, this may lower the network resource 
> utilization.
>

Hi Lei, thanks for reviewing. You are correct that we can keep pulling
a task enqueued first while others in the queue have already been
completed. In fact, only one DSA completion thread (pulling thread) is
used here even when multiple DSA devices are used. The pulling loop is
the most CPU intensive activity in the DSA workflow and that acts
directly against the goal of saving CPU usage. The trade-off I want to
take here is a slightly higher latency on DSA task completion but more
CPU savings. A single DSA engine can reach 30 GB/s throughput on
memory comparison operation. We use kernel tcp stack for network
transfer. The best I see is around 10GB/s throughput.  RDMA can
potentially go higher but I am not sure if it can go higher than 30
GB/s throughput anytime soon.

> > +{
> > +    struct dsa_completion_thread *thread_context =
> > +        (struct dsa_completion_thread *)opaque;
> > +    struct buffer_zero_batch_task *batch_task;
> > +    struct dsa_device_group *group = thread_context->group;
> > +
> > +    rcu_register_thread();
> > +
> > +    thread_context->thread_id = qemu_get_thread_id();
> > +    qemu_sem_post(&thread_context->sem_init_done);
> > +
> > +    while (thread_context->running) {
> > +        batch_task = dsa_task_dequeue(group);
> > +        assert(batch_task != NULL || !group->running);
> > +        if (!group->running) {
> > +            assert(!thread_context->running);
> > +            break;
> > +        }
> > +        if (batch_task->task_type == DSA_TASK) {
> > +            poll_task_completion(batch_task);
> > +        } else {
> > +            assert(batch_task->task_type == DSA_BATCH_TASK);
> > +            poll_batch_task_completion(batch_task);
> > +        }
> > +
> > +        dsa_batch_task_complete(batch_task);
> > +    }
> > +
> > +    rcu_unregister_thread();
> > +    return NULL;
> > +}
> > +
> > +/**
> > + * @brief Initializes a DSA completion thread.
> > + *
> > + * @param completion_thread A pointer to the completion thread context.
> > + * @param group A pointer to the DSA device group.
> > + */
> > +static void
> > +dsa_completion_thread_init(
> > +    struct dsa_completion_thread *completion_thread,
> > +    struct dsa_device_group *group)
> > +{
> > +    completion_thread->stopping = false;
> > +    completion_thread->running = true;
> > +    completion_thread->thread_id = -1;
> > +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> > +    completion_thread->group = group;
> > +
> > +    qemu_thread_create(&completion_thread->thread,
> > +                       DSA_COMPLETION_THREAD,
> > +                       dsa_completion_loop,
> > +                       completion_thread,
> > +                       QEMU_THREAD_JOINABLE);
> > +
> > +    /* Wait for initialization to complete */
> > +    while (completion_thread->thread_id == -1) {
> > +        qemu_sem_wait(&completion_thread->sem_init_done);
> > +    }
> > +}
> > +
> > +/**
> > + * @brief Stops the completion thread (and implicitly, the device group).
> > + *
> > + * @param opaque A pointer to the completion thread.
> > + */
> > +static void dsa_completion_thread_stop(void *opaque)
> > +{
> > +    struct dsa_completion_thread *thread_context =
> > +        (struct dsa_completion_thread *)opaque;
> > +
> > +    struct dsa_device_group *group = thread_context->group;
> > +
> > +    qemu_mutex_lock(&group->task_queue_lock);
> > +
> > +    thread_context->stopping = true;
> > +    thread_context->running = false;
> > +
> > +    dsa_device_group_stop(group);
> > +
> > +    qemu_cond_signal(&group->task_queue_cond);
> > +    qemu_mutex_unlock(&group->task_queue_lock);
> > +
> > +    qemu_thread_join(&thread_context->thread);
> > +
> > +    qemu_sem_destroy(&thread_context->sem_init_done);
> > +}
> > +
> >  /**
> >   * @brief Check if DSA is running.
> >   *
> > @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task 
> > *batch_task)
> >   */
> >  bool dsa_is_running(void)
> >  {
> > -    return false;
> > +    return completion_thread.running;
> >  }
> >
> >  static void
> > @@ -481,6 +720,7 @@ void dsa_start(void)
> >          return;
> >      }
> >      dsa_device_group_start(&dsa_group);
> > +    dsa_completion_thread_init(&completion_thread, &dsa_group);
> >  }
> >
> >  /**
> > @@ -496,6 +736,7 @@ void dsa_stop(void)
> >          return;
> >      }
> >
> > +    dsa_completion_thread_stop(&completion_thread);
> >      dsa_empty_task_queue(group);
> >  }
> >



reply via email to

[Prev in Thread] Current Thread [Next in Thread]