[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r8035 - in GNUnet: . src/applications src/applications/sqst
From: |
gnunet |
Subject: |
[GNUnet-SVN] r8035 - in GNUnet: . src/applications src/applications/sqstore_postgres |
Date: |
Sat, 27 Dec 2008 02:40:20 -0700 (MST) |
Author: grothoff
Date: 2008-12-27 02:40:20 -0700 (Sat, 27 Dec 2008)
New Revision: 8035
Added:
GNUnet/src/applications/sqstore_postgres/
GNUnet/src/applications/sqstore_postgres/Makefile.am
GNUnet/src/applications/sqstore_postgres/check.conf
GNUnet/src/applications/sqstore_postgres/postgres.c
GNUnet/src/applications/sqstore_postgres/postgres_test.c
GNUnet/src/applications/sqstore_postgres/postgres_test2.c
GNUnet/src/applications/sqstore_postgres/postgres_test3.c
Modified:
GNUnet/configure.ac
Log:
draft for postgres support
Modified: GNUnet/configure.ac
===================================================================
--- GNUnet/configure.ac 2008-12-27 08:16:08 UTC (rev 8034)
+++ GNUnet/configure.ac 2008-12-27 09:40:20 UTC (rev 8035)
@@ -359,7 +359,39 @@
AC_SUBST(SQLITE_CPPFLAGS)
AC_SUBST(SQLITE_LDFLAGS)
+# test for postgres
+postgres=false
+AC_MSG_CHECKING(for postgres)
+AC_ARG_WITH(postgres,
+ [ --with-postgres=PFX base of postgres installation],
+ [AC_MSG_RESULT("$with_postgres")
+ case $with_postgres in
+ no)
+ ;;
+ yes)
+ AC_CHECK_HEADERS(postgresql/libpq-fe.h,
+ postgres=true)
+ ;;
+ *)
+ LDFLAGS="-L$with_postgres/lib $LDFLAGS"
+ CPPFLAGS="-I$with_postgres/include $CPPFLAGS"
+ AC_CHECK_HEADERS(postgresql/libpq-fe.h,
+ EXT_LIB_PATH="-L$with_postgres/lib $EXT_LIB_PATH"
+ SQLITE_LDFLAGS="-L$with_postgres/lib"
+ SQLITE_CPPFLAGS="-I$with_postgres/include"
+ postgres=true)
+ LDFLAGS=$SAVE_LDFLAGS
+ CPPFLAGS=$SAVE_CPPFLAGS
+ ;;
+ esac
+ ],
+ [AC_MSG_RESULT([--with-postgres not specified])
+ AC_CHECK_HEADERS(postgresql/libpq-fe.h, postgres=true)])
+AM_CONDITIONAL(HAVE_POSTGRES, test x$postgres = xtrue)
+AC_SUBST(POSTGRES_CPPFLAGS)
+AC_SUBST(POSTGRES_LDFLAGS)
+
# test for libz (maybe required for linking mysql)
zlib=1
AC_CHECK_LIB(z, compress,,zlib=0)
@@ -981,6 +1013,7 @@
src/applications/rpc/Makefile
src/applications/session/Makefile
src/applications/sqstore_mysql/Makefile
+src/applications/sqstore_postgres/Makefile
src/applications/sqstore_sqlite/Makefile
src/applications/state/Makefile
src/applications/stats/Makefile
Added: GNUnet/src/applications/sqstore_postgres/Makefile.am
===================================================================
--- GNUnet/src/applications/sqstore_postgres/Makefile.am
(rev 0)
+++ GNUnet/src/applications/sqstore_postgres/Makefile.am 2008-12-27
09:40:20 UTC (rev 8035)
@@ -0,0 +1,54 @@
+INCLUDES = -I$(top_srcdir)/src/include
+
+if USE_COVERAGE
+ AM_CFLAGS = -fprofile-arcs -ftest-coverage
+endif
+plugindir = $(libdir)/GNUnet
+
+LDADD = \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+plugin_LTLIBRARIES = \
+ libgnunetmodule_sqstore_postgres.la
+
+check_PROGRAMS = \
+ postgres_test \
+ postgres_test2 \
+ postgres_test3
+
+TESTS = $(check_PROGRAMS)
+
+AM_CPPFLAGS = $(CPPFLAGS) $(POSTGRES_CPPFLAGS)
+
+libgnunetmodule_sqstore_postgres_la_SOURCES = \
+ postgres.c
+libgnunetmodule_sqstore_postgres_la_LDFLAGS = \
+ $(GN_PLUGIN_LDFLAGS) \
+ $(POSTGRES_LDFLAGS)
+libgnunetmodule_sqstore_postgres_la_LIBADD = \
+ $(top_builddir)/src/util/libgnunetutil.la \
+ -lpg \
+ $(LTLIBINTL)
+
+EXTRA_DIST = check.conf
+
+postgres_test_SOURCES = \
+ postgres_test.c
+postgres_test_LDADD = \
+ $(top_builddir)/src/server/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
+postgres_test2_SOURCES = \
+ postgres_test2.c
+postgres_test2_LDADD = \
+ $(top_builddir)/src/server/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
+
+
+
+postgres_test3_SOURCES = \
+ postgres_test3.c
+postgres_test3_LDADD = \
+ $(top_builddir)/src/server/libgnunetcore.la \
+ $(top_builddir)/src/util/libgnunetutil.la
Added: GNUnet/src/applications/sqstore_postgres/check.conf
===================================================================
--- GNUnet/src/applications/sqstore_postgres/check.conf
(rev 0)
+++ GNUnet/src/applications/sqstore_postgres/check.conf 2008-12-27 09:40:20 UTC
(rev 8035)
@@ -0,0 +1,46 @@
+[PATHS]
+GNUNETD_HOME = "/tmp/gnunet-postgres-sqstore-test"
+
+[GNUNETD]
+GNUNETD_HOME = "/tmp/gnunet-postgres-sqstore-test"
+HELLOEXPIRES = 1440
+LOGLEVEL = "NOTHING"
+LOGFILE = "$GNUNETD_HOME/logs"
+KEEPLOG = "0"
+PIDFILE = "$GNUNETD_HOME/gnunet.pid"
+HOSTS = "$GNUNETD_HOME/data/hosts/"
+HTTP-PROXY = ""
+HTTP-PROXY-PORT = 1080
+APPLICATIONS = "fs getoption stats traffic"
+PROCESS-PRIORITY = "NORMAL"
+
+[MODULES]
+sqstore = "sqstore_postgres"
+topology = "topology_default"
+dstore = "dstore_postgres"
+
+[NETWORK]
+PORT = 12087
+IP = ""
+HELLOEXCHANGE = YES
+TRUSTED = "127.0.0.0/8;"
+
+[LOAD]
+BASICLIMITING = YES
+MAXNETDOWNBPSTOTAL = 50000
+MAXNETUPBPSTOTAL = 50000
+MAXCPULOAD = 50
+
+[FS]
+QUOTA = 1024
+ACTIVEMIGRATION = YES
+DIR = "$GNUNETD_HOME/data/fs/"
+INDEX-DIRECTORY = "$GNUNETD_HOME/data/shared/"
+INDEX-QUOTA = 8192
+POOL = 32
+
+
+[TESTING]
+WEAKRANDOM = YES
+
+
Added: GNUnet/src/applications/sqstore_postgres/postgres.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres.c
(rev 0)
+++ GNUnet/src/applications/sqstore_postgres/postgres.c 2008-12-27 09:40:20 UTC
(rev 8035)
@@ -0,0 +1,1382 @@
+/*
+ This file is part of GNUnet.
+ (C) 2008 Christian Grothoff (and other contributing authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+
+/**
+ * @file applications/sqstore_postgres/postgres.c
+ * @brief Postgres based implementation of the sqstore service
+ * @author Christian Grothoff
+ *
+ * Database: Postgres
+ */
+
+#include "platform.h"
+#include "gnunet_directories.h"
+#include "gnunet_util.h"
+#include "gnunet_sqstore_service.h"
+#include "gnunet_protocols.h"
+#include "gnunet_stats_service.h"
+#include <postgresql/libpq-fe.h>
+
+#define DEBUG_POSTGRES GNUNET_NO
+
+/**
+ * Die with an error message that indicates
+ * a failure of the command 'cmd' with the message given
+ * by strerror(errno).
+ */
+#define DIE_POSTGRES(cmd) do { GNUNET_GE_LOG(ectx, GNUNET_GE_FATAL |
GNUNET_GE_IMMEDIATE | GNUNET_GE_ADMIN, _("`%s' failed at %s:%d with error:
%s\n"), cmd, __FILE__, __LINE__, PQerrorMessage(dbh)); abort(); } while(0)
+
+/**
+ * Log an error message at log-level 'level' that indicates
+ * a failure of the command 'cmd' on file 'filename'
+ * with the message given by strerror(errno).
+ */
+#define LOG_POSTGRES(level, cmd) do { GNUNET_GE_LOG(ectx, level, _("`%s'
failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__,
PQerrorMessage(dbh)); } while(0)
+
+#define SELECT_IT_LOW_PRIORITY_1 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio = $1 AND hash > $2) "\
+ "ORDER BY hash ASC LIMIT 1"
+
+#define SELECT_IT_LOW_PRIORITY_2 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio > $1) "\
+ "ORDER BY prio ASC, hash ASC LIMIT 1"
+
+#define SELECT_IT_NON_ANONYMOUS_1 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio = $1 AND hash < $2 AND anonLevel = 0) "\
+ " ORDER BY hash DESC LIMIT 1"
+
+#define SELECT_IT_NON_ANONYMOUS_2 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio < $1 AND anonLevel = 0)"\
+ " ORDER BY prio DESC, hash DESC LIMIT 1"
+
+#define SELECT_IT_EXPIRATION_TIME_1 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire = $1 AND hash > $2) "\
+ " ORDER BY hash ASC LIMIT 1"
+
+#define SELECT_IT_EXPIRATION_TIME_2 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire > $1) "\
+ " ORDER BY expire ASC, hash ASC LIMIT 1"
+
+#define SELECT_IT_MIGRATION_ORDER_1 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire = $1 AND hash < $2) "\
+ " ORDER BY hash DESC LIMIT 1"
+
+#define SELECT_IT_MIGRATION_ORDER_2 \
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire < $1) "\
+ " ORDER BY expire DESC, hash DESC LIMIT 1"
+
+/**
+ * After how many ms "busy" should a DB operation fail for good?
+ * A low value makes sure that we are more responsive to requests
+ * (especially PUTs). A high value guarantees a higher success
+ * rate (SELECTs in iterate can take several seconds despite LIMIT=1).
+ *
+ * The default value of 250ms should ensure that users do not experience
+ * huge latencies while at the same time allowing operations to succeed
+ * with reasonable probability.
+ */
+#define BUSY_TIMEOUT_MS 250
+
+/**
+ * Native Postgres database handle.
+ */
+static PGconn *dbh;
+
+static GNUNET_Stats_ServiceAPI *stats;
+
+static GNUNET_CoreAPIForPlugins *coreAPI;
+
+static unsigned int stat_size;
+
+#if DEBUG_POSTGRES
+static unsigned int stat_mem;
+#endif
+
+static struct GNUNET_GE_Context *ectx;
+
+static struct GNUNET_Mutex *lock;
+
+static char *fn;
+
+static unsigned long long payload;
+
+static unsigned int lastSync;
+
+/**
+ * @brief Prepare a SQL statement
+ */
+static int
+pq_prepare (const char *zSql, int nParams,
+ const Oid * paramTypes)
+{
+ PGresult * ret;
+ ret = PQprepare (dbh,
+ zSql,
+ zSql,
+ nParams,
+ paramTypes);
+ if (ret == NULL)
+ {
+ GNUNET_GE_LOG (ectx, GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
+ _("PQprepare failed (returning NULL)\n"));
+ return GNUNET_SYSERR;
+ }
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ LOG_POSTGRES (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
+ "PQprepare");
+ PQclear (ret);
+ return GNUNET_SYSERR;
+ }
+ PQclear (ret);
+ return GNUNET_OK;
+}
+
+/**
+ * Run simple SQL statement (without results).
+ */
+static int
+pq_exec (const char * sql)
+{
+ PGresult * ret;
+ ret = PQexec (dbh, sql);
+ if (ret == NULL)
+ {
+ /* FIXME: report error! */
+ return GNUNET_SYSERR;
+ }
+ if (PQresultStatus (res) != PGRES_COMMAND_OK)
+ {
+ LOG_POSTGRES (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
+ "PQexec");
+ PQclear(ret);
+ return GNUNET_SYSERR;
+ }
+ PQclear(ret);
+ return GNUNET_OK;
+}
+
+/**
+ * Create indices
+ */
+static int
+create_indices ()
+{
+ if ( (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_hash ON gn080 (hash)")) ||
+ (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)")) ||
+ (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_prio ON gn080 (prio)")) ||
+ (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_expire ON gn080 (expire)")) ||
+ (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)")) ||
+ (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)")) ||
+ (GNUNET_OK !=
+ pq_exec ("CREATE INDEX idx_comb7 ON gn080 (expire,hash)")) )
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+/**
+ * @brief Get a database handle
+ * @return the native Postgres database handle, NULL on error
+ */
+static PGconn *
+init_connection ()
+{
+ char * conninfo;
+
+ /* Open database and precompile statements */
+ conninfo = NULL;
+ GNUNET_GC_get_configuration_value_string (coreAPI->cfg,
+ "POSTGRES", "CONFIG",
"connect_timeout=10",
+ &conninfo);
+ dbh = PQconnectdb(conninfo);
+ GNUNET_free (conninfo);
+ if (dbh == NULL)
+ {
+ /* FIXME: warn about out-of-memory? */
+ return NULL;
+ }
+ if (PQstatus(dbh) != CONNECTION_OK)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _("Unable to initialize Postgres: %s.\n"),
+ PQerrorMessage (dbh));
+ PQfinish (dbh);
+ dbh = NULL;
+ return NULL;
+ }
+
+ if ( (GNUNET_OK !=
+ pq_exec ("CREATE TABLE gn080 ("
+ " size INTEGER NOT NULL DEFAULT 0,"
+ " type INTEGER NOT NULL DEFAULT 0,"
+ " prio INTEGER NOT NULL DEFAULT 0,"
+ " anonLevel INTEGER NOT NULL DEFAULT 0,"
+ " expire BIGINT NOT NULL DEFAULT 0,"
+ " hash BYTEA NOT NULL DEFAULT '',"
+ " vhash BYTEA NOT NULL DEFAULT '',"
+ " value BYTEA NOT NULL DEFAULT '')")) ||
+ (GNUNET_OK !=
+ create_indices () ) )
+ {
+ PQfinish (dbh);
+ dbh = NULL;
+ return NULL;
+ }
+ /* FIXME: prepare statements! */
+
+ return dbh;
+}
+
+
+/**
+ * Get an estimate of the size of the given
+ * value (and its key) in the datastore.<p>
+ */
+static unsigned int
+getContentDatastoreSize (const GNUNET_DatastoreValue * value)
+{
+ return sizeof (GNUNET_HashCode) * 2 + ntohl (value->size) -
+ sizeof (GNUNET_DatastoreValue) + 24;
+}
+
+/**
+ * Get the current on-disk size of the SQ store. Estimates are fine,
+ * if that's the only thing available.
+ *
+ * @return number of bytes used on disk
+ */
+static unsigned long long
+getSize ()
+{
+ double ret;
+
+ GNUNET_mutex_lock (lock);
+ ret = payload;
+ if (stats)
+ {
+ stats->set (stat_size, ret);
+#if DEBUG_POSTGRES
+ stats->set (stat_mem, postgres3_memory_used ());
+#endif
+ }
+ GNUNET_mutex_unlock (lock);
+ return (unsigned long long) (ret * 1.00);
+ /* benchmarking shows XX% overhead */
+}
+
+///////////////////////////////////////////////
+
+static int
+delete_by_rowid (postgresHandle * handle, unsigned long long rid)
+{
+ postgres3_stmt *stmt;
+
+ if (sq_prepare (handle->dbh,
+ "DELETE FROM gn080 WHERE _ROWID_ = ?", &stmt) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "sq_prepare");
+ return GNUNET_SYSERR;
+ }
+ postgres3_bind_int64 (stmt, 1, rid);
+ if (POSTGRES_DONE != postgres3_step (stmt))
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+ postgres3_finalize (stmt);
+ return GNUNET_SYSERR;
+ }
+ postgres3_finalize (stmt);
+ return GNUNET_OK;
+}
+
+/**
+ * Given a full row from gn080 table
(size,type,priority,anonLevel,expire,GNUNET_hash,value),
+ * assemble it into a GNUNET_DatastoreValue representation.
+ */
+static GNUNET_DatastoreValue *
+assembleDatum (postgresHandle * handle, postgres3_stmt * stmt,
+ GNUNET_HashCode * key, unsigned long long *rowid)
+{
+ GNUNET_DatastoreValue *value;
+ int contentSize;
+ postgres3 *dbh;
+ unsigned int type;
+ postgres3_stmt *stmtd;
+
+ *rowid = postgres3_column_int64 (stmt, 7);
+ type = postgres3_column_int (stmt, 1);
+ contentSize = postgres3_column_int (stmt, 0) - sizeof
(GNUNET_DatastoreValue);
+ dbh = handle->dbh;
+ if (contentSize < 0)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _
+ ("Invalid data in %s. Trying to fix (by deletion).\n"),
+ _("postgres datastore"));
+ if (POSTGRES_OK != postgres3_reset (stmt))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_reset");
+ if (sq_prepare (dbh, "DELETE FROM gn080 WHERE size < ?", &stmtd) !=
+ POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "sq_prepare");
+ return NULL;
+ }
+ if (POSTGRES_OK !=
+ postgres3_bind_int (stmtd, 1, sizeof (GNUNET_DatastoreValue)))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_bind_int");
+ if (POSTGRES_DONE != postgres3_step (stmtd))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+ if (POSTGRES_OK != postgres3_finalize (stmtd))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_finalize");
+ return NULL; /* error */
+ }
+
+ if (postgres3_column_bytes (stmt, 5) != sizeof (GNUNET_HashCode) ||
+ postgres3_column_bytes (stmt, 6) != contentSize)
+ {
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_WARNING | GNUNET_GE_BULK | GNUNET_GE_USER,
+ _("Invalid data in %s. Trying to fix (by deletion).\n"),
+ _("postgres datastore"));
+ if (POSTGRES_OK != postgres3_reset (stmt))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_reset");
+ if (sq_prepare
+ (dbh,
+ "DELETE FROM gn080 WHERE NOT ((LENGTH(hash) = ?) AND (size =
LENGTH(value) + ?))",
+ &stmtd) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "sq_prepare");
+ return NULL;
+ }
+
+ if (POSTGRES_OK != postgres3_bind_int (stmtd, 1, sizeof
(GNUNET_HashCode)))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_bind_int");
+ if (POSTGRES_OK !=
+ postgres3_bind_int (stmtd, 2, sizeof (GNUNET_DatastoreValue)))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_bind_int");
+ if (POSTGRES_DONE != postgres3_step (stmtd))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+ if (POSTGRES_OK != postgres3_finalize (stmtd))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_finalize");
+ return NULL;
+ }
+
+ value = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) + contentSize);
+ value->size = htonl (contentSize + sizeof (GNUNET_DatastoreValue));
+ value->type = htonl (type);
+ value->priority = htonl (postgres3_column_int (stmt, 2));
+ value->anonymity_level = htonl (postgres3_column_int (stmt, 3));
+ value->expiration_time = GNUNET_htonll (postgres3_column_int64 (stmt, 4));
+ memcpy (key, postgres3_column_blob (stmt, 5), sizeof (GNUNET_HashCode));
+ memcpy (&value[1], postgres3_column_blob (stmt, 6), contentSize);
+ return value;
+}
+
+
+/**
+ * @brief Get database statistics
+ * @param key kind of stat to retrieve
+ * @return GNUNET_SYSERR on error, the value otherwise
+ */
+static unsigned long long
+getStat (const char *key)
+{
+ int i;
+ postgres3_stmt *stmt;
+ unsigned long long ret = GNUNET_SYSERR;
+
+ i = sq_prepare (handle->dbh,
+ "SELECT value FROM gn071 WHERE key = ?", &stmt);
+ if (i == POSTGRES_OK)
+ {
+ postgres3_bind_text (stmt, 1, key, strlen (key), POSTGRES_STATIC);
+ i = postgres3_step (stmt);
+
+ if (i == POSTGRES_DONE)
+ {
+ ret = 0;
+ i = POSTGRES_OK;
+ }
+ else if (i == POSTGRES_ROW)
+ {
+ ret = postgres3_column_int64 (stmt, 0);
+ i = POSTGRES_OK;
+ }
+ postgres3_finalize (stmt);
+ }
+ if (i == POSTGRES_BUSY)
+ return GNUNET_SYSERR;
+ if (i != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres_getStat");
+ return GNUNET_SYSERR;
+ }
+ return ret;
+}
+
+/**
+ * @brief set database statistics
+ * @param key statistic to set
+ * @param val value to set
+ * @return GNUNET_SYSERR on error, GNUNET_OK otherwise
+ */
+static int
+setStat (postgresHandle * handle, const char *key, unsigned long long val)
+{
+ postgres3_stmt *stmt;
+ postgres3 *dbh;
+
+ dbh = handle->dbh;
+ if (sq_prepare (dbh, "DELETE FROM gn071 where key = ?", &stmt) ==
POSTGRES_OK)
+ {
+ postgres3_bind_text (stmt, 1, key, strlen (key), POSTGRES_STATIC);
+ if (POSTGRES_DONE != postgres3_step (stmt))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+
+ postgres3_finalize (stmt);
+ }
+
+ if (sq_prepare (dbh,
+ "INSERT INTO gn071(key, value) VALUES (?, ?)",
+ &stmt) != POSTGRES_OK)
+ return GNUNET_SYSERR;
+ if ((POSTGRES_OK !=
+ postgres3_bind_text (stmt, 1, key, strlen (key), POSTGRES_STATIC))
+ || (POSTGRES_OK != postgres3_bind_int64 (stmt, 2, val)))
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_bind_xxx");
+ postgres3_finalize (stmt);
+ return GNUNET_SYSERR;
+ }
+ if (postgres3_step (stmt) != POSTGRES_DONE)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+ postgres3_finalize (stmt);
+ return GNUNET_SYSERR;
+ }
+ postgres3_finalize (stmt);
+
+ return GNUNET_OK;
+}
+
+/**
+ * @brief write all statistics to the db
+ */
+static void
+syncStats (postgresHandle * handle)
+{
+ setStat (handle, "PAYLOAD", payload);
+ lastSync = 0;
+}
+
+/**
+ * Call a method for each key in the database and
+ * call the callback method on it.
+ *
+ * @param type entries of which type should be considered?
+ * @param iter maybe NULL (to just count); iter
+ * should return GNUNET_SYSERR to abort the
+ * iteration, GNUNET_NO to delete the entry and
+ * continue and GNUNET_OK to continue iterating
+ * @return the number of results processed,
+ * GNUNET_SYSERR on error
+ */
+static int
+postgres_iterate (unsigned int type,
+ int is_asc,
+ int is_prio,
+ int is_migr,
+ int limit_nonanonymous,
+ const char *stmt_str_1,
+ const char *stmt_str_2,
+ GNUNET_DatastoreValueIterator iter, void *closure)
+{
+ postgres3_stmt *stmt_1;
+ postgres3_stmt *stmt_2;
+ int count;
+ GNUNET_DatastoreValue *datum_1;
+ GNUNET_DatastoreValue *datum_2;
+ GNUNET_DatastoreValue *last_datum_2;
+ GNUNET_DatastoreValue *datum;
+ unsigned int lastPrio;
+ unsigned long long lastExp;
+ GNUNET_HashCode key_1;
+ GNUNET_HashCode key_2;
+ GNUNET_HashCode key;
+ int ret;
+ GNUNET_CronTime now;
+ unsigned long long rowid;
+ unsigned long long rowid_1;
+ unsigned long long rowid_2;
+
+ GNUNET_mutex_lock (lock);
+ if (sq_prepare (dbh, stmt_str_1, &stmt_1) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_prepare");
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ if (sq_prepare (dbh, stmt_str_2, &stmt_2) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_prepare");
+ postgres3_finalize (stmt_1);
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ count = 0;
+ if (is_asc)
+ {
+ lastPrio = 0;
+ lastExp = 0;
+ memset (&key, 0, sizeof (GNUNET_HashCode));
+ }
+ else
+ {
+ lastPrio = 0x7FFFFFFF;
+ lastExp = 0x7FFFFFFFFFFFFFFFLL;
+ memset (&key, 255, sizeof (GNUNET_HashCode));
+ }
+ last_datum_2 = NULL;
+ while (1)
+ {
+ if (is_prio)
+ {
+ postgres3_bind_int (stmt_1, 1, lastPrio);
+ postgres3_bind_int (stmt_2, 1, lastPrio);
+ }
+ else
+ {
+ postgres3_bind_int64 (stmt_1, 1, lastExp);
+ postgres3_bind_int64 (stmt_2, 1, lastExp);
+ }
+ postgres3_bind_blob (stmt_1, 2, &key, sizeof (GNUNET_HashCode),
+ POSTGRES_TRANSIENT);
+ now = GNUNET_get_time ();
+ datum_1 = NULL;
+ datum_2 = last_datum_2;
+ last_datum_2 = NULL;
+ if ((ret = postgres3_step (stmt_1)) == POSTGRES_ROW)
+ {
+ if (is_migr && postgres3_column_int64 (stmt_1, 4) < now)
+ datum_1 = NULL;
+ else
+ datum_1 = assembleDatum (handle, stmt_1, &key_1, &rowid_1);
+ if (POSTGRES_OK != postgres3_reset (stmt_1))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_reset");
+ }
+ else
+ {
+ if (ret != POSTGRES_DONE)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+ postgres3_finalize (stmt_1);
+ postgres3_finalize (stmt_2);
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ postgres3_reset (stmt_1);
+ }
+
+ if (datum_2 == NULL)
+ {
+ if ((ret = postgres3_step (stmt_2)) == POSTGRES_ROW)
+ {
+ if (is_migr && postgres3_column_int64 (stmt_2, 4) < now)
+ datum_2 = NULL;
+ else
+ datum_2 = assembleDatum (handle, stmt_2, &key_2, &rowid_2);
+ if (POSTGRES_OK != postgres3_reset (stmt_2))
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER
+ | GNUNET_GE_BULK, "postgres3_reset");
+ }
+ else
+ {
+ if (ret != POSTGRES_DONE)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN |
+ GNUNET_GE_USER | GNUNET_GE_BULK,
+ "postgres3_step");
+ postgres3_finalize (stmt_1);
+ postgres3_finalize (stmt_2);
+ GNUNET_mutex_unlock (lock);
+ GNUNET_free_non_null (datum_1);
+ return GNUNET_SYSERR;
+ }
+ postgres3_reset (stmt_2);
+ }
+ }
+ datum = NULL;
+ if (datum_1 == NULL)
+ {
+ datum = datum_2;
+ rowid = rowid_2;
+ key = key_2;
+ }
+ else if (datum_2 == NULL)
+ {
+ datum = datum_1;
+ rowid = rowid_1;
+ key = key_1;
+ }
+ else
+ {
+ /* have to pick between 1 and 2 */
+ if (is_prio)
+ {
+ if ((ntohl (datum_1->priority) < ntohl (datum_2->priority)) ==
+ is_asc)
+ {
+ datum = datum_1;
+ rowid = rowid_1;
+ key = key_1;
+ last_datum_2 = datum_2;
+ }
+ else
+ {
+ datum = datum_2;
+ rowid = rowid_2;
+ key = key_2;
+ GNUNET_free (datum_1);
+ }
+ }
+ else
+ {
+ if ((GNUNET_ntohll (datum_1->expiration_time) <
+ GNUNET_ntohll (datum_2->expiration_time)) == is_asc)
+ {
+ datum = datum_1;
+ rowid = rowid_1;
+ key = key_1;
+ last_datum_2 = datum_2;
+ }
+ else
+ {
+ datum = datum_2;
+ rowid = rowid_2;
+ key = key_2;
+ GNUNET_free (datum_1);
+ }
+ }
+ }
+ if (datum == NULL)
+ break;
+#if 0
+ printf ("FOUND %4u prio %4u exp %20llu old: %4u, %20llu\n",
+ (ntohl (datum->size) - sizeof (GNUNET_DatastoreValue)),
+ ntohl (datum->priority),
+ GNUNET_ntohll (datum->expiration_time), lastPrio, lastExp);
+#endif
+ if (((GNUNET_NO == limit_nonanonymous) ||
+ (ntohl (datum->anonymity_level) == 0)) &&
+ ((type == GNUNET_ECRS_BLOCKTYPE_ANY) ||
+ (type == ntohl (datum->type))))
+ {
+ count++;
+ if (iter != NULL)
+ {
+ GNUNET_mutex_unlock (lock);
+ ret = iter (&key, datum, closure, rowid);
+ GNUNET_mutex_lock (lock);
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_free (datum);
+ break;
+ }
+ if (ret == GNUNET_NO)
+ {
+ payload -= getContentDatastoreSize (datum);
+ delete_by_rowid (handle, rowid);
+ }
+ }
+ }
+ lastPrio = ntohl (datum->priority);
+ lastExp = GNUNET_ntohll (datum->expiration_time);
+ GNUNET_free (datum);
+ }
+ postgres3_finalize (stmt_1);
+ postgres3_finalize (stmt_2);
+ GNUNET_free_non_null (last_datum_2);
+ GNUNET_mutex_unlock (lock);
+ return count;
+}
+
+/**
+ * Call a method for each key in the database and
+ * call the callback method on it.
+ *
+ * @param type limit the iteration to entries of this
+ * type. 0 for all entries.
+ * @param iter the callback method
+ * @param closure argument to all callback calls
+ * @return the number of results, GNUNET_SYSERR on error
+ */
+static int
+iterateLowPriority (unsigned int type, GNUNET_DatastoreValueIterator iter,
+ void *closure)
+{
+ return postgres_iterate (type, GNUNET_YES, GNUNET_YES, GNUNET_NO, GNUNET_NO,
+ SELECT_IT_LOW_PRIORITY_1,
+ SELECT_IT_LOW_PRIORITY_2, iter, closure);
+}
+
+/**
+ * Call a method on content with zero anonymity.
+ *
+ * @param type limit the iteration to entries of this
+ * type. 0 for all entries.
+ * @param iter the callback method
+ * @param closure argument to all callback calls
+ * @return the number of results, GNUNET_SYSERR on error
+ */
+static int
+iterateNonAnonymous (unsigned int type, GNUNET_DatastoreValueIterator iter,
+ void *closure)
+{
+ return postgres_iterate (type, GNUNET_NO, GNUNET_YES, GNUNET_NO, GNUNET_YES,
+ SELECT_IT_NON_ANONYMOUS_1,
+ SELECT_IT_NON_ANONYMOUS_2, iter, closure);
+}
+
+/**
+ * Call a method for each key in the database and
+ * call the callback method on it.
+ *
+ * @param handle the database
+ * @param callback the callback method
+ * @param data second argument to all callback calls
+ * @return the number of items stored in the content database
+ */
+static int
+iterateExpirationTime (unsigned int type, GNUNET_DatastoreValueIterator iter,
+ void *closure)
+{
+ return postgres_iterate (type, GNUNET_YES, GNUNET_NO, GNUNET_NO, GNUNET_NO,
+ SELECT_IT_EXPIRATION_TIME_1,
+ SELECT_IT_EXPIRATION_TIME_2, iter, closure);
+}
+
+/**
+ * Iterate over the items in the datastore in migration
+ * order.
+ *
+ * @param iter never NULL
+ * @return the number of results, GNUNET_SYSERR on error
+ */
+static int
+iterateMigrationOrder (GNUNET_DatastoreValueIterator iter, void *closure)
+{
+ return postgres_iterate (0, GNUNET_NO, GNUNET_NO, GNUNET_YES, GNUNET_NO,
+ SELECT_IT_MIGRATION_ORDER_1,
+ SELECT_IT_MIGRATION_ORDER_2, iter, closure);
+}
+
+/**
+ * Call a method for each key in the database and
+ * do so quickly in any order (can lock the
+ * database until iteration is complete).
+ *
+ * @param callback the callback method
+ * @param data second argument to all callback calls
+ * @return the number of items stored in the content database
+ */
+static int
+iterateAllNow (GNUNET_DatastoreValueIterator iter, void *closure)
+{
+ postgres3_stmt *stmt;
+ int count;
+ GNUNET_DatastoreValue *datum;
+ int ret;
+ unsigned long long newpayload;
+ unsigned long long rowid;
+ unsigned long long last_rowid;
+ GNUNET_HashCode key;
+
+ newpayload = 0;
+ GNUNET_mutex_lock (lock);
+ /* For the rowid trick see
+ http://permalink.gmane.org/gmane.network.gnunet.devel/1363 */
+ if (sq_prepare (dbh,
+ "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_"
+ " FROM gn080 WHERE _ROWID_ > :1 ORDER BY _ROWID_ ASC LIMIT
1",
+ &stmt) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_prepare");
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ count = 0;
+ last_rowid = 0;
+ while (1)
+ {
+ ret = postgres3_bind_int64 (stmt, 1, last_rowid);
+ if (ret != POSTGRES_OK)
+ break;
+ if (postgres3_step (stmt) != POSTGRES_ROW)
+ break;
+ datum = assembleDatum (handle, stmt, &key, &rowid);
+#if 0
+ printf ("IA-FOUND %4u prio %4u exp %20llu RID %llu old-RID: %llu\n",
+ (ntohl (datum->size) - sizeof (GNUNET_DatastoreValue)),
+ ntohl (datum->priority),
+ GNUNET_ntohll (datum->expiration_time), rowid, last_rowid);
+#endif
+ last_rowid = rowid;
+ postgres3_reset (stmt);
+ if (datum == NULL)
+ continue;
+ newpayload += getContentDatastoreSize (datum);
+ count++;
+ if (iter != NULL)
+ {
+ GNUNET_mutex_unlock (lock);
+ ret = iter (&key, datum, closure, rowid);
+ GNUNET_mutex_lock (lock);
+ }
+ else
+ ret = GNUNET_OK;
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_free (datum);
+ break;
+ }
+ if (ret == GNUNET_NO)
+ {
+ newpayload -= getContentDatastoreSize (datum);
+ delete_by_rowid (handle, rowid);
+ }
+ GNUNET_free (datum);
+ }
+ postgres3_reset (stmt);
+ postgres3_finalize (stmt);
+ if (count != GNUNET_SYSERR)
+ {
+ /* re-computed payload! */
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_INFO | GNUNET_GE_IMMEDIATE | GNUNET_GE_USER |
+ GNUNET_GE_ADMIN,
+ "Postgres database size recomputed. New estimate is
%llu, old estimate was %llu\n",
+ newpayload, payload);
+ payload = newpayload;
+ syncStats (handle);
+ }
+ GNUNET_mutex_unlock (lock);
+ return count;
+}
+
+
+
+
+
+/**
+ * Iterate over all entries matching a particular key and
+ * type.
+ *
+ * @param key maybe NULL (to match all entries)
+ * @param vhash hash of the value; maybe NULL (to match all entries)
+ * @param type entries of which type are relevant?
+ * Use 0 for any type.
+ * @param iter maybe NULL (to just count); iter
+ * should return GNUNET_SYSERR to abort the
+ * iteration, GNUNET_NO to delete the entry and
+ * continue and GNUNET_OK to continue iterating
+ * @return the number of results processed,
+ * GNUNET_SYSERR on error
+ */
+static int
+get (const GNUNET_HashCode * key,
+ const GNUNET_HashCode * vhash,
+ unsigned int type, GNUNET_DatastoreValueIterator iter, void *closure)
+{
+ int ret;
+ int count;
+ int total;
+ int off;
+ int limit_off;
+ postgres3_stmt *stmt;
+ char scratch[256];
+ GNUNET_DatastoreValue *datum;
+ postgres3 *dbh;
+ GNUNET_HashCode rkey;
+ unsigned long long last_rowid;
+ unsigned long long rowid;
+ int sqoff;
+
+ if (key == NULL)
+ return iterateLowPriority (type, iter, closure);
+ GNUNET_mutex_lock (lock);
+
+ GNUNET_snprintf (scratch, 256,
+ "SELECT count(*) FROM gn080 WHERE hash=:1%s%s",
+ vhash == NULL ? "" : " AND vhash=:2",
+ type == 0 ? "" : (vhash ==
+ NULL) ? " AND type=:2" : " AND type=:3");
+ if (sq_prepare (dbh, scratch, &stmt) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres_prepare");
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ sqoff = 1;
+ ret = postgres3_bind_blob (stmt,
+ sqoff++,
+ key, sizeof (GNUNET_HashCode), POSTGRES_TRANSIENT);
+ if ((vhash != NULL) && (ret == POSTGRES_OK))
+ ret = postgres3_bind_blob (stmt,
+ sqoff++,
+ vhash,
+ sizeof (GNUNET_HashCode), POSTGRES_TRANSIENT);
+ if ((type != 0) && (ret == POSTGRES_OK))
+ ret = postgres3_bind_int (stmt, sqoff++, type);
+ if (ret != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres_bind");
+ postgres3_reset (stmt);
+ postgres3_finalize (stmt);
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+
+ }
+ ret = postgres3_step (stmt);
+ if (ret != POSTGRES_ROW)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres_step");
+ postgres3_reset (stmt);
+ postgres3_finalize (stmt);
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+
+ }
+ total = postgres3_column_int (stmt, 0);
+ postgres3_reset (stmt);
+ postgres3_finalize (stmt);
+ if ((iter == NULL) || (total == 0))
+ {
+ GNUNET_mutex_unlock (lock);
+ return total;
+ }
+
+ GNUNET_snprintf (scratch, 256,
+ "SELECT size, type, prio, anonLevel, expire, hash, value,
_ROWID_ "
+ "FROM gn080 WHERE hash=:1%s%s AND _ROWID_ >= :%d "
+ "ORDER BY _ROWID_ ASC LIMIT 1 OFFSET :d",
+ vhash == NULL ? "" : " AND vhash=:2",
+ type == 0 ? "" : (vhash ==
+ NULL) ? " AND type=:2" : " AND type=:3",
+ sqoff, sqoff + 1);
+ if (sq_prepare (dbh, scratch, &stmt) != POSTGRES_OK)
+ {
+ LOG_POSTGRES (handle,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres_prepare");
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ count = 0;
+ last_rowid = 0;
+ off = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, total);
+ while (1)
+ {
+ if (count == 0)
+ limit_off = off;
+ else
+ limit_off = 0;
+ sqoff = 1;
+ ret = postgres3_bind_blob (stmt,
+ sqoff++,
+ key, sizeof (GNUNET_HashCode),
+ POSTGRES_TRANSIENT);
+ if ((vhash != NULL) && (ret == POSTGRES_OK))
+ ret = postgres3_bind_blob (stmt,
+ sqoff++,
+ vhash,
+ sizeof (GNUNET_HashCode), POSTGRES_TRANSIENT);
+ if ((type != 0) && (ret == POSTGRES_OK))
+ ret = postgres3_bind_int (stmt, sqoff++, type);
+ if (ret == POSTGRES_OK)
+ ret = postgres3_bind_int64 (stmt, sqoff++, last_rowid);
+ if (ret == POSTGRES_OK)
+ ret = postgres3_bind_int (stmt, sqoff++, limit_off);
+ if (ret == POSTGRES_OK)
+ {
+ ret = postgres3_step (stmt);
+ if (ret != POSTGRES_ROW)
+ break;
+ datum = assembleDatum (handle, stmt, &rkey, &rowid);
+ last_rowid = rowid + 1;
+ postgres3_reset (stmt);
+ if (datum == NULL)
+ continue;
+ if ((key != NULL) &&
+ (0 != memcmp (&rkey, key, sizeof (GNUNET_HashCode))))
+ {
+ GNUNET_GE_BREAK (NULL, 0);
+ GNUNET_free (datum);
+ continue;
+ }
+ GNUNET_mutex_unlock (lock);
+ count++;
+ ret = iter (&rkey, datum, closure, rowid);
+ GNUNET_mutex_lock (lock);
+ if (ret == GNUNET_SYSERR)
+ {
+ GNUNET_free (datum);
+ ret = POSTGRES_DONE;
+ break;
+ }
+ if (ret == GNUNET_NO)
+ {
+ payload -= getContentDatastoreSize (datum);
+ delete_by_rowid (handle, rowid);
+ }
+ GNUNET_free (datum);
+ }
+ if (count + off == total)
+ last_rowid = 0; /* back to start */
+ if (count == total)
+ break;
+ }
+ postgres3_reset (stmt);
+ postgres3_finalize (stmt);
+ GNUNET_mutex_unlock (lock);
+ return count;
+}
+
+/**
+ * Write content to the db. Always adds a new record
+ * (does NOT overwrite existing data).
+ *
+ * @return GNUNET_SYSERR on error, GNUNET_NO on temporary error, GNUNET_OK if
ok.
+ */
+static int
+put (const GNUNET_HashCode * key, const GNUNET_DatastoreValue * value)
+{
+ int n;
+ postgres3_stmt *stmt;
+ unsigned int contentSize;
+ unsigned int size, type, prio, anon;
+ unsigned long long expir;
+ GNUNET_HashCode vhash;
+ postgresHandle *dbh;
+#if DEBUG_POSTGRES
+ GNUNET_EncName enc;
+
+ IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_BULK | GNUNET_GE_USER,
+ GNUNET_hash_to_enc (key, &enc));
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_BULK | GNUNET_GE_USER,
+ "Storing in database block with type %u/key `%s'/priority
%u/expiration %llu.\n",
+ ntohl (*(int *) &value[1]), &enc, ntohl (value->priority),
+ GNUNET_ntohll (value->expiration_time));
+#endif
+
+ if ((ntohl (value->size) < sizeof (GNUNET_DatastoreValue)))
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return GNUNET_SYSERR;
+ }
+ size = ntohl (value->size);
+ type = ntohl (value->type);
+ prio = ntohl (value->priority);
+ anon = ntohl (value->anonymity_level);
+ expir = GNUNET_ntohll (value->expiration_time);
+ contentSize = size - sizeof (GNUNET_DatastoreValue);
+ GNUNET_hash (&value[1], contentSize, &vhash);
+ GNUNET_mutex_lock (lock);
+ if (lastSync > 1000)
+ syncStats (dbh);
+ stmt = dbh->insertContent;
+ if ((POSTGRES_OK != postgres3_bind_int (stmt, 1, size)) ||
+ (POSTGRES_OK != postgres3_bind_int (stmt, 2, type)) ||
+ (POSTGRES_OK != postgres3_bind_int (stmt, 3, prio)) ||
+ (POSTGRES_OK != postgres3_bind_int (stmt, 4, anon)) ||
+ (POSTGRES_OK != postgres3_bind_int64 (stmt, 5, expir)) ||
+ (POSTGRES_OK !=
+ postgres3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
+ POSTGRES_TRANSIENT)) ||
+ (POSTGRES_OK !=
+ postgres3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
+ POSTGRES_TRANSIENT))
+ || (POSTGRES_OK !=
+ postgres3_bind_blob (stmt, 8, &value[1], contentSize,
+ POSTGRES_TRANSIENT)))
+ {
+ LOG_POSTGRES (dbh,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_bind_XXXX");
+ if (POSTGRES_OK != postgres3_reset (stmt))
+ LOG_POSTGRES (dbh,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_reset");
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+
+ n = postgres3_step (stmt);
+ if (n != POSTGRES_DONE)
+ {
+ if (n == POSTGRES_BUSY)
+ {
+ postgres3_reset (stmt);
+ GNUNET_mutex_unlock (lock);
+ GNUNET_GE_BREAK (NULL, 0);
+ return GNUNET_NO;
+ }
+ LOG_POSTGRES (dbh,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+ postgres3_reset (stmt);
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ if (POSTGRES_OK != postgres3_reset (stmt))
+ LOG_POSTGRES (dbh,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_reset");
+ lastSync++;
+ payload += getContentDatastoreSize (value);
+#if DEBUG_POSTGRES
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Postgres: done writing content\n");
+#endif
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_OK;
+}
+
+/**
+ * Update the priority for a particular key
+ * in the datastore.
+ */
+static int
+update (unsigned long long uid, int delta, GNUNET_CronTime expire)
+{
+ int n;
+
+ GNUNET_mutex_lock (lock);
+ postgres3_bind_int (dbh->updPrio, 1, delta);
+ postgres3_bind_int64 (dbh->updPrio, 2, expire);
+ postgres3_bind_int64 (dbh->updPrio, 3, uid);
+ n = postgres3_step (dbh->updPrio);
+ if (n != POSTGRES_DONE)
+ LOG_POSTGRES (dbh,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres3_step");
+
+ postgres3_reset (dbh->updPrio);
+
+#if DEBUG_POSTGRES
+ GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Postgres: block updated\n");
+#endif
+ GNUNET_mutex_unlock (lock);
+ if (n == POSTGRES_BUSY)
+ return GNUNET_NO;
+ return n == POSTGRES_OK ? GNUNET_OK : GNUNET_SYSERR;
+}
+
+///////////////////////////////////////
+
+
+static void
+postgres_shutdown ()
+{
+ unsigned int idx;
+
+ if (dbh == NULL)
+ return; /* already down */
+#if DEBUG_POSTGRES
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Postgres: closing database\n");
+#endif
+ syncStats ();
+ PQfinish (dbh);
+ dbh = NULL;
+}
+
+/**
+ * Delete the database. The next operation is
+ * guaranteed to be unloading of the module.
+ */
+static void
+drop ()
+{
+ pq_exec ("DROP TABLE gn080");
+ postgres_shutdown ();
+}
+
+
+GNUNET_SQstore_ServiceAPI *
+provide_module_sqstore_postgres (GNUNET_CoreAPIForPlugins * capi)
+{
+ static GNUNET_SQstore_ServiceAPI api;
+
+ ectx = capi->ectx;
+#if DEBUG_POSTGRES
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Postgres: initializing database\n");
+#endif
+
+ payload = 0;
+ lastSync = 0;
+ dbh = init_connection ();
+ if (dbh == NULL)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ return NULL;
+ }
+ payload = getStat ("PAYLOAD");
+ if (payload == GNUNET_SYSERR)
+ {
+ GNUNET_GE_BREAK (ectx, 0);
+ LOG_POSTGRES (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
+ GNUNET_GE_BULK, "postgres_payload");
+ GNUNET_mutex_destroy (lock);
+ return NULL;
+ }
+ lock = GNUNET_mutex_create (GNUNET_NO);
+ coreAPI = capi;
+ stats = coreAPI->service_request ("stats");
+ if (stats)
+ {
+ stat_size = stats->create (gettext_noop ("# bytes in datastore"));
+#if DEBUG_POSTGRES
+ stat_mem = stats->create (gettext_noop ("# bytes allocated by
Postgres"));
+#endif
+ }
+
+ api.getSize = &getSize;
+ api.put = &put;
+ api.get = &get;
+ api.iterateLowPriority = &iterateLowPriority;
+ api.iterateNonAnonymous = &iterateNonAnonymous;
+ api.iterateExpirationTime = &iterateExpirationTime;
+ api.iterateMigrationOrder = &iterateMigrationOrder;
+ api.iterateAllNow = &iterateAllNow;
+ api.drop = &drop;
+ api.update = &update;
+ return &api;
+}
+
+/**
+ * Shutdown the module.
+ */
+void
+release_module_sqstore_postgres ()
+{
+ if (stats != NULL)
+ coreAPI->service_release (stats);
+ postgres_shutdown ();
+#if DEBUG_POSTGRES
+ GNUNET_GE_LOG (ectx,
+ GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
+ "Postgres: database shutdown\n");
+#endif
+ GNUNET_mutex_destroy (lock);
+ lock = NULL;
+ coreAPI = NULL;
+}
+
+
+
+/**
+ * Update postgres database module.
+ *
+ * Currently only makes sure that the postgres indices are created.
+ */
+void
+update_module_sqstore_postgres (GNUNET_UpdateAPI * uapi)
+{
+ payload = 0;
+ lastSync = 0;
+ lock = GNUNET_mutex_create (GNUNET_NO);
+ dbh = init_connection ();
+ if (dbh == NULL)
+ {
+ GNUNET_mutex_destroy (lock);
+ GNUNET_free (fn);
+ fn = NULL;
+ return;
+ }
+ create_indices ();
+ postgres_shutdown ();
+ GNUNET_mutex_destroy (lock);
+}
+
+/* end of postgres.c */
Added: GNUnet/src/applications/sqstore_postgres/postgres_test.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres_test.c
(rev 0)
+++ GNUnet/src/applications/sqstore_postgres/postgres_test.c 2008-12-27
09:40:20 UTC (rev 8035)
@@ -0,0 +1,293 @@
+/*
+ This file is part of GNUnet.
+ (C) 2004, 2005, 2006, 2007 Christian Grothoff (and other contributing
authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+/*
+ * @file applications/sqstore_postgres/postgrestest.c
+ * @brief Test for the sqstore implementations.
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_protocols.h"
+#include "gnunet_sqstore_service.h"
+#include "core.h"
+
+#define ASSERT(x) do { if (! (x)) { printf("Error at %s:%d\n", __FILE__,
__LINE__); goto FAILURE;} } while (0)
+
+static GNUNET_CronTime now;
+
+static GNUNET_DatastoreValue *
+initValue (int i)
+{
+ GNUNET_DatastoreValue *value;
+
+ value = GNUNET_malloc (sizeof (GNUNET_DatastoreValue) + 8 * i);
+ value->size = htonl (sizeof (GNUNET_DatastoreValue) + 8 * i);
+ value->type = htonl (i);
+ value->priority = htonl (i + 1);
+ value->anonymity_level = htonl (i);
+ value->expiration_time = GNUNET_htonll (now - i * GNUNET_CRON_SECONDS);
+ memset (&value[1], i, 8 * i);
+ return value;
+}
+
+static int
+checkValue (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val, void *closure,
+ unsigned long long uid)
+{
+ int i;
+ int ret;
+ GNUNET_DatastoreValue *value;
+
+ i = *(int *) closure;
+ value = initValue (i);
+ if ((value->size == val->size) &&
+ (0 == memcmp (val, value, ntohl (val->size))))
+ ret = GNUNET_OK;
+ else
+ {
+ /*
+ printf("Wanted: %u, %llu; got %u, %llu - %d\n",
+ ntohl(value->size), GNUNET_ntohll(value->expiration_time),
+ ntohl(val->size), GNUNET_ntohll(val->expiration_time),
+ memcmp(val, value, ntohl(val->size))); */
+ ret = GNUNET_SYSERR;
+ }
+ GNUNET_free (value);
+ return ret;
+}
+
+static int
+iterateUp (const GNUNET_HashCode * key, const GNUNET_DatastoreValue * val,
+ int *closure, unsigned long long uid)
+{
+ int ret;
+
+ ret = checkValue (key, val, closure, uid);
+ (*closure) += 2;
+ return ret;
+}
+
+static int
+iterateDown (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val, int *closure,
+ unsigned long long uid)
+{
+ int ret;
+
+ (*closure) -= 2;
+ ret = checkValue (key, val, closure, uid);
+ return ret;
+}
+
+static int
+iterateDelete (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val, void *closure,
+ unsigned long long uid)
+{
+ return GNUNET_NO;
+}
+
+static int
+iteratePriority (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val,
+ GNUNET_SQstore_ServiceAPI * api, unsigned long long uid)
+{
+ api->update (uid, 4, 0);
+ return GNUNET_OK;
+}
+
+static int
+priorityCheck (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val, int *closure,
+ unsigned long long uid)
+{
+ int id;
+
+ id = (*closure);
+ if (id + 1 == ntohl (val->priority))
+ return GNUNET_OK;
+ fprintf (stderr,
+ "Wrong priority, wanted %u got %u\n", id + 1,
+ ntohl (val->priority));
+ return GNUNET_SYSERR;
+}
+
+static int
+multipleCheck (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val,
+ GNUNET_DatastoreValue ** last, unsigned long long uid)
+{
+ if (*last != NULL)
+ {
+ if (((*last)->size == val->size) &&
+ (0 == memcmp (*last, val, ntohl (val->size))))
+ return GNUNET_SYSERR; /* duplicate! */
+ GNUNET_free (*last);
+ }
+ *last = GNUNET_malloc (ntohl (val->size));
+ memcpy (*last, val, ntohl (val->size));
+ return GNUNET_OK;
+}
+
+
+/**
+ * Add testcode here!
+ */
+static int
+test (GNUNET_SQstore_ServiceAPI * api)
+{
+ GNUNET_DatastoreValue *value;
+ GNUNET_HashCode key;
+ unsigned long long oldSize;
+ int i;
+
+ now = 1000000;
+ oldSize = api->getSize ();
+ for (i = 0; i < 256; i++)
+ {
+ value = initValue (i);
+ memset (&key, 256 - i, sizeof (GNUNET_HashCode));
+ ASSERT (GNUNET_OK == api->put (&key, value));
+ GNUNET_free (value);
+ }
+ ASSERT (oldSize < api->getSize ());
+ for (i = 255; i >= 0; i--)
+ {
+ memset (&key, 256 - i, sizeof (GNUNET_HashCode));
+ ASSERT (1 == api->get (&key, NULL, i, &checkValue, (void *) &i));
+ }
+ ASSERT (256 ==
+ api->iterateLowPriority (GNUNET_ECRS_BLOCKTYPE_ANY, NULL, NULL));
+ ASSERT (256 ==
+ api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY, NULL, NULL));
+ for (i = 255; i >= 0; i--)
+ {
+ memset (&key, 256 - i, sizeof (GNUNET_HashCode));
+ ASSERT (1 == api->get (&key, NULL, i, &checkValue, (void *) &i));
+ }
+
+ oldSize = api->getSize ();
+ for (i = 255; i >= 0; i -= 2)
+ {
+ memset (&key, 256 - i, sizeof (GNUNET_HashCode));
+ value = initValue (i);
+ if (1 != api->get (&key, NULL, 0, &iterateDelete, NULL))
+ {
+ GNUNET_free (value);
+ ASSERT (0);
+ }
+ GNUNET_free (value);
+ }
+ ASSERT (oldSize > api->getSize ());
+ i = 0;
+ ASSERT (128 == api->iterateLowPriority (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ iterateUp, &i));
+ ASSERT (256 == i);
+ ASSERT (128 == api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ iterateDown, &i));
+ ASSERT (0 == i);
+ ASSERT (128 == api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ iterateDelete, api));
+ i = 0;
+ ASSERT (0 ==
+ api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ iterateDown, &i));
+ i = 42;
+ value = initValue (i);
+ memset (&key, 256 - i, sizeof (GNUNET_HashCode));
+ api->put (&key, value);
+ ASSERT (1 == api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ priorityCheck, &i));
+ ASSERT (1 == api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ priorityCheck, &i));
+ ASSERT (1 ==
+ api->iterateAllNow ((GNUNET_DatastoreValueIterator) &
+ iteratePriority, api));
+ i += 4;
+ ASSERT (1 == api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ priorityCheck, &i));
+ GNUNET_free (value);
+
+ /* test multiple results */
+ value = initValue (i + 1);
+ api->put (&key, value);
+ GNUNET_free (value);
+
+ value = NULL;
+ ASSERT (2 == api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY,
+ (GNUNET_DatastoreValueIterator) &
+ multipleCheck, &value));
+ GNUNET_free (value);
+ ASSERT (2 ==
+ api->iterateAllNow ((GNUNET_DatastoreValueIterator) & iterateDelete,
+ api));
+ ASSERT (0 ==
+ api->iterateExpirationTime (GNUNET_ECRS_BLOCKTYPE_ANY, NULL, NULL));
+ api->drop ();
+
+ return GNUNET_OK;
+
+FAILURE:
+ api->drop ();
+ return GNUNET_SYSERR;
+}
+
+int
+main (int argc, char *argv[])
+{
+ GNUNET_SQstore_ServiceAPI *api;
+ int ok;
+ struct GNUNET_GC_Configuration *cfg;
+ struct GNUNET_CronManager *cron;
+
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+ cron = GNUNET_cron_create (NULL);
+ GNUNET_CORE_init (NULL, cfg, cron, NULL);
+ api = GNUNET_CORE_request_service ("sqstore");
+ if (api != NULL)
+ {
+ ok = test (api);
+ GNUNET_CORE_release_service (api);
+ }
+ else
+ ok = GNUNET_SYSERR;
+ GNUNET_CORE_done ();
+ GNUNET_cron_destroy (cron);
+ GNUNET_GC_free (cfg);
+ if (ok == GNUNET_SYSERR)
+ return 1;
+ return 0;
+}
+
+/* end of postgrestest.c */
Added: GNUnet/src/applications/sqstore_postgres/postgres_test2.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres_test2.c
(rev 0)
+++ GNUnet/src/applications/sqstore_postgres/postgres_test2.c 2008-12-27
09:40:20 UTC (rev 8035)
@@ -0,0 +1,284 @@
+/*
+ This file is part of GNUnet.
+ (C) 2004, 2005, 2006, 2007 Christian Grothoff (and other contributing
authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+/*
+ * @file applications/sqstore_postgres/postgrestest2.c
+ * @brief Test for the sqstore implementations.
+ * @author Christian Grothoff
+ *
+ * This testcase inserts a bunch of (variable size) data and then deletes
+ * data until the (reported) database size drops below a given threshold.
+ * This is iterated 10 times, with the actual size of the content stored,
+ * the database size reported and the file size on disk being printed for
+ * each iteration. The code also prints a "I" for every 40 blocks
+ * inserted and a "D" for every 40 blocks deleted. The deletion
+ * strategy alternates between "lowest priority" and "earliest expiration".
+ * Priorities and expiration dates are set using a pseudo-random value
+ * within a realistic range.
+ * <p>
+ *
+ * Note that the disk overhead calculations are not very sane for
+ * MySQL: we take the entire /var/lib/mysql directory (best we can
+ * do for ISAM), which may contain other data and which never
+ * shrinks. The scanning of the entire mysql directory during
+ * each report is also likely to be the cause of a minor
+ * slowdown compared to postgres.<p>
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_protocols.h"
+#include "gnunet_sqstore_service.h"
+#include "core.h"
+
+#define ASSERT(x) do { if (! (x)) { printf("Error at %s:%d\n", __FILE__,
__LINE__); goto FAILURE;} } while (0)
+
+/**
+ * Target datastore size (in bytes).
+ * <p>
+ * Example impact of total size on the reported number
+ * of operations (insert and delete) per second (once
+ * roughly stabilized -- this is not "sound" experimental
+ * data but just a rough idea) for a particular machine:
+ * <pre>
+ * 4: 60 at 7k ops total
+ * 8: 50 at 3k ops total
+ * 16: 48 at 8k ops total
+ * 32: 46 at 8k ops total
+ * 64: 61 at 9k ops total
+ * 128: 89 at 9k ops total
+ * 4092: 11 at 383k ops total (12 GB stored, 14.8 GB DB size on disk, 2.5 GB
reported)
+ * </pre>
+ * Pure insertion performance into an empty DB initially peaks
+ * at about 400 ops. The performance seems to drop especially
+ * once the existing (fragmented) ISAM space is filled up and
+ * the DB needs to grow on disk. This could be explained with
+ * ISAM looking more carefully for defragmentation opportunities.
+ * <p>
+ * MySQL disk space overheads (for otherwise unused database when
+ * run with 128 MB target data size; actual size 651 MB, useful
+ * data stored 520 MB) are quite large in the range of 25-30%.
+ * <p>
+ * This kind of processing seems to be IO bound (system is roughly
+ * at 90% wait, 10% CPU). This is with MySQL 5.0.
+ *
+ */
+#define MAX_SIZE 1024LL * 1024 * 16
+
+/**
+ * Report progress outside of major reports? Should probably be GNUNET_YES if
+ * size is > 16 MB.
+ */
+#define REPORT_ID GNUNET_NO
+
+/**
+ * Number of put operations equivalent to 1/10th of MAX_SIZE
+ */
+#define PUT_10 MAX_SIZE / 32 / 1024 / 10
+
+/**
+ * Progress report frequency. 1/10th of a put operation block.
+ */
+#define REP_FREQ PUT_10 / 10
+
+/**
+ * Total number of iterations (each iteration doing
+ * PUT_10 put operations); we report full status every
+ * 10 iterations. Abort with CTRL-C.
+ */
+#define ITERATIONS 100
+
+/**
+ * Name of the database on disk.
+ * You may have to adjust this path and the access
+ * permission to the respective directory in order
+ * to obtain all of the performance information.
+ */
+#define DB_NAME "/tmp/gnunet-postgres-sqstore-test/data/fs/"
+
+static unsigned long long stored_bytes;
+
+static unsigned long long stored_entries;
+
+static unsigned long long stored_ops;
+
+static GNUNET_CronTime start_time;
+
+static int
+putValue (GNUNET_SQstore_ServiceAPI * api, int i, int k)
+{
+ GNUNET_DatastoreValue *value;
+ size_t size;
+ static GNUNET_HashCode key;
+ static int ic;
+
+ /* most content is 32k */
+ size = sizeof (GNUNET_DatastoreValue) + 32 * 1024;
+ if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 16) == 0) /* but some of
it is less! */
+ size =
+ sizeof (GNUNET_DatastoreValue) +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 32 * 1024);
+ size = size - (size & 7); /* always multiple of 8 */
+
+ /* generate random key */
+ GNUNET_hash (&key, sizeof (GNUNET_HashCode), &key);
+ value = GNUNET_malloc (size);
+ value->size = htonl (size);
+ value->type = htonl (i);
+ value->priority =
+ htonl (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 100));
+ value->anonymity_level = htonl (i);
+ value->expiration_time =
+ GNUNET_htonll (GNUNET_get_time () +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 1000));
+ memset (&value[1], i, size - sizeof (GNUNET_DatastoreValue));
+ if (i > 255)
+ memset (&value[1], i - 255, (size - sizeof (GNUNET_DatastoreValue)) / 2);
+ ((char *) &value[1])[0] = k;
+ if (GNUNET_OK != api->put (&key, value))
+ {
+ GNUNET_free (value);
+ fprintf (stderr, "E");
+ return GNUNET_SYSERR;
+ }
+ ic++;
+#if REPORT_ID
+ if (ic % REP_FREQ == 0)
+ fprintf (stderr, "I");
+#endif
+ stored_bytes += ntohl (value->size);
+ stored_ops++;
+ stored_entries++;
+ GNUNET_free (value);
+ return GNUNET_OK;
+}
+
+static int
+iterateDelete (const GNUNET_HashCode * key,
+ const GNUNET_DatastoreValue * val, void *cls,
+ unsigned long long uid)
+{
+ GNUNET_SQstore_ServiceAPI *api = cls;
+ static int dc;
+
+ if (api->getSize () < MAX_SIZE)
+ return GNUNET_SYSERR;
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ return GNUNET_SYSERR;
+ dc++;
+#if REPORT_ID
+ if (dc % REP_FREQ == 0)
+ fprintf (stderr, "D");
+#endif
+ stored_bytes -= ntohl (val->size);
+ stored_entries--;
+ return GNUNET_NO;
+}
+
+/**
+ * Add testcode here!
+ */
+static int
+test (GNUNET_SQstore_ServiceAPI * api)
+{
+ int i;
+ int j;
+ unsigned long long size;
+ int have_file;
+ struct stat sbuf;
+
+ have_file = 0 == stat (DB_NAME, &sbuf);
+
+ for (i = 0; i < ITERATIONS; i++)
+ {
+#if REPORT_ID
+ fprintf (stderr, ".");
+#endif
+ /* insert data equivalent to 1/10th of MAX_SIZE */
+ for (j = 0; j < PUT_10; j++)
+ {
+ ASSERT (GNUNET_OK == putValue (api, j, i));
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ }
+
+ /* trim down below MAX_SIZE again */
+ if ((i % 2) == 0)
+ api->iterateLowPriority (0, &iterateDelete, api);
+ else
+ api->iterateExpirationTime (0, &iterateDelete, api);
+
+ size = 0;
+ if (have_file)
+ GNUNET_disk_file_size (NULL, DB_NAME, &size, GNUNET_NO);
+ printf (
+#if REPORT_ID
+ "\n"
+#endif
+ "Useful %llu, API %llu, disk %llu (%.2f%%) / %lluk ops / %llu
ops/s\n", stored_bytes / 1024, /* used size in k */
+ api->getSize () / 1024, /* API-reported size in k */
+ size / 1024, /* disk size in kb */
+ (100.0 * size / stored_bytes) - 100, /* overhead */
+ (stored_ops * 2 - stored_entries) / 1024, /* total
operations (in k) */
+ 1000 * (stored_ops * 2 - stored_entries) / (1 + GNUNET_get_time
() - start_time)); /* operations per second */
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ }
+ api->drop ();
+ return GNUNET_OK;
+
+FAILURE:
+ api->drop ();
+ return GNUNET_SYSERR;
+}
+
+int
+main (int argc, char *argv[])
+{
+ GNUNET_SQstore_ServiceAPI *api;
+ int ok;
+ struct GNUNET_GC_Configuration *cfg;
+ struct GNUNET_CronManager *cron;
+
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+ cron = GNUNET_cron_create (NULL);
+ GNUNET_CORE_init (NULL, cfg, cron, NULL);
+ api = GNUNET_CORE_request_service ("sqstore");
+ if (api != NULL)
+ {
+ start_time = GNUNET_get_time ();
+ ok = test (api);
+ GNUNET_CORE_release_service (api);
+ }
+ else
+ ok = GNUNET_SYSERR;
+ GNUNET_CORE_done ();
+ GNUNET_cron_destroy (cron);
+ GNUNET_GC_free (cfg);
+ if (ok == GNUNET_SYSERR)
+ return 1;
+ return 0;
+}
+
+/* end of mysqltest2.c */
Added: GNUnet/src/applications/sqstore_postgres/postgres_test3.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres_test3.c
(rev 0)
+++ GNUnet/src/applications/sqstore_postgres/postgres_test3.c 2008-12-27
09:40:20 UTC (rev 8035)
@@ -0,0 +1,208 @@
+/*
+ This file is part of GNUnet.
+ (C) 2004, 2005, 2006, 2007 Christian Grothoff (and other contributing
authors)
+
+ GNUnet is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published
+ by the Free Software Foundation; either version 2, or (at your
+ option) any later version.
+
+ GNUnet is distributed in the hope that it will be useful, but
+ WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with GNUnet; see the file COPYING. If not, write to the
+ Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ Boston, MA 02111-1307, USA.
+*/
+/*
+ * @file applications/sqstore_postgres/postgrestest3.c
+ * @brief Profile sqstore iterators.
+ * @author Christian Grothoff
+ */
+
+#include "platform.h"
+#include "gnunet_util.h"
+#include "gnunet_protocols.h"
+#include "gnunet_sqstore_service.h"
+#include "core.h"
+
+/**
+ * Target datastore size (in bytes). Realistic sizes are
+ * more like 16 GB (not the default of 16 MB); however,
+ * those take too long to run them in the usual "make check"
+ * sequence. Hence the value used for shipping is tiny.
+ */
+#define MAX_SIZE 1024LL * 1024 * 128
+
+#define ITERATIONS 10
+
+/**
+ * Number of put operations equivalent to 1/10th of MAX_SIZE
+ */
+#define PUT_10 (MAX_SIZE / 32 / 1024 / ITERATIONS)
+
+static unsigned long long stored_bytes;
+
+static unsigned long long stored_entries;
+
+static unsigned long long stored_ops;
+
+static GNUNET_CronTime start_time;
+
+static int
+putValue (GNUNET_SQstore_ServiceAPI * api, int i, int k)
+{
+ GNUNET_DatastoreValue *value;
+ size_t size;
+ static GNUNET_HashCode key;
+ static int ic;
+
+ /* most content is 32k */
+ size = sizeof (GNUNET_DatastoreValue) + 32 * 1024;
+
+ if (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 16) == 0) /* but some of
it is less! */
+ size =
+ sizeof (GNUNET_DatastoreValue) +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 32 * 1024);
+ size = size - (size & 7); /* always multiple of 8 */
+
+ /* generate random key */
+ key.bits[0] = (unsigned int) GNUNET_get_time ();
+ GNUNET_hash (&key, sizeof (GNUNET_HashCode), &key);
+ value = GNUNET_malloc (size);
+ value->size = htonl (size);
+ value->type = htonl (i);
+ value->priority =
+ htonl (GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 100));
+ value->anonymity_level = htonl (i);
+ value->expiration_time =
+ GNUNET_htonll (GNUNET_get_time () + 60 * GNUNET_CRON_HOURS +
+ GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, 1000));
+ memset (&value[1], i, size - sizeof (GNUNET_DatastoreValue));
+ if (i > 255)
+ memset (&value[1], i - 255, (size - sizeof (GNUNET_DatastoreValue)) / 2);
+ ((char *) &value[1])[0] = k;
+ if (GNUNET_OK != api->put (&key, value))
+ {
+ GNUNET_free (value);
+ fprintf (stderr, "E");
+ return GNUNET_SYSERR;
+ }
+ ic++;
+ stored_bytes += ntohl (value->size);
+ stored_ops++;
+ stored_entries++;
+ GNUNET_free (value);
+ return GNUNET_OK;
+}
+
+static int
+iterateDummy (const GNUNET_HashCode * key, const GNUNET_DatastoreValue * val,
+ void *cls, unsigned long long uid)
+{
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ return GNUNET_SYSERR;
+ return GNUNET_OK;
+}
+
+static int
+test (GNUNET_SQstore_ServiceAPI * api)
+{
+ int i;
+ int j;
+ int ret;
+ GNUNET_CronTime start;
+ GNUNET_CronTime end;
+
+ for (i = 0; i < ITERATIONS; i++)
+ {
+ /* insert data equivalent to 1/10th of MAX_SIZE */
+ start = GNUNET_get_time ();
+ for (j = 0; j < PUT_10; j++)
+ {
+ if (GNUNET_OK != putValue (api, j, i))
+ break;
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ }
+ end = GNUNET_get_time ();
+ printf ("%3u insertion took %20llums\n", i, end - start);
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ start = GNUNET_get_time ();
+ ret = api->iterateLowPriority (0, &iterateDummy, api);
+ end = GNUNET_get_time ();
+ printf ("%3u low priority iteration took %20llums (%d)\n", i,
+ end - start, ret);
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ start = GNUNET_get_time ();
+ ret = api->iterateExpirationTime (0, &iterateDummy, api);
+ end = GNUNET_get_time ();
+ printf ("%3u expiration t iteration took %20llums (%d)\n", i,
+ end - start, ret);
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ start = GNUNET_get_time ();
+ ret = api->iterateNonAnonymous (0, &iterateDummy, api);
+ end = GNUNET_get_time ();
+ printf ("%3u non anonymou iteration took %20llums (%d)\n", i,
+ end - start, ret);
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ start = GNUNET_get_time ();
+ ret = api->iterateMigrationOrder (&iterateDummy, api);
+ end = GNUNET_get_time ();
+ printf ("%3u migration or iteration took %20llums (%d)\n", i,
+ end - start, ret);
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ start = GNUNET_get_time ();
+ ret = api->iterateAllNow (&iterateDummy, api);
+ end = GNUNET_get_time ();
+ printf ("%3u all now iteration took %20llums (%d)\n", i,
+ end - start, ret);
+ if (GNUNET_shutdown_test () == GNUNET_YES)
+ break;
+ }
+ api->drop ();
+ return GNUNET_OK;
+}
+
+int
+main (int argc, char *argv[])
+{
+ GNUNET_SQstore_ServiceAPI *api;
+ int ok;
+ struct GNUNET_GC_Configuration *cfg;
+ struct GNUNET_CronManager *cron;
+
+ cfg = GNUNET_GC_create ();
+ if (-1 == GNUNET_GC_parse_configuration (cfg, "check.conf"))
+ {
+ GNUNET_GC_free (cfg);
+ return -1;
+ }
+ cron = GNUNET_cron_create (NULL);
+ GNUNET_CORE_init (NULL, cfg, cron, NULL);
+ api = GNUNET_CORE_request_service ("sqstore");
+ if (api != NULL)
+ {
+ start_time = GNUNET_get_time ();
+ ok = test (api);
+ GNUNET_CORE_release_service (api);
+ }
+ else
+ ok = GNUNET_SYSERR;
+ GNUNET_CORE_done ();
+ GNUNET_cron_destroy (cron);
+ GNUNET_GC_free (cfg);
+ if (ok == GNUNET_SYSERR)
+ return 1;
+ return 0;
+}
+
+/* end of postgrestest3.c */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r8035 - in GNUnet: . src/applications src/applications/sqstore_postgres,
gnunet <=