gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r33891 - gnunet/src/sensor


From: gnunet
Subject: [GNUnet-SVN] r33891 - gnunet/src/sensor
Date: Wed, 2 Jul 2014 01:12:34 +0200

Author: otarabai
Date: 2014-07-02 01:12:33 +0200 (Wed, 02 Jul 2014)
New Revision: 33891

Modified:
   gnunet/src/sensor/gnunet-service-sensor-reporting.c
Log:
sensor: towards reporting to collection point


Modified: gnunet/src/sensor/gnunet-service-sensor-reporting.c
===================================================================
--- gnunet/src/sensor/gnunet-service-sensor-reporting.c 2014-07-01 17:42:04 UTC 
(rev 33890)
+++ gnunet/src/sensor/gnunet-service-sensor-reporting.c 2014-07-01 23:12:33 UTC 
(rev 33891)
@@ -28,29 +28,96 @@
 #include "sensor.h"
 #include "gnunet_peerstore_service.h"
 #include "gnunet_cadet_service.h"
+#include "gnunet_applications.h"
 
 #define LOG(kind,...) GNUNET_log_from (kind, "sensor-reporting",__VA_ARGS__)
 
 /**
- * Context of reporting to collection
- * point
+ * Context of reporting operations
  */
-struct CollectionReportingContext
+struct ReportingContext
 {
 
   /**
+   * DLL
+   */
+  struct ReportingContext *prev;
+
+  /**
+   * DLL
+   */
+  struct ReportingContext *next;
+
+  /**
    * Sensor information
    */
   struct SensorInfo *sensor;
 
   /**
-   * Reporting task (OR GNUNET_SCHEDULER_NO_TASK)
+   * Collection point reporting task
+   * (OR GNUNET_SCHEDULER_NO_TASK)
    */
-  GNUNET_SCHEDULER_TaskIdentifier task;
+  GNUNET_SCHEDULER_TaskIdentifier cp_task;
 
+  /**
+   * Watcher of sensor values
+   */
+  struct GNUNET_PEERSTORE_WatchContext *wc;
+
+  /**
+   * Last value read from sensor
+   */
+  void *last_value;
+
+  /**
+   * Size of @last_value
+   */
+  size_t last_value_size;
+
+  /**
+   * Incremented with every lock request
+   * (e.g. to send last value).
+   * Change @last_value only when @value_lock = 0
+   */
+  int value_lock;
+
 };
 
 /**
+ * Context of a created CADET channel
+ */
+struct CadetChannelContext
+{
+
+  /**
+   * DLL
+   */
+  struct CadetChannelContext *prev;
+
+  /**
+   * DLL
+   */
+  struct CadetChannelContext *next;
+
+  /**
+   * Peer Id of
+   */
+  struct GNUNET_PeerIdentity pid;
+
+  /**
+   * CADET channel handle
+   */
+  struct GNUNET_CADET_Channel *c;
+
+  /**
+   * Are we sending data on this channel?
+   * #GNUNET_YES / #GNUNET_NO
+   */
+  int sending;
+
+};
+
+/**
  * Our configuration.
  */
 static const struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -63,28 +130,136 @@
 /**
  * My peer id
  */
-static struct GNUNET_PeerIdentity peerid;
+static struct GNUNET_PeerIdentity mypeerid;
 
 /**
  * Handle to CADET service
  */
 static struct GNUNET_CADET_Handle *cadet;
 
