gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r15078 - in gnunet: . src/core src/datastore src/fs src/inc


From: gnunet
Subject: [GNUnet-SVN] r15078 - in gnunet: . src/core src/datastore src/fs src/include
Date: Tue, 26 Apr 2011 20:19:15 +0200

Author: grothoff
Date: 2011-04-26 20:19:15 +0200 (Tue, 26 Apr 2011)
New Revision: 15078

Modified:
   gnunet/TODO
   gnunet/src/core/core_api.c
   gnunet/src/datastore/datastore.h
   gnunet/src/datastore/datastore_api.c
   gnunet/src/datastore/gnunet-service-datastore.c
   gnunet/src/datastore/perf_datastore_api.c
   gnunet/src/datastore/perf_plugin_datastore.c
   gnunet/src/datastore/perf_plugin_datastore_data_postgres.conf
   gnunet/src/datastore/plugin_datastore_mysql.c
   gnunet/src/datastore/plugin_datastore_postgres.c
   gnunet/src/datastore/plugin_datastore_sqlite.c
   gnunet/src/datastore/plugin_datastore_template.c
   gnunet/src/datastore/test_datastore_api.c
   gnunet/src/datastore/test_datastore_api_data_sqlite.conf
   gnunet/src/datastore/test_datastore_api_management.c
   gnunet/src/datastore/test_plugin_datastore.c
   gnunet/src/fs/Makefile.am
   gnunet/src/fs/fs_download.c
   gnunet/src/fs/fs_test_lib_data.conf
   gnunet/src/fs/gnunet-pseudonym.c
   gnunet/src/fs/gnunet-service-fs_cp.c
   gnunet/src/fs/gnunet-service-fs_indexing.c
   gnunet/src/fs/gnunet-service-fs_indexing.h
   gnunet/src/fs/gnunet-service-fs_pe.c
   gnunet/src/fs/gnunet-service-fs_pr.c
   gnunet/src/fs/gnunet-service-fs_put.c
   gnunet/src/fs/test_fs_download_data.conf
   gnunet/src/fs/test_gnunet_fs_idx.py.in
   gnunet/src/fs/test_gnunet_fs_ns_data.conf
   gnunet/src/fs/test_gnunet_service_fs_migration_data.conf
   gnunet/src/include/gnunet_datastore_plugin.h
   gnunet/src/include/gnunet_datastore_service.h
Log:
datastore and fs fixes from Easter

Modified: gnunet/TODO
===================================================================
--- gnunet/TODO 2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/TODO 2011-04-26 18:19:15 UTC (rev 15078)
@@ -1,23 +1,31 @@
 0.9.0pre3: [2'11]
-* DATASTORE:
-  - postgres support currently not implemented
-* NAT/UPNP: [Milan / Ayush / MW]
+* NAT/UPNP: [Milan / MW]
   - [#1609] code clean up
   - testing
   - integration with transport service:
     + test TCP
     + implement UDP, HTTP/HTTPS 
 * Transport:
-  - UDP fragmentation
-* FS/CORE [CG]
-  - adjust service to deal with new datastore API (also crashes all over the 
place still,
-    likely related).
+  - ATS crashes [MW]
+  - UDP fragmentation [MW]
+* CORE:
+  - Core API's peer_change_preference leaks 'irc' and
+    Core API's notify_transmit_ready leaks 'th'!
+* FS [CG]
+  - test*.py fail
   - download of 100 MB file from 'leach' peer hung due to 
     failure of core-api to call back after a change preference request
     (structs indicate request was transmitted but reply never received?)
+    => try again!
+  - test_gnunet_service_fs_p2p:
+    => sometimes DATASTORE get operation fails to queue on target (why?)
+    => do we need to just make the queue larger?
+  - with core queue size of 1, we get notify_transmit_ready
+    from core API returning NULL (why? ok? just have larger queue?)
   - other runs (-L DEBUG) with downloads using the new 'trust' test show
     non-deterministic results (for any set of peers)
-* FS: [CG]
+  - implement 'SUPPORT_DELAYS'
+  - consider re-issue GSF_dht_lookup_ after non-DHT reply received 
   - implement multi-peer FS performance tests + gauger them!
     + insert
     + download
@@ -59,6 +67,8 @@
       => If MiM attacker uses vetoed address, blacklist the specific IP for
          the presumed neighbour!
   - need to periodically probe latency/transport cost changes & possibly 
switch transport
+* DATASTORE: [CG]
+  - check indexes / SQL for performance
 * DV: [Nate?]
   - proper bandwidth allocation
   - performance tests

Modified: gnunet/src/core/core_api.c
===================================================================
--- gnunet/src/core/core_api.c  2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/core/core_api.c  2011-04-26 18:19:15 UTC (rev 15078)
@@ -1842,6 +1842,7 @@
   struct GNUNET_CORE_InformationRequestContext *irc = cls;
 
   irc->cm = NULL;
+  // FIXME: who frees 'irc'?
 }
 
 
@@ -1901,6 +1902,7 @@
   irc = GNUNET_malloc (sizeof (struct GNUNET_CORE_InformationRequestContext));
   irc->h = h;
   irc->pr = pr;
+  // FIXME: who frees 'irc'? (if not cancelled?)
   cm = GNUNET_malloc (sizeof (struct ControlMessage) +
                      sizeof (struct RequestInfoMessage));
   cm->cont = &change_preference_send_continuation;

Modified: gnunet/src/datastore/datastore.h
===================================================================
--- gnunet/src/datastore/datastore.h    2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/datastore/datastore.h    2011-04-26 18:19:15 UTC (rev 15078)
@@ -114,6 +114,11 @@
   uint32_t type GNUNET_PACKED;
 
   /**
+   * Offset of the result.
+   */
+  uint64_t offset GNUNET_PACKED;
+
+  /**
    * Desired key (optional).  Check the "size" of the
    * header to see if the key is actually present.
    */
@@ -138,6 +143,11 @@
    */
   uint32_t type GNUNET_PACKED;
 
+  /**
+   * Offset of the result.
+   */
+  uint64_t offset GNUNET_PACKED;
+
 };
 
 

Modified: gnunet/src/datastore/datastore_api.c
===================================================================
--- gnunet/src/datastore/datastore_api.c        2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/datastore/datastore_api.c        2011-04-26 18:19:15 UTC (rev 
15078)
@@ -63,14 +63,14 @@
 struct ResultContext
 {
   /**
-   * Iterator to call with the result.
+   * Function to call with the result.
    */
-  GNUNET_DATASTORE_Iterator iter;
+  GNUNET_DATASTORE_DatumProcessor proc;
 
   /**
-   * Closure for iter.
+   * Closure for proc.
    */
-  void *iter_cls;
+  void *proc_cls;
 
 };
 
@@ -168,12 +168,6 @@
    */
   int was_transmitted;
   
-  /**
-   * Are we expecting a single message in response to this
-   * request (and, if it is data, no 'END' message)?
-   */
-  int one_shot; 
-  
 };
 
 /**
@@ -241,10 +235,9 @@
   int in_receive;
 
   /**
-   * We should either receive (and ignore) an 'END' message or force a
-   * disconnect for the next message from the service.
+   * We should ignore the next message(s) from the service.
    */
-  unsigned int expect_end_or_disconnect;
+  unsigned int skip_next_messages;
 
 };
 
@@ -335,7 +328,7 @@
   while (NULL != (qe = h->queue_head))
     {
       GNUNET_assert (NULL != qe->response_proc);
-      qe->response_proc (qe, NULL);
+      qe->response_proc (h, NULL);
     }
   if (GNUNET_YES == drop) 
     {
@@ -378,7 +371,7 @@
                            GNUNET_NO);
   qe->task = GNUNET_SCHEDULER_NO_TASK;
   GNUNET_assert (qe->was_transmitted == GNUNET_NO);
-  qe->response_proc (qe, NULL);
+  qe->response_proc (qe->h, NULL);
 }
 
 
@@ -394,7 +387,7 @@
  * @param timeout timeout for the operation
  * @param response_proc function to call with replies (can be NULL)
  * @param qc client context (NOT a closure for response_proc)
- * @return NULL if the queue is full (and this entry was dropped)
+ * @return NULL if the queue is full 
  */
 static struct GNUNET_DATASTORE_QueueEntry *
 make_queue_entry (struct GNUNET_DATASTORE_Handle *h,
@@ -418,6 +411,14 @@
       c++;
       pos = pos->next;
     }
+  if (c >= max_queue_size)
+    {
+      GNUNET_STATISTICS_update (h->stats,
+                               gettext_noop ("# queue overflows"),
+                               1,
+                               GNUNET_NO);
+      return NULL;
+    }
   ret = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_QueueEntry) + msize);
   ret->h = h;
   ret->response_proc = response_proc;
@@ -451,15 +452,6 @@
                                     pos,
                                     ret);
   h->queue_size++;
-  if (c > max_queue_size)
-    {
-      GNUNET_STATISTICS_update (h->stats,
-                               gettext_noop ("# queue overflows"),
-                               1,
-                               GNUNET_NO);
-      response_proc (ret, NULL);
-      return NULL;
-    }
   ret->task = GNUNET_SCHEDULER_add_delayed (timeout,
                                            &timeout_queue_entry,
                                            ret);
@@ -469,7 +461,15 @@
       if (pos->max_queue < h->queue_size)
        {
          GNUNET_assert (pos->response_proc != NULL);
-         pos->response_proc (pos, NULL);
+         /* move 'pos' element to head so that it will be 
+            killed on 'NULL' call below */
+         GNUNET_CONTAINER_DLL_remove (h->queue_head,
+                                      h->queue_tail,
+                                      pos);
+         GNUNET_CONTAINER_DLL_insert (h->queue_head,
+                                      h->queue_tail,
+                                      pos);
+         pos->response_proc (h, NULL);
          break;
        }
       pos = pos->next;
@@ -550,6 +550,7 @@
                            GNUNET_NO);
 #endif
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
+  h->skip_next_messages = 0;
   h->client = NULL;
   h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->retry_time,
                                                    &try_reconnect,
@@ -700,6 +701,7 @@
       qe->task = GNUNET_SCHEDULER_NO_TASK;
     }
   h->queue_size--;
+  qe->was_transmitted = GNUNET_SYSERR; /* use-after-free warning */
   GNUNET_free (qe);
 }
 
@@ -724,16 +726,22 @@
   int was_transmitted;
 
   h->in_receive = GNUNET_NO;
+  if (h->skip_next_messages > 0)
+    {
+      h->skip_next_messages--;
+      process_queue (h);
+      return;
+   } 
   if (NULL == (qe = h->queue_head))
     {
       GNUNET_break (0);
       do_disconnect (h);
       return;
     }
-  was_transmitted = qe->was_transmitted;
   rc = qe->qc.sc;
   if (msg == NULL)
     {      
+      was_transmitted = qe->was_transmitted;
       free_queue_entry (qe);
       if (NULL == h->client)
        return; /* forced disconnect */
@@ -1114,7 +1122,7 @@
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_remove (struct GNUNET_DATASTORE_Handle *h,
                          const GNUNET_HashCode *key,
-                         size_t size, 
+                        size_t size,
                         const void *data,
                         unsigned int queue_priority,
                         unsigned int max_queue_size,
@@ -1186,45 +1194,35 @@
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct ResultContext rc;
   const struct DataMessage *dm;
-  int was_transmitted;
 
   h->in_receive = GNUNET_NO;
+  if (h->skip_next_messages > 0)
+    {
+      h->skip_next_messages--;
+      process_queue (h);
+      return;
+    }
   if (msg == NULL)
     {
-      if (NULL != (qe = h->queue_head))
+      qe = h->queue_head;
+      GNUNET_assert (NULL != qe);
+      if (qe->was_transmitted == GNUNET_YES)
        {
-         was_transmitted = qe->was_transmitted;
-         free_queue_entry (qe);
          rc = qe->qc.rc;
-         if (was_transmitted == GNUNET_YES)
-           {
-             GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                         _("Failed to receive response from database.\n"));
-             do_disconnect (h);
-           }   
-         else
-           {
-#if DEBUG_DATASTORE
-             GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                         "Request dropped due to finite datastore queue 
length.\n");
-#endif
-           }   
-         if (rc.iter != NULL)
-           rc.iter (rc.iter_cls,
-                    NULL, 0, NULL, 0, 0, 0, 
-                    GNUNET_TIME_UNIT_ZERO_ABS, 0);
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     _("Failed to receive response from database.\n"));
+         do_disconnect (h);
        }
+      free_queue_entry (qe);
+      if (rc.proc != NULL)
+       rc.proc (rc.proc_cls,
+                NULL, 0, NULL, 0, 0, 0, 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0);    
       return;
     }
   if (ntohs(msg->type) == GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END) 
     {
       GNUNET_break (ntohs(msg->size) == sizeof(struct GNUNET_MessageHeader));
-      if (h->expect_end_or_disconnect > 0)
-       {
-         h->expect_end_or_disconnect--;
-         process_queue (h);
-         return;
-       }
       qe = h->queue_head;
       rc = qe->qc.rc;
       GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1234,8 +1232,8 @@
                  "Received end of result set, new queue size is %u\n",
                  h->queue_size);
 #endif
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
+      if (rc.proc != NULL)
+       rc.proc (rc.proc_cls,
                 NULL, 0, NULL, 0, 0, 0, 
                 GNUNET_TIME_UNIT_ZERO_ABS, 0); 
       h->retry_time.rel_value = 0;
@@ -1243,13 +1241,6 @@
       process_queue (h);
       return;
     }
-  if (h->expect_end_or_disconnect > 0)
-    {
-      /* only 'END' allowed, must reconnect */
-      h->retry_time = GNUNET_TIME_UNIT_ZERO;
-      do_disconnect (h);
-      return;
-    }
   qe = h->queue_head;
   rc = qe->qc.rc;
   GNUNET_assert (GNUNET_YES == qe->was_transmitted);
@@ -1261,40 +1252,16 @@
       free_queue_entry (qe);
       h->retry_time = GNUNET_TIME_UNIT_ZERO;
       do_disconnect (h);
-      if (rc.iter != NULL)
-       rc.iter (rc.iter_cls,
+      if (rc.proc != NULL)
+       rc.proc (rc.proc_cls,
                 NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0); 
+                GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
   GNUNET_STATISTICS_update (h->stats,
                            gettext_noop ("# Results received"),
                            1,
                            GNUNET_NO);
-  if (rc.iter == NULL)
-    {
-      h->result_count++;
-      GNUNET_STATISTICS_update (h->stats,
-                               gettext_noop ("# Excess results received"),
-                               1,
-                               GNUNET_NO);
-      if (h->result_count > MAX_EXCESS_RESULTS)
-       {
-         free_queue_entry (qe);
-         GNUNET_STATISTICS_update (h->stats,
-                                   gettext_noop ("# Forced database connection 
resets"),
-                                   1,
-                                   GNUNET_NO);
-         h->retry_time = GNUNET_TIME_UNIT_ZERO;
-         do_disconnect (h);      
-         return;
-       }
-      if (GNUNET_YES == qe->one_shot)
-       free_queue_entry (qe);
-      else
-       GNUNET_DATASTORE_iterate_get_next (h);
-      return;
-    }
   dm = (const struct DataMessage*) msg;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1304,10 +1271,9 @@
              ntohl(dm->size),
              GNUNET_h2s(&dm->key));
 #endif
-  if (GNUNET_YES == qe->one_shot)
-    free_queue_entry (qe);
+  free_queue_entry (qe);
   h->retry_time.rel_value = 0;
-  rc.iter (rc.iter_cls,
+  rc.proc (rc.proc_cls,
           &dm->key,
           ntohl(dm->size),
           &dm[1],
@@ -1331,33 +1297,33 @@
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
  *        and always once with a value of NULL.
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
- *         cancel; note that even if NULL is returned, the callback will be 
invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
                                      unsigned int queue_priority,
                                      unsigned int max_queue_size,
                                      struct GNUNET_TIME_Relative timeout,
-                                     GNUNET_DATASTORE_Iterator iter, 
-                                     void *iter_cls)
+                                     GNUNET_DATASTORE_DatumProcessor proc, 
+                                     void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GNUNET_MessageHeader *m;
   union QueueContext qc;
 
+  GNUNET_assert (NULL != proc);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to get replication entry in %llu ms\n",
              (unsigned long long) timeout.rel_value);
 #endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
   qe = make_queue_entry (h, sizeof(struct GNUNET_MessageHeader),
                         queue_priority, max_queue_size, timeout,
                         &process_result_message, &qc);
@@ -1369,7 +1335,6 @@
 #endif
       return NULL;    
     }
-  qe->one_shot = GNUNET_YES;
   GNUNET_STATISTICS_update (h->stats,
                            gettext_noop ("# GET REPLICATION requests 
executed"),
                            1,
@@ -1383,43 +1348,50 @@
 
 
 /**
- * Get a zero-anonymity value from the datastore.
+ * Get a single zero-anonymity value from the datastore.
  *
  * @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param type allowed type for the operation
- * @param iter function to call on a random value; it
+ * @param type allowed type for the operation (never zero)
+ * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
- *        and always once with a value of NULL.
- * @param iter_cls closure for iter
+ *        or with NULL if none value exists.
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
- *         cancel; note that even if NULL is returned, the callback will be 
invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
-                                        unsigned int queue_priority,
-                                        unsigned int max_queue_size,
-                                        struct GNUNET_TIME_Relative timeout,
-                                        enum GNUNET_BLOCK_Type type,
-                                        GNUNET_DATASTORE_Iterator iter, 
-                                        void *iter_cls)
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+                                    uint64_t offset,
+                                    unsigned int queue_priority,
+                                    unsigned int max_queue_size,
+                                    struct GNUNET_TIME_Relative timeout,
+                                    enum GNUNET_BLOCK_Type type,
+                                    GNUNET_DATASTORE_DatumProcessor proc, 
+                                    void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GetZeroAnonymityMessage *m;
   union QueueContext qc;
 
+  GNUNET_assert (NULL != proc);
   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Asked to get zero-anonymity entry in %llu ms\n",
+             "Asked to get %llu-th zero-anonymity entry of type %d in %llu 
ms\n",
+             (unsigned long long) offset,
+             type,
              (unsigned long long) timeout.rel_value);
 #endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
   qe = make_queue_entry (h, sizeof(struct GetZeroAnonymityMessage),
                         queue_priority, max_queue_size, timeout,
                         &process_result_message, &qc);
@@ -1427,7 +1399,7 @@
     {
 #if DEBUG_DATASTORE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Could not create queue entry for zero-anonymity 
iteration\n");
+                 "Could not create queue entry for zero-anonymity 
procation\n");
 #endif
       return NULL;    
     }
@@ -1439,55 +1411,57 @@
   m->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET_ZERO_ANONYMITY);
   m->header.size = htons(sizeof (struct GetZeroAnonymityMessage));
   m->type = htonl ((uint32_t) type);
+  m->offset = GNUNET_htonll (offset);
   process_queue (h);
   return qe;
 }
 
 
-
 /**
- * Iterate over the results for a particular key
- * in the datastore.  The iterator will only be called
- * once initially; if the first call did contain a
- * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
+ * Get a result for a particular key from the datastore.  The processor
+ * will only be called once.
  *
  * @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param key maybe NULL (to match all entries)
  * @param type desired type, 0 for any
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
  *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
- *         cancel; note that even if NULL is returned, the callback will be 
invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
-                             const GNUNET_HashCode * key,
-                             enum GNUNET_BLOCK_Type type,
-                             unsigned int queue_priority,
-                             unsigned int max_queue_size,
-                             struct GNUNET_TIME_Relative timeout,
-                             GNUNET_DATASTORE_Iterator iter, 
-                             void *iter_cls)
+GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
+                         uint64_t offset,
+                         const GNUNET_HashCode * key,
+                         enum GNUNET_BLOCK_Type type,
+                         unsigned int queue_priority,
+                         unsigned int max_queue_size,
+                         struct GNUNET_TIME_Relative timeout,
+                         GNUNET_DATASTORE_DatumProcessor proc, 
+                         void *proc_cls)
 {
   struct GNUNET_DATASTORE_QueueEntry *qe;
   struct GetMessage *gm;
   union QueueContext qc;
 
+  GNUNET_assert (NULL != proc);
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Asked to look for data of type %u under key `%s'\n",
              (unsigned int) type,
              GNUNET_h2s (key));
 #endif
-  qc.rc.iter = iter;
-  qc.rc.iter_cls = iter_cls;
+  qc.rc.proc = proc;
+  qc.rc.proc_cls = proc_cls;
   qe = make_queue_entry (h, sizeof(struct GetMessage),
                         queue_priority, max_queue_size, timeout,
                         &process_result_message, &qc);
@@ -1507,6 +1481,7 @@
   gm = (struct GetMessage*) &qe[1];
   gm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_GET);
   gm->type = htonl(type);
+  gm->offset = GNUNET_htonll (offset);
   if (key != NULL)
     {
       gm->header.size = htons(sizeof (struct GetMessage));
@@ -1522,25 +1497,6 @@
 
 
 /**
- * Function called to trigger obtaining the next result
- * from the datastore.
- * 
- * @param h handle to the datastore
- */
-void 
-GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h)
-{
-  struct GNUNET_DATASTORE_QueueEntry *qe = h->queue_head;
-
-  h->in_receive = GNUNET_YES;
-  GNUNET_CLIENT_receive (h->client,
-                        &process_result_message,
-                        h,
-                        GNUNET_TIME_absolute_get_remaining (qe->timeout));
-}
-
-
-/**
  * Cancel a datastore operation.  The final callback from the
  * operation must not have been done yet.
  * 
@@ -1551,6 +1507,7 @@
 {
   struct GNUNET_DATASTORE_Handle *h;
 
+  GNUNET_assert (GNUNET_SYSERR != qe->was_transmitted);
   h = qe->h;
 #if DEBUG_DATASTORE
   GNUNET_log  (GNUNET_ERROR_TYPE_DEBUG,
@@ -1562,7 +1519,7 @@
   if (GNUNET_YES == qe->was_transmitted) 
     {
       free_queue_entry (qe);
-      h->expect_end_or_disconnect++;
+      h->skip_next_messages++;
       return;
     }
   free_queue_entry (qe);

Modified: gnunet/src/datastore/gnunet-service-datastore.c
===================================================================
--- gnunet/src/datastore/gnunet-service-datastore.c     2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/gnunet-service-datastore.c     2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -209,19 +209,7 @@
 
 
 
-
 /**
- * Function called once the transmit operation has
- * either failed or succeeded.
- *
- * @param cls closure
- * @param status GNUNET_OK on success, GNUNET_SYSERR on error
- */
-typedef void (*TransmitContinuation)(void *cls,
-                                    int status);
-
-
-/**
  * Context for transmitting replies to clients.
  */
 struct TransmitCallbackContext 
@@ -252,22 +240,6 @@
    */
   struct GNUNET_SERVER_Client *client;
 
-  /**
-   * Function to call once msg has been transmitted
-   * (or at least added to the buffer).
-   */
-  TransmitContinuation tc;
-
-  /**
-   * Closure for tc.
-   */
-  void *tc_cls;
-
-  /**
-   * GNUNET_YES if we are supposed to signal the server
-   * completion of the client's request.
-   */
-  int end;
 };
 
   
@@ -330,7 +302,6 @@
  */
 static int 
 expired_processor (void *cls,
-                  void *next_cls,
                   const GNUNET_HashCode * key,
                   uint32_t size,
                   const void *data,
@@ -396,7 +367,7 @@
                const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
   expired_kill_task = GNUNET_SCHEDULER_NO_TASK;
-  plugin->api->expiration_get (plugin->api->cls, 
+  plugin->api->get_expiration (plugin->api->cls, 
                               &expired_processor,
                               NULL);
 }
@@ -424,7 +395,6 @@
  */
 static int 
 quota_processor (void *cls,
-                void *next_cls,
                 const GNUNET_HashCode * key,
                 uint32_t size,
                 const void *data,
@@ -487,7 +457,7 @@
          (last != need) )
     {
       last = need;
-      plugin->api->expiration_get (plugin->api->cls,
+      plugin->api->get_expiration (plugin->api->cls,
                                   &quota_processor,
                                   &need);    
     }
@@ -521,14 +491,7 @@
     {
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  _("Transmission to client failed!\n"));
-      if (tcc->tc != NULL)
-       tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
-      if (GNUNET_YES == tcc->end)
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Disconnecting client due to transmission failure!\n"));
-         GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
-       }
+      GNUNET_SERVER_receive_done (tcc->client, GNUNET_SYSERR);       
       GNUNET_SERVER_client_drop (tcc->client);
       GNUNET_free (tcc->msg);
       GNUNET_free (tcc);
@@ -536,23 +499,7 @@
     }
   GNUNET_assert (size >= msize);
   memcpy (buf, tcc->msg, msize);
-  if (tcc->tc != NULL)
-    tcc->tc (tcc->tc_cls, GNUNET_OK);
-  if (GNUNET_YES == tcc->end)
-    {
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Done processing client request\n");
-#endif
-      GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
-    }
-  else
-    {
-#if DEBUG_DATASTORE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Response transmitted, more pending!\n");
-#endif
-    }
+  GNUNET_SERVER_receive_done (tcc->client, GNUNET_OK);
   GNUNET_SERVER_client_drop (tcc->client);
   GNUNET_free (tcc->msg);
   GNUNET_free (tcc);
@@ -567,16 +514,10 @@
  * @param msg message to transmit, will be freed!
  * @param tc function to call afterwards
  * @param tc_cls closure for tc
- * @param end is this the last response (and we should
- *        signal the server completion accodingly after
- *        transmitting this message)?
  */
 static void
 transmit (struct GNUNET_SERVER_Client *client,
-         struct GNUNET_MessageHeader *msg,
-         TransmitContinuation tc,
-         void *tc_cls,
-         int end)
+         struct GNUNET_MessageHeader *msg)
 {
   struct TransmitCallbackContext *tcc;
 
@@ -586,17 +527,13 @@
       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
                  "Shutdown in progress, aborting transmission.\n");
 #endif
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       GNUNET_free (msg);
-      if (NULL != tc)
-       tc (tc_cls, GNUNET_SYSERR);
       return;
     }
   tcc = GNUNET_malloc (sizeof(struct TransmitCallbackContext));
   tcc->msg = msg;
   tcc->client = client;
