gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r9981 - in gnunet/src: include peerinfo


From: gnunet
Subject: [GNUnet-SVN] r9981 - in gnunet/src: include peerinfo
Date: Mon, 11 Jan 2010 23:13:37 +0100

Author: grothoff
Date: 2010-01-11 23:13:37 +0100 (Mon, 11 Jan 2010)
New Revision: 9981

Modified:
   gnunet/src/include/gnunet_peerinfo_service.h
   gnunet/src/include/gnunet_protocols.h
   gnunet/src/peerinfo/gnunet-service-peerinfo.c
   gnunet/src/peerinfo/peerinfo_api.c
Log:
adding notification API to peerinfo

Modified: gnunet/src/include/gnunet_peerinfo_service.h
===================================================================
--- gnunet/src/include/gnunet_peerinfo_service.h        2010-01-11 20:37:10 UTC 
(rev 9980)
+++ gnunet/src/include/gnunet_peerinfo_service.h        2010-01-11 22:13:37 UTC 
(rev 9981)
@@ -99,6 +99,41 @@
                          void *callback_cls);
 
 
+/**
+ * Handle for notifications about changes to the set of known peers.
+ */
+struct GNUNET_PEERINFO_NotifyContext;
+
+
+/**
+ * Call a method whenever our known information about peers
+ * changes.  Initially calls the given function for all known
+ * peers and then only signals changes.  Note that it is
+ * possible (i.e. on disconnects) that the callback is called
+ * twice with the same peer information.
+ *
+ * @param cfg configuration to use
+ * @param sched scheduler to use
+ * @param callback the method to call for each peer
+ * @param callback_cls closure for callback
+ * @return NULL on error
+ */
+struct GNUNET_PEERINFO_NotifyContext *
+GNUNET_PEERINFO_notify (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                       struct GNUNET_SCHEDULER_Handle *sched,
+                       GNUNET_PEERINFO_Processor callback,
+                       void *callback_cls);
+
+
+/**
+ * Stop notifying about changes.
+ *
+ * @param nc context to stop notifying
+ */
+void
+GNUNET_PEERINFO_notify_cancel (struct GNUNET_PEERINFO_NotifyContext *nc);
+
+
 #if 0                           /* keep Emacsens' auto-indent happy */
 {
 #endif

Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h       2010-01-11 20:37:10 UTC (rev 
9980)
+++ gnunet/src/include/gnunet_protocols.h       2010-01-11 22:13:37 UTC (rev 
9981)
@@ -211,7 +211,13 @@
  */
 #define GNUNET_MESSAGE_TYPE_PEERINFO_INFO_END 36
 
+/**
+ * Start notifying this client about all changes to
+ * the known peers until it disconnects.
+ */
+#define GNUNET_MESSAGE_TYPE_PEERINFO_NOTIFY 37
 
+
 /**
  * Message by which a TCP transport notifies
  * the other that it wants to check an address

Modified: gnunet/src/peerinfo/gnunet-service-peerinfo.c
===================================================================
--- gnunet/src/peerinfo/gnunet-service-peerinfo.c       2010-01-11 20:37:10 UTC 
(rev 9980)
+++ gnunet/src/peerinfo/gnunet-service-peerinfo.c       2010-01-11 22:13:37 UTC 
(rev 9981)
@@ -26,6 +26,9 @@
  * structure of data/hosts/ and data/credit/).
  *
  * @author Christian Grothoff
+ *
+ * TODO:
+ * - HostEntries are never 'free'd (add expiration, upper bound?)
  */
 
 #include "platform.h"
@@ -84,12 +87,64 @@
 
 };
 