+/**
+ * Head of DLL of all reporting contexts
+ */
+struct ReportingContext *rc_head;
 
 /**
+ * Tail of DLL of all reporting contexts
+ */
+struct ReportingContext *rc_tail;
+
+/**
+ * Head of DLL of all cadet channels
+ */
+struct CadetChannelContext *cc_head;
+
+/**
+ * Tail of DLL of all cadet channels
+ */
+struct CadetChannelContext *cc_tail;
+
+
+/**
+ * Destroy a reporting context structure
+ */
+static void
+destroy_reporting_context (struct ReportingContext *rc)
+{
+  if (NULL != rc->wc)
+  {
+    GNUNET_PEERSTORE_watch_cancel (rc->wc);
+    rc->wc = NULL;
+  }
+  if (GNUNET_SCHEDULER_NO_TASK != rc->cp_task)
+  {
+    GNUNET_SCHEDULER_cancel(rc->cp_task);
+    rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
+  }
+  if (NULL != rc->last_value)
+  {
+    GNUNET_free (rc->last_value);
+    rc->last_value_size = 0;
+  }
+  GNUNET_free(rc);
+}
+
+/**
  * Stop sensor reporting module
  */
 void SENSOR_reporting_stop ()
 {
+  struct ReportingContext *rc;
+
   LOG (GNUNET_ERROR_TYPE_DEBUG, "Stopping sensor reporting module.\n");
+  /* TODO: destroy cadet channels */
+  while (NULL != rc_head)
+  {
+    rc = rc_head;
+    GNUNET_CONTAINER_DLL_remove (rc_head, rc_tail, rc);
+    destroy_reporting_context (rc);
+  }
   if (NULL != peerstore)
   {
     GNUNET_PEERSTORE_disconnect (peerstore);
     peerstore = NULL;
   }
+  if (NULL != cadet)
+  {
+    GNUNET_CADET_disconnect (cadet);
+    cadet = NULL;
+  }
 }
 
 /**
+ * Returns CADET channel established to given peer
+ * or creates a new one
+ *
+ * @param pid Peer Identity
+ * @return Context of established cadet channel
+ */
+struct CadetChannelContext *
+get_cadet_channel (struct GNUNET_PeerIdentity pid)
+{
+  struct CadetChannelContext *cc;
+
+  cc = cc_head;
+  while (NULL != cc)
+  {
+    if (0 == GNUNET_CRYPTO_cmp_peer_identity (&pid, &cc->pid))
+      return cc;
+    cc = cc->next;
+  }
+  cc = GNUNET_new (struct CadetChannelContext);
+  cc->c = GNUNET_CADET_channel_create(cadet,
+      cc,
+      &pid,
+      GNUNET_APPLICATION_TYPE_SENSORDASHBOARD,
+      GNUNET_CADET_OPTION_DEFAULT);
+  cc->pid = pid;
+  cc->sending = GNUNET_NO;
+  GNUNET_CONTAINER_DLL_insert (cc_head, cc_tail, cc);
+  return cc;
+}
+
+/**
+ * Function called to notify a client about the connection begin ready
+ * to queue more data.  @a buf will be NULL and @a size zero if the
+ * connection was closed for writing in the meantime.
+ *
+ * @param cls closure
+ * @param size number of bytes available in @a buf
+ * @param buf where the callee should write the message
+ * @return number of bytes written to @a buf
+ */
+size_t
+do_report_collection_point (void *cls, size_t size, void *buf)
+{
+  /* TODO: check error from CADET */
+  /* TODO: do transfer */
+  /* TODO: cc->sending, rc->value_lock */
+  return 0;
+}
+
+/**
  * Task scheduled to send values to collection point
  *
  * @param cls closure, a 'struct CollectionReportingContext *'
@@ -93,11 +268,68 @@
 void report_collection_point
 (void *cls, const struct GNUNET_SCHEDULER_TaskContext* tc)
 {
-  struct CollectionReportingContext *crc = cls;
+  struct ReportingContext *rc = cls;
+  struct SensorInfo *sensor = rc->sensor;
+  struct CadetChannelContext *cc;
 
-  crc->task = GNUNET_SCHEDULER_NO_TASK;
+  rc->cp_task = GNUNET_SCHEDULER_NO_TASK;
+  GNUNET_assert (NULL != sensor->collection_point);
+  cc = get_cadet_channel (*sensor->collection_point);
+  if (GNUNET_YES == cc->sending)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Cadet channel to collection point busy, trying again on next 
interval.");
+    rc->cp_task =
+        GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
+            &report_collection_point,
+            rc);
+    return;
+  }
+  cc->sending = GNUNET_YES;
+  rc->value_lock ++;
+  /* TODO: construct message */
+  /* TODO: if constructed message is added to cc, no need for rc->value_lock */
+  GNUNET_CADET_notify_transmit_ready (cc->c,
+      GNUNET_YES,
+      sensor->collection_interval,
+      rc->last_value_size, /* FIXME: size of constructed message */
+      &do_report_collection_point,
+      rc);
+  /* TODO */
+  /* TODO: reschedule reporting */
 }
 