-  tcc->tc = tc;
-  tcc->tc_cls = tc_cls;
-  tcc->end = end;
   if (NULL ==
       (tcc->th = GNUNET_SERVER_notify_transmit_ready (client,
                                                      ntohs(msg->size),
@@ -605,14 +542,7 @@
                                                      tcc)))
     {
       GNUNET_break (0);
-      if (GNUNET_YES == end)
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
-                     _("Forcefully disconnecting client.\n"));
-         GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
-       }
-      if (NULL != tc)
-       tc (tc_cls, GNUNET_SYSERR);
+      GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
       GNUNET_free (msg);
       GNUNET_free (tcc);
       return;
@@ -653,34 +583,11 @@
   sm->status = htonl(code);
   if (slen > 0)
     memcpy (&sm[1], msg, slen);  
-  transmit (client, &sm->header, NULL, NULL, GNUNET_YES);
+  transmit (client, &sm->header);
 }
 
 
-/**
- * Function called once the transmit operation has
- * either failed or succeeded.
- *
- * @param next_cls closure for calling "next_request" callback
- * @param status GNUNET_OK on success, GNUNET_SYSERR on error
- */
-static void 
-get_next(void *next_cls,
-        int status)
-{
-  if (status != GNUNET_OK)
-    {
-      GNUNET_log (GNUNET_ERROR_TYPE_INFO,
-                 _("Failed to transmit an item to the client; aborting 
iteration.\n"));
-      if (plugin != NULL)
-       plugin->api->next_request (next_cls, GNUNET_YES);
-      return;
-    }
-  if (next_cls != NULL)
-   plugin->api->next_request (next_cls, GNUNET_NO);
-}
 
-
 /**
  * Function that will transmit the given datastore entry
  * to the client.
@@ -702,7 +609,6 @@
  */
 static int
 transmit_item (void *cls,
-              void *next_cls,
               const GNUNET_HashCode * key,
               uint32_t size,
               const void *data,
@@ -727,10 +633,11 @@
       end = GNUNET_malloc (sizeof(struct GNUNET_MessageHeader));
       end->size = htons(sizeof(struct GNUNET_MessageHeader));
       end->type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA_END);
-      transmit (client, end, NULL, NULL, GNUNET_YES);
+      transmit (client, end);
       GNUNET_SERVER_client_drop (client);
       return GNUNET_OK;
     }
+  GNUNET_assert (sizeof (struct DataMessage) + size < 
GNUNET_SERVER_MAX_MESSAGE_SIZE);
   dm = GNUNET_malloc (sizeof(struct DataMessage) + size);
   dm->header.size = htons(sizeof(struct DataMessage) + size);
   dm->header.type = htons(GNUNET_MESSAGE_TYPE_DATASTORE_DATA);
@@ -754,8 +661,7 @@
                            gettext_noop ("# results found"),
                            1,
                            GNUNET_NO);
-  transmit (client, &dm->header, &get_next, next_cls, 
-           (next_cls != NULL) ? GNUNET_NO : GNUNET_YES);
+  transmit (client, &dm->header);
   return GNUNET_OK;
 }
 
@@ -939,11 +845,6 @@
    * Client to notify on completion.
    */
   struct GNUNET_SERVER_Client *client;
-
-  /**
-   * Did we find the data already in the database?
-   */
-  int is_present;
   
   /* followed by the 'struct DataMessage' */
 };
@@ -1009,7 +910,6 @@
  * matches the put and if none match executes the put.
  *
  * @param cls closure, pointer to the client (of type 'struct PutContext').
- * @param next_cls closure to use to ask for the next item
  * @param key key for the content
  * @param size number of bytes in data
  * @param data content stored
@@ -1020,12 +920,11 @@
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
  *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue,
- *         GNUNET_NO to delete the item and continue (if supported)
+ * @return GNUNET_OK usually
+ *         GNUNET_NO to delete the item 
  */
 static int
 check_present (void *cls,
-              void *next_cls,
               const GNUNET_HashCode * key,
               uint32_t size,
               const void *data,
@@ -1041,13 +940,10 @@
   dm = (const struct DataMessage*) &pc[1];
   if (key == NULL)
     {
-      if (pc->is_present == GNUNET_YES)        
-       transmit_status (pc->client, GNUNET_NO, NULL);
-      else
-       execute_put (pc->client, dm);
+      execute_put (pc->client, dm);
       GNUNET_SERVER_client_drop (pc->client);
       GNUNET_free (pc);
-      return GNUNET_SYSERR;
+      return GNUNET_OK;
     }
   if ( (GNUNET_BLOCK_TYPE_FS_DBLOCK == type) ||
        (GNUNET_BLOCK_TYPE_FS_IBLOCK == type) ||
@@ -1056,12 +952,19 @@
                       data,
                       size)) ) )
     {
-      pc->is_present = GNUNET_YES;
-      plugin->api->next_request (next_cls, GNUNET_YES);
+#if DEBUG_MYSQL
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Result already present in datastore\n");
+#endif
+      transmit_status (pc->client, GNUNET_NO, NULL);
+      GNUNET_SERVER_client_drop (pc->client);
+      GNUNET_free (pc);
     }
   else
     {
-      plugin->api->next_request (next_cls, GNUNET_NO);
+      execute_put (pc->client, dm);
+      GNUNET_SERVER_client_drop (pc->client);
+      GNUNET_free (pc);
     }
   return GNUNET_OK;
 }
@@ -1083,6 +986,7 @@
   int rid;
   struct ReservationList *pos;
   struct PutContext *pc;
+  GNUNET_HashCode vhash;
   uint32_t size;
 
   if ( (dm == NULL) ||
@@ -1124,16 +1028,18 @@
   if (GNUNET_YES == GNUNET_CONTAINER_bloomfilter_test (filter,
                                                       &dm->key))
     {
+      GNUNET_CRYPTO_hash (&dm[1], size, &vhash);
       pc = GNUNET_malloc (sizeof (struct PutContext) + size + sizeof (struct 
DataMessage));
       pc->client = client;
       GNUNET_SERVER_client_keep (client);
       memcpy (&pc[1], dm, size + sizeof (struct DataMessage));
-      plugin->api->get (plugin->api->cls,
-                       &dm->key,
-                       NULL,
-                       ntohl (dm->type),
-                       &check_present,
-                       pc);      
+      plugin->api->get_key (plugin->api->cls,
+                           0,
+                           &dm->key,
+                           &vhash,
+                           ntohl (dm->type),
+                           &check_present,
+                           pc);      
       return;
     }
   execute_put (client, dm);
@@ -1192,16 +1098,17 @@
                                1,
                                GNUNET_NO);
       transmit_item (client,
-                    NULL, NULL, 0, NULL, 0, 0, 0, 
+                    NULL, 0, NULL, 0, 0, 0, 
                     GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
-  plugin->api->get (plugin->api->cls,
-                   ((size == sizeof(struct GetMessage)) ? &msg->key : NULL),
-                   NULL,
-                   ntohl(msg->type),
-                   &transmit_item,
-                   client);    
+  plugin->api->get_key (plugin->api->cls,
+                       GNUNET_ntohll (msg->offset),
+                       ((size == sizeof(struct GetMessage)) ? &msg->key : 
NULL),
+                       NULL,
+                       ntohl(msg->type),
+                       &transmit_item,
+                       client);    
 }
 
 
@@ -1265,7 +1172,7 @@
                            1,
                            GNUNET_NO);
   GNUNET_SERVER_client_keep (client);
-  plugin->api->replication_get (plugin->api->cls,
+  plugin->api->get_replication (plugin->api->cls,
                                &transmit_item,
                                client);  
 }
@@ -1303,37 +1210,20 @@
                            1,
                            GNUNET_NO);
   GNUNET_SERVER_client_keep (client);
-  plugin->api->iter_zero_anonymity (plugin->api->cls,
-                                   type,
-                                   &transmit_item,
-                                   client);  
+  plugin->api->get_zero_anonymity (plugin->api->cls,
+                                  GNUNET_ntohll (msg->offset),
+                                  type,
+                                  &transmit_item,
+                                  client);  
 }
 
 
 /**
- * Context for the 'remove_callback'.
- */
-struct RemoveContext 
-{
-  /**
-   * Client for whom we're doing the remvoing.
-   */
-  struct GNUNET_SERVER_Client *client;
-
-  /**
-   * GNUNET_YES if we managed to remove something.
-   */
-  int found;
-};
-
-
-/**
  * Callback function that will cause the item that is passed
  * in to be deleted (by returning GNUNET_NO).
  */
 static int
 remove_callback (void *cls,
-                void *next_cls,
                 const GNUNET_HashCode * key,
                 uint32_t size,
                 const void *data,
@@ -1343,7 +1233,7 @@
                 struct GNUNET_TIME_Absolute
                 expiration, uint64_t uid)
 {
-  struct RemoveContext *rc = cls;
+  struct GNUNET_SERVER_Client *client = cls;
 
   if (key == NULL)
     {
@@ -1352,15 +1242,10 @@
                  "No further matches for `%s' request.\n",
                  "REMOVE");
 #endif 
-      if (GNUNET_YES == rc->found)
-       transmit_status (rc->client, GNUNET_OK, NULL);       
-      else
-       transmit_status (rc->client, GNUNET_NO, _("Content not found"));        
-      GNUNET_SERVER_client_drop (rc->client);
-      GNUNET_free (rc);
+      transmit_status (client, GNUNET_NO, _("Content not found"));             
+      GNUNET_SERVER_client_drop (client);
       return GNUNET_OK; /* last item */
     }
-  rc->found = GNUNET_YES;
 #if DEBUG_DATASTORE
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Item %llu matches `%s' request for key `%s' and type %u.\n",
@@ -1375,7 +1260,8 @@
                            GNUNET_YES);
   GNUNET_CONTAINER_bloomfilter_remove (filter,
                                       key);
-  plugin->api->next_request (next_cls, GNUNET_YES);
+  transmit_status (client, GNUNET_OK, NULL);       
+  GNUNET_SERVER_client_drop (client);
   return GNUNET_NO;
 }
 
@@ -1394,7 +1280,6 @@
 {
   const struct DataMessage *dm = check_data (message);
   GNUNET_HashCode vhash;
-  struct RemoveContext *rc;
 
   if (dm == NULL)
     {
@@ -1413,18 +1298,17 @@
                            gettext_noop ("# REMOVE requests received"),
                            1,
                            GNUNET_NO);
-  rc = GNUNET_malloc (sizeof(struct RemoveContext));
   GNUNET_SERVER_client_keep (client);
-  rc->client = client;
   GNUNET_CRYPTO_hash (&dm[1],
                      ntohl(dm->size),
                      &vhash);
-  plugin->api->get (plugin->api->cls,
-                   &dm->key,
-                   &vhash,
-                   (enum GNUNET_BLOCK_Type) ntohl(dm->type),
-                   &remove_callback,
-                   rc);
+  plugin->api->get_key (plugin->api->cls,
+                       0,
+                       &dm->key,
+                       &vhash,
+                       (enum GNUNET_BLOCK_Type) ntohl(dm->type),
+                       &remove_callback,
+                       client);
 }
 
 
@@ -1469,7 +1353,7 @@
                  _("Datastore payload inaccurate (%lld < %lld).  Trying to 
fix.\n"),
                  (long long) payload,
                  (long long) -delta);
-      payload = plugin->api->get_size (plugin->api->cls);
+      payload = plugin->api->estimate_size (plugin->api->cls);
       sync_stats ();
       return;
     }
@@ -1518,7 +1402,7 @@
 
   stat_get = NULL;
   if (stats_worked == GNUNET_NO) 
-    payload = plugin->api->get_size (plugin->api->cls);
+    payload = plugin->api->estimate_size (plugin->api->cls);
 }
 
 
@@ -1636,8 +1520,6 @@
          GNUNET_CONNECTION_notify_transmit_ready_cancel (tcc->th);
          GNUNET_SERVER_client_drop (tcc->client);
        }
-      if (NULL != tcc->tc)
-       tcc->tc (tcc->tc_cls, GNUNET_SYSERR);
       GNUNET_free (tcc->msg);
       GNUNET_free (tcc);
     }

Modified: gnunet/src/datastore/perf_datastore_api.c
===================================================================
--- gnunet/src/datastore/perf_datastore_api.c   2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/datastore/perf_datastore_api.c   2011-04-26 18:19:15 UTC (rev 
15078)
@@ -385,6 +385,7 @@
   GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
                       argv, "perf-datastore-api", "nohelp",
                       options, &run, NULL);
+  sleep (1); /* give datastore chance to process 'DROP' */
   if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
     {
       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");

Modified: gnunet/src/datastore/perf_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/perf_plugin_datastore.c        2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/perf_plugin_datastore.c        2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -37,7 +37,7 @@
  * those take too long to run them in the usual "make check"
  * sequence.  Hence the value used for shipping is tiny.
  */
-#define MAX_SIZE 1024LL * 1024 * 128
+#define MAX_SIZE 1024LL * 1024 * 32
 
 #define ITERATIONS 2
 
@@ -81,6 +81,7 @@
   enum RunPhase phase;
   unsigned int cnt;
   unsigned int iter;
+  uint64_t offset;
 };
 
 
@@ -100,7 +101,8 @@
 
             
 static void
-putValue (struct GNUNET_DATASTORE_PluginFunctions * api, int i, int k)
+putValue (struct GNUNET_DATASTORE_PluginFunctions * api, 
+         int i, int k)
 {
   char value[65536];
   size_t size;
@@ -156,7 +158,6 @@
 
 static int
 iterate_zeros (void *cls,
-             void *next_cls,
              const GNUNET_HashCode * key,
              uint32_t size,
              const void *data,
@@ -171,7 +172,18 @@
   int i;
   const char *cdata = data;
 
-  if (key == NULL)
+  GNUNET_assert (key != NULL);
+  GNUNET_assert (size >= 8);
+  memcpy (&i, &cdata[4], sizeof (i));
+  hits[i/8] |= (1 << (i % 8)); 
+
+#if VERBOSE 
+  fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
+          type, priority, size,
+          (unsigned long long) expiration.abs_value);
+#endif
+  crc->cnt++;
+  if (crc->cnt == PUT_10 / 4 - 1)
     {
       char buf[256];
       unsigned int bc;
@@ -192,42 +204,17 @@
              crc->cnt);
       GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
       memset (hits, 0, sizeof (hits));
-      if ( (int) (PUT_10 / 4 - crc->cnt) > 2)
-       {
-         fprintf (stderr,
-                  "Got %d items, expected %d\n",
-                  (int) crc->cnt, (int) PUT_10 / 4);
-         GNUNET_break (0);
-         crc->phase = RP_ERROR;
-       }
-      else
-       {
-         crc->phase++;
-         crc->cnt = 0;
-         crc->start = GNUNET_TIME_absolute_get ();      
-       }
-      GNUNET_SCHEDULER_add_now (&test, crc);
-      return GNUNET_OK;
+      crc->phase++;
+      crc->cnt = 0;
+      crc->start = GNUNET_TIME_absolute_get ();      
     }
-  GNUNET_assert (size >= 8);
-  memcpy (&i, &cdata[4], sizeof (i));
-  hits[i/8] |= (1 << (i % 8)); 
-
-#if VERBOSE 
-  fprintf (stderr, "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
-          type, priority, size,
-          (unsigned long long) expiration.abs_value);
-#endif
-  crc->cnt++;
-  crc->api->next_request (next_cls,
-                         GNUNET_NO);
+  GNUNET_SCHEDULER_add_now (&test, crc);
   return GNUNET_OK;
 }
 
 
 static int
 expiration_get (void *cls,
-               void *next_cls,
                const GNUNET_HashCode * key,
                uint32_t size,
                const void *data,
@@ -281,7 +268,6 @@
 
 static int
 replication_get (void *cls,
-                void *next_cls,
                 const GNUNET_HashCode * key,
                 uint32_t size,
                 const void *data,
@@ -323,6 +309,7 @@
       GAUGER (category, buf, crc->end.abs_value - crc->start.abs_value, "ms");
       memset (hits, 0, sizeof (hits));
       crc->phase++;
+      crc->offset = 0;
       crc->cnt = 0;      
       crc->start = GNUNET_TIME_absolute_get ();      
     }
@@ -386,7 +373,15 @@
   int j;
 
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
-    crc->phase = RP_ERROR;        
+    {
+      GNUNET_break (0);
+      crc->phase = RP_ERROR;        
+    }
+#if VERBOSE
+  fprintf (stderr, "In phase %d, iteration %u\n",
+          crc->phase,
+          crc->cnt);
+#endif
   switch (crc->phase)
     {
     case RP_ERROR:
@@ -419,17 +414,19 @@
       GNUNET_SCHEDULER_add_now (&test, crc);
       break;
     case RP_REP_GET:
-      crc->api->replication_get (crc->api->cls, 
+      crc->api->get_replication (crc->api->cls, 
                                 &replication_get,
                                 crc);
       break;
     case RP_ZA_GET:
-      crc->api->iter_zero_anonymity (crc->api->cls, 1, 
-                                    &iterate_zeros,
-                                    crc);
+      crc->api->get_zero_anonymity (crc->api->cls, 
+                                   crc->offset++,
+                                   1, 
+                                   &iterate_zeros,
+                                   crc);
       break;
     case RP_EXP_GET:
-      crc->api->expiration_get (crc->api->cls, 
+      crc->api->get_expiration (crc->api->cls, 
                                &expiration_get,
                                crc);
       break;
@@ -549,7 +546,6 @@
   char *pos;
   char dir_name[128];
 
-  if (1) return 0;
   /* determine name of plugin to use */
   plugin_name = argv[0];
   while (NULL != (pos = strstr(plugin_name, "_")))

Modified: gnunet/src/datastore/perf_plugin_datastore_data_postgres.conf
===================================================================
--- gnunet/src/datastore/perf_plugin_datastore_data_postgres.conf       
2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/datastore/perf_plugin_datastore_data_postgres.conf       
2011-04-26 18:19:15 UTC (rev 15078)
@@ -20,8 +20,8 @@
 # REJECT_FROM =
 # REJECT_FROM6 =
 # PREFIX =
+# DEBUG = YES
 
-
 [dht]
 AUTOSTART = NO
 

Modified: gnunet/src/datastore/plugin_datastore_mysql.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_mysql.c       2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/plugin_datastore_mysql.c       2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -160,60 +160,8 @@
 
 };
 
-/**
- * Context for the universal iterator.
- */
-struct NextRequestClosure;
 
 /**
- * Type of a function that will prepare
- * the next iteration.
- *
- * @param cls closure
- * @param nc the next context; NULL for the last
- *         call which gives the callback a chance to
- *         clean up the closure
- * @return GNUNET_OK on success, GNUNET_NO if there are
- *         no more values, GNUNET_SYSERR on error
- */
-typedef int (*PrepareFunction)(void *cls,
-                              struct NextRequestClosure *nc);
-
-
-struct NextRequestClosure
-{
-  struct Plugin *plugin;
-
-  struct GNUNET_TIME_Absolute now;
-
-  /**
-   * Function to call to prepare the next
-   * iteration.
-   */
-  PrepareFunction prep;
-
-  /**
-   * Closure for prep.
-   */
-  void *prep_cls;
-
-  MYSQL_BIND rbind[7];
-
-  enum GNUNET_BLOCK_Type type;
-
-  PluginIterator dviter;
-
-  void *dviter_cls;
-
-  unsigned int count;
-
-  int end_it;
-
-  int one_shot;
-};
-
-
-/**
  * Context for all functions in this plugin.
  */
 struct Plugin 
@@ -244,16 +192,6 @@
   char *cnffile;
 
   /**
-   * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
-   */
-  struct NextRequestClosure *next_task_nc;
-
-  /**
-   * Pending task with scheduler for running the next request.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier next_task;
-
-  /**
    * Prepared statements.
    */
 #define INSERT_ENTRY "INSERT INTO gn090 
(repl,type,prio,anonLevel,expire,hash,vhash,value) VALUES (?,?,?,?,?,?,?,?)"
@@ -295,7 +233,7 @@
 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
   struct GNUNET_MysqlStatementHandle *get_size;
 
-#define SELECT_IT_NON_ANONYMOUS "SELECT 
type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 ORDER BY 
uid DESC LIMIT 1 OFFSET ?"
+#define SELECT_IT_NON_ANONYMOUS "SELECT 
type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE anonLevel=0 AND 
type=? ORDER BY uid DESC LIMIT 1 OFFSET ?"
   struct GNUNET_MysqlStatementHandle *zero_iter;
 
 #define SELECT_IT_EXPIRATION "(SELECT 
type,prio,anonLevel,expire,hash,value,uid FROM gn090 WHERE expire < ? ORDER BY 
prio ASC LIMIT 1) "\
@@ -372,7 +310,6 @@
 }
 
 
-
 /**
  * Free a prepared statement.
  *
@@ -381,8 +318,7 @@
  */
 static void
 prepared_statement_destroy (struct Plugin *plugin, 
-                           struct GNUNET_MysqlStatementHandle
-                           *s)
+                           struct GNUNET_MysqlStatementHandle *s)
 {
   GNUNET_CONTAINER_DLL_remove (plugin->shead,
                               plugin->stail,
@@ -397,6 +333,8 @@
 /**
  * Close database connection and all prepared statements (we got a DB
  * disconnect error).
+ * 
+ * @param plugin plugin context
  */
 static int
 iclose (struct Plugin *plugin)
@@ -420,10 +358,11 @@
  * Open the connection with the database (and initialize
  * our default options).
  *
+ * @param plugin plugin context
  * @return GNUNET_OK on success
  */
 static int
-iopen (struct Plugin *ret)
+iopen (struct Plugin *plugin)
 {
   char *mysql_dbname;
   char *mysql_server;
@@ -433,67 +372,67 @@
   my_bool reconnect;
   unsigned int timeout;
 
-  ret->dbf = mysql_init (NULL);
-  if (ret->dbf == NULL)
+  plugin->dbf = mysql_init (NULL);
+  if (plugin->dbf == NULL)
     return GNUNET_SYSERR;
-  if (ret->cnffile != NULL)
-    mysql_options (ret->dbf, MYSQL_READ_DEFAULT_FILE, ret->cnffile);
-  mysql_options (ret->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
+  if (plugin->cnffile != NULL)
+    mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_FILE, plugin->cnffile);
+  mysql_options (plugin->dbf, MYSQL_READ_DEFAULT_GROUP, "client");
   reconnect = 0;
-  mysql_options (ret->dbf, MYSQL_OPT_RECONNECT, &reconnect);
-  mysql_options (ret->dbf,
+  mysql_options (plugin->dbf, MYSQL_OPT_RECONNECT, &reconnect);
+  mysql_options (plugin->dbf,
                  MYSQL_OPT_CONNECT_TIMEOUT, (const void *) &timeout);
-  mysql_options(ret->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
+  mysql_options(plugin->dbf, MYSQL_SET_CHARSET_NAME, "UTF8");
   timeout = 60; /* in seconds */
-  mysql_options (ret->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
-  mysql_options (ret->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) &timeout);
+  mysql_options (plugin->dbf, MYSQL_OPT_READ_TIMEOUT, (const void *) &timeout);
+  mysql_options (plugin->dbf, MYSQL_OPT_WRITE_TIMEOUT, (const void *) 
&timeout);
   mysql_dbname = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
                                                     "datastore-mysql", 
"DATABASE"))
     GNUNET_assert (GNUNET_OK == 
-                  GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+                  GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
                                                          "datastore-mysql", 
"DATABASE", 
                                                          &mysql_dbname));
   else
     mysql_dbname = GNUNET_strdup ("gnunet");
   mysql_user = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
                                                     "datastore-mysql", "USER"))
     {
       GNUNET_assert (GNUNET_OK == 
-                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+                   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
                                                           "datastore-mysql", 
"USER", 
                                                           &mysql_user));
     }
   mysql_password = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
                                                     "datastore-mysql", 
