gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14834 - gnunet/src/datastore


From: gnunet
Subject: [GNUnet-SVN] r14834 - gnunet/src/datastore
Date: Sun, 3 Apr 2011 18:30:06 +0200

Author: grothoff
Date: 2011-04-03 18:30:06 +0200 (Sun, 03 Apr 2011)
New Revision: 14834

Modified:
   gnunet/src/datastore/plugin_datastore_sqlite.c
Log:
first hack at implementing new replication select code

Modified: gnunet/src/datastore/plugin_datastore_sqlite.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_sqlite.c      2011-04-03 15:10:41 UTC 
(rev 14833)
+++ gnunet/src/datastore/plugin_datastore_sqlite.c      2011-04-03 16:30:06 UTC 
(rev 14834)
@@ -1,6 +1,6 @@
  /*
      This file is part of GNUnet
-     (C) 2009 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -39,37 +39,43 @@
 #define LOG_SQLITE(db, msg, level, cmd) do { GNUNET_log_from (level, "sqlite", 
_("`%s' failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__, 
sqlite3_errmsg(db->dbh)); if (msg != NULL) GNUNET_asprintf(msg, _("`%s' failed 
at %s:%u with error: %s"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh)); } 
while(0)
 
 #define SELECT_IT_LOW_PRIORITY_1 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(prio = ? AND hash > ?) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio 
= ? AND hash > ?) "\
   "ORDER BY hash ASC LIMIT 1"
 
 #define SELECT_IT_LOW_PRIORITY_2 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(prio > ?) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio 
> ?) "\
   "ORDER BY prio ASC, hash ASC LIMIT 1"
 
 #define SELECT_IT_NON_ANONYMOUS_1 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(prio = ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio 
= ? AND hash < ? AND anonLevel = 0 AND expire > %llu) "\
   " ORDER BY hash DESC LIMIT 1"
 
 #define SELECT_IT_NON_ANONYMOUS_2 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(prio < ? AND anonLevel = 0 AND expire > %llu)"\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE (prio 
< ? AND anonLevel = 0 AND expire > %llu)"\
   " ORDER BY prio DESC, hash DESC LIMIT 1"
 
 #define SELECT_IT_EXPIRATION_TIME_1 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(expire = ? AND hash > ?) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE 
(expire = ? AND hash > ?) "\
   " ORDER BY hash ASC LIMIT 1"
 
 #define SELECT_IT_EXPIRATION_TIME_2 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(expire > ?) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE 
(expire > ?) "\
   " ORDER BY expire ASC, hash ASC LIMIT 1"
 
 #define SELECT_IT_MIGRATION_ORDER_1 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(expire = ? AND hash < ?) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE 
(expire = ? AND hash < ?) "\
   " ORDER BY hash DESC LIMIT 1"
 
 #define SELECT_IT_MIGRATION_ORDER_2 \
-  "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE 
(expire < ? AND expire > %llu) "\
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE 
(expire < ? AND expire > %llu) "\
   " ORDER BY expire DESC, hash DESC LIMIT 1"
 
+
+#define SELECT_IT_REPLICATION_ORDER \
+  "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn090 WHERE 
(expire > ?) "\
+  " ORDER BY repl DESC, Random() LIMIT 1"
+
+
 /**
  * After how many ms "busy" should a DB operation fail for good?
  * A low value makes sure that we are more responsive to requests
@@ -115,6 +121,16 @@
   sqlite3_stmt *updPrio;
 
   /**
+   * Precompiled SQL for replication decrement.
+   */
+  sqlite3_stmt *updRepl;
+
+  /**
+   * Precompiled SQL for replication decrement.
+   */
+  sqlite3_stmt *selRepl;
+
+  /**
    * Precompiled SQL for insertion.
    */
   sqlite3_stmt *insertContent;
@@ -173,20 +189,22 @@
 {
   /* create indices */
   sqlite3_exec (dbh,
-                "CREATE INDEX idx_hash ON gn080 (hash)", NULL, NULL, NULL);
+                "CREATE INDEX idx_hash ON gn090 (hash)", NULL, NULL, NULL);
   sqlite3_exec (dbh,
-                "CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)", NULL,
+                "CREATE INDEX idx_hash_vhash ON gn090 (hash,vhash)", NULL,
                 NULL, NULL);
