gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[gnurl] 116/264: sockfilt: fix race-condition of waiting threads and eve


From: gnunet
Subject: [gnurl] 116/264: sockfilt: fix race-condition of waiting threads and event handling
Date: Thu, 30 Apr 2020 16:06:59 +0200

This is an automated email from the git hooks/post-receive script.

nikita pushed a commit to branch master
in repository gnurl.

commit 9657ecb15b387ce86421af738045ede8a72aab13
Author: Marc Hoersken <address@hidden>
AuthorDate: Thu Apr 2 18:41:11 2020 +0200

    sockfilt: fix race-condition of waiting threads and event handling
    
    Fix race-condition of waiting threads finishing while events are
    already being processed which lead to invalid or skipped events.
    
    Use mutex to check for one event at a time or do post-processing.
    In addition to mutex-based locking use specific event as signal.
    
    Closes #5156
---
 tests/server/sockfilt.c | 246 ++++++++++++++++++++++++++++++++----------------
 1 file changed, 163 insertions(+), 83 deletions(-)

diff --git a/tests/server/sockfilt.c b/tests/server/sockfilt.c
index 5403d7c42..9c43f90e4 100644
--- a/tests/server/sockfilt.c
+++ b/tests/server/sockfilt.c
@@ -532,12 +532,14 @@ static void lograw(unsigned char *buffer, ssize_t len)
  */
 struct select_ws_wait_data {
   HANDLE handle; /* actual handle to wait for during select */
-  HANDLE event;  /* internal event to abort waiting thread */
+  HANDLE signal; /* internal event to signal handle trigger */
+  HANDLE abort;  /* internal event to abort waiting thread */
+  HANDLE mutex;  /* mutex to prevent event race-condition */
 };
 static DWORD WINAPI select_ws_wait_thread(LPVOID lpParameter)
 {
   struct select_ws_wait_data *data;
-  HANDLE handle, handles[2];
+  HANDLE mutex, signal, handle, handles[2];
   INPUT_RECORD inputrecord;
   LARGE_INTEGER size, pos;
   DWORD type, length;
@@ -546,8 +548,10 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID 
lpParameter)
   data = (struct select_ws_wait_data *) lpParameter;
   if(data) {
     handle = data->handle;
-    handles[0] = data->event;
+    handles[0] = data->abort;
     handles[1] = handle;
+    signal = data->signal;
+    mutex = data->mutex;
     free(data);
   }
   else