"PASSWORD"))
     {
       GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+                   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
                                                           "datastore-mysql", 
"PASSWORD",
                                                           &mysql_password));
     }
   mysql_server = NULL;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
                                                     "datastore-mysql", "HOST"))
     {
       GNUNET_assert (GNUNET_OK == 
-                   GNUNET_CONFIGURATION_get_value_string (ret->env->cfg,
+                   GNUNET_CONFIGURATION_get_value_string (plugin->env->cfg,
                                                           "datastore-mysql", 
"HOST", 
                                                           &mysql_server));
     }
   mysql_port = 0;
-  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (ret->env->cfg,
+  if (GNUNET_YES == GNUNET_CONFIGURATION_have_value (plugin->env->cfg,
                                                     "datastore-mysql", "PORT"))
     {
       GNUNET_assert (GNUNET_OK ==
-                   GNUNET_CONFIGURATION_get_value_number (ret->env->cfg, 
"datastore-mysql",
+                   GNUNET_CONFIGURATION_get_value_number (plugin->env->cfg, 
"datastore-mysql",
                                                           "PORT", 
&mysql_port));
     }
 
   GNUNET_assert (mysql_dbname != NULL);
-  mysql_real_connect (ret->dbf, 
+  mysql_real_connect (plugin->dbf, 
                      mysql_server, 
                      mysql_user, mysql_password,
                       mysql_dbname, 
@@ -503,10 +442,10 @@
   GNUNET_free_non_null (mysql_user);
   GNUNET_free_non_null (mysql_password);
   GNUNET_free (mysql_dbname);
-  if (mysql_error (ret->dbf)[0])
+  if (mysql_error (plugin->dbf)[0])
     {
       LOG_MYSQL (GNUNET_ERROR_TYPE_ERROR,
-                 "mysql_real_connect", ret);
+                 "mysql_real_connect", plugin);
       return GNUNET_SYSERR;
     }
   return GNUNET_OK;
@@ -686,20 +625,7 @@
   return GNUNET_OK;
 }
 
-/**
- * Type of a callback that will be called for each
- * data set returned from MySQL.
- *
- * @param cls user-defined argument
- * @param num_values number of elements in values
- * @param values values returned by MySQL
- * @return GNUNET_OK to continue iterating, GNUNET_SYSERR to abort
- */
-typedef int (*GNUNET_MysqlDataProcessor) (void *cls,
-                                          unsigned int num_values,
-                                          MYSQL_BIND *values);
 
-
 /**
  * Run a prepared SELECT statement.
  *
@@ -708,40 +634,31 @@
  * @param result_size number of elements in results array
  * @param results pointer to already initialized MYSQL_BIND
  *        array (of sufficient size) for passing results
- * @param processor function to call on each result
- * @param processor_cls extra argument to processor
- * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
+ * @param ap pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
  *        values (size + buffer-reference for pointers); terminated
  *        with "-1"
- * @return GNUNET_SYSERR on error, otherwise
- *         the number of successfully affected (or queried) rows
+ * @return GNUNET_SYSERR on error, otherwise GNUNET_OK or GNUNET_NO (no result)
  */
 static int
-prepared_statement_run_select (struct Plugin *plugin,
-                              struct GNUNET_MysqlStatementHandle *s,
-                              unsigned int result_size,
-                              MYSQL_BIND *results,
-                              GNUNET_MysqlDataProcessor processor, void 
*processor_cls,
-                              ...)
+prepared_statement_run_select_va (struct Plugin *plugin,
+                                 struct GNUNET_MysqlStatementHandle *s,
+                                 unsigned int result_size,
+                                 MYSQL_BIND *results,
+                                 va_list ap)
 {
-  va_list ap;
   int ret;
   unsigned int rsize;
-  int total;
 
   if (GNUNET_OK != prepare_statement (plugin, s))
     {
       GNUNET_break (0);
       return GNUNET_SYSERR;
     }
-  va_start (ap, processor_cls);
   if (GNUNET_OK != init_params (plugin, s, ap))
     {
       GNUNET_break (0);
-      va_end (ap);
       return GNUNET_SYSERR;
     }
-  va_end (ap);
   rsize = mysql_stmt_field_count (s->statement);
   if (rsize > result_size)
     {
@@ -757,33 +674,57 @@
       iclose (plugin);
       return GNUNET_SYSERR;
     }
-
-  total = 0;
-  while (1)
+  ret = mysql_stmt_fetch (s->statement);
+  if (ret == MYSQL_NO_DATA)
+    return GNUNET_NO;
+  if (ret != 0)
     {
-      ret = mysql_stmt_fetch (s->statement);
-      if (ret == MYSQL_NO_DATA)
-        break;
-      if (ret != 0)
-        {
-          GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                     _("`%s' failed at %s:%d with error: %s\n"),
-                     "mysql_stmt_fetch",
-                     __FILE__, __LINE__, mysql_stmt_error (s->statement));
-          iclose (plugin);
-          return GNUNET_SYSERR;
-        }
-      if (processor != NULL)
-        if (GNUNET_OK != processor (processor_cls, rsize, results))
-          break;
-      total++;
+      GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+                 _("`%s' failed at %s:%d with error: %s\n"),
+                 "mysql_stmt_fetch",
+                 __FILE__, __LINE__, mysql_stmt_error (s->statement));
+      iclose (plugin);
+      return GNUNET_SYSERR;
     }
   mysql_stmt_reset (s->statement);
-  return total;
+  return GNUNET_OK;
 }
 
 
 /**
+ * Run a prepared SELECT statement.
+ *
+ * @param plugin plugin context
+ * @param s statement to run
+ * @param result_size number of elements in results array
+ * @param results pointer to already initialized MYSQL_BIND
+ *        array (of sufficient size) for passing results
+ * @param ... pairs and triplets of "MYSQL_TYPE_XXX" keys and their respective
+ *        values (size + buffer-reference for pointers); terminated
+ *        with "-1"
+ * @return GNUNET_SYSERR on error, otherwise
+ *         the number of successfully affected (or queried) rows
+ */
+static int
+prepared_statement_run_select (struct Plugin *plugin,
+                              struct GNUNET_MysqlStatementHandle *s,
+                              unsigned int result_size,
+                              MYSQL_BIND *results,
+                              ...)
+{
+  va_list ap;
+  int ret;
+
+  va_start (ap, results);
+  ret = prepared_statement_run_select_va (plugin, s, 
+                                         result_size, results,
+                                         ap);
+  va_end (ap);
+  return ret;
+}
+
+
+/**
  * Run a prepared statement that does NOT produce results.
  *
  * @param plugin plugin context
@@ -854,23 +795,6 @@
 
 
 /**
- * Function that simply returns GNUNET_OK
- *
- * @param cls closure, not used
- * @param num_values not used
- * @param values not used
- * @return GNUNET_OK
- */
-static int
-return_ok (void *cls, 
-          unsigned int num_values, 
-          MYSQL_BIND *values)
-{
-  return GNUNET_OK;
-}
-
-
-/**
  * Get an estimate of how much space the database is
  * currently using.
  *
@@ -878,7 +802,7 @@
  * @return number of bytes used on disk
  */
 static unsigned long long
-mysql_plugin_get_size (void *cls)
+mysql_plugin_estimate_size (void *cls)
 {
   struct Plugin *plugin = cls;
   MYSQL_BIND cbind[1];
@@ -893,7 +817,6 @@
       prepared_statement_run_select (plugin,
                                     plugin->get_size,
                                     1, cbind, 
-                                    &return_ok, NULL,
                                     -1))
     return 0;
   return total;
@@ -929,7 +852,6 @@
 {
   struct Plugin *plugin = cls;
   unsigned int irepl = replication;
-  unsigned int itype = type;
   unsigned int ipriority = priority;
   unsigned int ianonymity = anonymity;
   unsigned long long lexpiration = expiration.abs_value;
@@ -952,7 +874,7 @@
                              plugin->insert_entry,
                              NULL,
                              MYSQL_TYPE_LONG, &irepl, GNUNET_YES,
-                             MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+                             MYSQL_TYPE_LONG, &type, GNUNET_YES,
                              MYSQL_TYPE_LONG, &ipriority, GNUNET_YES,
                              MYSQL_TYPE_LONG, &ianonymity, GNUNET_YES,
                              MYSQL_TYPE_LONGLONG, &lexpiration, GNUNET_YES,
@@ -1034,20 +956,23 @@
 }
 
 
-
-
 /**
- * Continuation of "mysql_next_request".
+ * Run the given select statement and call 'proc' on the resulting
+ * values (which must be in particular positions).
  *
- * @param next_cls the next context
- * @param tc the task context (unused)
+ * @param plugin the plugin handle
+ * @param stmt select statement to run
+ * @param proc function to call on result
+ * @param proc_cls closure for proc
+ * @param ... arguments to initialize stmt
  */
 static void 
-mysql_next_request_cont (void *next_cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+execute_select (struct Plugin *plugin,
+               struct GNUNET_MysqlStatementHandle *stmt,
+               PluginDatumProcessor proc, void *proc_cls,
+               ...)
 {
-  struct NextRequestClosure *nrc = next_cls;
-  struct Plugin *plugin;
+  va_list ap;
   int ret;
   unsigned int type;
   unsigned int priority;
@@ -1059,19 +984,10 @@
   char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
   GNUNET_HashCode key;
   struct GNUNET_TIME_Absolute expiration;
-  MYSQL_BIND *rbind = nrc->rbind;
+  MYSQL_BIND rbind[7];
 
-  plugin = nrc->plugin;
-  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-  plugin->next_task_nc = NULL;
-
-  if (GNUNET_YES == nrc->end_it) 
-    goto END_SET;
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->now = GNUNET_TIME_absolute_get ();
   hashSize = sizeof (GNUNET_HashCode);
-  memset (nrc->rbind, 0, sizeof (nrc->rbind));
-  rbind = nrc->rbind;
+  memset (rbind, 0, sizeof (rbind));
   rbind[0].buffer_type = MYSQL_TYPE_LONG;
   rbind[0].buffer = &type;
   rbind[0].is_unsigned = 1;
@@ -1096,16 +1012,28 @@
   rbind[6].buffer = &uid;
   rbind[6].is_unsigned = 1;
 
-  if (GNUNET_OK != nrc->prep (nrc->prep_cls,
-                             nrc))
-    goto END_SET;
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  va_start (ap, proc_cls);
+  ret = prepared_statement_run_select_va (plugin,
+                                         stmt,
+                                         7, rbind,
+                                         ap);
+  va_end (ap);
+  if (ret <= 0)
+    {
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
+    }
   GNUNET_assert (size <= sizeof(value));
   if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
        (hashSize != sizeof (GNUNET_HashCode)) )
     {
       GNUNET_break (0);
-      goto END_SET;
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
     }    
 #if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -1116,18 +1044,13 @@
              anonymity,
              exp);
 #endif
+  GNUNET_assert (size < MAX_DATUM_SIZE);
   expiration.abs_value = exp;
-  ret = nrc->dviter (nrc->dviter_cls, 
-                    (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
-                    &key,
-                    size, value,
-                    type, priority, anonymity, expiration,
-                    uid);
-  if (ret == GNUNET_SYSERR)
-    {
-      nrc->end_it = GNUNET_YES;
-      return;
-    }
+  ret = proc (proc_cls, 
+             &key,
+             size, value,
+             type, priority, anonymity, expiration,
+             uid);
   if (ret == GNUNET_NO)
     {
       do_delete_entry (plugin, uid);
@@ -1135,189 +1058,50 @@
        plugin->env->duc (plugin->env->cls,
                          - size);
     }
-  if (nrc->one_shot == GNUNET_YES)
-    GNUNET_free (nrc);
-  return;
- END_SET:
-  /* call dviter with "end of set" */
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->dviter (nrc->dviter_cls, 
-              NULL, NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->prep (nrc->prep_cls, NULL);
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  GNUNET_free (nrc);
 }
 
 
-/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-static void 
-mysql_plugin_next_request (void *next_cls,
-                          int end_it)
-{
-  struct NextRequestClosure *nrc = next_cls;
 
-  if (GNUNET_YES == end_it)
-    nrc->end_it = GNUNET_YES;
-  nrc->plugin->next_task_nc = nrc;
-  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
-                                                    nrc);
-}  
-
-
 /**
- * Context for 'get_statement_prepare'.
- */
-struct GetContext
-{
-  GNUNET_HashCode key;
-  GNUNET_HashCode vhash;
-
-  unsigned int prio;
-  unsigned int anonymity;
-  unsigned long long expiration;
-  unsigned long long vkey;
-  unsigned long long total;
-  unsigned int off;
-  unsigned int count;
-  int have_vhash;
-};
-
-
-static int
-get_statement_prepare (void *cls,
-                      struct NextRequestClosure *nrc)
-{
-  struct GetContext *gc = cls;
-  struct Plugin *plugin;
-  int ret;
-  unsigned long hashSize;
-  
-  if (NULL == nrc)
-    {
-      GNUNET_free (gc);
-      return GNUNET_NO;
-    }
-  if (gc->count == gc->total)
-    return GNUNET_NO;
-  plugin = nrc->plugin;
-  hashSize = sizeof (GNUNET_HashCode);
-  if (++gc->off >= gc->total)
-    gc->off = 0;
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Obtaining result number %d/%lld at offset %u for GET `%s'\n",
-             gc->count+1,
-             gc->total,
-             gc->off,
-             GNUNET_h2s (&gc->key));  
-#endif
-  if (nrc->type != 0)
-    {
-      if (gc->have_vhash)
-       {
-         ret = prepared_statement_run_select (plugin,
-                                              
plugin->select_entry_by_hash_vhash_and_type, 
-                                              7, nrc->rbind, 
-                                              &return_ok, NULL, 
-                                              MYSQL_TYPE_BLOB, &gc->key, 
hashSize, &hashSize,
-                                              MYSQL_TYPE_BLOB, &gc->vhash, 
hashSize, &hashSize,
-                                              MYSQL_TYPE_LONG, &nrc->type, 
GNUNET_YES, 
-                                              MYSQL_TYPE_LONG, &gc->off, 
GNUNET_YES,
-                                              -1);
-       }
-      else
-       {
-         ret =
-           prepared_statement_run_select (plugin,
-                                          
plugin->select_entry_by_hash_and_type, 
-                                          7, nrc->rbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, 
&hashSize,
-                                          MYSQL_TYPE_LONG, &nrc->type, 
GNUNET_YES, 
-                                          MYSQL_TYPE_LONG, &gc->off, 
GNUNET_YES,
-                                          -1);
-       }
-    }
-  else
-    {
-      if (gc->have_vhash)
-       {
-         ret =
-           prepared_statement_run_select (plugin,
-                                          
plugin->select_entry_by_hash_and_vhash, 
-                                          7, nrc->rbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, 
&hashSize, 
-                                          MYSQL_TYPE_BLOB, &gc->vhash, 
hashSize, &hashSize, 
-                                          MYSQL_TYPE_LONG, &gc->off, 
GNUNET_YES, 
-                                          -1);
-       }
-      else
-       {
-         ret =
-           prepared_statement_run_select (plugin,
-                                          plugin->select_entry_by_hash, 
-                                          7, nrc->rbind, 
-                                          &return_ok, NULL,
-                                          MYSQL_TYPE_BLOB, &gc->key, hashSize, 
&hashSize,
-                                          MYSQL_TYPE_LONG, &gc->off, 
GNUNET_YES, 
-                                          -1);
-       }
-    }
-  gc->count++;
-  return ret;
-}
-
-
-/**
- * Iterate over the results for a particular key in the datastore.
+ * Get one of the results for a particular key in the datastore.
  *
  * @param cls closure
- * @param key maybe NULL (to match all entries)
+ * @param offset offset of the result (mod #num-results); 
+ *               specific ordering does not matter for the offset
+ * @param key key to match, never NULL
  * @param vhash hash of the value, maybe NULL (to
  *        match all values that have the right key).
  *        Note that for DBlocks there is no difference
  *        betwen key and vhash, but for other blocks
  *        there may be!
  * @param type entries of which type are relevant?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ *     Use 0 for any type.
+ * @param proc function to call on each matching value; however,
+ *        after the first call to "proc", the plugin must wait
+ *        until "NextRequest" was called before giving the processor
+ *        the next item; finally, the "proc" should be called once
+ *        once with a NULL value at the end ("next_cls" should be NULL
+ *        for that last call)
+ * @param proc_cls closure for proc
  */
 static void
-mysql_plugin_get (void *cls,
-                 const GNUNET_HashCode *key,
-                 const GNUNET_HashCode *vhash,
-                 enum GNUNET_BLOCK_Type type,
-                 PluginIterator iter, void *iter_cls)
+mysql_plugin_get_key (void *cls,
+                     uint64_t offset,
+                     const GNUNET_HashCode *key,
+                     const GNUNET_HashCode *vhash,
+                     enum GNUNET_BLOCK_Type type,                    
+                     PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  unsigned int itype = type;
   int ret;
   MYSQL_BIND cbind[1];
-  struct GetContext *gc;
-  struct NextRequestClosure *nrc;
   long long total;
   unsigned long hashSize;
   unsigned long hashSize2;
+  unsigned long long off;
 
   GNUNET_assert (key != NULL);
-  if (iter == NULL) 
-    return;
+  GNUNET_assert (NULL != proc);
   hashSize = sizeof (GNUNET_HashCode);
   hashSize2 = sizeof (GNUNET_HashCode);
   memset (cbind, 0, sizeof (cbind));
@@ -1333,10 +1117,9 @@
             prepared_statement_run_select (plugin,
                                           
plugin->count_entry_by_hash_vhash_and_type, 
                                           1, cbind, 
-                                          &return_ok, NULL,
                                           MYSQL_TYPE_BLOB, key, hashSize, 
&hashSize, 
                                           MYSQL_TYPE_BLOB, vhash, hashSize2, 
&hashSize2, 
-                                          MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+                                          MYSQL_TYPE_LONG, &type, GNUNET_YES,
                                           -1);
         }
       else
@@ -1345,9 +1128,8 @@
             prepared_statement_run_select (plugin,
                                           
plugin->count_entry_by_hash_and_type, 
                                           1, cbind, 
-                                          &return_ok, NULL,
                                           MYSQL_TYPE_BLOB, key, hashSize, 
&hashSize, 
-                                          MYSQL_TYPE_LONG, &itype, GNUNET_YES,
+                                          MYSQL_TYPE_LONG, &type, GNUNET_YES,
                                           -1);
         }
     }
@@ -1359,7 +1141,6 @@
             prepared_statement_run_select (plugin,
                                           
plugin->count_entry_by_hash_and_vhash, 
                                           1, cbind,
-                                          &return_ok, NULL,
                                           MYSQL_TYPE_BLOB, key, hashSize, 
&hashSize, 
                                           MYSQL_TYPE_BLOB, vhash, hashSize2, 
&hashSize2, 
                                           -1);
@@ -1371,79 +1152,81 @@
             prepared_statement_run_select (plugin,
                                           plugin->count_entry_by_hash,
                                           1, cbind, 
-                                          &return_ok, NULL, 
                                           MYSQL_TYPE_BLOB, key, hashSize, 
&hashSize, 
                                           -1);
         }
     }
   if ((ret != GNUNET_OK) || (0 >= total))
     {
-      iter (iter_cls, 
-           NULL, NULL, 0, NULL, 0, 0, 0, 
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
            GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
+  offset = offset % total;
+  off = (unsigned long long) offset;
 #if DEBUG_MYSQL
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Iterating over %lld results for GET `%s'\n",
+             "Obtaining %llu/%lld result for GET `%s'\n",
+             off,
              total,
              GNUNET_h2s (key));
 #endif
-  gc = GNUNET_malloc (sizeof (struct GetContext));
-  gc->key = *key;
-  if (vhash != NULL)
+
+  if (type != GNUNET_BLOCK_TYPE_ANY)
     {
-      gc->have_vhash = GNUNET_YES;
-      gc->vhash = *vhash;
+      if (NULL != vhash)
+       {
+         execute_select (plugin,
+                         plugin->select_entry_by_hash_vhash_and_type, 
+                         proc, proc_cls,
+                         MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                         MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize,
+                         MYSQL_TYPE_LONG, &type, GNUNET_YES, 
+                         MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+                         -1);
+       }
+      else
+       {
+         execute_select (plugin,
+                         plugin->select_entry_by_hash_and_type, 
+                         proc, proc_cls,
+                         MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                         MYSQL_TYPE_LONG, &type, GNUNET_YES, 
+                         MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+                         -1);
+       }
     }
-  gc->total = total;
-  gc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
-  
-
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->type = type;  
-  nrc->dviter = iter;
-  nrc->dviter_cls = iter_cls;
-  nrc->prep = &get_statement_prepare;
-  nrc->prep_cls = gc;
-  mysql_plugin_next_request (nrc, GNUNET_NO);
+  else
+    {
+      if (NULL != vhash)
+       {
+         execute_select (plugin,
+                         plugin->select_entry_by_hash_and_vhash, 
+                         proc, proc_cls,
+                         MYSQL_TYPE_BLOB, key, hashSize, &hashSize, 
+                         MYSQL_TYPE_BLOB, vhash, hashSize, &hashSize, 
+                         MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, 
+                         -1);
+       }
+      else
+       {
+         execute_select (plugin,
+                         plugin->select_entry_by_hash, 
+                         proc, proc_cls,
+                         MYSQL_TYPE_BLOB, key, hashSize, &hashSize,
+                         MYSQL_TYPE_LONGLONG, &off, GNUNET_YES, 
+                         -1);
+       }
+    }
 }
 
 
 /**
- * Run the prepared statement to get the next data item ready.
- * 
- * @param cls not used
- * @param nrc closure for the next request iterator
- * @return GNUNET_OK on success, GNUNET_NO if there is no additional item
- */
-static int
-iterator_zero_prepare (void *cls,
-                      struct NextRequestClosure *nrc)
-{
-  struct Plugin *plugin;
-  int ret;
-
-  if (nrc == NULL)
-    return GNUNET_NO;
-  plugin = nrc->plugin;
-  ret = prepared_statement_run_select (plugin,
-                                      plugin->zero_iter,
-                                      7, nrc->rbind,
-                                      &return_ok, NULL,
-                                      MYSQL_TYPE_LONG, &nrc->count, GNUNET_YES,
-                                      -1);
-  nrc->count++;
-  return ret;
-}
-
-
-/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Get a zero-anonymity datum from the datastore.
  *
  * @param cls our "struct Plugin*"
+ * @param offset offset of the result
  * @param type entries of which type should be considered?
  *        Use 0 for any type.
  * @param iter function to call on each matching value;
@@ -1451,47 +1234,27 @@
  * @param iter_cls closure for iter
  */
 static void
-mysql_plugin_iter_zero_anonymity (void *cls,
-                                 enum GNUNET_BLOCK_Type type,
-                                 PluginIterator iter,
-                                 void *iter_cls)
+mysql_plugin_get_zero_anonymity (void *cls,
+                                uint64_t offset,
+                                enum GNUNET_BLOCK_Type type,
+                                PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
+  unsigned long long off;
 
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->type = type;  
-  nrc->dviter = iter;
-  nrc->dviter_cls = iter_cls;
-  nrc->prep = &iterator_zero_prepare;
-  mysql_plugin_next_request (nrc, GNUNET_NO);
-}
+  off = (unsigned long long) offset;
+  execute_select (plugin,
+                 plugin->zero_iter,
+                 proc, proc_cls,
+                 MYSQL_TYPE_LONG, &type, GNUNET_YES,
+                 MYSQL_TYPE_LONGLONG, &off, GNUNET_YES,
+                 -1);
 
-
-/**
- * Run the SELECT statement for the replication function.
- * 
- * @param cls the 'struct Plugin'
- * @param nrc the context (not used)
- */
-static int
-replication_prepare (void *cls,
-                    struct NextRequestClosure *nrc)
-{
-  struct Plugin *plugin = cls;
-
-  return prepared_statement_run_select (plugin,
-                                       plugin->select_replication, 
-                                       7, nrc->rbind, 
-                                       &return_ok, NULL,
-                                       -1);
 }
 
 
-
 /**
- * Context for 'repl_iter' function.
+ * Context for 'repl_proc' function.
  */
 struct ReplCtx
 {
@@ -1504,22 +1267,21 @@
   /**
    * Function to call for the result (or the NULL).
    */
-  PluginIterator iter;
+  PluginDatumProcessor proc;
   
   /**
-   * Closure for iter.
+   * Closure for proc.
    */
-  void *iter_cls;
+  void *proc_cls;
 };
 
 
 /**
- * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Wrapper for the processor for 'mysql_plugin_get_replication'.
  * Decrements the replication counter and calls the original
  * iterator.
  *
  * @param cls closure
- * @param next_cls closure to pass to the "next" function.
  * @param key key for the content
  * @param size number of bytes in data
  * @param data content stored
@@ -1535,8 +1297,7 @@
  *         GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-repl_iter (void *cls,
-          void *next_cls,
+repl_proc (void *cls,
           const GNUNET_HashCode *key,
           uint32_t size,
           const void *data,
@@ -1552,8 +1313,8 @@
   int ret;
   int iret;
 
-  ret = rc->iter (rc->iter_cls,
-                 next_cls, key,
+  ret = rc->proc (rc->proc_cls,
+                 key,
                  size, data, 
                  type, priority, anonymity, expiration,
                  uid);
@@ -1561,10 +1322,10 @@
     {
       oid = (unsigned long long) uid;
       iret = prepared_statement_run (plugin,
-                                   plugin->dec_repl,
-                                   NULL,
-                                   MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
-                                   -1);
+                                    plugin->dec_repl,
+                                    NULL,
+                                    MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
+                                    -1);
       if (iret == GNUNET_SYSERR)
        {
          GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
@@ -1577,94 +1338,56 @@
 
 
 /**
- * Get a random item for replication.  Returns a single, not expired, random 
item
- * from those with the highest replication counters.  The item's 
- * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for replication.  Returns a single, not expired,
+ * random item from those with the highest replication counters.  The
+ * item's replication counter is decremented by one IF it was positive
+ * before.  Call 'proc' with all values ZERO or NULL if the datastore
+ * is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param iter_cls closure for proc
  */
 static void
-mysql_plugin_replication_get (void *cls,
-                             PluginIterator iter, void *iter_cls)
+mysql_plugin_get_replication (void *cls,
+                             PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
   struct ReplCtx rc;
   
   rc.plugin = plugin;
-  rc.iter = iter;
-  rc.iter_cls = iter_cls;
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->now = GNUNET_TIME_absolute_get ();
-  nrc->prep = &replication_prepare;
-  nrc->prep_cls = plugin;
-  nrc->type = 0;
-  nrc->dviter = &repl_iter;
-  nrc->dviter_cls = &rc;
-  nrc->end_it = GNUNET_NO;
-  nrc->one_shot = GNUNET_YES;
-  mysql_next_request_cont (nrc, NULL);
-}
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+  execute_select (plugin,
+                 plugin->select_replication, 
+                 &repl_proc, &rc,
+                 -1);
 