+
 /**
+ * Entries that we still need to tell the client about.
+ */
+struct PendingEntry
+{
+
+  /**
+   * This is a linked list.
+   */
+  struct PendingEntry *next;
+
+  /**
+   * Entry to tell the client about.
+   */
+  struct HostEntry *he;
+};
+
+
+/**
+ * Clients to notify of changes to the peer information.
+ */
+struct NotifyList
+{
+
+  /**
+   * This is a linked list.
+   */
+  struct NotifyList *next;
+
+  /**
+   * Client to notify.
+   */ 
+  struct GNUNET_SERVER_Client *client;
+
+  /**
+   * Notifications pending for this entry.
+   */
+  struct PendingEntry *pending;
+
+  /**
+   * Handle for a transmit ready request.
+   */
+  struct GNUNET_CONNECTION_TransmitHandle *transmit_ctx;
+};
+
+
+/**
  * The in-memory list of known hosts.
  */
 static struct HostEntry *hosts;
 
 /**
+ * Clients to immediately notify about all changes.
+ */
+static struct NotifyList *notify_list;
+
+/**
  * Directory where the hellos are stored in (data/hosts)
  */
 static char *networkIdDirectory;
@@ -101,6 +156,116 @@
 
 
 /**
+ * Transmit peer information messages from the pending queue
+ * to the client.
+ *
+ * @param cls the 'struct NotifyList' that we are processing
+ * @param size number of bytes we can transmit
+ * @param vbuf where to write the messages
+ * @return number of bytes written to vbuf
+ */
+static size_t
+transmit_pending_notification (void *cls,
+                              size_t size,
+                              void *vbuf)
+{
+  struct NotifyList *nl = cls;
+  char *buf = vbuf;
+  struct PendingEntry *pos;
+  struct PendingEntry *next;
+  struct InfoMessage im;
+  uint16_t hs;
+  size_t left;
+
+  nl->transmit_ctx = NULL;
+  next = nl->pending;
+  pos = nl->pending;
+  left = size;
+  while ( (pos != NULL) &&
+         (left >= sizeof (struct InfoMessage) + (hs = GNUNET_HELLO_size 
(pos->he->hello))) )
+    {
+      next = pos->next;
+      im.header.size = htons (hs + sizeof (struct InfoMessage));
+      im.header.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_INFO);
+      im.trust = htonl (pos->he->trust);
+      im.peer = pos->he->identity;
+      memcpy (&buf[size - left], &im, sizeof (struct InfoMessage));      
+      memcpy (&buf[size - left + sizeof (struct InfoMessage)], pos->he->hello, 
hs);
+      left -= hs + sizeof (struct InfoMessage);
+      GNUNET_free (pos);
+      pos = next;      
+    }
+  nl->pending = next;
+  if (nl->pending != NULL)
+    {
+      nl->transmit_ctx 
+       = GNUNET_SERVER_notify_transmit_ready (nl->client,
+                                              sizeof (struct InfoMessage) + hs,
+                                              GNUNET_TIME_UNIT_FOREVER_REL,
+                                              &transmit_pending_notification,
+                                              nl);
+    }
+  return size - left;
+}
+
+
+
+/**
+ * Notify client about host change.  Checks if the
+ * respective host entry is already in the list of things
+ * to send to the client, and if not, adds it.  Also
+ * triggers a new request for transmission if the pending
+ * list was previously empty.
+ *
+ * @param nl client to notify
+ * @param hc entry to notify about
+ */
+static void
+do_notify (struct NotifyList *nl,
+          struct HostEntry *he)
+{
+  struct PendingEntry *pe;
+
+  pe = nl->pending;
+  while (NULL != pe)
+    {
+      if (pe->he == he)
+       return; /* already in list */
+      pe = pe->next;
+    }
+  pe = GNUNET_malloc (sizeof (struct PendingEntry));
+  pe->next = nl->pending;
+  pe->he = he;
+  nl->pending = pe;
+  if (nl->transmit_ctx != NULL)
+    return; /* already trying to transmit */
+  nl->transmit_ctx = GNUNET_SERVER_notify_transmit_ready (nl->client,
+                                                         sizeof (struct 
InfoMessage) + GNUNET_HELLO_size (he->hello),
+                                                         
GNUNET_TIME_UNIT_FOREVER_REL,
+                                                         
&transmit_pending_notification,
+                                                         nl);
+}
+
+
+/**
+ * Notify all clients in the notify list about the
+ * given host entry changing.
+ */
+static void
+notify_all (struct HostEntry *he)
+{
+  struct NotifyList *nl;
+
+  nl = notify_list;
+  while (NULL != nl)
+    {
+      do_notify (nl, he);
+      nl = nl->next;
+    }
+}
+
+
+/**
  * Address iterator that causes expired entries to be discarded.
  *
  * @param cls pointer to the current time
@@ -231,6 +396,7 @@
   GNUNET_free (fn);
   entry->next = hosts;
   hosts = entry;
+  notify_all (entry);
 }
 
 
@@ -246,6 +412,7 @@
 change_host_trust (const struct GNUNET_PeerIdentity *hostId, int value)
 {
   struct HostEntry *host;
+  unsigned int old_trust;
 
   if (value == 0)
     return 0;
@@ -256,6 +423,7 @@
       host = lookup_host_entry (hostId);
     }
   GNUNET_assert (host != NULL);
+  old_trust = host->trust;
   if (value > 0)
     {
       if (host->trust + value < host->trust)
@@ -276,6 +444,8 @@
       else
         host->trust += value;
     }
+  if (host->trust != old_trust)
+    notify_all (host);
   return value;
 }
 
@@ -383,6 +553,8 @@
   else
     {
       mrg = GNUNET_HELLO_merge (host->hello, hello);
+      /* FIXME: check if old and merged hello are equal,
+        and if so, bail out early... */
       GNUNET_free (host->hello);
       host->hello = mrg;
     }