-  sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn080 (prio)", NULL, NULL,
+  sqlite3_exec (dbh, "CREATE INDEX idx_prio ON gn090 (prio)", NULL, NULL,
                 NULL);
-  sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn080 (expire)", NULL, NULL,
+  sqlite3_exec (dbh, "CREATE INDEX idx_expire ON gn090 (expire)", NULL, NULL,
                 NULL);
-  sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", NULL,
+  sqlite3_exec (dbh, "CREATE INDEX idx_comb3 ON gn090 (prio,anonLevel)", NULL,
                 NULL, NULL);
-  sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
+  sqlite3_exec (dbh, "CREATE INDEX idx_comb4 ON gn090 (prio,hash,anonLevel)",
                 NULL, NULL, NULL);
-  sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn080 (expire,hash)", NULL,
+  sqlite3_exec (dbh, "CREATE INDEX idx_comb7 ON gn090 (expire,hash)", NULL,
                 NULL, NULL);
+  sqlite3_exec (dbh, "CREATE INDEX idx_comb8 ON gn090 (expire)", NULL,
+                NULL, NULL);
 }
 
 
@@ -286,12 +304,12 @@
   /* We have to do it here, because otherwise precompiling SQL might fail */
   CHECK (SQLITE_OK ==
          sq_prepare (plugin->dbh,
-                     "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn080'",
+                     "SELECT 1 FROM sqlite_master WHERE tbl_name = 'gn090'",
                      &stmt));
   if ( (sqlite3_step (stmt) == SQLITE_DONE) &&
        (sqlite3_exec (plugin->dbh,
-                     "CREATE TABLE gn080 ("
-                     "  size INT4 NOT NULL DEFAULT 0,"
+                     "CREATE TABLE gn090 ("
+                     "  repl INT4 NOT NULL DEFAULT 0,"
                      "  type INT4 NOT NULL DEFAULT 0,"
                      "  prio INT4 NOT NULL DEFAULT 0,"
                      "  anonLevel INT4 NOT NULL DEFAULT 0,"
@@ -329,16 +347,23 @@
   sqlite3_finalize (stmt);
 
   if ((sq_prepare (plugin->dbh,
-                   "UPDATE gn080 SET prio = prio + ?, expire = MAX(expire,?) 
WHERE "
+                   "UPDATE gn090 SET prio = prio + ?, expire = MAX(expire,?) 
WHERE "
                    "_ROWID_ = ?",
                    &plugin->updPrio) != SQLITE_OK) ||
       (sq_prepare (plugin->dbh,
-                   "INSERT INTO gn080 (size, type, prio, "
+                   "UPDATE gn090 SET repl = MAX (0, repl - 1) WHERE "
+                   "_ROWID_ = ?",
+                   &plugin->updRepl) != SQLITE_OK) ||
+      (sq_prepare (plugin->dbh,
+                   SELECT_IT_REPLICATION_ORDER,
+                   &plugin->selRepl) != SQLITE_OK) ||
+      (sq_prepare (plugin->dbh,
+                   "INSERT INTO gn090 (repl, type, prio, "
                    "anonLevel, expire, hash, vhash, value) VALUES "
                    "(?, ?, ?, ?, ?, ?, ?, ?)",
                    &plugin->insertContent) != SQLITE_OK) ||
       (sq_prepare (plugin->dbh,
-                   "DELETE FROM gn080 WHERE _ROWID_ = ?",
+                   "DELETE FROM gn090 WHERE _ROWID_ = ?",
                    &plugin->delRow) != SQLITE_OK))
     {
       LOG_SQLITE (plugin, NULL,
@@ -367,6 +392,10 @@
     sqlite3_finalize (plugin->delRow);
   if (plugin->updPrio != NULL)
     sqlite3_finalize (plugin->updPrio);
+  if (plugin->updRepl != NULL)
+    sqlite3_finalize (plugin->updRepl);
+  if (plugin->selRepl != NULL)
+    sqlite3_finalize (plugin->selRepl);
   if (plugin->insertContent != NULL)
     sqlite3_finalize (plugin->insertContent);
   result = sqlite3_close(plugin->dbh);
@@ -415,7 +444,6 @@
 delete_by_rowid (struct Plugin* plugin, 
                 unsigned long long rid)
 {
-
   sqlite3_bind_int64 (plugin->delRow, 1, rid);
   if (SQLITE_DONE != sqlite3_step (plugin->delRow))
     {
@@ -564,11 +592,11 @@
       return;
     }
 
-  rowid = sqlite3_column_int64 (nc->stmt, 7);
+  rowid = sqlite3_column_int64 (nc->stmt, 6);
   nc->last_rowid = rowid;
-  type = sqlite3_column_int (nc->stmt, 1);
-  size = sqlite3_column_bytes (nc->stmt, 6);
-  if (sqlite3_column_bytes (nc->stmt, 5) != sizeof (GNUNET_HashCode))
+  type = sqlite3_column_int (nc->stmt, 0);
+  size = sqlite3_column_bytes (nc->stmt, 5);
+  if (sqlite3_column_bytes (nc->stmt, 4) != sizeof (GNUNET_HashCode))
     {
       GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
                       "sqlite",
@@ -579,7 +607,7 @@
                     GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
       if (sq_prepare
           (nc->plugin->dbh,
-           "DELETE FROM gn080 WHERE NOT LENGTH(hash) = ?",
+           "DELETE FROM gn090 WHERE NOT LENGTH(hash) = ?",
            &stmtd) != SQLITE_OK)
         {
           LOG_SQLITE (nc->plugin, NULL,
@@ -604,14 +632,14 @@
       goto END;
     }
 
-  priority = sqlite3_column_int (nc->stmt, 2);
-  anonymity = sqlite3_column_int (nc->stmt, 3);
-  expiration.abs_value = sqlite3_column_int64 (nc->stmt, 4);
-  key = sqlite3_column_blob (nc->stmt, 5);
+  priority = sqlite3_column_int (nc->stmt, 1);
+  anonymity = sqlite3_column_int (nc->stmt, 2);
+  expiration.abs_value = sqlite3_column_int64 (nc->stmt, 3);
+  key = sqlite3_column_blob (nc->stmt, 4);
   nc->lastPriority = priority;
   nc->lastExpiration = expiration;
   memcpy (&nc->lastKey, key, sizeof(GNUNET_HashCode));
-  data = sqlite3_column_blob (nc->stmt, 6);
+  data = sqlite3_column_blob (nc->stmt, 5);
   nc->count++;
   ret = nc->iter (nc->iter_cls,
                  nc,
@@ -678,7 +706,6 @@
 }
 
 
-
 /**
  * Store an item in the datastore.
  *
@@ -723,7 +750,7 @@
 #endif
   GNUNET_CRYPTO_hash (data, size, &vhash);
   stmt = plugin->insertContent;
-  if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, size)) ||
+  if ((SQLITE_OK != sqlite3_bind_int (stmt, 1, replication)) ||
       (SQLITE_OK != sqlite3_bind_int (stmt, 2, type)) ||
       (SQLITE_OK != sqlite3_bind_int (stmt, 3, priority)) ||
       (SQLITE_OK != sqlite3_bind_int (stmt, 4, anonymity)) ||
@@ -1282,7 +1309,7 @@
   sqlite3_stmt *stmt;
 
   if (sq_prepare (plugin->dbh, 
-                 "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ 
FROM gn080",
+                 "SELECT type,prio,anonLevel,expire,hash,value,_ROWID_ FROM 
gn090",
                  &stmt) != SQLITE_OK)
     {
       LOG_SQLITE (plugin, NULL,
@@ -1443,7 +1470,7 @@
       return;
     }
   GNUNET_snprintf (scratch, sizeof (scratch),
-                   "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
+                   "SELECT count(*) FROM gn090 WHERE hash=:1%s%s",
                    vhash == NULL ? "" : " AND vhash=:2",
                    type == 0 ? "" : (vhash ==
                                      NULL) ? " AND type=:2" : " AND type=:3");
@@ -1495,8 +1522,8 @@
     }
 
   GNUNET_snprintf (scratch, sizeof (scratch),
-                   "SELECT size, type, prio, anonLevel, expire, hash, value, 
_ROWID_ "
-                   "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
+                   "SELECT type, prio, anonLevel, expire, hash, value, _ROWID_ 
"
+                   "FROM gn090 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
                    "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
                    vhash == NULL ? "" : " AND vhash=:2",
                    type == 0 ? "" : (vhash ==
@@ -1547,8 +1574,84 @@
 sqlite_plugin_replication_get (void *cls,
                               PluginIterator iter, void *iter_cls)
 {
-  /* FIXME: not implemented! */
-  iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
+  struct Plugin *plugin = cls;
+  int n;
+  sqlite3_stmt *stmt;
+  struct GNUNET_TIME_Absolute expiration;
+  unsigned long long rowid;
+
+#if DEBUG_SQLITE
+  GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
+                  "sqlite",
+                  "Getting random block based on replication order.\n");
+#endif
+  stmt = plugin->selRepl;
+  if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, expiration.abs_value))
+    {
+      LOG_SQLITE (plugin, NULL,                  
+                  GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite3_bind_XXXX");
+      if (SQLITE_OK != sqlite3_reset (stmt))
+        LOG_SQLITE (plugin, NULL,
+                    GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
"sqlite3_reset");
+      iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
+           GNUNET_TIME_UNIT_ZERO_ABS, 0);
+      return;
+    }
+  n = sqlite3_step (stmt);
+  switch (n)
+    {
+    case SQLITE_ROW:
+      rowid = sqlite3_column_int64 (stmt, 6);
+      if (sqlite3_column_bytes (stmt, 4) != sizeof (GNUNET_HashCode))
+       {
+         GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, 
+                          "sqlite",
+                          _("Invalid data in database.  Trying to fix (by 
deletion).\n"));
+         if (SQLITE_OK != sqlite3_reset (stmt))
+           LOG_SQLITE (plugin, NULL,
+                       GNUNET_ERROR_TYPE_ERROR |
+                       GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+         delete_by_rowid (plugin, rowid);
+         break;
+       }
+      expiration.abs_value = sqlite3_column_int64 (stmt, 3);
+      (void) iter (iter_cls,
+                  NULL,
+                  sqlite3_column_blob (stmt, 4) /* key */,
+                  sqlite3_column_bytes (stmt, 5) /* size of data */,
+                  sqlite3_column_blob (stmt, 5) /* data */, 
+                  sqlite3_column_int (stmt, 0) /* type */,
+                  sqlite3_column_int (stmt, 1) /* priority */,
+                  sqlite3_column_int (stmt, 2) /* anonymity */,
+                  expiration,
+                  rowid);
+      if (SQLITE_OK != sqlite3_reset (stmt))
+       LOG_SQLITE (plugin, NULL,
+                   GNUNET_ERROR_TYPE_ERROR |
+                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+      return;
+    case SQLITE_DONE:
+      /* database must be empty */
+      if (SQLITE_OK != sqlite3_reset (stmt))
+       LOG_SQLITE (plugin, NULL,
+                   GNUNET_ERROR_TYPE_ERROR |
+                   GNUNET_ERROR_TYPE_BULK, "sqlite3_reset");
+      break;
+    case SQLITE_BUSY:    
+    case SQLITE_ERROR:
+    case SQLITE_MISUSE:
+    default:
+      LOG_SQLITE (plugin, NULL,
+                 GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, 
+                 "sqlite3_step");
+      (void) sqlite3_reset (stmt);
+      GNUNET_break (0);
+      database_shutdown (plugin);
+      database_setup (plugin->env->cfg,
+                     plugin);
+      break;
+    }
+  iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0,            
        GNUNET_TIME_UNIT_ZERO_ABS, 0);
 }
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]