-
-/**
- * Run the SELECT statement for the expiration function.
- * 
- * @param cls the 'struct Plugin'
- * @param nrc the context (not used)
- * @return GNUNET_OK on success, GNUNET_NO if there are
- *         no more values, GNUNET_SYSERR on error
- */
-static int
-expiration_prepare (void *cls,
-                   struct NextRequestClosure *nrc)
-{
-  struct Plugin *plugin = cls;
-  long long nt;
-
-  if (NULL == nrc)
-    return GNUNET_NO;
-  nt = (long long) nrc->now.abs_value;
-  return prepared_statement_run_select
-    (plugin,
-     plugin->select_expiration, 
-     7, nrc->rbind, 
-     &return_ok, NULL,
-     MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
-     -1);
 }
 
 
 /**
  * Get a random item for expiration.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
 static void
-mysql_plugin_expiration_get (void *cls,
-                            PluginIterator iter, void *iter_cls)
+mysql_plugin_get_expiration (void *cls,
+                            PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
+  long long nt;
 
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->now = GNUNET_TIME_absolute_get ();
-  nrc->prep = &expiration_prepare;
-  nrc->prep_cls = plugin;
-  nrc->type = 0;
-  nrc->dviter = iter;
-  nrc->dviter_cls = iter_cls;
-  nrc->end_it = GNUNET_NO;
-  nrc->one_shot = GNUNET_YES;
-  mysql_next_request_cont (nrc, NULL);
+  nt = (long long) GNUNET_TIME_absolute_get().abs_value;
+  execute_select (plugin,
+                 plugin->select_expiration, 
+                 proc, proc_cls,
+                 MYSQL_TYPE_LONGLONG, &nt, GNUNET_YES, 
+                 -1);
+
 }
 
 
@@ -1760,14 +1483,13 @@
 
   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
   api->cls = plugin;
-  api->get_size = &mysql_plugin_get_size;
+  api->estimate_size = &mysql_plugin_estimate_size;
   api->put = &mysql_plugin_put;
-  api->next_request = &mysql_plugin_next_request;
-  api->get = &mysql_plugin_get;
-  api->replication_get = &mysql_plugin_replication_get;
-  api->expiration_get = &mysql_plugin_expiration_get;
   api->update = &mysql_plugin_update;
-  api->iter_zero_anonymity = &mysql_plugin_iter_zero_anonymity;
+  api->get_key = &mysql_plugin_get_key;
+  api->get_replication = &mysql_plugin_get_replication;
+  api->get_expiration = &mysql_plugin_get_expiration;
+  api->get_zero_anonymity = &mysql_plugin_get_zero_anonymity;
   api->drop = &mysql_plugin_drop;
   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
                    "mysql", _("Mysql database running\n"));
@@ -1787,14 +1509,6 @@
   struct Plugin *plugin = api->cls;
 
   iclose (plugin);
-  if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (plugin->next_task);
-      plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-      plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
-      GNUNET_free (plugin->next_task_nc);
-      plugin->next_task_nc = NULL;
-    }
   GNUNET_free_non_null (plugin->cnffile);
   GNUNET_free (plugin);
   GNUNET_free (api);

Modified: gnunet/src/datastore/plugin_datastore_postgres.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_postgres.c    2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/plugin_datastore_postgres.c    2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -44,103 +44,6 @@
 
 
 /**
- * Closure for 'postgres_next_request_cont'.
- */
-struct NextRequestClosure
-{
-  /**
-   * Global plugin data.
-   */
-  struct Plugin *plugin;
-  
-  /**
-   * Function to call for each matching entry.
-   */
-  PluginIterator iter;
-  
-  /**
-   * Closure for 'iter'.
-   */
-  void *iter_cls;
-  
-  /**
-   * Parameters for the prepared statement.
-   */
-  const char *paramValues[5];
-  
-  /**
-   * Name of the prepared statement to run.
-   */
-  const char *pname;
-  
-  /**
-   * Size of values pointed to by paramValues.
-   */
-  int paramLengths[5];
-  
-  /**
-   * Number of paramters in paramValues/paramLengths.
-   */
-  int nparams; 
-  
-  /**
-   * Current time (possible parameter), big-endian.
-   */
-  uint64_t bnow;
-  
-  /**
-   * Key (possible parameter)
-   */
-  GNUNET_HashCode key;
-  
-  /**
-   * Hash of value (possible parameter)
-   */
-  GNUNET_HashCode vhash;
-  
-  /**
-   * Number of entries found so far
-   */
-  unsigned long long count;
-  
-  /**
-   * Offset this iteration starts at.
-   */
-  uint64_t off;
-  
-  /**
-   * Current offset to use in query, big-endian.
-   */
-  uint64_t blimit_off;
-  
-  /**
-   * Current total number of entries found so far, big-endian.
-   */
-  uint64_t bcount;
-  
-  /**
-   *  Overall number of matching entries.
-   */
-  unsigned long long total;
-  
-  /**
-   * Type of block (possible paramter), big-endian.
-   */
-  uint32_t btype;
-  
-  /**
-   * Flag set to GNUNET_YES to stop iteration.
-   */
-  int end_it;
-
-  /**
-   * Flag to indicate that there should only be one result.
-   */
-  int one_shot;
-};
-
-
-/**
  * Context for all functions in this plugin.
  */
 struct Plugin 
@@ -155,16 +58,6 @@
    */
   PGconn *dbh;
 
-  /**
-   * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
-   */
-  struct NextRequestClosure *next_task_nc;
-
-  /**
-   * Pending task with scheduler for running the next request.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier next_task;
-
 };
 
 
@@ -434,7 +327,7 @@
        pq_prepare (plugin,
                   "select_non_anonymous",
                   "SELECT type, prio, anonLevel, expire, hash, value, oid FROM 
gn090 "
-                  "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
+                  "WHERE anonLevel = 0 AND type = $1 ORDER BY oid DESC LIMIT 1 
OFFSET $2",
                    1,
                    __LINE__)) ||
       (GNUNET_OK !=
@@ -482,11 +375,13 @@
 delete_by_rowid (struct Plugin *plugin,
                 unsigned int rowid)
 {
-  const char *paramValues[] = { (const char *) &rowid };
-  int paramLengths[] = { sizeof (rowid) };
+  uint32_t browid;
+  const char *paramValues[] = { (const char *) &browid };
+  int paramLengths[] = { sizeof (browid) };
   const int paramFormats[] = { 1 };
   PGresult *ret;
 
+  browid = htonl (rowid);
   ret = PQexecPrepared (plugin->dbh,
                         "delrow",
                         1, paramValues, paramLengths, paramFormats, 1);
@@ -510,7 +405,7 @@
  * @return number of bytes used on disk
  */
 static unsigned long long
-postgres_plugin_get_size (void *cls)
+postgres_plugin_estimate_size (void *cls)
 {
   struct Plugin *plugin = cls;
   unsigned long long total;
@@ -619,22 +514,20 @@
 
 
 /**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
+ * Function invoked to process the result and call
+ * the processor.
  *
- * @param next_cls the 'struct NextRequestClosure'
- * @param tc scheduler context
+ * @param plugin global plugin data
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
+ * @param res result from exec
  */
 static void 
-postgres_next_request_cont (void *next_cls,
-                           const struct GNUNET_SCHEDULER_TaskContext *tc)
+process_result (struct Plugin *plugin,
+               PluginDatumProcessor proc, void *proc_cls,
+               PGresult *res)
 {
-  struct NextRequestClosure *nrc = next_cls;
-  struct Plugin *plugin = nrc->plugin;
-  const int paramFormats[] = { 1, 1, 1, 1, 1 };
   int iret;
-  PGresult *res;
   enum GNUNET_BLOCK_Type type;
   uint32_t anonymity;
   uint32_t priority;
@@ -643,38 +536,11 @@
   struct GNUNET_TIME_Absolute expiration_time;
   GNUNET_HashCode key;
 
-  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-  plugin->next_task_nc = NULL;
-  if ( (GNUNET_YES == nrc->end_it) ||
-       (nrc->count == nrc->total) )
-    {
-#if DEBUG_POSTGRES
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "datastore-postgres",
-                      "Ending iteration (%s)\n",
-                      (GNUNET_YES == nrc->end_it) ? "client requested it" : 
"completed result set");
-#endif
-      nrc->iter (nrc->iter_cls, 
-                NULL, NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
-      return;
-    }  
-  if (nrc->off == nrc->total)
-    nrc->off = 0;
-  nrc->blimit_off = GNUNET_htonll (nrc->off);
-  nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
-  res = PQexecPrepared (plugin->dbh,
-                       nrc->pname,
-                       nrc->nparams,
-                       nrc->paramValues, 
-                       nrc->paramLengths,
-                       paramFormats, 1);
   if (GNUNET_OK != check_result (plugin,
                                 res,
                                 PGRES_TUPLES_OK,
                                 "PQexecPrepared",
-                                nrc->pname,
+                                "select",
                                 __LINE__))
     {
 #if DEBUG_POSTGRES
@@ -682,10 +548,9 @@
                       "datastore-postgres",
                       "Ending iteration (postgres error)\n");
 #endif
-      nrc->iter (nrc->iter_cls, 
-                NULL, NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);      
       return;
     }
 
@@ -697,11 +562,10 @@
                       "datastore-postgres",
                       "Ending iteration (no more results)\n");
 #endif
-      nrc->iter (nrc->iter_cls, 
-                NULL, NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
       PQclear (res);
-      GNUNET_free (nrc);
       return; 
     }
   if ((1 != PQntuples (res)) ||
@@ -710,11 +574,10 @@
       (sizeof (uint32_t) != PQfsize (res, 6)))
     {
       GNUNET_break (0);
-      nrc->iter (nrc->iter_cls, 
-                NULL, NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
       PQclear (res);
-      GNUNET_free (nrc);
       return;
     }
   rowid = ntohl (*(uint32_t *) PQgetvalue (res, 0, 6));
@@ -727,10 +590,9 @@
       GNUNET_break (0);
       PQclear (res);
       delete_by_rowid (plugin, rowid);
-      nrc->iter (nrc->iter_cls, 
-                NULL, NULL, 0, NULL, 0, 0, 0, 
-                GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
 
@@ -749,33 +611,23 @@
                   (unsigned int) size,
                   (unsigned int) type);
 #endif
-  iret = nrc->iter (nrc->iter_cls,
-                   (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
-                   &key,
-                   size,
-                   PQgetvalue (res, 0, 5),
-                   (enum GNUNET_BLOCK_Type) type,
-                   priority,
-                   anonymity,
-                   expiration_time,
-                   rowid);
+  iret = proc (proc_cls,
+              &key,
+              size,
+              PQgetvalue (res, 0, 5),
+              (enum GNUNET_BLOCK_Type) type,
+              priority,
+              anonymity,
+              expiration_time,
+              rowid);
   PQclear (res);
-  if (iret != GNUNET_NO)
+  if (iret == GNUNET_NO)
     {
-      nrc->count++;
-      nrc->off++;
-    }
-  if (iret == GNUNET_SYSERR)
-    {
 #if DEBUG_POSTGRES
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "datastore-postgres",
-                      "Ending iteration (client error)\n");
+      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+                 "Processor asked for item %u to be removed.\n",
+                 rowid);
 #endif
-      return;
-    }
-  if (iret == GNUNET_NO)
-    {
       if (GNUNET_OK == delete_by_rowid (plugin, rowid))
        {
 #if DEBUG_POSTGRES
@@ -794,38 +646,10 @@
 #endif
        }
     }
-  if (nrc->one_shot == GNUNET_YES) 
-    GNUNET_free (nrc);
 }
 
 
 /**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-static void 
-postgres_plugin_next_request (void *next_cls,
-                             int end_it)
-{
-  struct NextRequestClosure *nrc = next_cls;
-
-  if (GNUNET_YES == end_it)
-    nrc->end_it = GNUNET_YES;
-  nrc->plugin->next_task_nc = nrc;
-  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now 
(&postgres_next_request_cont,
-                                                    nrc);
-}
-
-
-/**
  * Iterate over the results for a particular key
  * in the datastore.
  *
@@ -843,62 +667,62 @@
  * @param iter_cls closure for iter
  */
 static void
-postgres_plugin_get (void *cls,
-                    const GNUNET_HashCode * key,
-                    const GNUNET_HashCode * vhash,
-                    enum GNUNET_BLOCK_Type type,
-                    PluginIterator iter, void *iter_cls)
+postgres_plugin_get_key (void *cls,
+                        uint64_t offset,
+                        const GNUNET_HashCode *key,
+                        const GNUNET_HashCode *vhash,
+                        enum GNUNET_BLOCK_Type type,
+                        PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
   const int paramFormats[] = { 1, 1, 1, 1, 1 };
+  int paramLengths[4];
+  const char *paramValues[4];
+  int nparams;
+  const char *pname;
   PGresult *ret;
+  uint64_t total;
+  uint64_t blimit_off;
+  uint32_t btype;
 
   GNUNET_assert (key != NULL);
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->plugin = plugin;
-  nrc->iter = iter;
-  nrc->iter_cls = iter_cls;
-  nrc->key = *key;
-  if (vhash != NULL)
-    nrc->vhash = *vhash;
-  nrc->paramValues[0] = (const char*) &nrc->key;
-  nrc->paramLengths[0] = sizeof (GNUNET_HashCode);
-  nrc->btype = htonl (type);
+  paramValues[0] = (const char*) key;
+  paramLengths[0] = sizeof (GNUNET_HashCode);
+  btype = htonl (type);
   if (type != 0)
     {
       if (vhash != NULL)
         {
-          nrc->paramValues[1] = (const char *) &nrc->vhash;
-          nrc->paramLengths[1] = sizeof (nrc->vhash);
-          nrc->paramValues[2] = (const char *) &nrc->btype;
-          nrc->paramLengths[2] = sizeof (nrc->btype);
-          nrc->paramValues[3] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[3] = sizeof (nrc->blimit_off);
-          nrc->nparams = 4;
-          nrc->pname = "getvt";
+          paramValues[1] = (const char *) vhash;
+          paramLengths[1] = sizeof (GNUNET_HashCode);
+          paramValues[2] = (const char *) &btype;
+          paramLengths[2] = sizeof (btype);
+          paramValues[3] = (const char *) &blimit_off;
+          paramLengths[3] = sizeof (blimit_off);
+          nparams = 4;
+          pname = "getvt";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND 
vhash=$2 AND type=$3",
                               3,
                               NULL,
-                              nrc->paramValues, 
-                             nrc->paramLengths,
+                              paramValues, 
+                             paramLengths,
                              paramFormats, 1);
         }
       else
         {
-          nrc->paramValues[1] = (const char *) &nrc->btype;
-          nrc->paramLengths[1] = sizeof (nrc->btype);
-          nrc->paramValues[2] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[2] = sizeof (nrc->blimit_off);
-          nrc->nparams = 3;
-          nrc->pname = "gett";
+          paramValues[1] = (const char *) &btype;
+          paramLengths[1] = sizeof (btype);
+          paramValues[2] = (const char *) &blimit_off;
+          paramLengths[2] = sizeof (blimit_off);
+          nparams = 3;
+          pname = "gett";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND 
type=$2",
                               2,
                               NULL,
-                              nrc->paramValues, 
-                             nrc->paramLengths, 
+                              paramValues, 
+                             paramLengths, 
                              paramFormats, 1);
         }
     }
@@ -906,32 +730,32 @@
     {
       if (vhash != NULL)
         {
-          nrc->paramValues[1] = (const char *) &nrc->vhash;
-          nrc->paramLengths[1] = sizeof (nrc->vhash);
-          nrc->paramValues[2] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[2] = sizeof (nrc->blimit_off);
-          nrc->nparams = 3;
-          nrc->pname = "getv";
+          paramValues[1] = (const char *) vhash;
+          paramLengths[1] = sizeof (GNUNET_HashCode);
+          paramValues[2] = (const char *) &blimit_off;
+          paramLengths[2] = sizeof (blimit_off);
+          nparams = 3;
+          pname = "getv";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND 
vhash=$2",
                               2,
                               NULL,
-                              nrc->paramValues, 
-                             nrc->paramLengths,
+                              paramValues, 
+                             paramLengths,
                              paramFormats, 1);
         }
       else
         {
-          nrc->paramValues[1] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[1] = sizeof (nrc->blimit_off);
-          nrc->nparams = 2;
-          nrc->pname = "get";
+          paramValues[1] = (const char *) &blimit_off;
+          paramLengths[1] = sizeof (blimit_off);
+          nparams = 2;
+          pname = "get";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1",
                               1,
                               NULL,
-                              nrc->paramValues, 
-                             nrc->paramLengths,
+                              paramValues, 
+                             paramLengths,
                              paramFormats, 1);
         }
     }
@@ -939,13 +763,12 @@
                                 ret,
                                  PGRES_TUPLES_OK,
                                  "PQexecParams",
-                                nrc->pname,
+                                pname,
                                 __LINE__))
     {
-      iter (iter_cls, 
-           NULL, NULL, 0, NULL, 0, 0, 0, 
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
            GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
       return;
     }
   if ((PQntuples (ret) != 1) ||
@@ -954,26 +777,30 @@
     {
       GNUNET_break (0);
       PQclear (ret);
-      iter (iter_cls, 
-           NULL, NULL, 0, NULL, 0, 0, 0, 
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
            GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
       return;
     }
-  nrc->total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 
0, 0));
+  total = GNUNET_ntohll (*(const unsigned long long *) PQgetvalue (ret, 0, 0));
   PQclear (ret);
-  if (nrc->total == 0)
+  if (total == 0)
     {
-      iter (iter_cls, 
-           NULL, NULL, 0, NULL, 0, 0, 0, 
+      proc (proc_cls, 
+           NULL, 0, NULL, 0, 0, 0, 
            GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
       return;
     }
-  nrc->off = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
-                                      nrc->total);
-  postgres_plugin_next_request (nrc,
-                               GNUNET_NO);
+  blimit_off = GNUNET_htonll (offset % total);
+  ret = PQexecPrepared (plugin->dbh,
+                       pname,
+                       nparams,
+                       paramValues, 
+                       paramLengths,
+                       paramFormats, 1);
+  process_result (plugin,
+                 proc, proc_cls,
+                 ret);
 }
 
 
@@ -989,28 +816,33 @@
  * @param iter_cls closure for iter
  */
 static void
-postgres_plugin_iter_zero_anonymity (void *cls,
-                                    enum GNUNET_BLOCK_Type type,
-                                    PluginIterator iter,
-                                    void *iter_cls)
+postgres_plugin_get_zero_anonymity (void *cls,
+                                   uint64_t offset,
+                                   enum GNUNET_BLOCK_Type type,
+                                   PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
+  uint32_t btype;
+  uint64_t boff;
+  const int paramFormats[] = { 1, 1 };
+  int paramLengths[] = { sizeof (btype), sizeof (boff) };
+  const char *paramValues[] = { (const char*) &btype, (const char*) &boff };
+  PGresult *ret;
 
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->total = UINT32_MAX;
-  nrc->btype = htonl ((uint32_t) type);
-  nrc->plugin = plugin;
-  nrc->iter = iter;
-  nrc->iter_cls = iter_cls;
-  nrc->pname = "select_non_anonymous";
-  nrc->nparams = 1;
-  nrc->paramLengths[0] = sizeof (nrc->bcount);
-  nrc->paramValues[0] = (const char*) &nrc->bcount;
-  postgres_plugin_next_request (nrc,
-                               GNUNET_NO);
+  btype = htonl ((uint32_t) type);
+  boff = GNUNET_htonll (offset);
+  ret = PQexecPrepared (plugin->dbh,
+                       "select_non_anonymous",
+                       2,
+                       paramValues, 
+                       paramLengths,
+                       paramFormats, 1);
+  process_result (plugin,
+                 proc, proc_cls,
+                 ret);
 }
 
+
 /**
  * Context for 'repl_iter' function.
  */
@@ -1025,12 +857,12 @@
   /**
    * Function to call for the result (or the NULL).
    */
-  PluginIterator iter;
+  PluginDatumProcessor proc;
   
   /**
-   * Closure for iter.
+   * Closure for proc.
    */
-  void *iter_cls;
+  void *proc_cls;
 };
 
 
@@ -1056,8 +888,7 @@
  *         GNUNET_NO to delete the item and continue (if supported)
  */
 static int
-repl_iter (void *cls,
-          void *next_cls,
+repl_proc (void *cls,
           const GNUNET_HashCode *key,
           uint32_t size,
           const void *data,
@@ -1073,8 +904,8 @@
   PGresult *qret;
   uint32_t boid;
 
-  ret = rc->iter (rc->iter_cls,
-                 next_cls, key,
+  ret = rc->proc (rc->proc_cls,
+                 key,
                  size, data, 
                  type, priority, anonymity, expiration,
                  uid);
@@ -1107,32 +938,30 @@
  * Get a random item for replication.  Returns a single, not expired, random 
item
  * from those with the highest replication counters.  The item's 
  * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for iter
  */
 static void
-postgres_plugin_replication_get (void *cls,
-                                PluginIterator iter, void *iter_cls)
+postgres_plugin_get_replication (void *cls,
+                                PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
   struct ReplCtx rc;
+  PGresult *ret;
 
   rc.plugin = plugin;
-  rc.iter = iter;
-  rc.iter_cls = iter_cls;
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->one_shot = GNUNET_YES;
-  nrc->total = 1;
-  nrc->plugin = plugin;
-  nrc->iter = &repl_iter;
-  nrc->iter_cls = &rc;
-  nrc->pname = "select_replication_order";
-  nrc->nparams = 0;
-  postgres_next_request_cont (nrc, NULL);
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+  ret = PQexecPrepared (plugin->dbh,
+                       "select_replication_order",
+                       0,
+                       NULL, NULL, NULL, 1);
+  process_result (plugin,
+                 &repl_proc, &rc,
+                 ret);
 }
 
 
@@ -1141,29 +970,31 @@
  * Call 'iter' with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for iter
  */
 static void
-postgres_plugin_expiration_get (void *cls,
-                               PluginIterator iter, void *iter_cls)
+postgres_plugin_get_expiration (void *cls,
+                               PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure *nrc;
   uint64_t btime;
+  const int paramFormats[] = { 1 };
+  int paramLengths[] = { sizeof (btime) };
+  const char *paramValues[] = { (const char*) &btime };
+  PGresult *ret;
   
   btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->one_shot = GNUNET_YES;
-  nrc->total = 1;
-  nrc->plugin = plugin;
-  nrc->iter = iter;
-  nrc->iter_cls = iter_cls;
-  nrc->pname = "select_expiration_order";
-  nrc->nparams = 1;
-  nrc->paramValues[0] = (const char *) &btime;
-  nrc->paramLengths[0] = sizeof (btime);
-  postgres_next_request_cont (nrc, NULL);
+  ret = PQexecPrepared (plugin->dbh,
+                       "select_expiration_order",
+                       1,
+                       paramValues,
+                       paramLengths,
+                       paramFormats, 
+                       1);
+  process_result (plugin,
+                 proc, proc_cls,
+                 ret);
 }
 
 
@@ -1260,14 +1091,13 @@
     }
   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
   api->cls = plugin;
-  api->get_size = &postgres_plugin_get_size;
+  api->estimate_size = &postgres_plugin_estimate_size;
   api->put = &postgres_plugin_put;
-  api->next_request = &postgres_plugin_next_request;
-  api->get = &postgres_plugin_get;
-  api->replication_get = &postgres_plugin_replication_get;
-  api->expiration_get = &postgres_plugin_expiration_get;
   api->update = &postgres_plugin_update;
-  api->iter_zero_anonymity = &postgres_plugin_iter_zero_anonymity;
+  api->get_key = &postgres_plugin_get_key;
+  api->get_replication = &postgres_plugin_get_replication;
+  api->get_expiration = &postgres_plugin_get_expiration;
+  api->get_zero_anonymity = &postgres_plugin_get_zero_anonymity;
   api->drop = &postgres_plugin_drop;
   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
                    "datastore-postgres",
@@ -1287,13 +1117,6 @@
   struct GNUNET_DATASTORE_PluginFunctions *api = cls;
   struct Plugin *plugin = api->cls;
   
-  if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-      GNUNET_SCHEDULER_cancel (plugin->next_task);
-      plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-      GNUNET_free (plugin->next_task_nc);
-      plugin->next_task_nc = NULL;
-    }
   PQfinish (plugin->dbh);
   GNUNET_free (plugin);
   GNUNET_free (api);

Modified: gnunet/src/datastore/plugin_datastore_sqlite.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_sqlite.c      2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/plugin_datastore_sqlite.c      2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -108,21 +108,16 @@
   sqlite3_stmt *selExpi;
 
   /**
-   * Precompiled SQL for insertion.
+   * Precompiled SQL for expiration selection.
    */
-  sqlite3_stmt *insertContent;
+  sqlite3_stmt *selZeroAnon;
 
   /**
-   * Closure of the 'next_task' (must be freed if 'next_task' is cancelled).
+   * Precompiled SQL for insertion.
    */
-  struct NextContext *next_task_nc;
+  sqlite3_stmt *insertContent;
 
   /**
-   * Pending task with scheduler for running the next request.
-   */
-  GNUNET_SCHEDULER_TaskIdentifier next_task;
-
-  /**
    * Should the database be dropped on shutdown?
    */
   int drop_on_shutdown;
@@ -326,6 +321,11 @@
                   " WHERE NOT EXISTS (SELECT 1 FROM gn090 WHERE expire < ?1 
LIMIT 1) OR expire < ?1 "
                   " ORDER BY prio ASC LIMIT 1",
                    &plugin->selExpi) != SQLITE_OK) ||