@@ -393,6 +565,7 @@
                        GNUNET_DISK_PERM_USER_READ | GNUNET_DISK_PERM_USER_WRITE
                        | GNUNET_DISK_PERM_GROUP_READ | 
GNUNET_DISK_PERM_OTHER_READ);
   GNUNET_free (fn);
+  notify_all (host);
 }
 
 
@@ -643,6 +816,35 @@
 
 
 /**
+ * Handle NOTIFY-message.
+ *
+ * @param cls closure
+ * @param client identification of the client
+ * @param message the actual message
+ */
+static void
+handle_notify (void *cls,
+            struct GNUNET_SERVER_Client *client,
+            const struct GNUNET_MessageHeader *message)
+{
+  struct NotifyList *nl;
+  struct HostEntry *pos;
+
+  nl = GNUNET_malloc (sizeof (struct NotifyList));
+  nl->next = notify_list;
+  nl->client = client;
+  GNUNET_SERVER_client_keep (client);  
+  notify_list = nl;
+  pos = hosts;
+  while (NULL != pos)
+    {
+      do_notify (nl, pos);
+      pos = pos->next;
+    }
+}
+
+
+/**
  * List of handlers for the messages understood by this
  * service.
  */
@@ -652,11 +854,58 @@
    sizeof (struct ListPeerMessage)},
   {&handle_get_all, NULL, GNUNET_MESSAGE_TYPE_PEERINFO_GET_ALL,
    sizeof (struct ListAllPeersMessage)},
+  {&handle_notify, NULL, GNUNET_MESSAGE_TYPE_PEERINFO_NOTIFY,
+   sizeof (struct GNUNET_MessageHeader)},
   {NULL, NULL, 0, 0}
 };
 
 
