qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[RFC PATCH 2/3] migration/multifd: Decouple control flow from the SYNC p


From: Fabiano Rosas
Subject: [RFC PATCH 2/3] migration/multifd: Decouple control flow from the SYNC packet
Date: Fri, 22 Sep 2023 11:53:18 -0300

We currently have the sem_sync semaphore that is used:

1) on the sending side, to know when the multifd_send_thread has
   finished sending the MULTIFD_FLAG_SYNC packet;

  This is unnecessary. Multifd sends packets (not pages) one by one
  and completion is already bound by both the channels_ready and sem
  semaphores. The SYNC packet has nothing special that would require
  it to have a separate semaphore on the sending side.

2) on the receiving side, to know when the multifd_recv_thread has
   finished receiving the MULTIFD_FLAG_SYNC packet;

  This is unnecessary because the multifd_recv_state->sem_sync
  semaphore already does the same thing. We care that the SYNC arrived
  from the source, knowing that the SYNC has been received by the recv
  thread doesn't add anything.

3) on both sending and receiving sides, to wait for the multifd threads
   to finish before cleaning up;

   This happens because multifd_send_sync_main() blocks
   ram_save_complete() from finishing until the semaphore is
   posted. This is surprising and not documented.

Clarify the above situation by renaming 'sem_sync' to 'sem_done' and
making the #3 usage the main one. Stop tracking the SYNC packet on
source (#1) and leave multifd_recv_state->sem_sync untouched on the
destination (#2).

Due to the 'channels_ready' and 'sem' semaphores, we always send
packets in lockstep with switching MultiFDSendParams, so
p->pending_job is always either 1 or 0. The thread has no knowledge of
whether it will have more to send once it posts to
channels_ready. Send it on an extra loop so it sees no pending_job and
releases the semaphore.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
 migration/multifd.c    | 89 ++++++++++++++++++++++++++++++++----------
 migration/multifd.h    |  8 ++--
 migration/trace-events |  2 +-
 3 files changed, 73 insertions(+), 26 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index d626740f2f..3d4a631915 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -541,7 +541,7 @@ void multifd_save_cleanup(void)
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
         qemu_sem_destroy(&p->sem);
-        qemu_sem_destroy(&p->sem_sync);
+        qemu_sem_destroy(&p->sem_done);
         g_free(p->name);
         p->name = NULL;
         multifd_pages_clear(p->pages);
@@ -592,7 +592,7 @@ int multifd_send_sync_main(QEMUFile *f)
 
     if (!migrate_multifd()) {
         return 0;
-    }
+
     if (multifd_send_state->pages->num) {
         if (multifd_send_pages(f) < 0) {
             error_report("%s: multifd_send_pages fail", __func__);
@@ -600,6 +600,12 @@ int multifd_send_sync_main(QEMUFile *f)
         }
     }
 
+    /* wait for all channels to be idle */
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        trace_multifd_send_sync_main_wait(p->id);
+        qemu_sem_wait(&multifd_send_state->channels_ready);
+    }
+
     /*
      * When using zero-copy, it's necessary to flush the pages before any of
      * the pages can be sent again, so we'll make sure the new version of the
@@ -610,9 +616,46 @@ int multifd_send_sync_main(QEMUFile *f)
      * to be less frequent, e.g. only after we finished one whole scanning of
      * all the dirty bitmaps.
      */
-
     flush_zero_copy = migrate_zero_copy_send();
 
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        MultiFDSendParams *p = &multifd_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        assert(!p->pending_job);
+        qemu_mutex_unlock(&p->mutex);
+
+        qemu_sem_post(&p->sem);
+        qemu_sem_wait(&p->sem_done);
+
+        if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
+            return -1;
+        }
+    }
+
+    /*
+     * All channels went idle and have no more jobs. Unless we send
+     * them more work, we're good to allow any cleanup code to run at
+     * this point.
+     */
+
+    return 0;
+}
+
+int multifd_send_sync_main(QEMUFile *f)
+{
+    int i, ret;
+
+    if (!migrate_multifd()) {
+        return 0;
+    }
+    if (multifd_send_state->pages->num) {
+        if (multifd_send_pages(f) < 0) {
+            error_report("%s: multifd_send_pages fail", __func__);
+            return -1;
+        }
+    }
+
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
@@ -633,11 +676,21 @@ int multifd_send_sync_main(QEMUFile *f)
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
     }
+
+    for (i = 0; i < migrate_multifd_channels(); i++) {
+        trace_multifd_send_wait(migrate_multifd_channels() - i);
+        qemu_sem_wait(&multifd_send_state->channels_ready);
+    }
+
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
-        trace_multifd_send_sync_main_wait(p->id);
-        qemu_sem_wait(&p->sem_sync);
+        qemu_mutex_lock(&p->mutex);
+        assert(!p->pending_job);
+        qemu_mutex_unlock(&p->mutex);
+
+        qemu_sem_post(&p->sem);
+        qemu_sem_wait(&p->sem_done);
 
         if (flush_zero_copy && p->c && (multifd_zero_copy_flush(p->c) < 0)) {
             return -1;
@@ -739,15 +792,9 @@ static void *multifd_send_thread(void *opaque)
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
 
-            if (flags & MULTIFD_FLAG_SYNC) {
-                qemu_sem_post(&p->sem_sync);
-            }
-        } else if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
         } else {
+            qemu_sem_post(&p->sem_done);
             qemu_mutex_unlock(&p->mutex);
-            /* sometimes there are spurious wakeups */
         }
     }
 