+      (sq_prepare (plugin->dbh, 
+                  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM 
gn090 "
+                  "WHERE (anonLevel = 0 AND type=?1) "
+                  "ORDER BY hash DESC LIMIT 1 OFFSET ?2",
+                  &plugin->selZeroAnon) != SQLITE_OK) ||
       (sq_prepare (plugin->dbh,
                    "INSERT INTO gn090 (repl, type, prio, "
                    "anonLevel, expire, hash, vhash, value) "
@@ -367,6 +367,8 @@
     sqlite3_finalize (plugin->selRepl);
   if (plugin->selExpi != NULL)
     sqlite3_finalize (plugin->selExpi);
+  if (plugin->selZeroAnon != NULL)
+    sqlite3_finalize (plugin->selZeroAnon);
   if (plugin->insertContent != NULL)
     sqlite3_finalize (plugin->insertContent);
   result = sqlite3_close(plugin->dbh);
@@ -436,247 +438,6 @@
 
 
 /**
- * Context for the universal iterator.
- */
-struct NextContext;
-
-/**
- * Type of a function that will prepare
- * the next iteration.
- *
- * @param cls closure
- * @param nc the next context; NULL for the last
- *         call which gives the callback a chance to
- *         clean up the closure
- * @return GNUNET_OK on success, GNUNET_NO if there are
- *         no more values, GNUNET_SYSERR on error
- */
-typedef int (*PrepareFunction)(void *cls,
-                              struct NextContext *nc);
-
-
-/**
- * Context we keep for the "next request" callback.
- */
-struct NextContext
-{
-  /**
-   * Internal state.
-   */ 
-  struct Plugin *plugin;
-
-  /**
-   * Function to call on the next value.
-   */
-  PluginIterator iter;
-
-  /**
-   * Closure for iter.
-   */
-  void *iter_cls;
-
-  /**
-   * Function to call to prepare the next
-   * iteration.
-   */
-  PrepareFunction prep;
-
-  /**
-   * Closure for prep.
-   */
-  void *prep_cls;
-
-  /**
-   * Statement that the iterator will get the data
-   * from (updated or set by prep).
-   */ 
-  sqlite3_stmt *stmt;
-
-  /**
-   * Row ID of the last result.
-   */
-  unsigned long long last_rowid;
-
-  /**
-   * Key of the last result.
-   */
-  GNUNET_HashCode lastKey;  
-
-  /**
-   * Priority of the last value visited.
-   */ 
-  unsigned int lastPriority; 
-
-  /**
-   * Number of results processed so far.
-   */
-  unsigned int count;
-
-  /**
-   * Set to GNUNET_YES if we must stop now.
-   */
-  int end_it;
-};
-
-
-/**
- * Continuation of "sqlite_next_request".
- *
- * @param cls the 'struct NextContext*'
- * @param tc the task context (unused)
- */
-static void 
-sqlite_next_request_cont (void *cls,
-                         const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct NextContext * nc = cls;
-  struct Plugin *plugin;
-  unsigned long long rowid;
-  int ret;
-  unsigned int size;
-  unsigned int hsize;
-  uint32_t anonymity;
-  uint32_t priority;
-  enum GNUNET_BLOCK_Type type;
-  const GNUNET_HashCode *key;
-  struct GNUNET_TIME_Absolute expiration;
-  
-  plugin = nc->plugin;
-  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-  plugin->next_task_nc = NULL;
-  if ( (GNUNET_YES == nc->end_it) ||
-       (GNUNET_OK != (nc->prep(nc->prep_cls,
-                              nc))) )
-    {
-#if DEBUG_SQLITE
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "sqlite",
-                      "Iteration completes after %u results\n",
-                      nc->count);
-#endif
-    END:
-      nc->iter (nc->iter_cls, 
-               NULL, NULL, 0, NULL, 0, 0, 0, 
-               GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      nc->prep (nc->prep_cls, NULL);
-      GNUNET_free (nc);
-      return;
-    }
-
-  type = sqlite3_column_int (nc->stmt, 0);
-  priority = sqlite3_column_int (nc->stmt, 1);
-  anonymity = sqlite3_column_int (nc->stmt, 2);
-  expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
-  hsize = sqlite3_column_bytes (nc->stmt, 4);
-  key = sqlite3_column_blob (nc->stmt, 4);
-  size = sqlite3_column_bytes (nc->stmt, 5);
-  rowid = sqlite3_column_int64 (nc->stmt, 6);
-  if (hsize != sizeof (GNUNET_HashCode))
-    {
-      GNUNET_break (0);
-      if (SQLITE_OK != sqlite3_reset (nc->stmt))
-       LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR |
-                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
-      if (GNUNET_OK == delete_by_rowid (plugin, rowid))
-       plugin->env->duc (plugin->env->cls,
-                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
-      goto END;
-    }
-#if DEBUG_SQLITE
-  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                  "sqlite",
-                  "Iterator returns value with type %u/key `%s'/priority 
%u/expiration %llu (%lld).\n",
-                  type, 
-                  GNUNET_h2s(key),
-                  priority,
-                  (unsigned long long) GNUNET_TIME_absolute_get_remaining 
(expiration).rel_value,
-                  (long long) expiration.abs_value);
-#endif
-  if (size > MAX_ITEM_SIZE)
-    {
-      GNUNET_break (0);
-      if (SQLITE_OK != sqlite3_reset (nc->stmt))
-       LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR |
-                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
-      if (GNUNET_OK == delete_by_rowid (plugin, rowid))
-       plugin->env->duc (plugin->env->cls,
-                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD)); 
-      goto END;
-    }
-  {
-    char data[size];
-    
-    memcpy (data, sqlite3_column_blob (nc->stmt, 5), size);
-    nc->count++;
-    nc->last_rowid = rowid;
-    nc->lastPriority = priority;
-    nc->lastKey = *key;
-    if (SQLITE_OK != sqlite3_reset (nc->stmt))
-      LOG_SQLITE (plugin, NULL,
-                 GNUNET_ERROR_TYPE_ERROR |
-                 GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
-    ret = nc->iter (nc->iter_cls, nc,
-                   &nc->lastKey,
-                   size, data,
-                   type, priority,
-                   anonymity, expiration,
-                   rowid);
-  }
-  switch (ret)
-    {
-    case GNUNET_SYSERR:
-      nc->end_it = GNUNET_YES;
-      break;
-    case GNUNET_NO:
-      if (GNUNET_OK == delete_by_rowid (plugin, rowid))
-       {
-#if DEBUG_SQLITE
-         GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                          "sqlite",
-                          "Removed entry %llu (%u bytes)\n",
-                          (unsigned long long) rowid,
-                          size + GNUNET_DATASTORE_ENTRY_OVERHEAD);
-#endif
-         plugin->env->duc (plugin->env->cls,
-                           - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));
-       }
-      break;
-    case GNUNET_YES:
-      break;
-    default:
-      GNUNET_break (0);
-    }
-}
-
-
-/**
- * Function invoked on behalf of a "PluginIterator" asking the
- * database plugin to call the iterator with the next item.
- *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-static void 
-sqlite_next_request (void *next_cls,
-                    int end_it)
-{
-  struct NextContext * nc= next_cls;
-
-  if (GNUNET_YES == end_it)
-    nc->end_it = GNUNET_YES;
-  nc->plugin->next_task_nc = nc;
-  nc->plugin->next_task = GNUNET_SCHEDULER_add_now (&sqlite_next_request_cont,
-                                                   nc);
-}
-
-
-/**
  * Store an item in the datastore.
  *
  * @param cls closure
@@ -849,355 +610,147 @@
 
 
 /**
- * Internal context for an iteration.
- */
-struct ZeroIterContext
-{
-  /**
-   * First iterator statement for zero-anonymity iteration.
-   */
-  sqlite3_stmt *stmt_1;
-
-  /**
-   * Second iterator statement for zero-anonymity iteration.
-   */
-  sqlite3_stmt *stmt_2;
-
-  /**
-   * Desired type for blocks returned by this iterator.
-   */
-  enum GNUNET_BLOCK_Type type;
-};
-
-
-/**
- * Prepare our SQL query to obtain the next record from the database.
+ * Execute statement that gets a row and call the callback
+ * with the result.  Resets the statement afterwards.
  *
- * @param cls our "struct ZeroIterContext"
- * @param nc NULL to terminate the iteration, otherwise our context for
- *           getting the next result.
- * @return GNUNET_OK on success, GNUNET_NO if there are no more results,
- *         GNUNET_SYSERR on error (or end of iteration)
+ * @param plugin the plugin
+ * @param stmt the statement
+ * @param proc processor to call
+ * @param proc_cls closure for 'proc'
  */
-static int
-zero_iter_next_prepare (void *cls,
-                       struct NextContext *nc)
+static void
+execute_get (struct Plugin *plugin,
+            sqlite3_stmt *stmt,
+            PluginDatumProcessor proc, void *proc_cls)
 {
-  struct ZeroIterContext *ic = cls;
-  struct Plugin *plugin;
+  int n;
+  struct GNUNET_TIME_Absolute expiration;
+  unsigned long long rowid;
+  unsigned int size;
   int ret;
 
-  if (nc == NULL)
+  n = sqlite3_step (stmt);
+  switch (n)
     {
-#if DEBUG_SQLITE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Asked to clean up iterator state.\n");
-#endif
-      sqlite3_finalize (ic->stmt_1);
-      sqlite3_finalize (ic->stmt_2);
-      return GNUNET_SYSERR;
-    }
-  plugin = nc->plugin;
-
-  /* first try iter 1 */
-#if DEBUG_SQLITE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Restricting to results larger than the last priority %u and key 
`%s'\n",
-             nc->lastPriority,
-             GNUNET_h2s (&nc->lastKey));
-#endif
-  if ( (SQLITE_OK != sqlite3_bind_int (ic->stmt_1, 1, nc->lastPriority)) ||
-       (SQLITE_OK != sqlite3_bind_blob (ic->stmt_1, 2, 
-                                       &nc->lastKey, 
-                                       sizeof (GNUNET_HashCode),
-                                       SQLITE_TRANSIENT)) )
-    {
-      LOG_SQLITE (plugin, NULL,
-                  GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite3_bind_XXXX");
-      if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
+    case SQLITE_ROW:
+      size = sqlite3_column_bytes (stmt, 5);
+      rowid = sqlite3_column_int64 (stmt, 6);
+      if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
+       {
+         GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
+                          "sqlite",
+                          _("Invalid data in database.  Trying to fix (by 
deletion).\n"));
+         if (SQLITE_OK != sqlite3_reset (stmt))
+           LOG_SQLITE (plugin, NULL,
+                       GNUNET_ERROR_TYPE_ERROR |
+                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+         if (GNUNET_OK == delete_by_rowid (plugin, rowid))
+           plugin->env->duc (plugin->env->cls,
+                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
  
+         break;
+       }
+      expiration.abs_value = sqlite3_column_int64 (stmt, 3);
+      ret = proc (proc_cls,
+                 sqlite3_column_blob (stmt, 4) /* key */,
+                 size,
+                 sqlite3_column_blob (stmt, 5) /* data */, 
+                 sqlite3_column_int (stmt, 0) /* type */,
+                 sqlite3_column_int (stmt, 1) /* priority */,
+                 sqlite3_column_int (stmt, 2) /* anonymity */,
+                 expiration,
+                 rowid);
+      if (SQLITE_OK != sqlite3_reset (stmt))
        LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR | 
-                   GNUNET_ERROR_TYPE_BULK, 
-                   "sqlite3_reset");  
-      return GNUNET_SYSERR;
-    }
-  if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_1)))
-    {      
-#if DEBUG_SQLITE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result found using iterator 1\n");
-#endif
-      nc->stmt = ic->stmt_1;
-      return GNUNET_OK;
-    }
-  if (ret != SQLITE_DONE)
-    {
-      LOG_SQLITE (plugin, NULL,
-                 GNUNET_ERROR_TYPE_ERROR |
-                 GNUNET_ERROR_TYPE_BULK,
-                 "sqlite3_step");
-      if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
+                   GNUNET_ERROR_TYPE_ERROR |
+                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+      if ( (GNUNET_NO == ret) &&
+          (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
+       plugin->env->duc (plugin->env->cls,
+                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
+      return;
+    case SQLITE_DONE:
+      /* database must be empty */
+      if (SQLITE_OK != sqlite3_reset (stmt))
        LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR | 
-                   GNUNET_ERROR_TYPE_BULK, 
-                   "sqlite3_reset");  
-      return GNUNET_SYSERR;
-    }
-  if (SQLITE_OK != sqlite3_reset (ic->stmt_1))
-    LOG_SQLITE (plugin, NULL,
-               GNUNET_ERROR_TYPE_ERROR | 
-               GNUNET_ERROR_TYPE_BULK, 
-               "sqlite3_reset");  
-
-  /* now try iter 2 */
-  if (SQLITE_OK != sqlite3_bind_int (ic->stmt_2, 1, nc->lastPriority))
-    {
-      LOG_SQLITE (plugin, NULL,                  
-                  GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite3_bind_XXXX");
-      return GNUNET_SYSERR;
-    }
-  if (SQLITE_ROW == (ret = sqlite3_step (ic->stmt_2))) 
-    {
-#if DEBUG_SQLITE
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Result found using iterator 2\n");
-#endif
-      nc->stmt = ic->stmt_2;
-      return GNUNET_OK;
-    }
-  if (ret != SQLITE_DONE)
-    {
+                   GNUNET_ERROR_TYPE_ERROR |
+                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+      break;
+    case SQLITE_BUSY:    
+    case SQLITE_ERROR:
+    case SQLITE_MISUSE:
+    default:
       LOG_SQLITE (plugin, NULL,
-                 GNUNET_ERROR_TYPE_ERROR |
-                 GNUNET_ERROR_TYPE_BULK,
+                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
                  "sqlite3_step");
-      if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
+      if (SQLITE_OK != sqlite3_reset (stmt))
        LOG_SQLITE (plugin, NULL,
                    GNUNET_ERROR_TYPE_ERROR |
                    GNUNET_ERROR_TYPE_BULK,
                    "sqlite3_reset");
-      return GNUNET_SYSERR;
+      GNUNET_break (0);
+      database_shutdown (plugin);
+      database_setup (plugin->env->cfg,
+                     plugin);
+      break;
     }
-  if (SQLITE_OK != sqlite3_reset (ic->stmt_2))
+  if (SQLITE_OK != sqlite3_reset (stmt))
     LOG_SQLITE (plugin, NULL,
                GNUNET_ERROR_TYPE_ERROR |
-               GNUNET_ERROR_TYPE_BULK,
-               "sqlite3_reset");
-#if DEBUG_SQLITE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "No result found using either iterator\n");
-#endif
-  return GNUNET_NO;
+               GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+  proc (proc_cls, NULL, 0, NULL, 0, 0, 0,          
+       GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 
 
+
 /**
  * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * the given processor for the item.
  *
  * @param cls our plugin context
  * @param type entries of which type should be considered?
  *        Use 0 for any type.
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
  *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  */
 static void
-sqlite_plugin_iter_zero_anonymity (void *cls,
-                                  enum GNUNET_BLOCK_Type type,
-                                  PluginIterator iter,
-                                  void *iter_cls)
+sqlite_plugin_get_zero_anonymity (void *cls,
+                                 uint64_t offset,
+                                 enum GNUNET_BLOCK_Type type,
+                                 PluginDatumProcessor proc,
+                                 void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct GNUNET_TIME_Absolute now;
-  struct NextContext *nc;
-  struct ZeroIterContext *ic;
-  sqlite3_stmt *stmt_1;
-  sqlite3_stmt *stmt_2;
-  char *q;
+  sqlite3_stmt *stmt;
 
   GNUNET_assert (type != GNUNET_BLOCK_TYPE_ANY);
-  now = GNUNET_TIME_absolute_get ();
-  GNUNET_asprintf (&q, 
-                  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM 
gn090 "
-                  "WHERE (anonLevel = 0 AND expire > %llu AND prio = ?1 AND 
type=%d AND hash < ?2) "
-                  "ORDER BY hash DESC LIMIT 1",
-                  (unsigned long long) now.abs_value,
-                  type);
-  if (sq_prepare (plugin->dbh, q, &stmt_1) != SQLITE_OK)
+  stmt = plugin->selZeroAnon;
+  if ( (SQLITE_OK != sqlite3_bind_int (stmt, 1, type)) ||
+       (SQLITE_OK != sqlite3_bind_int64 (stmt, 2, offset)) )
     {
       LOG_SQLITE (plugin, NULL,
-                  GNUNET_ERROR_TYPE_ERROR |
-                  GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
-      GNUNET_free (q);
+                  GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
+                 "sqlite3_bind_XXXX");
+      if (SQLITE_OK != sqlite3_reset (stmt))
+       LOG_SQLITE (plugin, NULL,
+                   GNUNET_ERROR_TYPE_ERROR | 
+                   GNUNET_ERROR_TYPE_BULK, 
+                   "sqlite3_reset");
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0,      
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
-  GNUNET_free (q);
-  GNUNET_asprintf (&q, 
-                  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM 
gn090 "
-                  "WHERE (anonLevel = 0 AND expire > %llu AND prio < ?1 AND 
type=%d) "
-                  "ORDER BY prio DESC, hash DESC LIMIT 1",
-                  (unsigned long long) now.abs_value,
-                  type);
-  if (sq_prepare (plugin->dbh, q, &stmt_2) != SQLITE_OK)
-    {
-      LOG_SQLITE (plugin, NULL,
-                  GNUNET_ERROR_TYPE_ERROR |
-                  GNUNET_ERROR_TYPE_BULK, "sqlite3_prepare_v2");
-      sqlite3_finalize (stmt_1);
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
-      GNUNET_free (q);
-      return;
-    }
-  GNUNET_free (q);
-  nc = GNUNET_malloc (sizeof(struct NextContext) + 
-                     sizeof(struct ZeroIterContext));
-  nc->plugin = plugin;
-  nc->iter = iter;
-  nc->iter_cls = iter_cls;
-  nc->stmt = NULL;
-  ic = (struct ZeroIterContext*) &nc[1];
-  ic->stmt_1 = stmt_1;
-  ic->stmt_2 = stmt_2;
-  ic->type = type;
-  nc->prep = &zero_iter_next_prepare;
-  nc->prep_cls = ic;
-  nc->lastPriority = INT32_MAX;
-  memset (&nc->lastKey, 255, sizeof (GNUNET_HashCode));
-  sqlite_next_request (nc, GNUNET_NO);
+  execute_get (plugin, stmt, proc, proc_cls);
 }
 
 
-/**
- * Context for get_next_prepare.
- */
-struct GetNextContext
-{
 
-  /**
-   * Our prepared statement.
-   */
-  sqlite3_stmt *stmt;
-
-  /**
-   * Plugin handle.
-   */
-  struct Plugin *plugin;
-
-  /**
-   * Key for the query.
-   */
-  GNUNET_HashCode key;
-
-  /**
-   * Vhash for the query.
-   */
-  GNUNET_HashCode vhash;
-
-  /**
-   * Expected total number of results.
-   */
-  unsigned int total;
-
-  /**
-   * Offset to add for the selected result.
-   */
-  unsigned int off;
-
-  /**
-   * Is vhash set?
-   */
-  int have_vhash;
-
-  /**
-   * Desired block type.
-   */
-  enum GNUNET_BLOCK_Type type;
-
-};
-
-
 /**
- * Prepare the stmt in 'nc' for the next round of execution, selecting the
- * next return value.
+ * Get results for a particular key in the datastore.
  *
- * @param cls our "struct GetNextContext*"
- * @param nc the general context
- * @return GNUNET_YES if there are more results, 
- *         GNUNET_NO if there are no more results,
- *         GNUNET_SYSERR on internal error
- */
-static int
-get_next_prepare (void *cls,
-                 struct NextContext *nc)
-{
-  struct GetNextContext *gnc = cls;
-  int ret;
-  int limit_off;
-  unsigned int sqoff;
-
-  if (nc == NULL)
-    {
-      sqlite3_finalize (gnc->stmt);
-      return GNUNET_SYSERR;
-    }
-  if (nc->count == gnc->total)
-    return GNUNET_NO;
-  if (nc->count + gnc->off == gnc->total)
-    nc->last_rowid = 0;
-  if (nc->count == 0)
-    limit_off = gnc->off;
-  else
-    limit_off = 0;
-  sqlite3_reset (nc->stmt);
-  sqoff = 1;
-  ret = sqlite3_bind_blob (nc->stmt,
-                          sqoff++,
-                          &gnc->key, 
-                          sizeof (GNUNET_HashCode),
-                          SQLITE_TRANSIENT);
-  if ((gnc->have_vhash) && (ret == SQLITE_OK))
-    ret = sqlite3_bind_blob (nc->stmt,
-                            sqoff++,
-                            &gnc->vhash,
-                            sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
-  if ((gnc->type != 0) && (ret == SQLITE_OK))
-    ret = sqlite3_bind_int (nc->stmt, sqoff++, gnc->type);
-  if (ret == SQLITE_OK)
-    ret = sqlite3_bind_int64 (nc->stmt, sqoff++, limit_off);
-  if (ret != SQLITE_OK)
-    return GNUNET_SYSERR;
-#if DEBUG_SQLITE 
-  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                  "sqlite",
-                   "Preparing to GET for key `%s' with type %d at offset %u\n",
-                  GNUNET_h2s (&gnc->key),
-                  gnc->type,
-                  limit_off);
-#endif
-  ret = sqlite3_step (nc->stmt);
-  switch (ret)
-    {
-    case SQLITE_ROW:
-      return GNUNET_OK;  
-    case SQLITE_DONE:
-      return GNUNET_NO;
-    default:
-      LOG_SQLITE (gnc->plugin, NULL,
-                 GNUNET_ERROR_TYPE_ERROR |
-                 GNUNET_ERROR_TYPE_BULK,
-                 "sqlite3_step");
-      return GNUNET_SYSERR;
-    }
-}
-
-
-/**
- * Iterate over the results for a particular key
- * in the datastore.
- *
  * @param cls closure
+ * @param offset offset (mod count).
  * @param key key to match, never NULL
  * @param vhash hash of the value, maybe NULL (to
  *        match all values that have the right key).
@@ -1206,27 +759,27 @@
  *        there may be!
  * @param type entries of which type are relevant?
  *     Use 0 for any type.
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
  *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  */
 static void
-sqlite_plugin_get (void *cls,
-                  const GNUNET_HashCode *key,
-                  const GNUNET_HashCode *vhash,
-                  enum GNUNET_BLOCK_Type type,
-                  PluginIterator iter, void *iter_cls)
+sqlite_plugin_get_key (void *cls,
+                      uint64_t offset,
+                      const GNUNET_HashCode *key,
+                      const GNUNET_HashCode *vhash,
+                      enum GNUNET_BLOCK_Type type,
+                      PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
-  struct GetNextContext *gnc;
-  struct NextContext *nc;
   int ret;
   int total;
+  int limit_off;
+  unsigned int sqoff;
   sqlite3_stmt *stmt;
   char scratch[256];
-  unsigned int sqoff;
 
-  GNUNET_assert (iter != NULL);
+  GNUNET_assert (proc != NULL);
   GNUNET_assert (key != NULL);
   GNUNET_snprintf (scratch, sizeof (scratch),
                    "SELECT count(*) FROM gn090 WHERE hash=?%s%s",
@@ -1236,7 +789,7 @@
     {
       LOG_SQLITE (plugin, NULL,
                   GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite_prepare");
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
   sqoff = 1;
@@ -1253,7 +806,7 @@
       LOG_SQLITE (plugin, NULL,
                   GNUNET_ERROR_TYPE_ERROR, "sqlite_bind");
       sqlite3_finalize (stmt);
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
   ret = sqlite3_step (stmt);
@@ -1263,147 +816,64 @@
                   GNUNET_ERROR_TYPE_ERROR| GNUNET_ERROR_TYPE_BULK, 
                  "sqlite_step");
       sqlite3_finalize (stmt);
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
   total = sqlite3_column_int (stmt, 0);
   sqlite3_finalize (stmt);
   if (0 == total)
     {
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
+  limit_off = (int) (offset % total);
+  if (limit_off < 0)
+    limit_off += total;
   GNUNET_snprintf (scratch, sizeof (scratch),
                    "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ 
"
                    "FROM gn090 WHERE hash=?%s%s "
                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET ?",
                    vhash == NULL ? "" : " AND vhash=?",
                    type == 0 ? "" : " AND type=?");
-
   if (sq_prepare (plugin->dbh, scratch, &stmt) != SQLITE_OK)
     {
       LOG_SQLITE (plugin, NULL,
                   GNUNET_ERROR_TYPE_ERROR |
                   GNUNET_ERROR_TYPE_BULK, "sqlite_prepare");
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 
0);
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
-  nc = GNUNET_malloc (sizeof(struct NextContext) + 
-                     sizeof(struct GetNextContext));
-  nc->plugin = plugin;
-  nc->iter = iter;
-  nc->iter_cls = iter_cls;
-  nc->stmt = stmt;
-  gnc = (struct GetNextContext*) &nc[1];
-  gnc->total = total;
-  gnc->type = type;
-  gnc->key = *key;
-  gnc->plugin = plugin;
-  gnc->stmt = stmt; /* alias used for freeing at the end! */
-  if (NULL != vhash)
+  sqoff = 1;
+  ret = sqlite3_bind_blob (stmt,
+                          sqoff++,
+                          key, 
+                          sizeof (GNUNET_HashCode),
+                          SQLITE_TRANSIENT);
+  if ((vhash != NULL) && (ret == SQLITE_OK))
+    ret = sqlite3_bind_blob (stmt,
+                            sqoff++,
+                            vhash,
+                            sizeof (GNUNET_HashCode), SQLITE_TRANSIENT);
+  if ((type != 0) && (ret == SQLITE_OK))
+    ret = sqlite3_bind_int (stmt, sqoff++, type);
+  if (ret == SQLITE_OK)
+    ret = sqlite3_bind_int64 (stmt, sqoff++, limit_off);
+  if (ret != SQLITE_OK)
     {
-      gnc->have_vhash = GNUNET_YES;
-      gnc->vhash = *vhash;
-    }
-  gnc->off = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, total);
-  nc->prep = &get_next_prepare;
-  nc->prep_cls = gnc;
-  sqlite_next_request (nc, GNUNET_NO);
-}
-
-
-/**
- * Execute statement that gets a row and call the callback
- * with the result.  Resets the statement afterwards.
- *
- * @param plugin the plugin
- * @param stmt the statement
- * @param iter iterator to call
- * @param iter_cls closure for 'iter'
- */
-static void
-execute_get (struct Plugin *plugin,
-            sqlite3_stmt *stmt,
-            PluginIterator iter, void *iter_cls)
-{
-  int n;
-  struct GNUNET_TIME_Absolute expiration;
-  unsigned long long rowid;
-  unsigned int size;
-  int ret;
-
-  n = sqlite3_step (stmt);
-  switch (n)
-    {
-    case SQLITE_ROW:
-      size = sqlite3_column_bytes (stmt, 5);
-      rowid = sqlite3_column_int64 (stmt, 6);
-      if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
-       {
-         GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
-                          "sqlite",
-                          _("Invalid data in database.  Trying to fix (by 
deletion).\n"));
-         if (SQLITE_OK != sqlite3_reset (stmt))
-           LOG_SQLITE (plugin, NULL,
-                       GNUNET_ERROR_TYPE_ERROR |
-                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
-         if (GNUNET_OK == delete_by_rowid (plugin, rowid))
-           plugin->env->duc (plugin->env->cls,
-                             - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));      
  
-         break;
-       }
-      expiration.abs_value = sqlite3_column_int64 (stmt, 3);
-      ret = iter (iter_cls,
-                 NULL,
-                 sqlite3_column_blob (stmt, 4) /* key */,
-                 size,
-                 sqlite3_column_blob (stmt, 5) /* data */, 
-                 sqlite3_column_int (stmt, 0) /* type */,
-                 sqlite3_column_int (stmt, 1) /* priority */,
-                 sqlite3_column_int (stmt, 2) /* anonymity */,
-                 expiration,
-                 rowid);
-      if (SQLITE_OK != sqlite3_reset (stmt))
-       LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR |
-                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
-      if ( (GNUNET_NO == ret) &&
-          (GNUNET_OK == delete_by_rowid (plugin, rowid)) )
-       plugin->env->duc (plugin->env->cls,
-                         - (size + GNUNET_DATASTORE_ENTRY_OVERHEAD));  
-      return;
-    case SQLITE_DONE:
-      /* database must be empty */
-      if (SQLITE_OK != sqlite3_reset (stmt))
-       LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR |
-                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
-      break;
-    case SQLITE_BUSY:    
-    case SQLITE_ERROR:
-    case SQLITE_MISUSE:
-    default:
       LOG_SQLITE (plugin, NULL,
-                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
-                 "sqlite3_step");
-      if (SQLITE_OK != sqlite3_reset (stmt))
-       LOG_SQLITE (plugin, NULL,
-                   GNUNET_ERROR_TYPE_ERROR |
-                   GNUNET_ERROR_TYPE_BULK,
-                   "sqlite3_reset");
-      GNUNET_break (0);
-      database_shutdown (plugin);
-      database_setup (plugin->env->cfg,
-                     plugin);
-      break;
+                  GNUNET_ERROR_TYPE_ERROR |
+                  GNUNET_ERROR_TYPE_BULK, "sqlite_bind");
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
     }
-  iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,            
-       GNUNET_TIME_UNIT_ZERO_ABS, 0);
+  execute_get (plugin, stmt, proc, proc_cls);
+  sqlite3_finalize (stmt);
 }
 
 