+/**
+ * Function that is called when a client disconnects.
+ */
+static void
+notify_disconnect (void *cls,
+                  struct GNUNET_SERVER_Client *client)
+{
+  struct NotifyList *pos;
+  struct NotifyList *prev;
+  struct NotifyList *next;
+  struct PendingEntry *p;
 
+  pos = notify_list;
+  prev = NULL;
+  while (pos != NULL)
+    {
+      next = pos->next;
+      if (pos->client == client)
+       {
+         while (NULL != (p = pos->pending))
+           {
+             pos->pending = p->next;
+             GNUNET_free (p);
+           }
+         if (pos->transmit_ctx != NULL)
+           {
+             GNUNET_CONNECTION_notify_transmit_ready_cancel 
(pos->transmit_ctx);
+             pos->transmit_ctx = NULL;
+           }
+         if (prev == NULL)
+           notify_list = next;
+         else
+           prev->next = next;
+          GNUNET_SERVER_client_drop (client);
+         GNUNET_free (pos);
+       }
+      else
+       {
+         prev = pos;
+       }
+      pos = next;
+    }
+
+}
+
+
 /**
  * Process statistics requests.
  *
@@ -692,6 +941,7 @@
   GNUNET_SCHEDULER_add_with_priority (sched,
                                      GNUNET_SCHEDULER_PRIORITY_IDLE,
                                      &cron_clean_data_hosts, NULL);
+  GNUNET_SERVER_disconnect_notify (server, &notify_disconnect, NULL);
   GNUNET_SERVER_add_handlers (server, handlers);
 }
 

Modified: gnunet/src/peerinfo/peerinfo_api.c
===================================================================
--- gnunet/src/peerinfo/peerinfo_api.c  2010-01-11 20:37:10 UTC (rev 9980)
+++ gnunet/src/peerinfo/peerinfo_api.c  2010-01-11 22:13:37 UTC (rev 9981)
@@ -298,4 +298,247 @@
     }
 }
 
+
+
+/**
+ * Context for the info handler.
+ */
+struct GNUNET_PEERINFO_NotifyContext
+{
+
+  /**
+   * Our connection to the PEERINFO service.
+   */
+  struct GNUNET_CLIENT_Connection *client;
+
+  /**
+   * Function to call with information.
+   */
+  GNUNET_PEERINFO_Processor callback;
+
+  /**
+   * Closure for callback.
+   */
+  void *callback_cls;
+
+  /**
+   * Handle to our initial request for message transmission to
+   * the peerinfo service.
+   */
+  struct GNUNET_CLIENT_TransmitHandle *init;
+
+  /**
+   * Configuration.
+   */
+  const struct GNUNET_CONFIGURATION_Handle *cfg;
+
+  /**
+   * Scheduler.
+   */
+  struct GNUNET_SCHEDULER_Handle *sched;
+};
+
+
+/**
+ * Send a request to the peerinfo service to start being
+ * notified about all changes to peer information.
+ *
+ * @param nc our context
+ */
+static void
+request_notifications (struct GNUNET_PEERINFO_NotifyContext *nc);
+
+
+/**
+ * Read notifications from the client handle and pass them
+ * to the callback.
+ *
+ * @param nc our context
+ */
+static void
+receive_notifications (struct GNUNET_PEERINFO_NotifyContext *nc);
+
+
+/**
+ * Receive a peerinfo information message, process it and
+ * go for more.
+ *
+ * @param cls closure
+ * @param msg message received, NULL on timeout or fatal error
+ */
+static void
+process_notification (void *cls,
+                     const struct
+                     GNUNET_MessageHeader * msg)
+{
+  struct GNUNET_PEERINFO_NotifyContext *nc = cls;
+  const struct InfoMessage *im;
+  const struct GNUNET_HELLO_Message *hello;
+  uint16_t ms;
+
+  if (msg == NULL)
+    {
+      GNUNET_CLIENT_disconnect (nc->client);
+      nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+      request_notifications (nc);
+      return;
+    }
+  ms = ntohs (msg->size);
+  if ((ms < sizeof (struct InfoMessage)) ||
+      (ntohs (msg->type) != GNUNET_MESSAGE_TYPE_PEERINFO_INFO))
+    {
+      GNUNET_break (0);
+      GNUNET_CLIENT_disconnect (nc->client);
+      nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+      request_notifications (nc);
+      return;
+    }
+  im = (const struct InfoMessage *) msg;
+  hello = NULL;
+  if (ms > sizeof (struct InfoMessage) + sizeof (struct GNUNET_MessageHeader))
+    {
+      hello = (const struct GNUNET_HELLO_Message *) &im[1];
+      if (ms != sizeof (struct InfoMessage) + GNUNET_HELLO_size (hello))
+        {
+          GNUNET_break (0);
+         GNUNET_CLIENT_disconnect (nc->client);
+         nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+         request_notifications (nc);
+          return;
+        }
+    }
+#if DEBUG_PEERINFO
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Received information about peer `%s' from peerinfo database\n",
+             GNUNET_i2s (&im->peer));
+#endif
+  nc->callback (nc->callback_cls, &im->peer, hello, ntohl (im->trust));
+  receive_notifications (nc);
+}
+
+
+/**
+ * Read notifications from the client handle and pass them
+ * to the callback.
+ *
+ * @param nc our context
+ */
+static void
+receive_notifications (struct GNUNET_PEERINFO_NotifyContext *nc)
+{
+  GNUNET_CLIENT_receive (nc->client,
+                        &process_notification,
+                        nc,
+                        GNUNET_TIME_UNIT_FOREVER_REL);
+}
+
+
+/**
+ * Transmit our init-notify request, start receiving.
+ *
+ * @param cls closure (our 'struct GNUNET_PEERINFO_NotifyContext')
+ * @param size number of bytes available in buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to buf
+ */
+static size_t 
+transmit_notify_request (void *cls,
+                        size_t size, 
+                        void *buf)
+{
+  struct GNUNET_PEERINFO_NotifyContext *nc = cls;
+  struct GNUNET_MessageHeader hdr;
+
+  nc->init = NULL;
+  if (buf == NULL)
+    {
+      GNUNET_CLIENT_disconnect (nc->client);
+      nc->client = GNUNET_CLIENT_connect (nc->sched, "peerinfo", nc->cfg);
+      request_notifications (nc);
+      return 0;
+    }
+  GNUNET_assert (size >= sizeof (struct GNUNET_MessageHeader));
+  hdr.size = htons (sizeof (struct GNUNET_MessageHeader));
+  hdr.type = htons (GNUNET_MESSAGE_TYPE_PEERINFO_NOTIFY);
+  memcpy (buf, &hdr, sizeof (struct GNUNET_MessageHeader));
+  receive_notifications (nc);
+  return sizeof (struct GNUNET_MessageHeader);
+}
+
+
+/**
+ * Send a request to the peerinfo service to start being
+ * notified about all changes to peer information.
+ *
+ * @param nc our context
+ */
+static void
+request_notifications (struct GNUNET_PEERINFO_NotifyContext *nc)
+{
+  GNUNET_assert (NULL == nc->init);
+  nc->init =GNUNET_CLIENT_notify_transmit_ready (nc->client,
+                                                sizeof (struct 
GNUNET_MessageHeader),
+                                                GNUNET_TIME_UNIT_FOREVER_REL,
+                                                GNUNET_YES,
+                                                &transmit_notify_request,
+                                                nc);
+}
+
+
+/**
+ * Call a method whenever our known information about peers
+ * changes.  Initially calls the given function for all known
+ * peers and then only signals changes.
+ *
+ * @param cfg configuration to use
+ * @param sched scheduler to use
+ * @param callback the method to call for each peer
+ * @param callback_cls closure for callback
+ * @return NULL on error
+ */
+struct GNUNET_PEERINFO_NotifyContext *
+GNUNET_PEERINFO_notify (const struct GNUNET_CONFIGURATION_Handle *cfg,
+                       struct GNUNET_SCHEDULER_Handle *sched,
+                       GNUNET_PEERINFO_Processor callback,
+                       void *callback_cls)
+{
+  struct GNUNET_PEERINFO_NotifyContext *nc;
+  struct GNUNET_CLIENT_Connection *client;
+
+  client = GNUNET_CLIENT_connect (sched, "peerinfo", cfg);
+  if (client == NULL)
+    {      
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                  _("Could not connect to `%s' service.\n"), "peerinfo");
+      return NULL;
+    }
+  nc = GNUNET_malloc (sizeof (struct GNUNET_PEERINFO_NotifyContext));
+  nc->sched = sched;
+  nc->cfg = cfg;
+  nc->client = client;
+  nc->callback = callback;
+  nc->callback_cls = callback_cls; 
+  request_notifications (nc);
+  return nc;
+}
+
+
+/**
+ * Stop notifying about changes.
+ *
+ * @param nc context to stop notifying
+ */
+void
+GNUNET_PEERINFO_notify_cancel (struct GNUNET_PEERINFO_NotifyContext *nc)
+{
+  if (NULL != nc->init)
+    {
+      GNUNET_CLIENT_notify_transmit_ready_cancel (nc->init);
+      nc->init = NULL;
+    }
+  GNUNET_CLIENT_disconnect (nc->client);
+  GNUNET_free (nc);
+}
+
+
 /* end of peerinfo_api.c */





reply via email to

[Prev in Thread] Current Thread [Next in Thread]