@@ -764,7 +811,7 @@ out:
      */
     if (ret != 0) {
         qemu_sem_post(&multifd_send_state->channels_ready);
-        qemu_sem_post(&p->sem_sync);
+        qemu_sem_post(&p->sem_done);
     }
 
     qemu_mutex_lock(&p->mutex);
@@ -802,7 +849,7 @@ static void multifd_tls_outgoing_handshake(QIOTask *task,
          */
         p->quit = true;
         qemu_sem_post(&multifd_send_state->channels_ready);
-        qemu_sem_post(&p->sem_sync);
+        qemu_sem_post(&p->sem_done);
     }
 }
 
@@ -880,7 +927,7 @@ static void 
multifd_new_send_channel_cleanup(MultiFDSendParams *p,
      migrate_set_error(migrate_get_current(), err);
      /* Error happen, we need to tell who pay attention to me */
      qemu_sem_post(&multifd_send_state->channels_ready);
-     qemu_sem_post(&p->sem_sync);
+     qemu_sem_post(&p->sem_done);
      /*
       * Although multifd_send_thread is not created, but main migration
       * thread need to judge whether it is running, so we need to mark
@@ -938,7 +985,7 @@ int multifd_save_setup(Error **errp)
 
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
-        qemu_sem_init(&p->sem_sync, 0);
+        qemu_sem_init(&p->sem_done, 0);
         p->quit = false;
         p->pending_job = 0;
         p->id = i;
@@ -1047,7 +1094,7 @@ void multifd_load_cleanup(void)
              * multifd_recv_thread may hung at MULTIFD_FLAG_SYNC handle code,
              * however try to wakeup it without harm in cleanup phase.
              */
-            qemu_sem_post(&p->sem_sync);
+            qemu_sem_post(&p->sem_done);
         }
 
         qemu_thread_join(&p->thread);
@@ -1059,7 +1106,7 @@ void multifd_load_cleanup(void)
         object_unref(OBJECT(p->c));
         p->c = NULL;
         qemu_mutex_destroy(&p->mutex);
-        qemu_sem_destroy(&p->sem_sync);
+        qemu_sem_destroy(&p->sem_done);
         g_free(p->name);
         p->name = NULL;
         p->packet_len = 0;
@@ -1100,7 +1147,7 @@ void multifd_recv_sync_main(void)
             }
         }
         trace_multifd_recv_sync_main_signal(p->id);
-        qemu_sem_post(&p->sem_sync);
+        qemu_sem_post(&p->sem_done);
     }
     trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
 }
@@ -1152,7 +1199,7 @@ static void *multifd_recv_thread(void *opaque)
 
         if (flags & MULTIFD_FLAG_SYNC) {
             qemu_sem_post(&multifd_recv_state->sem_sync);
-            qemu_sem_wait(&p->sem_sync);
+            qemu_sem_wait(&p->sem_done);
         }
     }
 
@@ -1195,7 +1242,7 @@ int multifd_load_setup(Error **errp)
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_init(&p->mutex);
-        qemu_sem_init(&p->sem_sync, 0);
+        qemu_sem_init(&p->sem_done, 0);
         p->quit = false;
         p->id = i;
         p->packet_len = sizeof(MultiFDPacket_t)
diff --git a/migration/multifd.h b/migration/multifd.h
index a835643b48..2d53f91da3 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -90,8 +90,8 @@ typedef struct {
 
     /* sem where to wait for more work */
     QemuSemaphore sem;
-    /* syncs main thread and channels */
-    QemuSemaphore sem_sync;
+    /* channel is done transmitting until more pages are queued */
+    QemuSemaphore sem_done;
 
     /* this mutex protects the following parameters */
     QemuMutex mutex;
@@ -153,8 +153,8 @@ typedef struct {
     /* number of pages in a full packet */
     uint32_t page_count;
 
-    /* syncs main thread and channels */
-    QemuSemaphore sem_sync;
+    /* channel is done transmitting until more pages are queued */
+    QemuSemaphore sem_done;
 
     /* this mutex protects the following parameters */
     QemuMutex mutex;
diff --git a/migration/trace-events b/migration/trace-events
index 4666f19325..4367a1a22b 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -136,7 +136,7 @@ multifd_send(uint8_t id, uint64_t packet_num, uint32_t 
normal, uint32_t flags, u
 multifd_send_error(uint8_t id) "channel %u"
 multifd_send_sync_main(long packet_num) "packet num %ld"
 multifd_send_sync_main_signal(uint8_t id) "channel %u"
-multifd_send_sync_main_wait(uint8_t id) "channel %u"
+multifd_send_wait(uint8_t n) "waiting for %u channels to finish sending"
 multifd_send_terminate_threads(bool error) "error %d"
 multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) 
"channel %u packets %" PRIu64 " normal pages %"  PRIu64
 multifd_send_thread_start(uint8_t id) "%u"
-- 
2.35.3




reply via email to

[Prev in Thread] Current Thread [Next in Thread]