+
 /**
- * Context for 'repl_iter' function.
+ * Context for 'repl_proc' function.
  */
 struct ReplCtx
 {
@@ -1416,22 +886,21 @@
   /**
    * Function to call for the result (or the NULL).
    */
-  PluginIterator iter;
+  PluginDatumProcessor proc;
   
   /**
-   * Closure for iter.
+   * Closure for proc.
    */
-  void *iter_cls;
+  void *proc_cls;
 };
 
 
 /**
- * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Wrapper for the processor for 'sqlite_plugin_replication_get'.
  * Decrements the replication counter and calls the original
- * iterator.
+ * processor.
  *
  * @param cls closure
- * @param next_cls closure to pass to the "next" function.
  * @param key key for the content
  * @param size number of bytes in data
  * @param data content stored
@@ -1442,13 +911,11 @@
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
  *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
- *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ * @return GNUNET_OK for normal return,
+ *         GNUNET_NO to delete the item
  */
 static int
-repl_iter (void *cls,
-          void *next_cls,
+repl_proc (void *cls,
           const GNUNET_HashCode *key,
           uint32_t size,
           const void *data,
@@ -1462,8 +929,8 @@
   struct Plugin *plugin = rc->plugin;
   int ret;
 
-  ret = rc->iter (rc->iter_cls,
-                 next_cls, key,
+  ret = rc->proc (rc->proc_cls,
+                 key,
                  size, data, 
                  type, priority, anonymity, expiration,
                  uid);
@@ -1494,15 +961,15 @@
  * Get a random item for replication.  Returns a single random item
  * from those with the highest replication counters.  The item's 
  * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
 static void
-sqlite_plugin_replication_get (void *cls,
-                              PluginIterator iter, void *iter_cls)
+sqlite_plugin_get_replication (void *cls,
+                              PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
   struct ReplCtx rc;
@@ -1513,24 +980,24 @@
                   "Getting random block based on replication order.\n");
 #endif
   rc.plugin = plugin;
-  rc.iter = iter;
-  rc.iter_cls = iter_cls;
-  execute_get (plugin, plugin->selRepl, &repl_iter, &rc);
+  rc.proc = proc;
+  rc.proc_cls = proc_cls;
+  execute_get (plugin, plugin->selRepl, &repl_proc, &rc);
 }
 
 
 
 /**
  * Get a random item that has expired or has low priority.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Call 'proc' with all values ZERO or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
 static void
-sqlite_plugin_expiration_get (void *cls,
-                             PluginIterator iter, void *iter_cls)
+sqlite_plugin_get_expiration (void *cls,
+                             PluginDatumProcessor proc, void *proc_cls)
 {
   struct Plugin *plugin = cls;
   sqlite3_stmt *stmt;
@@ -1550,11 +1017,11 @@
       if (SQLITE_OK != sqlite3_reset (stmt))
         LOG_SQLITE (plugin, NULL,
                     GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite3_reset");
-      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
+      proc (proc_cls, NULL, 0, NULL, 0, 0, 0, 
            GNUNET_TIME_UNIT_ZERO_ABS, 0);
       return;
     }
-  execute_get (plugin, stmt, iter, iter_cls);
+  execute_get (plugin, stmt, proc, proc_cls);
 }
 
 
@@ -1579,7 +1046,7 @@
  * @return the size of the database on disk (estimate)
  */
 static unsigned long long
-sqlite_plugin_get_size (void *cls)
+sqlite_plugin_estimate_size (void *cls)
 {
   struct Plugin *plugin = cls;
   sqlite3_stmt *stmt;
@@ -1653,14 +1120,13 @@
     }
   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
   api->cls = &plugin;
-  api->get_size = &sqlite_plugin_get_size;
+  api->estimate_size = &sqlite_plugin_estimate_size;
   api->put = &sqlite_plugin_put;
-  api->next_request = &sqlite_next_request;
-  api->get = &sqlite_plugin_get;
-  api->replication_get = &sqlite_plugin_replication_get;
-  api->expiration_get = &sqlite_plugin_expiration_get;
   api->update = &sqlite_plugin_update;
-  api->iter_zero_anonymity = &sqlite_plugin_iter_zero_anonymity;
+  api->get_key = &sqlite_plugin_get_key;
+  api->get_replication = &sqlite_plugin_get_replication;
+  api->get_expiration = &sqlite_plugin_get_expiration;
+  api->get_zero_anonymity = &sqlite_plugin_get_zero_anonymity;
   api->drop = &sqlite_plugin_drop;
   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
                    "sqlite", _("Sqlite database running\n"));
@@ -1684,27 +1150,9 @@
 #if DEBUG_SQLITE
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
                   "sqlite",
-                  "sqlite plugin is doneing\n");
+                  "sqlite plugin is done\n");
 #endif
 
-  if (plugin->next_task != GNUNET_SCHEDULER_NO_TASK)
-    {
-#if DEBUG_SQLITE
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "sqlite",
-                      "Canceling next task\n");
-#endif
-      GNUNET_SCHEDULER_cancel (plugin->next_task);
-      plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-#if DEBUG_SQLITE
-      GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
-                      "sqlite",
-                      "Prep'ing next task\n");
-#endif
-      plugin->next_task_nc->prep (plugin->next_task_nc->prep_cls, NULL);
-      GNUNET_free (plugin->next_task_nc);
-      plugin->next_task_nc = NULL;
-    }
   fn = NULL;
   if (plugin->drop_on_shutdown)
     fn = GNUNET_strdup (plugin->fn);

Modified: gnunet/src/datastore/plugin_datastore_template.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_template.c    2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/plugin_datastore_template.c    2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -47,7 +47,8 @@
  * @param cls our "struct Plugin*"
  * @return number of bytes used on disk
  */
-static unsigned long long template_plugin_get_size (void *cls)
+static unsigned long long 
+template_plugin_estimate_size (void *cls)
 {
   GNUNET_break (0);
   return 0;
@@ -88,30 +89,11 @@
 
 
 /**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
+ * Get one of the results for a particular key in the datastore.
  *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-static void 
-template_plugin_next_request (void *next_cls,
-                      int end_it)
-{
-  GNUNET_break (0);
-}
-
-
-/**
- * Iterate over the results for a particular key
- * in the datastore.
- *
  * @param cls closure
+ * @param offset offset of the result (mod #num-results); 
+ *               specific ordering does not matter for the offset
  * @param key maybe NULL (to match all entries)
  * @param vhash hash of the value, maybe NULL (to
  *        match all values that have the right key).
@@ -120,16 +102,17 @@
  *        there may be!
  * @param type entries of which type are relevant?
  *     Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc function to call on each matching value;
+ *        will be called with NULL if nothing matches
+ * @param proc_cls closure for proc
  */
 static void
-template_plugin_get (void *cls,
-                    const GNUNET_HashCode * key,
-                    const GNUNET_HashCode * vhash,
-                    enum GNUNET_BLOCK_Type type,
-                    PluginIterator iter, void *iter_cls)
+template_plugin_get_key (void *cls,
+                        uint64_t offset,
+                        const GNUNET_HashCode * key,
+                        const GNUNET_HashCode * vhash,
+                        enum GNUNET_BLOCK_Type type,
+                        PluginDatumProcessor proc, void *proc_cls)
 {
   GNUNET_break (0);
 }
@@ -137,34 +120,35 @@
 
 
 /**
- * Get a random item for replication.  Returns a single, not expired, random 
item
- * from those with the highest replication counters.  The item's 
- * replication counter is decremented by one IF it was positive before.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for replication.  Returns a single, not expired,
+ * random item from those with the highest replication counters.  The
+ * item's replication counter is decremented by one IF it was positive
+ * before.  Call 'proc' with all values ZERO or NULL if the datastore
+ * is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
 static void
-template_plugin_replication_get (void *cls,
-                                PluginIterator iter, void *iter_cls)
+template_plugin_get_replication (void *cls,
+                                PluginDatumProcessor proc, void *proc_cls)
 {
   GNUNET_break (0);
 }
 
 
 /**
- * Get a random item for expiration.
- * Call 'iter' with all values ZERO or NULL if the datastore is empty.
+ * Get a random item for expiration.  Call 'proc' with all values ZERO
+ * or NULL if the datastore is empty.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
 static void
-template_plugin_expiration_get (void *cls,
-                               PluginIterator iter, void *iter_cls)
+template_plugin_get_expiration (void *cls,
+                               PluginDatumProcessor proc, void *proc_cls)
 {
   GNUNET_break (0);
 }
@@ -196,7 +180,8 @@
 static int
 template_plugin_update (void *cls,
                        uint64_t uid,
-                       int delta, struct GNUNET_TIME_Absolute expire,
+                       int delta, 
+                       struct GNUNET_TIME_Absolute expire,
                        char **msg)
 {
   GNUNET_break (0);
@@ -206,21 +191,23 @@
 
 
 /**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Call the given processor on an item with zero anonymity.
  *
  * @param cls our "struct Plugin*"
+ * @param offset offset of the result (mod #num-results); 
+ *               specific ordering does not matter for the offset
  * @param type entries of which type should be considered?
  *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc function to call on each matching value;
+ *        will be called  with NULL if no value matches
+ * @param proc_cls closure for proc
  */
 static void
-template_plugin_iter_zero_anonymity (void *cls,
-                                    enum GNUNET_BLOCK_Type type,
-                                    PluginIterator iter,
-                                    void *iter_cls)
+template_plugin_get_zero_anonymity (void *cls,
+                                   uint64_t offset,
+                                   enum GNUNET_BLOCK_Type type,
+                                   PluginDatumProcessor proc,
+                                   void *proc_cls)
 {
   GNUNET_break (0);
 }
@@ -253,14 +240,13 @@
   plugin->env = env;
   api = GNUNET_malloc (sizeof (struct GNUNET_DATASTORE_PluginFunctions));
   api->cls = plugin;
-  api->get_size = &template_plugin_get_size;
+  api->estimate_size = &template_plugin_estimate_size;
   api->put = &template_plugin_put;
-  api->next_request = &template_plugin_next_request;
-  api->get = &template_plugin_get;
-  api->replication_get = &template_plugin_replication_get;
-  api->expiration_get = &template_plugin_expiration_get;
   api->update = &template_plugin_update;
-  api->iter_zero_anonymity = &template_plugin_iter_zero_anonymity;
+  api->get_key = &template_plugin_get_key;
+  api->get_replication = &template_plugin_get_replication;
+  api->get_expiration = &template_plugin_get_expiration;
+  api->get_zero_anonymity = &template_plugin_get_zero_anonymity;
   api->drop = &template_plugin_drop;
   GNUNET_log_from (GNUNET_ERROR_TYPE_INFO,
                    "template", _("Template database running\n"));

Modified: gnunet/src/datastore/test_datastore_api.c
===================================================================
--- gnunet/src/datastore/test_datastore_api.c   2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/datastore/test_datastore_api.c   2011-04-26 18:19:15 UTC (rev 
15078)
@@ -102,20 +102,18 @@
 enum RunPhase
   {
     RP_DONE = 0,
-    RP_PUT,
-    RP_GET,
-    RP_DEL,
-    RP_DO_DEL,
-    RP_DELVALIDATE,
-    RP_RESERVE,
-    RP_PUT_MULTIPLE,
-    RP_PUT_MULTIPLE_NEXT,
-    RP_GET_MULTIPLE,
-    RP_GET_MULTIPLE_NEXT, /* 10 */
-    RP_GET_MULTIPLE_DONE,
-    RP_UPDATE,
-    RP_UPDATE_VALIDATE, /* 13 */
-    RP_UPDATE_DONE,
+    RP_PUT = 1,
+    RP_GET = 2,
+    RP_DEL = 3,
+    RP_DO_DEL = 4,
+    RP_DELVALIDATE = 5,
+    RP_RESERVE = 6,
+    RP_PUT_MULTIPLE = 7,
+    RP_PUT_MULTIPLE_NEXT = 8,
+    RP_GET_MULTIPLE = 9,
+    RP_GET_MULTIPLE_NEXT = 10,
+    RP_UPDATE = 11,
+    RP_UPDATE_VALIDATE = 12,
     RP_ERROR
   };
 
@@ -129,7 +127,9 @@
   void *data;
   size_t size;
   enum RunPhase phase;
-  unsigned long long uid;
+  uint64_t uid;
+  uint64_t offset;
+  uint64_t first_uid;
 };
 
 
@@ -144,16 +144,15 @@
               const char *msg)
 {
   struct CpsRunContext *crc = cls;
+
   if (GNUNET_OK != success)
     {
-      ok = 42;
       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                 "Operation not successfull: `%s'\n", msg);
+                 "Operation %d/%d not successfull: `%s'\n", 
+                 crc->phase,
+                 crc->i,
+                 msg);
       crc->phase = RP_ERROR;
-      GNUNET_SCHEDULER_add_continuation (&run_continuation,
-                                        crc,
-                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      return;
     }
   GNUNET_free_non_null (crc->data);
   crc->data = NULL;
@@ -171,7 +170,8 @@
   struct CpsRunContext *crc = cls;
   if (0 >= success)
     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-               "%s\n", msg);
+               "Error obtaining reservation: `%s'\n", 
+               msg);
   GNUNET_assert (0 < success);
   crc->rid = success;
   GNUNET_SCHEDULER_add_continuation (&run_continuation,
@@ -188,42 +188,48 @@
             enum GNUNET_BLOCK_Type type,
             uint32_t priority,
             uint32_t anonymity,
-            struct GNUNET_TIME_Absolute
-            expiration, uint64_t uid)
+            struct GNUNET_TIME_Absolute expiration, 
+            uint64_t uid)
 {
-  static int matched;
   struct CpsRunContext *crc = cls;
   int i;
 
-  if (key == NULL)
-    {
-      if (crc->i == 0)
-       {
-         crc->phase = RP_DEL;
-         crc->i = ITERATIONS;
-       }
-      GNUNET_assert (matched == GNUNET_YES);
-      matched = GNUNET_NO;
-      GNUNET_SCHEDULER_add_continuation (&run_continuation,
-                                        crc,
-                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      return;
-    }
   i = crc->i;
+#if 0
+  fprintf (stderr,
+          "Check value got `%s' of size %u, type %d, expire %llu\n",
+          GNUNET_h2s (key),
+          (unsigned int) size,
+          type,
+          (unsigned long long) expiration.abs_value);
+  fprintf (stderr,
+          "Check value iteration %d wants size %u, type %d, expire %llu\n",
+          i,
+          (unsigned int) get_size (i),
+          get_type (i),
+          (unsigned long long) get_expiration(i).abs_value);
+#endif
   GNUNET_assert (size == get_size (i));
   GNUNET_assert (0 == memcmp (data, get_data(i), size));
   GNUNET_assert (type == get_type (i));
   GNUNET_assert (priority == get_priority (i));
   GNUNET_assert (anonymity == get_anonymity(i));
   GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value);
-  matched = GNUNET_YES;
-  GNUNET_DATASTORE_iterate_get_next (datastore);
+  crc->offset++;
+  if (crc->i == 0)
+    {
+      crc->phase = RP_DEL;
+      crc->i = ITERATIONS;
+    } 
+  GNUNET_SCHEDULER_add_continuation (&run_continuation,
+                                    crc,
+                                    GNUNET_SCHEDULER_REASON_PREREQ_DONE);
 }
 
 
 static void 
 delete_value (void *cls,
-             const GNUNET_HashCode * key,
+             const GNUNET_HashCode *key,
              size_t size,
              const void *data,
              enum GNUNET_BLOCK_Type type,
@@ -233,36 +239,23 @@
              expiration, uint64_t uid)
 {
   struct CpsRunContext *crc = cls;
-  if (key == NULL)
-    {
-      if (crc->data == NULL)
-       {
-         GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
-                     "Content %u not found!\n",
-                     crc->i);
-         crc->phase = RP_ERROR;
-       }
-      else
-       {
-         crc->phase = RP_DO_DEL;
-       }
-      GNUNET_SCHEDULER_add_continuation (&run_continuation,
-                                        crc,
-                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      return;
-    }
+
   GNUNET_assert (crc->data == NULL);
+  GNUNET_assert (NULL != key);
   crc->size = size;
   crc->key = *key;
   crc->data = GNUNET_malloc (size);
   memcpy (crc->data, data, size);
-  GNUNET_DATASTORE_iterate_get_next (datastore);
+  crc->phase = RP_DO_DEL;
+  GNUNET_SCHEDULER_add_continuation (&run_continuation,
+                                    crc,
+                                    GNUNET_SCHEDULER_REASON_PREREQ_DONE);
 }
 
 
 static void 
 check_nothing (void *cls,
-              const GNUNET_HashCode * key,
+              const GNUNET_HashCode *key,
               size_t size,
               const void *data,
               enum GNUNET_BLOCK_Type type,
@@ -272,11 +265,10 @@
               expiration, uint64_t uid)
 {
   struct CpsRunContext *crc = cls;
+
   GNUNET_assert (key == NULL);
   if (crc->i == 0)
-    {
-      crc->phase = RP_RESERVE;   
-    }
+    crc->phase = RP_RESERVE;
   GNUNET_SCHEDULER_add_continuation (&run_continuation,
                                     crc,
                                     GNUNET_SCHEDULER_REASON_PREREQ_DONE);
@@ -296,47 +288,28 @@
 {
   struct CpsRunContext *crc = cls;
 
-  if (key == NULL)
-    {
-      if (crc->phase != RP_GET_MULTIPLE_DONE)
-       {
-         fprintf (stderr, 
-                  "Wrong phase: %d\n",
-                  crc->phase);
-         GNUNET_break (0);
-         crc->phase = RP_ERROR;
-       }
-      else
-       {
-         crc->phase = RP_UPDATE;
-       }
-      GNUNET_SCHEDULER_add_continuation (&run_continuation,
-                                        crc,
-                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      return;
-    }
+  GNUNET_assert (key != NULL);
   switch (crc->phase)
     {
     case RP_GET_MULTIPLE:
       crc->phase = RP_GET_MULTIPLE_NEXT;
+      crc->first_uid = uid;
+      crc->offset++;
       break;
     case RP_GET_MULTIPLE_NEXT:
-      crc->phase = RP_GET_MULTIPLE_DONE;
+      GNUNET_assert (uid != crc->first_uid);
+      crc->phase = RP_UPDATE;
       break;
-    case RP_GET_MULTIPLE_DONE:
-      /* do not advance further */
-      break;
     default:
       GNUNET_break (0);
+      crc->phase = RP_ERROR;
       break;
     }
-#if VERBOSE
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Test in phase %u\n", crc->phase);
-#endif
   if (priority == get_priority (42))
     crc->uid = uid;
-  GNUNET_DATASTORE_iterate_get_next (datastore);
+  GNUNET_SCHEDULER_add_continuation (&run_continuation,
+                                    crc,
+                                    GNUNET_SCHEDULER_REASON_PREREQ_DONE);
 }
 
 
@@ -353,31 +326,19 @@
 {
   struct CpsRunContext *crc = cls;
 
-  if (key == NULL)
-    {
-      if (crc->phase != RP_UPDATE_DONE)
-       {
-         GNUNET_break (0);
-         crc->phase = RP_ERROR;
-       }
-      else
-       {
-         crc->phase = RP_DONE;
-       }
-      GNUNET_SCHEDULER_add_continuation (&run_continuation,
-                                        crc,
-                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      return;
-    }
+  GNUNET_assert (key != NULL);
   if ( (anonymity == get_anonymity (42)) &&
        (size == get_size (42)) &&
        (priority == get_priority (42) + 100) )
+    crc->phase = RP_DONE;    
+  else
     {
-      crc->phase = RP_UPDATE_DONE;
+      GNUNET_assert (size == get_size (43));
+      crc->offset++;
     }
-  else
-    GNUNET_assert (size == get_size (43));
-  GNUNET_DATASTORE_iterate_get_next (datastore);
+  GNUNET_SCHEDULER_add_continuation (&run_continuation,
+                                    crc,
+                                    GNUNET_SCHEDULER_REASON_PREREQ_DONE);
 }
 
 
@@ -427,12 +388,13 @@
                  crc->i);
 #endif
       GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-      GNUNET_DATASTORE_iterate_key (datastore, 
-                                   &crc->key,
-                                   get_type (crc->i),
-                                   1, 1, TIMEOUT,
-                                   &check_value,
-                                   crc);
+      GNUNET_DATASTORE_get_key (datastore, 
+                               crc->offset,
+                               &crc->key,
+                               get_type (crc->i),
+                               1, 1, TIMEOUT,
+                               &check_value,
+                               crc);
       break;
     case RP_DEL:
       crc->i--;
@@ -444,12 +406,14 @@
 #endif
       crc->data = NULL;
       GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-      GNUNET_DATASTORE_iterate_key (datastore, 
-                                   &crc->key,
-                                   get_type (crc->i),
-                                   1, 1, TIMEOUT,
-                                   &delete_value,
-                                   crc);
+      GNUNET_assert (NULL !=
+                    GNUNET_DATASTORE_get_key (datastore, 
+                                              crc->offset,
+                                              &crc->key,
+                                              get_type (crc->i),
+                                              1, 1, TIMEOUT,
+                                              &delete_value,
+                                              crc));
       break;
     case RP_DO_DEL:
 #if VERBOSE
@@ -467,13 +431,14 @@
        {
          crc->phase = RP_DEL;
        }
-      GNUNET_DATASTORE_remove (datastore,
-                              &crc->key,
-                              crc->size,
-                              crc->data,
-                              1, 1, TIMEOUT,
-                              &check_success,
-                              crc);
+      GNUNET_assert (NULL !=
+                    GNUNET_DATASTORE_remove (datastore,
+                                             &crc->key,
+                                             crc->size,
+                                             crc->data,
+                                             1, 1, TIMEOUT,
+                                             &check_success,
+                                             crc));
       break;   
     case RP_DELVALIDATE:
       crc->i--;
@@ -484,12 +449,14 @@
                  crc->i);
 #endif
       GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-      GNUNET_DATASTORE_iterate_key (datastore, 
-                                   &crc->key,
-                                   get_type (crc->i),
-                                   1, 1, TIMEOUT,
-                                   &check_nothing,
-                                   crc);
+      GNUNET_assert (NULL != 
+                    GNUNET_DATASTORE_get_key (datastore, 
+                                              crc->offset,
+                                              &crc->key,
+                                              get_type (crc->i),
+                                              1, 1, TIMEOUT,
+                                              &check_nothing,
+                                              crc));
       break;
     case RP_RESERVE:
       crc->phase = RP_PUT_MULTIPLE;
@@ -533,16 +500,24 @@
                            crc);
       break;
     case RP_GET_MULTIPLE:
-      GNUNET_DATASTORE_iterate_key (datastore,
-                                   &crc->key, 
-                                   get_type (42),
-                                   1, 1, TIMEOUT,
-                                   &check_multiple,
-                                   crc);
+      GNUNET_assert (NULL !=
+                    GNUNET_DATASTORE_get_key (datastore,
+                                              crc->offset,
+                                              &crc->key, 
+                                              get_type (42),
+                                              1, 1, TIMEOUT,
+                                              &check_multiple,
+                                              crc));
       break;
     case RP_GET_MULTIPLE_NEXT:
-    case RP_GET_MULTIPLE_DONE:
-      GNUNET_assert (0);
+      GNUNET_assert (NULL !=
+                    GNUNET_DATASTORE_get_key (datastore,
+                                              crc->offset,
+                                              &crc->key, 
+                                              get_type (42),
+                                              1, 1, TIMEOUT,
+                                              &check_multiple,
+                                              crc));
       break;
     case RP_UPDATE:
       GNUNET_assert (crc->uid > 0);
@@ -556,16 +531,15 @@
                               crc);
       break;
     case RP_UPDATE_VALIDATE:
-      GNUNET_DATASTORE_iterate_key (datastore,
-                                   &crc->key, 
-                                   get_type (42),
-                                   1, 1, TIMEOUT,
-                                   &check_update,
-                                   crc);   
+      GNUNET_assert (NULL !=
+                    GNUNET_DATASTORE_get_key (datastore,
+                                              crc->offset,
+                                              &crc->key, 
+                                              get_type (42),
+                                              1, 1, TIMEOUT,
+                                              &check_update,
+                                              crc));
       break;
-    case RP_UPDATE_DONE:
-      GNUNET_assert (0);
-      break;
     case RP_DONE:
 #if VERBOSE
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -681,6 +655,7 @@
                       argv, "test-datastore-api", "nohelp",
                       options, &run, NULL);
 #if START_DATASTORE
+  sleep (1); /* give datastore chance to receive 'DROP' request */
   if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
     {
       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");

Modified: gnunet/src/datastore/test_datastore_api_data_sqlite.conf
===================================================================
--- gnunet/src/datastore/test_datastore_api_data_sqlite.conf    2011-04-26 
14:30:13 UTC (rev 15077)
+++ gnunet/src/datastore/test_datastore_api_data_sqlite.conf    2011-04-26 
18:19:15 UTC (rev 15078)
@@ -29,7 +29,7 @@
 # REJECT_FROM =
 # REJECT_FROM6 =
 # PREFIX =
-# DEBUG = YES
+#DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-datastore
 

Modified: gnunet/src/datastore/test_datastore_api_management.c
===================================================================
--- gnunet/src/datastore/test_datastore_api_management.c        2011-04-26 
14:30:13 UTC (rev 15077)
+++ gnunet/src/datastore/test_datastore_api_management.c        2011-04-26 
18:19:15 UTC (rev 15078)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet.
-     (C) 2004, 2005, 2006, 2007, 2009 Christian Grothoff (and other 
contributing authors)
+     (C) 2004, 2005, 2006, 2007, 2009, 2011 Christian Grothoff (and other 
contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -97,9 +97,9 @@
 
 enum RunPhase
   {
-    RP_DONE = 0,
     RP_PUT,
     RP_GET,
+    RP_DONE,
     RP_GET_FAIL
   };
 
@@ -112,6 +112,7 @@
   const struct GNUNET_CONFIGURATION_Handle *cfg;
   void *data;
   enum RunPhase phase;
+  uint64_t offset;
 };
 
 
@@ -146,42 +147,26 @@
             enum GNUNET_BLOCK_Type type,
             uint32_t priority,
             uint32_t anonymity,
-            struct GNUNET_TIME_Absolute
-            expiration, uint64_t uid)
+            struct GNUNET_TIME_Absolute expiration, 
+            uint64_t uid)
 {
   struct CpsRunContext *crc = cls;
   int i;
 
-  if (key == NULL)
-    {
-      crc->i--;
-      if (crc->found == GNUNET_YES)
-       {
-         crc->phase = RP_GET;
-         crc->found = GNUNET_NO;
-       }
-      else
-       {
-         fprintf (stderr,
-                  "First not found was %u\n", crc->i);
-         crc->phase = RP_GET_FAIL;
-       }
-      if (0 == crc->i)
-       crc->phase = RP_DONE;
-      GNUNET_SCHEDULER_add_continuation (&run_continuation,
-                                        crc,
-                                        GNUNET_SCHEDULER_REASON_PREREQ_DONE);
-      return;
-    }
   i = crc->i;
-  crc->found = GNUNET_YES;
   GNUNET_assert (size == get_size (i));
   GNUNET_assert (0 == memcmp (data, get_data(i), size));
   GNUNET_assert (type == get_type (i));
   GNUNET_assert (priority == get_priority (i));
   GNUNET_assert (anonymity == get_anonymity(i));
   GNUNET_assert (expiration.abs_value == get_expiration(i).abs_value);
-  GNUNET_DATASTORE_iterate_get_next (datastore);
+  crc->offset++;
+  crc->i--;
+  if (crc->i == 0)
+    crc->phase = RP_DONE;
+  GNUNET_SCHEDULER_add_continuation (&run_continuation,
+                                    crc,
+                                    GNUNET_SCHEDULER_REASON_PREREQ_DONE);
 }
 
 
@@ -241,7 +226,7 @@
        {
          GNUNET_log (GNUNET_ERROR_TYPE_INFO,
                      "Sleeping to give datastore time to clean up\n");
-         sleep (5);
+         sleep (1);
          crc->phase = RP_GET;
          crc->i--;
        }
@@ -254,12 +239,13 @@
                  crc->i);
 #endif
       GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-      GNUNET_DATASTORE_iterate_key (datastore, 
-                                   &crc->key,
-                                   get_type (crc->i),
-                                   1, 1, TIMEOUT,
-                                   &check_value,
-                                   crc);
+      GNUNET_DATASTORE_get_key (datastore, 
+                               crc->offset++,
+                               &crc->key,
+                               get_type (crc->i),
+                               1, 1, TIMEOUT,
+                               &check_value,
+                               crc);
       break;
     case RP_GET_FAIL:
 #if VERBOSE
@@ -269,12 +255,13 @@
                  crc->i);
 #endif
       GNUNET_CRYPTO_hash (&crc->i, sizeof (int), &crc->key);
-      GNUNET_DATASTORE_iterate_key (datastore, 
-                                   &crc->key,
-                                   get_type (crc->i),
-                                   1, 1, TIMEOUT,
-                                   &check_nothing,
-                                   crc);
+      GNUNET_DATASTORE_get_key (datastore, 
+                               crc->offset++,
+                               &crc->key,
+                               get_type (crc->i),
+                               1, 1, TIMEOUT,
+                               &check_nothing,
+                               crc);
       break;
     case RP_DONE:
       GNUNET_assert (0 == crc->i);
@@ -372,6 +359,7 @@
   GNUNET_PROGRAM_run ((sizeof (argv) / sizeof (char *)) - 1,
                       argv, "test-datastore-api", "nohelp",
                       options, &run, NULL);
+  sleep (1); /* give datastore chance to process 'DROP' request */
   if (0 != GNUNET_OS_process_kill (proc, SIGTERM))
     {
       GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "kill");

Modified: gnunet/src/datastore/test_plugin_datastore.c
===================================================================
--- gnunet/src/datastore/test_plugin_datastore.c        2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/datastore/test_plugin_datastore.c        2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -65,6 +65,7 @@
   enum RunPhase phase;
   unsigned int cnt;
   unsigned int i;
+  uint64_t offset;
 };
 
 
@@ -120,6 +121,11 @@
   value[0] = k;
   msg = NULL;
   prio = GNUNET_CRYPTO_random_u32 (GNUNET_CRYPTO_QUALITY_WEAK, 100);
+#if VERBOSE
+  fprintf (stderr, 
+          "putting type %u, anon %u under key %s\n",
+          i+1, i, GNUNET_h2s (&key));
+#endif
   if (GNUNET_OK != api->put (api->cls,
                             &key, 
                             size,
@@ -149,9 +155,11 @@
       const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
+static uint64_t guid;
+
+
 static int
 iterate_one_shot (void *cls,
-                 void *next_cls,
                  const GNUNET_HashCode * key,
                  uint32_t size,
                  const void *data,
@@ -164,57 +172,21 @@
 {
   struct CpsRunContext *crc = cls;
   
-  GNUNET_assert (NULL == next_cls);
   GNUNET_assert (key != NULL);
+  guid = uid;
   crc->phase++;
 #if VERBOSE
   fprintf (stderr,
-          "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
+          "Found result type=%u, priority=%u, size=%u, expire=%llu, key %s\n",
           type, priority, size,
-          (unsigned long long) expiration.abs_value);
+          (unsigned long long) expiration.abs_value,
+          GNUNET_h2s (key));
 #endif    
   GNUNET_SCHEDULER_add_now (&test, crc);
   return GNUNET_OK;
 }
 
 
-static uint64_t guid;
-
-static int
-iterate_with_next (void *cls,
-                  void *next_cls,
-                  const GNUNET_HashCode * key,
-                  uint32_t size,
-                  const void *data,
-                  enum GNUNET_BLOCK_Type type,
-                  uint32_t priority,
-                  uint32_t anonymity,
-                  struct GNUNET_TIME_Absolute
-                  expiration, 
-                  uint64_t uid)
-{
-  struct CpsRunContext *crc = cls;
-  
-  if (key == NULL)
-    {
-      crc->phase++;
-      GNUNET_SCHEDULER_add_now (&test, crc);
-      return GNUNET_OK;
-    }
-  guid = uid;
-#if VERBOSE
-  fprintf (stderr,
-          "Found result type=%u, priority=%u, size=%u, expire=%llu\n",
-          type, priority, size,
-          (unsigned long long) expiration.abs_value);
-#endif
-  crc->cnt++;
-  crc->api->next_request (next_cls,
-                         GNUNET_NO);
-  return GNUNET_OK;
-}
-
-
 /**
  * Function called when the service shuts
  * down.  Unloads our datastore plugin.
@@ -274,12 +246,19 @@
 
   if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     {
+      GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                 "Test aborted.\n");
       crc->phase = RP_ERROR;
-      ok = 1;
     }
+#if VERBOSE
+  fprintf (stderr, "In phase %d, iteration %u\n",
+          crc->phase,
+          crc->cnt);
+#endif
   switch (crc->phase)
     {
     case RP_ERROR:
+      ok = 1;
       GNUNET_break (0); 
       crc->api->drop (crc->api->cls);
       GNUNET_SCHEDULER_add_now (&cleaning_task, crc);
@@ -289,7 +268,7 @@
       for (j=0;j<PUT_10;j++)
        {
          put_value (crc->api, j, crc->i);
-         cs = crc->api->get_size (crc->api->cls);
+         cs = crc->api->estimate_size (crc->api->cls);
          GNUNET_assert (os < cs);
          os = cs;
        }
@@ -305,11 +284,12 @@
          break;
        }
       gen_key (5, &key);
-      crc->api->get (crc->api->cls,
-                    &key, NULL, 
-                    GNUNET_BLOCK_TYPE_ANY,
-                    &iterate_with_next,
-                    crc);
+      crc->api->get_key (crc->api->cls,
+                        crc->offset++,
+                        &key, NULL, 
+                        GNUNET_BLOCK_TYPE_ANY,
+                        &iterate_one_shot,
+                        crc);
       break;
     case RP_UPDATE:
       GNUNET_assert (GNUNET_OK ==
@@ -329,18 +309,19 @@
          GNUNET_SCHEDULER_add_now (&test, crc);
          break;
        }
-      crc->api->iter_zero_anonymity (crc->api->cls, 
-                                    1, 
-                                    &iterate_with_next,
-                                    crc);
+      crc->api->get_zero_anonymity (crc->api->cls, 
+                                   0,
+                                   1, 
+                                   &iterate_one_shot,
+                                   crc);
       break;
     case RP_REPL_GET:
-      crc->api->replication_get (crc->api->cls, 
+      crc->api->get_replication (crc->api->cls, 
                                 &iterate_one_shot,
                                 crc);
       break;
     case RP_EXPI_GET:
-      crc->api->expiration_get (crc->api->cls, 
+      crc->api->get_expiration (crc->api->cls, 
                                &iterate_one_shot,
                                crc);
       break;

Modified: gnunet/src/fs/Makefile.am
===================================================================
--- gnunet/src/fs/Makefile.am   2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/fs/Makefile.am   2011-04-26 18:19:15 UTC (rev 15078)
@@ -1,4 +1,3 @@
-
 INCLUDES = -I$(top_srcdir)/src/include
 
 if MINGW
@@ -173,8 +172,7 @@
  test_gnunet_fs_idx.py
 endif
 
-#if !DISABLE_TEST_RUN
-if 0 
+if !DISABLE_TEST_RUN
 TESTS = \
  test_fs_directory \
  test_fs_download \

Modified: gnunet/src/fs/fs_download.c
===================================================================
--- gnunet/src/fs/fs_download.c 2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/fs/fs_download.c 2011-04-26 18:19:15 UTC (rev 15078)
@@ -756,10 +756,12 @@
       child_block_size = GNUNET_FS_tree_compute_tree_size (drc->depth);
       GNUNET_assert (0 == (drc->offset - dr->offset) % child_block_size);
       chk_off = (drc->offset - dr->offset) / child_block_size;
-      GNUNET_assert (drc->state == BRS_INIT);
-      drc->state = BRS_CHK_SET;
-      drc->chk = chks[chk_off];
-      try_top_down_reconstruction (dc, drc);
+      if (drc->state == BRS_INIT)      
+       {
+         drc->state = BRS_CHK_SET;
+         drc->chk = chks[chk_off];
+         try_top_down_reconstruction (dc, drc);
+       }
       if (drc->state != BRS_DOWNLOAD_UP)
        up_done = GNUNET_NO; /* children not all done */
     } 
@@ -815,10 +817,11 @@
              dr->depth,
              GNUNET_h2s (&dr->chk.query));
 #endif
-  GNUNET_assert (GNUNET_NO ==
-                GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
-                                                              &dr->chk.query,
-                                                              dr));
+  if (GNUNET_NO !=
+      GNUNET_CONTAINER_multihashmap_contains_value (dc->active,
+                                                   &dr->chk.query,
+                                                   dr))
+    return; /* already active */
   GNUNET_CONTAINER_multihashmap_put (dc->active,
                                     &dr->chk.query,
                                     dr,

Modified: gnunet/src/fs/fs_test_lib_data.conf
===================================================================
--- gnunet/src/fs/fs_test_lib_data.conf 2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/fs/fs_test_lib_data.conf 2011-04-26 18:19:15 UTC (rev 15078)
@@ -43,7 +43,7 @@
 #TOTAL_QUOTA_OUT = 9321
 TOTAL_QUOTA_IN = 3932160
 TOTAL_QUOTA_OUT = 3932160
-DEBUG = YES
+#DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-core
 
@@ -53,8 +53,8 @@
 #OPTIONS = -L DEBUG
 CONTENT_CACHING = NO
 CONTENT_PUSHING = NO
-DEBUG = YES
-#PREFIX = valgrind --tool=memcheck --leak-check=yes 
+# DEBUG = YES
+# PREFIX = valgrind --tool=memcheck --leak-check=yes --trace-children=yes
 #BINARY = /home/grothoff/gn9/bin/gnunet-service-fs
 #PREFIX = xterm -e gdb -x cmd --args 
 

Modified: gnunet/src/fs/gnunet-pseudonym.c
===================================================================
--- gnunet/src/fs/gnunet-pseudonym.c    2011-04-26 14:30:13 UTC (rev 15077)
+++ gnunet/src/fs/gnunet-pseudonym.c    2011-04-26 18:19:15 UTC (rev 15078)
@@ -341,7 +341,7 @@
      0, &GNUNET_GETOPT_set_one, &no_remote_printing},
     {'r', "replication", "LEVEL",
      gettext_noop ("set the desired replication LEVEL"),
-     0, &GNUNET_GETOPT_set_uint, &bo.replication_level},
+     1, &GNUNET_GETOPT_set_uint, &bo.replication_level},
     {'R', "root", "ID",
      gettext_noop
      ("specify ID of the root of the namespace"),

Modified: gnunet/src/fs/gnunet-service-fs_cp.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_cp.c        2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/gnunet-service-fs_cp.c        2011-04-26 18:19:15 UTC (rev 
15078)
@@ -704,9 +704,9 @@
 
 
 /**
- * Free the given client request.
+ * Free the given request.
  *
- * @param cls the client request to free
+ * @param cls the request to free
  * @param tc task context
  */ 
 static void
@@ -1182,6 +1182,7 @@
                                    NULL, 0, /* replies_seen */
                                    &handle_p2p_reply,
                                    peerreq);
+  GNUNET_assert (NULL != pr);
   peerreq->pr = pr;
   GNUNET_break (GNUNET_OK ==
                GNUNET_CONTAINER_multihashmap_put (cp->request_map,
@@ -1427,7 +1428,7 @@
                        const GNUNET_HashCode *query,
                        void *value)
 {
-  struct PeerRequest *peerreq = cls;
+  struct PeerRequest *peerreq = value;
   struct GSF_PendingRequest *pr = peerreq->pr;
 
   GSF_pending_request_cancel_ (pr);

Modified: gnunet/src/fs/gnunet-service-fs_indexing.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_indexing.c  2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/gnunet-service-fs_indexing.c  2011-04-26 18:19:15 UTC (rev 
15078)
@@ -566,7 +566,7 @@
                                  uint32_t anonymity,
                                  struct GNUNET_TIME_Absolute
                                  expiration, uint64_t uid,
-                                 GNUNET_DATASTORE_Iterator cont,
+                                 GNUNET_DATASTORE_DatumProcessor cont,
                                  void *cont_cls)
 {
   const struct OnDemandBlock *odb;

Modified: gnunet/src/fs/gnunet-service-fs_indexing.h
===================================================================
--- gnunet/src/fs/gnunet-service-fs_indexing.h  2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/gnunet-service-fs_indexing.h  2011-04-26 18:19:15 UTC (rev 
15078)
@@ -63,7 +63,7 @@
                                  uint32_t anonymity,
                                  struct GNUNET_TIME_Absolute
                                  expiration, uint64_t uid,
-                                 GNUNET_DATASTORE_Iterator cont,
+                                 GNUNET_DATASTORE_DatumProcessor cont,
                                  void *cont_cls);
 
 /**

Modified: gnunet/src/fs/gnunet-service-fs_pe.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pe.c        2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/gnunet-service-fs_pe.c        2011-04-26 18:19:15 UTC (rev 
15078)
@@ -158,7 +158,7 @@
              rp->transmission_counter);
 #endif 
 
-
+  GNUNET_assert (rp->hn == NULL);
   if (GNUNET_TIME_absolute_get_remaining (rp->earliest_transmission).rel_value 
== 0)
     rp->hn = GNUNET_CONTAINER_heap_insert (pp->priority_heap,
                                           rp,

Modified: gnunet/src/fs/gnunet-service-fs_pr.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_pr.c        2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/gnunet-service-fs_pr.c        2011-04-26 18:19:15 UTC (rev 
15078)
@@ -100,6 +100,20 @@
   GNUNET_PEER_Id sender_pid;
 
   /**
+   * Current offset for querying our local datastore for results.
+   * Starts at a random value, incremented until we get the same
+   * UID again (detected using 'first_uid'), which is then used
+   * to termiante the iteration.
+   */
+  uint64_t local_result_offset;
+
+  /**
+   * Unique ID of the first result from the local datastore;
+   * used to detect wrap-around of the offset.
+   */
+  uint64_t first_uid;
+
+  /**
    * Number of valid entries in the 'replies_seen' array.
    */
   unsigned int replies_seen_count;
@@ -113,7 +127,7 @@
    * Mingle value we currently use for the bf.
    */
   uint32_t mingle;
-                           
+
 };
 
 
@@ -273,6 +287,8 @@
              type);
 #endif 
   pr = GNUNET_malloc (sizeof (struct GSF_PendingRequest));
+  pr->local_result_offset = GNUNET_CRYPTO_random_u64 
(GNUNET_CRYPTO_QUALITY_WEAK,
+                                                     UINT64_MAX);              
                                         
   pr->public_data.query = *query;
   if (GNUNET_BLOCK_TYPE_FS_SBLOCK == type)
     {
@@ -535,7 +551,20 @@
               void *value)
 {
   struct GSF_PendingRequest *pr = value;
-  
+  GSF_LocalLookupContinuation cont;
+
+#if DEBUG_FS
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Cleaning up pending request for `%s'.\n",
+             GNUNET_h2s (key));
+#endif  
+  if (NULL != (cont = pr->llc_cont))
+    {
+      pr->llc_cont = NULL;
+      cont (pr->llc_cont_cls,
+           pr,
+           pr->local_result);
+    } 
   GSF_plan_notify_request_done_ (pr);
   GNUNET_free_non_null (pr->replies_seen);
   if (NULL != pr->bf)
@@ -560,6 +589,7 @@
 void
 GSF_pending_request_cancel_ (struct GSF_PendingRequest *pr)
 {
+  if (NULL == pr_map) return; /* already cleaned up! */
   GNUNET_assert (GNUNET_OK ==
                 GNUNET_CONTAINER_multihashmap_remove (pr_map,
                                                       &pr->public_data.query,
@@ -1023,13 +1053,22 @@
   GNUNET_HashCode query;
   unsigned int old_rf;
   
+  pr->qe = NULL;
+  if (0 == pr->replies_seen_count)
+    {
+      pr->first_uid = uid;
+    }
+  else
+    {
+      if (uid == pr->first_uid)
+       key = NULL; /* all replies seen! */
+    }
   if (NULL == key)
     {
 #if DEBUG_FS
       GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
                  "No further local responses available.\n");
 #endif
-      pr->qe = NULL;
       if (NULL != (cont = pr->llc_cont))
        {
          pr->llc_cont = NULL;
@@ -1041,9 +1080,10 @@
     }
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "New local response to `%s' of type %u.\n",
+             "Received reply for `%s' of type %d with UID %llu from 
datastore.\n",
              GNUNET_h2s (key),
-             type);
+             type,
+             (unsigned long long) uid);
 #endif
   if (type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
     {
@@ -1061,8 +1101,22 @@
                                            &process_local_reply,
                                            pr))
        {
-         if (pr->qe != NULL)
-           GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+         pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                            pr->local_result_offset - 1,
+                                            &pr->public_data.query,
+                                            pr->public_data.type == 
GNUNET_BLOCK_TYPE_FS_DBLOCK 
+                                            ? GNUNET_BLOCK_TYPE_ANY 
+                                            : pr->public_data.type, 
+                                            (0 != (GSF_PRO_PRIORITY_UNLIMITED 
& pr->public_data.options))
+                                            ? UINT_MAX
+                                            : 1 /* queue priority */,
+                                            (0 != (GSF_PRO_PRIORITY_UNLIMITED 
& pr->public_data.options))
+                                            ? UINT_MAX
+                                            : 1 /* max queue size */,
+                                            GNUNET_TIME_UNIT_FOREVER_REL,
+                                            &process_local_reply,
+                                            pr);
+         GNUNET_assert (NULL != pr->qe);
        }
       return;
     }
@@ -1085,7 +1139,22 @@
                               -1, -1, 
                               GNUNET_TIME_UNIT_FOREVER_REL,
                               NULL, NULL);
-      GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+      pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                        pr->local_result_offset - 1,
+                                        &pr->public_data.query,
+                                        pr->public_data.type == 
GNUNET_BLOCK_TYPE_FS_DBLOCK 
+                                        ? GNUNET_BLOCK_TYPE_ANY 
+                                        : pr->public_data.type, 
+                                        (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
+                                        ? UINT_MAX
+                                        : 1 /* queue priority */,
+                                        (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
+                                        ? UINT_MAX
+                                        : 1 /* max queue size */,
+                                        GNUNET_TIME_UNIT_FOREVER_REL,
+                                        &process_local_reply,
+                                        pr);
+      GNUNET_assert (NULL != pr->qe);
       return;
     }
   prq.type = type;
@@ -1097,12 +1166,16 @@
     GSF_update_datastore_delay_ (pr->public_data.start_time);
   process_reply (&prq, key, pr);
   pr->local_result = prq.eval;