@@ -567,30 +571,41 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID 
lpParameter)
         */
       while(WaitForMultipleObjectsEx(1, handles, FALSE, 0, FALSE)
             == WAIT_TIMEOUT) {
-        /* get total size of file */
-        length = 0;
-        size.QuadPart = 0;
-        size.LowPart = GetFileSize(handle, &length);
-        if((size.LowPart != INVALID_FILE_SIZE) ||
-           (GetLastError() == NO_ERROR)) {
-          size.HighPart = length;
-          /* get the current position within the file */
-          pos.QuadPart = 0;
-          pos.LowPart = SetFilePointer(handle, 0, &pos.HighPart,
-                                       FILE_CURRENT);
-          if((pos.LowPart != INVALID_SET_FILE_POINTER) ||
-             (GetLastError() == NO_ERROR)) {
-            /* compare position with size, abort if not equal */
-            if(size.QuadPart == pos.QuadPart) {
-              /* sleep and continue waiting */
-              SleepEx(0, FALSE);
-              continue;
+        length = WaitForSingleObjectEx(mutex, 0, FALSE);
+        if(length == WAIT_OBJECT_0) {
+          /* get total size of file */
+          length = 0;
+          size.QuadPart = 0;
+          size.LowPart = GetFileSize(handle, &length);
+          if((size.LowPart != INVALID_FILE_SIZE) ||
+            (GetLastError() == NO_ERROR)) {
+            size.HighPart = length;
+            /* get the current position within the file */
+            pos.QuadPart = 0;
+            pos.LowPart = SetFilePointer(handle, 0, &pos.HighPart,
+                                        FILE_CURRENT);
+            if((pos.LowPart != INVALID_SET_FILE_POINTER) ||
+              (GetLastError() == NO_ERROR)) {
+              /* compare position with size, abort if not equal */
+              if(size.QuadPart == pos.QuadPart) {
+                /* sleep and continue waiting */
+                SleepEx(0, FALSE);
+                ReleaseMutex(mutex);
+                continue;
+              }
             }
           }
+          /* there is some data available, stop waiting */
+          logmsg("[select_ws_wait_thread] data available, DISK: %p", handle);
+          SetEvent(signal);
+          ReleaseMutex(mutex);
+          break;
+        }
+        else if(length == WAIT_ABANDONED) {
+          /* we are not allowed to process this event, because select_ws
+             is post-processing the signalled events and we must exit. */
+          break;
         }
-        /* there is some data available, stop waiting */
-        logmsg("[select_ws_wait_thread] data available on DISK: %p", handle);
-        break;
       }
       break;
 
@@ -604,23 +619,34 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID 
lpParameter)
         */
       while(WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE)
             == WAIT_OBJECT_0 + 1) {
-        /* check if this is an actual console handle */
-        length = 0;
-        if(GetConsoleMode(handle, &length)) {
-          /* retrieve an event from the console buffer */
+        length = WaitForSingleObjectEx(mutex, 0, FALSE);
+        if(length == WAIT_OBJECT_0) {
+          /* check if this is an actual console handle */
           length = 0;
-          if(PeekConsoleInput(handle, &inputrecord, 1, &length)) {
-            /* check if the event is not an actual key-event */
-            if(length == 1 && inputrecord.EventType != KEY_EVENT) {
-              /* purge the non-key-event and continue waiting */
-              ReadConsoleInput(handle, &inputrecord, 1, &length);
-              continue;
+          if(GetConsoleMode(handle, &length)) {
+            /* retrieve an event from the console buffer */
+            length = 0;
+            if(PeekConsoleInput(handle, &inputrecord, 1, &length)) {
+              /* check if the event is not an actual key-event */
+              if(length == 1 && inputrecord.EventType != KEY_EVENT) {
+                /* purge the non-key-event and continue waiting */
+                ReadConsoleInput(handle, &inputrecord, 1, &length);
+                ReleaseMutex(mutex);
+                continue;
+              }
             }
           }
+          /* there is some data available, stop waiting */
+          logmsg("[select_ws_wait_thread] data available, CHAR: %p", handle);
+          SetEvent(signal);
+          ReleaseMutex(mutex);
+          break;
+        }
+        else if(length == WAIT_ABANDONED) {
+          /* we are not allowed to process this event, because select_ws
+             is post-processing the signalled events and we must exit. */
+          break;
         }
-        /* there is some data available, stop waiting */
-        logmsg("[select_ws_wait_thread] data available on CHAR: %p", handle);
-        break;
       }
       break;
 
@@ -634,43 +660,62 @@ static DWORD WINAPI select_ws_wait_thread(LPVOID 
lpParameter)
         */
       while(WaitForMultipleObjectsEx(1, handles, FALSE, 0, FALSE)
             == WAIT_TIMEOUT) {
-        /* peek into the pipe and retrieve the amount of data available */
-        length = 0;
-        if(PeekNamedPipe(handle, NULL, 0, NULL, &length, NULL)) {
-          /* if there is no data available, sleep and continue waiting */
-          if(length == 0) {
-            SleepEx(0, FALSE);
-            continue;
+        length = WaitForSingleObjectEx(mutex, 0, FALSE);
+        if(length == WAIT_OBJECT_0) {
+          /* peek into the pipe and retrieve the amount of data available */
+          length = 0;
+          if(PeekNamedPipe(handle, NULL, 0, NULL, &length, NULL)) {
+            /* if there is no data available, sleep and continue waiting */
+            if(length == 0) {
+              SleepEx(0, FALSE);
+              ReleaseMutex(mutex);
+              continue;
+            }
+            else {
+              logmsg("[select_ws_wait_thread] PeekNamedPipe len: %d", length);
+            }
           }
           else {
-            logmsg("[select_ws_wait_thread] PeekNamedPipe len: %d", length);
+            /* if the pipe has been closed, sleep and continue waiting */
+            length = GetLastError();
+            logmsg("[select_ws_wait_thread] PeekNamedPipe error: %d", length);
+            if(length == ERROR_BROKEN_PIPE) {
+              SleepEx(0, FALSE);
+              ReleaseMutex(mutex);
+              continue;
+            }
           }
+          /* there is some data available, stop waiting */
+          logmsg("[select_ws_wait_thread] data available, PIPE: %p", handle);
+          SetEvent(signal);
+          ReleaseMutex(mutex);
+          break;
         }
-        else {
-          /* if the pipe has been closed, sleep and continue waiting */
-          length = GetLastError();
-          logmsg("[select_ws_wait_thread] PeekNamedPipe error: %d", length);
-          if(length == ERROR_BROKEN_PIPE) {
-            SleepEx(0, FALSE);
-            continue;
-          }
+        else if(length == WAIT_ABANDONED) {
+          /* we are not allowed to process this event, because select_ws
+             is post-processing the signalled events and we must exit. */
+          break;
         }
-        /* there is some data available, stop waiting */
-        logmsg("[select_ws_wait_thread] data available on PIPE: %p", handle);
-        break;
       }
       break;
 
     default:
       /* The handle has an unknown type, try to wait on it */
-      WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE);
-      logmsg("[select_ws_wait_thread] data available on HANDLE: %p", handle);
+      if(WaitForMultipleObjectsEx(2, handles, FALSE, INFINITE, FALSE)
+         == WAIT_OBJECT_0 + 1) {
+        if(WaitForSingleObjectEx(mutex, 0, FALSE) == WAIT_OBJECT_0) {
+          logmsg("[select_ws_wait_thread] data available, HANDLE: %p", handle);
+          SetEvent(signal);
+          ReleaseMutex(mutex);
+        }
+      }
       break;
   }
 
   return 0;
 }
-static HANDLE select_ws_wait(HANDLE handle, HANDLE event)
+static HANDLE select_ws_wait(HANDLE handle, HANDLE signal,
+                             HANDLE abort, HANDLE mutex)
 {
   struct select_ws_wait_data *data;
   HANDLE thread = NULL;
@@ -679,7 +724,9 @@ static HANDLE select_ws_wait(HANDLE handle, HANDLE event)
   data = malloc(sizeof(struct select_ws_wait_data));
   if(data) {
     data->handle = handle;
-    data->event = event;
+    data->signal = signal;
+    data->abort = abort;
+    data->mutex = mutex;
 
     /* launch waiting thread */
     thread = CreateThread(NULL, 0,
@@ -695,21 +742,21 @@ static HANDLE select_ws_wait(HANDLE handle, HANDLE event)
   return thread;
 }
 struct select_ws_data {
-  curl_socket_t fd;      /* the original input handle   (indexed by fds) */
-  curl_socket_t wsasock; /* the internal socket handle  (indexed by wsa) */
-  WSAEVENT wsaevent;     /* the internal WINSOCK2 event (indexed by wsa) */
-  HANDLE thread;         /* the internal threads handle (indexed by thd) */
+  curl_socket_t fd;      /* the original input handle  (indexed by fds/idx) */
+  curl_socket_t wsasock; /* the internal socket handle (indexed by wsa) */
+  WSAEVENT wsaevent;     /* the internal WINSOCK event (indexed by wsa) */
+  HANDLE signal;         /* the internal signal handle (indexed by thd) */
+  HANDLE thread;         /* the internal thread handle (indexed by thd) */
 };
 static int select_ws(int nfds, fd_set *readfds, fd_set *writefds,
                      fd_set *exceptfds, struct timeval *timeout)
 {
+  HANDLE abort, mutex, signal, handle, *handles;
   DWORD milliseconds, wait, idx;
   WSANETWORKEVENTS wsanetevents;
   struct select_ws_data *data;
-  HANDLE handle, *handles;
   WSAEVENT wsaevent;
   int error, fds;
-  HANDLE waitevent = NULL;
   DWORD nfd = 0, thd = 0, wsa = 0;
   int ret = 0;
 
@@ -725,9 +772,17 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
     return 0;
   }
 
-  /* create internal event to signal waiting threads */
-  waitevent = CreateEvent(NULL, TRUE, FALSE, NULL);
-  if(!waitevent) {
+  /* create internal event to abort waiting threads */
+  abort = CreateEvent(NULL, TRUE, FALSE, NULL);
+  if(!abort) {
+    errno = ENOMEM;
+    return -1;
+  }
+
+  /* create internal mutex to lock event handling in threads */
+  mutex = CreateMutex(NULL, FALSE, NULL);
+  if(!mutex) {
+    CloseHandle(abort);
     errno = ENOMEM;
     return -1;
   }
@@ -735,7 +790,8 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
   /* allocate internal array for the internal data */
   data = calloc(nfds, sizeof(struct select_ws_data));
   if(data == NULL) {
-    CloseHandle(waitevent);
+    CloseHandle(abort);
+    CloseHandle(mutex);
     errno = ENOMEM;
     return -1;
   }
@@ -743,7 +799,8 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
   /* allocate internal array for the internal event handles */
   handles = calloc(nfds, sizeof(HANDLE));
   if(handles == NULL) {
-    CloseHandle(waitevent);
+    CloseHandle(abort);
+    CloseHandle(mutex);
     free(data);
     errno = ENOMEM;
     return -1;
@@ -768,10 +825,19 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
       data[nfd].fd = curlx_sitosk(fds);
       if(fds == fileno(stdin)) {
         handle = GetStdHandle(STD_INPUT_HANDLE);
-        handle = select_ws_wait(handle, waitevent);
-        handles[nfd] = handle;
-        data[thd].thread = handle;
-        thd++;
+        signal = CreateEvent(NULL, TRUE, FALSE, NULL);
+        if(signal) {
+          handle = select_ws_wait(handle, signal, abort, mutex);
+          if(handle) {
+            handles[nfd] = signal;
+            data[thd].signal = signal;
+            data[thd].thread = handle;
+            thd++;
+          }
+          else {
+            CloseHandle(signal);
+          }
+        }
       }
       else if(fds == fileno(stdout)) {
         handles[nfd] = GetStdHandle(STD_OUTPUT_HANDLE);
@@ -794,10 +860,19 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
             curl_socket_t socket = curlx_sitosk(fds);
             WSACloseEvent(wsaevent);
             handle = (HANDLE) socket;
-            handle = select_ws_wait(handle, waitevent);
-            handles[nfd] = handle;
-            data[thd].thread = handle;
-            thd++;
+            signal = CreateEvent(NULL, TRUE, FALSE, NULL);
+            if(signal) {
+              handle = select_ws_wait(handle, signal, abort, mutex);
+              if(handle) {
+                handles[nfd] = signal;
+                data[thd].signal = signal;
+                data[thd].thread = handle;
+                thd++;
+              }
+              else {
+                CloseHandle(signal);
+              }
+            }
           }
         }
       }
@@ -816,8 +891,8 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
   /* wait for one of the internal handles to trigger */
   wait = WaitForMultipleObjectsEx(nfd, handles, FALSE, milliseconds, FALSE);
 
-  /* signal the event handle for the waiting threads */
-  SetEvent(waitevent);
+  /* wait for internal mutex to lock event handling in threads */
+  WaitForSingleObjectEx(mutex, INFINITE, FALSE);
 
   /* loop over the internal handles returned in the descriptors */
   for(idx = 0; idx < nfd; idx++) {
@@ -881,6 +956,9 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
     }
   }
 
+  /* signal the event handle for the other waiting threads */
+  SetEvent(abort);
+
   for(fds = 0; fds < nfds; fds++) {
     if(FD_ISSET(fds, readfds))
       logmsg("select_ws: %d is readable", fds);
@@ -898,11 +976,13 @@ static int select_ws(int nfds, fd_set *readfds, fd_set 
*writefds,
   }
 
   for(idx = 0; idx < thd; idx++) {
-    WaitForSingleObject(data[idx].thread, INFINITE);
+    WaitForSingleObjectEx(data[idx].thread, INFINITE, FALSE);
     CloseHandle(data[idx].thread);
+    CloseHandle(data[idx].signal);
   }
 
-  CloseHandle(waitevent);
+  CloseHandle(abort);
+  CloseHandle(mutex);
 
   free(handles);
   free(data);

-- 
To stop receiving notification emails like this one, please contact
address@hidden.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]