+/*
+ * Sensor value watch callback
+ */
+static int
+sensor_watch_cb (void *cls,
+    struct GNUNET_PEERSTORE_Record *record,
+    char *emsg)
+{
+  struct ReportingContext *rc = cls;
+
+  if (NULL != emsg)
+    return GNUNET_YES;
+  if (rc->value_lock > 0)
+  {
+    LOG (GNUNET_ERROR_TYPE_DEBUG,
+        "Did not update reporting context of sensor `%s'"
+        " because value is locked for sending.",
+        rc->sensor->name);
+    return GNUNET_YES;
+  }
+  if (NULL != rc->last_value)
+  {
+    GNUNET_free (rc->last_value);
+    rc->last_value_size = 0;
+  }
+  rc->last_value = GNUNET_malloc(record->value_size);
+  memcpy (rc->last_value, record->value, record->value_size);
+  rc->last_value_size = record->value_size;
+  return GNUNET_YES;
+}
+
 /**
  * Iterator for defined sensors
  * Watches sensors for readings to report
@@ -113,20 +345,32 @@
     void *value)
 {
   struct SensorInfo *sensor = value;
-  struct CollectionReportingContext *crc;
+  struct ReportingContext *rc;
 
+  if (NULL == sensor->collection_point &&
+      GNUNET_NO == sensor->p2p_report)
+    return GNUNET_YES;
+  rc = GNUNET_new (struct ReportingContext);
+  rc->sensor = sensor;
+  rc->last_value = NULL;
+  rc->last_value_size = 0;
+  rc->value_lock = 0;
+  rc->wc = GNUNET_PEERSTORE_watch(peerstore,
+      "sensor",
+      &mypeerid,
+      sensor->name,
+      &sensor_watch_cb,
+      rc);
   if (NULL != sensor->collection_point)
   {
     LOG (GNUNET_ERROR_TYPE_INFO,
         "Will start reporting sensor `%s' values to collection point `%s' 
every %s.\n",
         sensor->name, GNUNET_i2s_full(sensor->collection_point),
         GNUNET_STRINGS_relative_time_to_string(sensor->collection_interval, 
GNUNET_YES));
-    crc = GNUNET_new (struct CollectionReportingContext);
-    crc->sensor = sensor;
-    crc->task =
+    rc->cp_task =
         GNUNET_SCHEDULER_add_delayed (sensor->collection_interval,
             &report_collection_point,
-            crc);
+            rc);
   }
   if (GNUNET_YES == sensor->p2p_report)
   {
@@ -135,6 +379,7 @@
         sensor->name,
         GNUNET_STRINGS_relative_time_to_string(sensor->p2p_interval, 
GNUNET_YES));
   }
+  GNUNET_CONTAINER_DLL_insert (rc_head, rc_tail, rc);
   return GNUNET_YES;
 }
 
@@ -153,7 +398,10 @@
     const struct GNUNET_CADET_Channel *channel,
     void *channel_ctx)
 {
+  struct CadetChannelContext *cc = channel_ctx;
 
+  GNUNET_CONTAINER_DLL_remove (cc_head, cc_tail, cc);
+  GNUNET_free (cc);
 }
 
 /**
@@ -173,7 +421,7 @@
 
   GNUNET_assert(NULL != sensors);
   cfg = c;
-  GNUNET_CRYPTO_get_peer_identity(cfg, &peerid);
+  GNUNET_CRYPTO_get_peer_identity(cfg, &mypeerid);
   GNUNET_CONTAINER_multihashmap_iterate(sensors, &init_sensor_reporting, NULL);
   peerstore = GNUNET_PEERSTORE_connect(cfg);
   if (NULL == peerstore)




reply via email to

[Prev in Thread] Current Thread [Next in Thread]