-  if (pr->qe == NULL)
+  if (prq.eval == GNUNET_BLOCK_EVALUATION_OK_LAST)
     {
-#if DEBUG_FS
-      GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-                 "Request cancelled, not asking datastore for more\n");
-#endif
+      if (NULL != (cont = pr->llc_cont))
+       {
+         pr->llc_cont = NULL;
+         cont (pr->llc_cont_cls,
+               pr,
+               pr->local_result);
+       }      
+      return;
     }
   if ( (0 == (GSF_PRO_PRIORITY_UNLIMITED & pr->public_data.options)) &&
        ( (GNUNET_YES == GSF_test_get_load_too_high_ (0)) ||
@@ -1116,8 +1189,6 @@
                                gettext_noop ("# processing result set cut 
short due to load"),
                                1,
                                GNUNET_NO);
-      GNUNET_DATASTORE_cancel (pr->qe);
-      pr->qe = NULL;
       if (NULL != (cont = pr->llc_cont))
        {
          pr->llc_cont = NULL;
@@ -1127,7 +1198,22 @@
        }
       return;
     }
-  GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
+  pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                    pr->local_result_offset++,
+                                    &pr->public_data.query,
+                                    pr->public_data.type == 
GNUNET_BLOCK_TYPE_FS_DBLOCK 
+                                    ? GNUNET_BLOCK_TYPE_ANY 
+                                    : pr->public_data.type, 
+                                    (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
+                                    ? UINT_MAX
+                                    : 1 /* queue priority */,
+                                    (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
+                                    ? UINT_MAX
+                                    : 1 /* max queue size */,
+                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &process_local_reply,
+                                    pr);
+  GNUNET_assert (NULL != pr->qe);
 }
 
 
@@ -1147,20 +1233,21 @@
   GNUNET_assert (NULL == pr->llc_cont);
   pr->llc_cont = cont;
   pr->llc_cont_cls = cont_cls;
-  pr->qe = GNUNET_DATASTORE_iterate_key (GSF_dsh,
-                                        &pr->public_data.query,
-                                        pr->public_data.type == 
GNUNET_BLOCK_TYPE_FS_DBLOCK 
-                                        ? GNUNET_BLOCK_TYPE_ANY 
-                                        : pr->public_data.type, 
-                                        (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
-                                        ? UINT_MAX
-                                        : 1 /* queue priority */,
-                                        (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
-                                        ? UINT_MAX
-                                        : 1 /* max queue size */,
-                                        GNUNET_TIME_UNIT_FOREVER_REL,
-                                        &process_local_reply,
-                                        pr);
+  pr->qe = GNUNET_DATASTORE_get_key (GSF_dsh,
+                                    pr->local_result_offset++,
+                                    &pr->public_data.query,
+                                    pr->public_data.type == 
GNUNET_BLOCK_TYPE_FS_DBLOCK 
+                                    ? GNUNET_BLOCK_TYPE_ANY 
+                                    : pr->public_data.type, 
+                                    (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
+                                    ? UINT_MAX
+                                    : 1 /* queue priority */,
+                                    (0 != (GSF_PRO_PRIORITY_UNLIMITED & 
pr->public_data.options))
+                                    ? UINT_MAX
+                                    : 1 /* max queue size */,
+                                    GNUNET_TIME_UNIT_FOREVER_REL,
+                                    &process_local_reply,
+                                    pr);
 }
 
 

Modified: gnunet/src/fs/gnunet-service-fs_put.c
===================================================================
--- gnunet/src/fs/gnunet-service-fs_put.c       2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/gnunet-service-fs_put.c       2011-04-26 18:19:15 UTC (rev 
15078)
@@ -35,25 +35,50 @@
 
 
 /**
- * Request to datastore for DHT PUTs (or NULL).
+ * Context for each zero-anonymity iterator.
  */
-static struct GNUNET_DATASTORE_QueueEntry *dht_qe;
+struct PutOperator
+{
 
-/**
- * Type we will request for the next DHT PUT round from the datastore.
- */
-static enum GNUNET_BLOCK_Type dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
+  /**
+   * Request to datastore for DHT PUTs (or NULL).
+   */
+  struct GNUNET_DATASTORE_QueueEntry *dht_qe;
 
-/**
- * ID of task that collects blocks for DHT PUTs.
- */
-static GNUNET_SCHEDULER_TaskIdentifier dht_task;
+  /**
+   * Type we request from the datastore.
+   */
+  enum GNUNET_BLOCK_Type dht_put_type;
 
+  /**
+   * ID of task that collects blocks for DHT PUTs.
+   */
+  GNUNET_SCHEDULER_TaskIdentifier dht_task;
+  
+  /**
+   * How many entires with zero anonymity of our type do we currently
+   * estimate to have in the database?
+   */
+  uint64_t zero_anonymity_count_estimate;
+
+  /**
+   * Current offset when iterating the database.
+   */
+  uint64_t current_offset;
+};
+
+
 /**
- * How many entires with zero anonymity do we currently estimate
- * to have in the database?
+ * ANY-terminated list of our operators (one per type
+ * of block that we're putting into the DHT).
  */
-static unsigned int zero_anonymity_count_estimate;
+static struct PutOperator operators[] = 
+  {
+    { NULL, GNUNET_BLOCK_TYPE_FS_KBLOCK, 0, 0, 0 },
+    { NULL, GNUNET_BLOCK_TYPE_FS_SBLOCK, 0, 0, 0 },
+    { NULL, GNUNET_BLOCK_TYPE_FS_NBLOCK, 0, 0, 0 },
+    { NULL, GNUNET_BLOCK_TYPE_ANY, 0, 0, 0 }
+  };
 
 
 /**
@@ -67,26 +92,26 @@
                       const struct GNUNET_SCHEDULER_TaskContext *tc);
 
 
-
 /**
- * If the DHT PUT gathering task is not currently running, consider
- * (re)scheduling it with the appropriate delay.
+ * Task that is run periodically to obtain blocks for DHT PUTs.
+ * 
+ * @param cls type of blocks to gather
+ * @param tc scheduler context (unused)
  */
 static void
-consider_dht_put_gathering (void *cls)
+delay_dht_put_blocks (void *cls,
+                     const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
+  struct PutOperator *po = cls;
   struct GNUNET_TIME_Relative delay;
 
-  if (GSF_dsh == NULL)
+  po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  if (dht_qe != NULL)
-    return;
-  if (dht_task != GNUNET_SCHEDULER_NO_TASK)
-    return;
-  if (zero_anonymity_count_estimate > 0)
+  if (po->zero_anonymity_count_estimate > 0)
     {
       delay = GNUNET_TIME_relative_divide 
(GNUNET_DHT_DEFAULT_REPUBLISH_FREQUENCY,
-                                          zero_anonymity_count_estimate);
+                                          po->zero_anonymity_count_estimate);
       delay = GNUNET_TIME_relative_min (delay,
                                        MAX_DHT_PUT_FREQ);
     }
@@ -96,24 +121,13 @@
         (hopefully) appear */
       delay = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, 5);
     }
-  dht_task = GNUNET_SCHEDULER_add_delayed (delay,
-                                          &gather_dht_put_blocks,
-                                          cls);
+  po->dht_task = GNUNET_SCHEDULER_add_delayed (delay,
+                                              &gather_dht_put_blocks,
+                                              po);
 }
 
 
 /**
- * Function called upon completion of the DHT PUT operation.
- */
-static void
-dht_put_continuation (void *cls,
-                     const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  GNUNET_DATASTORE_iterate_get_next (GSF_dsh);
-}
-
-
-/**
  * Store content in DHT.
  *
  * @param cls closure
@@ -138,31 +152,19 @@
                         struct GNUNET_TIME_Absolute
                         expiration, uint64_t uid)
 { 
-  static unsigned int counter;
-  static GNUNET_HashCode last_vhash;
-  static GNUNET_HashCode vhash;
+  struct PutOperator *po = cls;
 
+  po->dht_qe = NULL;
   if (key == NULL)
     {
-      dht_qe = NULL;
-      consider_dht_put_gathering (cls);
+      po->zero_anonymity_count_estimate = po->current_offset - 1;
+      po->current_offset = 0;
+      po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
+                                              po);
       return;
     }
-  /* slightly funky code to estimate the total number of values with zero
-     anonymity from the maximum observed length of a monotonically increasing 
-     sequence of hashes over the contents */
-  GNUNET_CRYPTO_hash (data, size, &vhash);
-  if (GNUNET_CRYPTO_hash_cmp (&vhash, &last_vhash) <= 0)
-    {
-      if (zero_anonymity_count_estimate > 0)
-       zero_anonymity_count_estimate /= 2;
-      counter = 0;
-    }
-  last_vhash = vhash;
-  if (counter < 31)
-    counter++;
-  if (zero_anonymity_count_estimate < (1 << counter))
-    zero_anonymity_count_estimate = (1 << counter);
+  po->zero_anonymity_count_estimate = GNUNET_MAX (po->current_offset,
+                                                 
po->zero_anonymity_count_estimate);
 #if DEBUG_FS
   GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
              "Retrieved block `%s' of type %u for DHT PUT\n",
@@ -178,8 +180,8 @@
                  data,
                  expiration,
                  GNUNET_TIME_UNIT_FOREVER_REL,
-                 &dht_put_continuation,
-                 cls);
+                 &delay_dht_put_blocks,
+                 po);
 }
 
 
@@ -193,17 +195,20 @@
 gather_dht_put_blocks (void *cls,
                       const struct GNUNET_SCHEDULER_TaskContext *tc)
 {
-  dht_task = GNUNET_SCHEDULER_NO_TASK;
-  if (GSF_dsh == NULL)
+  struct PutOperator *po = cls;
+
+  po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+  if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN))
     return;
-  if (dht_put_type == GNUNET_BLOCK_TYPE_FS_ONDEMAND)
-    dht_put_type = GNUNET_BLOCK_TYPE_FS_KBLOCK;
-  dht_qe = GNUNET_DATASTORE_iterate_zero_anonymity (GSF_dsh, 
+  po->dht_qe = GNUNET_DATASTORE_get_zero_anonymity (GSF_dsh, 
+                                                   po->current_offset++,
                                                    0, UINT_MAX,
                                                    
GNUNET_TIME_UNIT_FOREVER_REL,
-                                                   dht_put_type++,
-                                                   &process_dht_put_content, 
NULL);
-  GNUNET_assert (dht_qe != NULL);
+                                                   po->dht_put_type,
+                                                   &process_dht_put_content, 
po);
+  if (NULL == po->dht_qe)
+    po->dht_task = GNUNET_SCHEDULER_add_now (&delay_dht_put_blocks,
+                                            po);
 }
 
 
@@ -213,7 +218,14 @@
 void
 GSF_put_init_ ()
 {
-  dht_task = GNUNET_SCHEDULER_add_now (&gather_dht_put_blocks, NULL);
+  unsigned int i;
+
+  i = 0;
+  while (operators[i].dht_put_type != GNUNET_BLOCK_TYPE_ANY)
+    {
+      operators[i].dht_task = GNUNET_SCHEDULER_add_now 
(&gather_dht_put_blocks, &operators[i]);
+      i++;
+    }
 }
 
 
@@ -223,16 +235,24 @@
 void
 GSF_put_done_ ()
 {
-  if (GNUNET_SCHEDULER_NO_TASK != dht_task)
+  struct PutOperator *po;
+  unsigned int i;
+
+  i = 0;
+  while ((po = &operators[i])->dht_put_type != GNUNET_BLOCK_TYPE_ANY)
     {
-      GNUNET_SCHEDULER_cancel (dht_task);
-      dht_task = GNUNET_SCHEDULER_NO_TASK;
+      if (GNUNET_SCHEDULER_NO_TASK != po->dht_task)
+       {
+         GNUNET_SCHEDULER_cancel (po->dht_task);
+         po->dht_task = GNUNET_SCHEDULER_NO_TASK;
+       }
+      if (NULL != po->dht_qe)
+       {
+         GNUNET_DATASTORE_cancel (po->dht_qe);
+         po->dht_qe = NULL;
+       }
+      i++;
     }
-  if (NULL != dht_qe)
-    {
-      GNUNET_DATASTORE_cancel (dht_qe);
-      dht_qe = NULL;
-    }
 }
 
 /* end of gnunet-service-fs_put.c */

Modified: gnunet/src/fs/test_fs_download_data.conf
===================================================================
--- gnunet/src/fs/test_fs_download_data.conf    2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/test_fs_download_data.conf    2011-04-26 18:19:15 UTC (rev 
15078)
@@ -36,7 +36,8 @@
 [fs]
 PORT = 42471
 HOSTNAME = localhost
-ACTIVEMIGRATION = NO
+CONTENT_CACHING = NO
+CONTENT_PUSHING = NO
 # DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-fs

Modified: gnunet/src/fs/test_gnunet_fs_idx.py.in
===================================================================
--- gnunet/src/fs/test_gnunet_fs_idx.py.in      2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/test_gnunet_fs_idx.py.in      2011-04-26 18:19:15 UTC (rev 
15078)
@@ -31,7 +31,7 @@
   pub.expect ("URI is 
`gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147'.\r")
   pub.expect (pexpect.EOF)
 
-  down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o 
\"COPYING\" 
gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147')
+  down = pexpect.spawn ('gnunet-download -c test_gnunet_fs_idx_data.conf -o 
COPYING 
gnunet://fs/chk/PC0M19QMQC0BPSHR6BGA228PP6INER1D610MGEMOMEM87222FN8HVUO7PQGO0O9HD2GVLHF2N5IDHEQUNK6LKE428FPO96SKQEA486O.PG7K85JGQ6N599MD5HEP3CHEVFPKQD9JB6NPSLVA3T1SKDS66CFI499VS6MGQ88B0QUAVT1282TCRD4GGFVUKDLGI8F0SPIANA3J2LG.35147')
   down.expect (re.compile ("Downloading `COPYING\' done \(.*\).\r"));
   down.expect (pexpect.EOF);
   os.system ('rm COPYING');

Modified: gnunet/src/fs/test_gnunet_fs_ns_data.conf
===================================================================
--- gnunet/src/fs/test_gnunet_fs_ns_data.conf   2011-04-26 14:30:13 UTC (rev 
15077)
+++ gnunet/src/fs/test_gnunet_fs_ns_data.conf   2011-04-26 18:19:15 UTC (rev 
15078)
@@ -36,7 +36,7 @@
 [fs]
 PORT = 47471
 HOSTNAME = localhost
-#DEBUG = YES
+DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes
 #BINARY = /home/grothoff/bin/gnunet-service-fs
 

Modified: gnunet/src/fs/test_gnunet_service_fs_migration_data.conf
===================================================================
--- gnunet/src/fs/test_gnunet_service_fs_migration_data.conf    2011-04-26 
14:30:13 UTC (rev 15077)
+++ gnunet/src/fs/test_gnunet_service_fs_migration_data.conf    2011-04-26 
18:19:15 UTC (rev 15078)
@@ -53,7 +53,7 @@
 ACTIVEMIGRATION = YES
 CONTENT_CACHING = YES
 CONTENT_PUSHING = YES
-DEBUG = YES
+#DEBUG = YES
 #PREFIX = valgrind --tool=memcheck --leak-check=yes 
 #PREFIX = xterm -e gdb -x cmd --args 
 

Modified: gnunet/src/include/gnunet_datastore_plugin.h
===================================================================
--- gnunet/src/include/gnunet_datastore_plugin.h        2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/include/gnunet_datastore_plugin.h        2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -78,26 +78,9 @@
 
 
 /**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
+ * An processor over a set of items stored in the datastore.
  *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-typedef void (*PluginNextRequest)(void *next_cls,
-                                 int end_it);
-
-
-/**
- * An iterator over a set of items stored in the datastore.
- *
  * @param cls closure
- * @param next_cls closure to pass to the "next" function.
  * @param key key for the content
  * @param size number of bytes in data
  * @param data content stored
@@ -105,24 +88,21 @@
  * @param priority priority of the content
  * @param anonymity anonymity-level for the content
  * @param expiration expiration time for the content
- * @param uid unique identifier for the datum;
- *        maybe 0 if no unique identifier is available
+ * @param uid unique identifier for the datum
  *
- * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
- *         (continue on call to "next", of course),
- *         GNUNET_NO to delete the item and continue (if supported)
+ * @return GNUNET_OK to keep the item
+ *         GNUNET_NO to delete the item
  */
-typedef int (*PluginIterator) (void *cls,
-                              void *next_cls,
-                              const GNUNET_HashCode * key,
-                              uint32_t size,
-                              const void *data,
-                              enum GNUNET_BLOCK_Type type,
-                              uint32_t priority,
-                              uint32_t anonymity,
-                              struct GNUNET_TIME_Absolute
-                              expiration, 
-                              uint64_t uid);
+typedef int (*PluginDatumProcessor) (void *cls,
+                                    const GNUNET_HashCode * key,
+                                    uint32_t size,
+                                    const void *data,
+                                    enum GNUNET_BLOCK_Type type,
+                                    uint32_t priority,
+                                    uint32_t anonymity,
+                                    struct GNUNET_TIME_Absolute
+                                    expiration, 
+                                    uint64_t uid);
 
 /**
  * Get an estimate of how much space the database is
@@ -131,7 +111,7 @@
  * @param cls closure
  * @return number of bytes used on disk
  */
-typedef unsigned long long (*PluginGetSize) (void *cls);
+typedef unsigned long long (*PluginEstimateSize) (void *cls);
 
 
 /**
@@ -165,10 +145,11 @@
 
 
 /**
- * Iterate over the results for a particular key
- * in the datastore.
+ * Get one of the results for a particular key in the datastore.
  *
  * @param cls closure
+ * @param offset offset of the result (mod #num-results); 
+ *               specific ordering does not matter for the offset
  * @param key key to match, never NULL
  * @param vhash hash of the value, maybe NULL (to
  *        match all values that have the right key).
@@ -177,34 +158,31 @@
  *        there may be!
  * @param type entries of which type are relevant?
  *     Use 0 for any type.
- * @param iter function to call on each matching value; however,
- *        after the first call to "iter", the plugin must wait
- *        until "NextRequest" was called before giving the iterator
- *        the next item; finally, the "iter" should be called once
- *        once with a NULL value at the end ("next_cls" should be NULL
- *        for that last call)
- * @param iter_cls closure for iter
+ * @param proc function to call on the matching value; 
+ *        proc should be called with NULL if there is no result
+ * @param proc_cls closure for proc
  */
-typedef void (*PluginGet) (void *cls,
-                          const GNUNET_HashCode *key,
-                          const GNUNET_HashCode *vhash,
-                          enum GNUNET_BLOCK_Type type,
-                          PluginIterator iter, void *iter_cls);
+typedef void (*PluginGetKey) (void *cls,
+                             uint64_t offset,
+                             const GNUNET_HashCode *key,
+                             const GNUNET_HashCode *vhash,
+                             enum GNUNET_BLOCK_Type type,
+                             PluginDatumProcessor proc, void *proc_cls);
 
 
 
 /**
  * Get a random item (additional constraints may apply depending on
- * the specific implementation).  Calls 'iter' with all values ZERO or
- * NULL if no item applies, otherwise 'iter' is called once and only
+ * the specific implementation).  Calls 'proc' with all values ZERO or
+ * NULL if no item applies, otherwise 'proc' is called once and only
  * once with an item, with the 'next_cls' argument being NULL.
  *
  * @param cls closure
- * @param iter function to call the value (once only).
- * @param iter_cls closure for iter
+ * @param proc function to call the value (once only).
+ * @param proc_cls closure for proc
  */
-typedef void (*PluginRandomGet) (void *cls,
-                                PluginIterator iter, void *iter_cls);
+typedef void (*PluginGetRandom) (void *cls,
+                                PluginDatumProcessor proc, void *proc_cls);
 
 
 /**
@@ -238,26 +216,22 @@
 
 
 /**
- * Select a subset of the items in the datastore and call the given
- * iterator for the first item; then allow getting more items by
- * calling the 'next_request' callback with the given 'next_cls'
- * argument passed to 'iter'.
+ * Select a single item from the datastore at the specified offset
+ * (among those applicable). 
  *
  * @param cls closure
+ * @param offset offset of the result (mod #num-results); 
+ *               specific ordering does not matter for the offset
  * @param type entries of which type should be considered?
- *        Myst not be zero (ANY).
- * @param iter function to call on each matching value; however,
- *        after the first call to "iter", the plugin must wait
- *        until "NextRequest" was called before giving the iterator
- *        the next item; finally, the "iter" should be called once
- *        once with a NULL value at the end ("next_cls" should be NULL
- *        for that last call)
- * @param iter_cls closure for iter
+ *        Must not be zero (ANY).
+ * @param proc function to call on the matching value
+ * @param proc_cls closure for proc
  */
-typedef void (*PluginSelector) (void *cls,
-                                enum GNUNET_BLOCK_Type type,
-                                PluginIterator iter,
-                                void *iter_cls);
+typedef void (*PluginGetType) (void *cls,
+                              uint64_t offset,
+                              enum GNUNET_BLOCK_Type type,
+                              PluginDatumProcessor proc,
+                              void *proc_cls);
 
 
 /**
@@ -283,10 +257,10 @@
   void *cls;
 
   /**
-   * Get the current on-disk size of the SQ store.  Estimates are
-   * fine, if that's the only thing available.
+   * Calculate the current on-disk size of the SQ store.  Estimates
+   * are fine, if that's the only thing available.
    */
-  PluginGetSize get_size;
+  PluginEstimateSize estimate_size;
 
   /**
    * Function to store an item in the datastore.
@@ -304,38 +278,29 @@
   PluginUpdate update;
 
   /**
-   * Function called by iterators whenever they want the next value;
-   * note that unlike all of the other callbacks, this one does get a
-   * the "next_cls" closure which is usually different from the "cls"
-   * member of this struct!
+   * Get a particular datum matching a given hash from the datastore.
    */
-  PluginNextRequest next_request;
+  PluginGetKey get_key;
 
   /**
-   * Function to iterate over the results for a particular key
-   * in the datastore.
+   * Get datum (of the specified type) with anonymity level zero.
    */
-  PluginGet get;
+  PluginGetType get_zero_anonymity;
 
   /**
-   * Iterate over content with anonymity level zero.
-   */
-  PluginSelector iter_zero_anonymity;
-
-  /**
    * Function to get a random item with high replication score from
    * the database, lowering the item's replication score.  Returns a
    * single random item from those with the highest replication
    * counters.  The item's replication counter is decremented by one
    * IF it was positive before.
    */
-  PluginRandomGet replication_get;
+  PluginGetRandom get_replication;
 
   /**
    * Function to get a random expired item or, if none are expired, one
    * with a low priority.
    */
-  PluginRandomGet expiration_get;
+  PluginGetRandom get_expiration;
 
   /**
    * Delete the database.  The next operation is

Modified: gnunet/src/include/gnunet_datastore_service.h
===================================================================
--- gnunet/src/include/gnunet_datastore_service.h       2011-04-26 14:30:13 UTC 
(rev 15077)
+++ gnunet/src/include/gnunet_datastore_service.h       2011-04-26 18:19:15 UTC 
(rev 15078)
@@ -262,7 +262,7 @@
 
 
 /**
- * An iterator over a set of items stored in the datastore.
+ * Process a datum that was stored in the datastore.
  *
  * @param cls closure
  * @param key key for the content
@@ -275,90 +275,82 @@
  * @param uid unique identifier for the datum;
  *        maybe 0 if no unique identifier is available
  */
-typedef void (*GNUNET_DATASTORE_Iterator) (void *cls,
-                                          const GNUNET_HashCode * key,
-                                          size_t size,
-                                          const void *data,
-                                          enum GNUNET_BLOCK_Type type,
-                                          uint32_t priority,
-                                          uint32_t anonymity,
-                                          struct GNUNET_TIME_Absolute
-                                          expiration, uint64_t uid);
+typedef void (*GNUNET_DATASTORE_DatumProcessor) (void *cls,
+                                                const GNUNET_HashCode * key,
+                                                size_t size,
+                                                const void *data,
+                                                enum GNUNET_BLOCK_Type type,
+                                                uint32_t priority,
+                                                uint32_t anonymity,
+                                                struct GNUNET_TIME_Absolute
+                                                expiration, uint64_t uid);
 
 
 /**
- * Iterate over the results for a particular key
- * in the datastore.  The iterator will only be called
- * once initially; if the first call did contain a
- * result, further results can be obtained by calling
- * "GNUNET_DATASTORE_iterate_get_next" with the given argument.
+ * Get a result for a particular key from the datastore.  The processor
+ * will only be called once.
  *
  * @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param key maybe NULL (to match all entries)
  * @param type desired type, 0 for any
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param iter function to call on each matching value;
+ * @param proc function to call on each matching value;
  *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
- *         cancel; note that even if NULL is returned, the callback will be 
invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_key (struct GNUNET_DATASTORE_Handle *h,
-                             const GNUNET_HashCode * key,
-                             enum GNUNET_BLOCK_Type type,
-                             unsigned int queue_priority,
-                             unsigned int max_queue_size,
-                             struct GNUNET_TIME_Relative timeout,
-                             GNUNET_DATASTORE_Iterator iter, 
-                             void *iter_cls);
+GNUNET_DATASTORE_get_key (struct GNUNET_DATASTORE_Handle *h,
+                         uint64_t offset,
+                         const GNUNET_HashCode * key,
+                         enum GNUNET_BLOCK_Type type,
+                         unsigned int queue_priority,
+                         unsigned int max_queue_size,
+                         struct GNUNET_TIME_Relative timeout,
+                         GNUNET_DATASTORE_DatumProcessor proc, 
+                         void *proc_cls);
 
 
 /**
- * Get all zero-anonymity values from the datastore.
+ * Get a single zero-anonymity value from the datastore.
  *
  * @param h handle to the datastore
+ * @param offset offset of the result (mod #num-results); set to
+ *               a random 64-bit value initially; then increment by
+ *               one each time; detect that all results have been found by uid
+ *               being again the first uid ever returned.
  * @param queue_priority ranking of this request in the priority queue
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
  * @param type allowed type for the operation (never zero)
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
- *        and always once with a value of NULL at the end.
- * @param iter_cls closure for iter
+ *        or with NULL if none value exists.
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
- *         cancel; note that even if NULL is returned, the callback will be 
invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
-GNUNET_DATASTORE_iterate_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
-                                        unsigned int queue_priority,
-                                        unsigned int max_queue_size,
-                                        struct GNUNET_TIME_Relative timeout,
-                                        enum GNUNET_BLOCK_Type type,
-                                        GNUNET_DATASTORE_Iterator iter, 
-                                        void *iter_cls);
+GNUNET_DATASTORE_get_zero_anonymity (struct GNUNET_DATASTORE_Handle *h,
+                                    uint64_t offset,
+                                    unsigned int queue_priority,
+                                    unsigned int max_queue_size,
+                                    struct GNUNET_TIME_Relative timeout,
+                                    enum GNUNET_BLOCK_Type type,
+                                    GNUNET_DATASTORE_DatumProcessor proc, 
+                                    void *proc_cls);
 
 
 /**
- * Function called to trigger obtaining the next result
- * from the datastore.  ONLY applies for 'GNUNET_DATASTORE_iterate_*'
- * calls, not for 'get' calls.  FIXME: how much mixing of iterate
- * calls with other operations can we permit!?  Should we pass
- * the 'QueueEntry' instead of the datastore handle here instead?
- * 
- * @param h handle to the datastore
- */
-void
-GNUNET_DATASTORE_iterate_get_next (struct GNUNET_DATASTORE_Handle *h);
-
-
-/**
  * Get a random value from the datastore for content replication.
  * Returns a single, random value among those with the highest
  * replication score, lowering positive replication scores by one for
@@ -370,21 +362,20 @@
  * @param max_queue_size at what queue size should this request be dropped
  *        (if other requests of higher priority are in the queue)
  * @param timeout how long to wait at most for a response
- * @param iter function to call on a random value; it
+ * @param proc function to call on a random value; it
  *        will be called once with a value (if available)
  *        and always once with a value of NULL.
- * @param iter_cls closure for iter
+ * @param proc_cls closure for proc
  * @return NULL if the entry was not queued, otherwise a handle that can be 
used to
- *         cancel; note that even if NULL is returned, the callback will be 
invoked
- *         (or rather, will already have been invoked)
+ *         cancel
  */
 struct GNUNET_DATASTORE_QueueEntry *
 GNUNET_DATASTORE_get_for_replication (struct GNUNET_DATASTORE_Handle *h,
                                      unsigned int queue_priority,
                                      unsigned int max_queue_size,
                                      struct GNUNET_TIME_Relative timeout,
-                                     GNUNET_DATASTORE_Iterator iter, 
-                                     void *iter_cls);
+                                     GNUNET_DATASTORE_DatumProcessor proc, 
+                                     void *proc_cls);
 
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]