[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r8082 - GNUnet/src/applications/sqstore_postgres
From: |
gnunet |
Subject: |
[GNUnet-SVN] r8082 - GNUnet/src/applications/sqstore_postgres |
Date: |
Sat, 3 Jan 2009 01:16:29 -0700 (MST) |
Author: grothoff
Date: 2009-01-03 01:16:29 -0700 (Sat, 03 Jan 2009)
New Revision: 8082
Modified:
GNUnet/src/applications/sqstore_postgres/postgres.c
GNUnet/src/applications/sqstore_postgres/postgres_test2.c
Log:
fixes
Modified: GNUnet/src/applications/sqstore_postgres/postgres.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres.c 2009-01-03 05:15:50 UTC
(rev 8081)
+++ GNUnet/src/applications/sqstore_postgres/postgres.c 2009-01-03 08:16:29 UTC
(rev 8082)
@@ -36,21 +36,6 @@
#define DEBUG_POSTGRES GNUNET_NO
-/**
- * Die with an error message that indicates
- * a failure of the command 'cmd' with the message given
- * by strerror(errno).
- */
-#define DIE_POSTGRES(cmd) do { GNUNET_GE_LOG(coreAPI->ectx, GNUNET_GE_FATAL |
GNUNET_GE_IMMEDIATE | GNUNET_GE_ADMIN, _("`%s' failed at %s:%d with error:
%s"), cmd, __FILE__, __LINE__, PQerrorMessage(dbh)); abort(); } while(0)
-
-/**
- * Log an error message at log-level 'level' that indicates
- * a failure of the command 'cmd' on file 'filename'
- * with the message given by strerror(errno).
- */
-#define LOG_POSTGRES(level, cmd) do { GNUNET_GE_LOG(coreAPI->ectx, level,
_("`%s' failed at %s:%d with error: %s"), cmd, __FILE__, __LINE__,
PQerrorMessage(dbh)); } while(0)
-
-
#define SELECT_IT_LOW_PRIORITY "(SELECT size, type, prio, anonLevel, expire,
hash, value, oid FROM gn080 "\
"WHERE (prio = $1 AND oid > $2) "
\
"ORDER BY prio ASC,oid ASC LIMIT 1) "\
@@ -129,17 +114,27 @@
static int
check_result (PGresult * ret,
int expected_status,
- const char * command)
+ const char * command,
+ const char * args,
+ int line)
{
if (ret == NULL)
{
+ GNUNET_GE_LOG(coreAPI->ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
+ "Postgres failed to allocate result for `%s:%s' at %d\n",
+ command,
+ args,
+ line);
/* FIXME: report error! */
return GNUNET_SYSERR;
}
if (PQresultStatus (ret) != expected_status)
{
- LOG_POSTGRES (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
- command);
+ GNUNET_GE_LOG(coreAPI->ectx,
+ GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_BULK,
+ _("`%s:%s' failed at %s:%d with error: %s"),
+ command, args, __FILE__, line, PQerrorMessage(dbh));
PQclear(ret);
return GNUNET_SYSERR;
}
@@ -150,15 +145,12 @@
* Run simple SQL statement (without results).
*/
static int
-pq_exec (const char * sql)
+pq_exec (const char * sql, int line)
{
PGresult * ret;
ret = PQexec (dbh, sql);
- if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexec"))
- {
- GNUNET_mutex_unlock (lock);
- return GNUNET_SYSERR;
- }
+ if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexec", sql, line))
+ return GNUNET_SYSERR;
PQclear(ret);
return GNUNET_OK;
}
@@ -169,15 +161,13 @@
static int
pq_prepare (const char * name,
const char * sql,
- int nparms)
+ int nparms,
+ int line)
{
PGresult * ret;
ret = PQprepare (dbh, name, sql, nparms, NULL);
- if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQprepare"))
- {
- GNUNET_mutex_unlock (lock);
- return GNUNET_SYSERR;
- }
+ if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQprepare", sql,
line))
+ return GNUNET_SYSERR;
PQclear(ret);
return GNUNET_OK;
}
@@ -214,7 +204,7 @@
dbh = NULL;
return GNUNET_SYSERR;
}
- pq_exec ("DROP TABLE gn080");
+ pq_exec ("DROP TABLE gn080", __LINE__);
/* FIXME: this could fail if the table already
exists -- add check! */
if ( (GNUNET_OK !=
@@ -227,21 +217,21 @@
" hash BYTEA NOT NULL DEFAULT '',"
" vhash BYTEA NOT NULL DEFAULT '',"
" value BYTEA NOT NULL DEFAULT '')"
- "WITH OIDS")) ||
+ "WITH OIDS", __LINE__)) ||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_hash ON gn080 (hash)")) ||
+ pq_exec ("CREATE INDEX idx_hash ON gn080 (hash)", __LINE__)) ||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)")) ||
+ pq_exec ("CREATE INDEX idx_hash_vhash ON gn080 (hash,vhash)",
__LINE__)) ||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_prio ON gn080 (prio)")) ||
+ pq_exec ("CREATE INDEX idx_prio ON gn080 (prio)", __LINE__)) ||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_expire ON gn080 (expire)")) ||
+ pq_exec ("CREATE INDEX idx_expire ON gn080 (expire)", __LINE__)) ||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)")) ||
+ pq_exec ("CREATE INDEX idx_comb3 ON gn080 (prio,anonLevel)", __LINE__))
||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)")) ||
+ pq_exec ("CREATE INDEX idx_comb4 ON gn080 (prio,hash,anonLevel)",
__LINE__)) ||
(GNUNET_OK !=
- pq_exec ("CREATE INDEX idx_comb7 ON gn080 (expire,hash)")) )
+ pq_exec ("CREATE INDEX idx_comb7 ON gn080 (expire,hash)", __LINE__)) )
{
PQfinish (dbh);
dbh = NULL;
@@ -253,56 +243,67 @@
"SELECT size, type, prio, anonLevel, expire, hash, value,
oid FROM gn080 "
"WHERE hash=$1 AND vhash=$2 AND type=$3 "
"AND oid >= $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
- 5)) ||
+ 5,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("gett",
"SELECT size, type, prio, anonLevel, expire, hash, value,
oid FROM gn080 "
"WHERE hash=$1 AND type=$2"
"AND oid >= $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
- 4)) ||
+ 4,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("getv",
"SELECT size, type, prio, anonLevel, expire, hash, value,
oid FROM gn080 "
"WHERE hash=$1 AND vhash=$2"
"AND oid >= $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
- 4)) ||
+ 4,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("get",
"SELECT size, type, prio, anonLevel, expire, hash, value,
oid FROM gn080 "
"WHERE hash=$1"
"AND oid >= $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
- 3)) ||
+ 3,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("put",
"INSERT INTO gn080 (size, type, prio, anonLevel, expire,
hash, vhash, value) "
"VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
- 8)) ||
+ 8,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("update",
"UPDATE gn080 SET prio = prio + $1, expire = CASE WHEN
expire < $2 THEN $2 ELSE expire END "
"WHERE oid = $3",
- 3)) ||
+ 3,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("select_low_priority",
SELECT_IT_LOW_PRIORITY,
- 2)) ||
+ 2,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("select_non_anonymous",
SELECT_IT_NON_ANONYMOUS,
- 2)) ||
+ 2,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("select_expiration_time",
SELECT_IT_EXPIRATION_TIME,
- 2)) ||
+ 2,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("select_migration_order",
SELECT_IT_MIGRATION_ORDER,
- 3)) ||
+ 3,
+ __LINE__)) ||
(GNUNET_OK !=
pq_prepare("delrow",
"DELETE FROM gn080 "
"WHERE oid=$1",
- 1)) )
+ 1,
+ __LINE__)) )
{
PQfinish (dbh);
dbh = NULL;
@@ -386,7 +387,7 @@
paramLengths,
paramFormats,
1);
- if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexecPrepared"))
+ if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexecPrepared",
"delrow", __LINE__))
{
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
@@ -406,7 +407,9 @@
{
GNUNET_DatastoreValue *value;
unsigned int size;
-
+
+ if (0 == PQntuples (res))
+ return NULL; /* no result */
if ( (1 != PQntuples (res)) ||
(8 != PQnfields (res)) ||
(sizeof(unsigned int) != PQfsize (res, 0)) ||
@@ -416,13 +419,13 @@
return NULL;
}
*rowid = * (unsigned int*) PQgetvalue (res, 0, 7);
- size = * (unsigned int*) PQgetvalue (res, 0, 0);
+ size = ntohl(* (unsigned int*) PQgetvalue (res, 0, 0));
if ( (size < sizeof (GNUNET_DatastoreValue)) ||
(sizeof(unsigned int) != PQfsize (res, 1)) ||
(sizeof(unsigned int) != PQfsize (res, 2)) ||
(sizeof(unsigned int) != PQfsize (res, 3)) ||
(sizeof(unsigned long long) != PQfsize (res, 4)) ||
- (sizeof(GNUNET_HashCode) != PQfsize (res, 5)) ||
+ (sizeof(GNUNET_HashCode) != PQgetlength (res, 0, 5)) ||
(size - sizeof (GNUNET_DatastoreValue) != PQgetlength (res, 0, 6) ) )
{
GNUNET_GE_BREAK (NULL, 0);
@@ -431,10 +434,10 @@
}
value = GNUNET_malloc (size);
value->size = htonl (size);
- value->type = htonl ( * (unsigned int*) PQgetvalue (res, 0, 1));
- value->priority = htonl ( * (unsigned int*) PQgetvalue (res, 0, 2));
- value->anonymity_level = htonl ( * (unsigned int*) PQgetvalue (res, 0, 3));
- value->expiration_time = GNUNET_htonll ( * (unsigned long long*) PQgetvalue
(res, 0, 4));
+ value->type = * (unsigned int*) PQgetvalue (res, 0, 1);
+ value->priority = * (unsigned int*) PQgetvalue (res, 0, 2);
+ value->anonymity_level = * (unsigned int*) PQgetvalue (res, 0, 3);
+ value->expiration_time = * (unsigned long long*) PQgetvalue (res, 0, 4);
memcpy (key, PQgetvalue (res, 0, 5), sizeof (GNUNET_HashCode));
memcpy (&value[1], PQgetvalue (res, 0, 6), size -
sizeof(GNUNET_DatastoreValue));
return value;
@@ -526,7 +529,7 @@
GNUNET_GE_BREAK (NULL, 0);
return GNUNET_SYSERR;
}
- now = GNUNET_get_time ();
+ now = GNUNET_htonll(GNUNET_get_time ());
count = 0;
GNUNET_mutex_lock (lock);
while (1)
@@ -538,16 +541,20 @@
paramLengths,
paramFormats,
1);
- if (GNUNET_OK != check_result (ret, PGRES_TUPLES_OK, "PQexecPrepared"))
+ if (GNUNET_OK != check_result (ret,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared",
+ pname,
+ __LINE__))
{
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
datum = assembleDatum (ret, &key, &last_oid);
if (datum == NULL)
- continue;
- last_prio = ntohl(datum->priority);
- last_expire = GNUNET_ntohll(datum->expiration_time);
+ break; /* iteration complete */
+ last_prio = datum->priority;
+ last_expire = datum->expiration_time;
count++;
if (dviter != NULL)
{
@@ -560,7 +567,11 @@
break;
}
if (iret == GNUNET_NO)
- delete_by_rowid(last_oid);
+ {
+ payload -= getContentDatastoreSize (datum);
+ lastSync++;
+ delete_by_rowid(last_oid);
+ }
}
GNUNET_free (datum);
}
@@ -666,7 +677,7 @@
const GNUNET_HashCode * vhash,
unsigned int type, GNUNET_DatastoreValueIterator iter, void *closure)
{
- unsigned int total;
+ unsigned long long total;
const char * paramValues[5];
int paramLengths[5];
const int paramFormats[] = {1,1,1,1,1};
@@ -674,10 +685,11 @@
unsigned int rowid;
int nparams;
int iret;
+ unsigned int n_type;
const char * pname;
- int count;
- int off;
- int limit_off;
+ long long count;
+ long long off;
+ long long limit_off;
PGresult * ret;
GNUNET_DatastoreValue *datum;
GNUNET_HashCode rkey;
@@ -689,11 +701,12 @@
paramLengths[0] = sizeof(GNUNET_HashCode);
if (type != 0)
{
+ n_type = htonl(type);
if (vhash != NULL)
{
paramValues[1] = (const char*) vhash;
paramLengths[1] = sizeof(GNUNET_HashCode);
- paramValues[2] = (const char*) &type;
+ paramValues[2] = (const char*) &n_type;
paramLengths[2] = sizeof(unsigned int);
paramValues[3] = (const char*) &last_rowid;
paramLengths[3] = sizeof(last_rowid);
@@ -712,7 +725,7 @@
}
else
{
- paramValues[1] = (const char*) &type;
+ paramValues[1] = (const char*) &n_type;
paramLengths[1] = sizeof(unsigned int);
paramValues[2] = (const char*) &last_rowid;
paramLengths[2] = sizeof(last_rowid);
@@ -769,31 +782,38 @@
1);
}
}
- if (GNUNET_OK != check_result (ret, PGRES_TUPLES_OK, "PQexecParams"))
+ if (GNUNET_OK != check_result (ret,
+ PGRES_TUPLES_OK,
+ "PQexecParams",
+ pname,
+ __LINE__))
{
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
- if ( (PQntuples (ret) != 1) ||
- (PQnfields (ret) != 1) ||
- (PQgetlength (ret, 0, 0) != sizeof(unsigned int) ) )
+ if ( (PQntuples (ret) != 1) ||
+ (PQnfields (ret) != 1) ||
+ (PQgetlength (ret, 0, 0) != sizeof(unsigned long long) ) )
{
GNUNET_GE_BREAK (NULL, 0);
PQclear(ret);
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
- total = *(const unsigned int*) PQgetvalue(ret, 0, 0);
+ total = GNUNET_ntohll(*(const unsigned long long*) PQgetvalue(ret, 0, 0));
PQclear(ret);
if ( (iter == NULL) || (total == 0) )
{
GNUNET_mutex_unlock (lock);
+ fprintf(stderr,
+ "Total is %llu\n",
+ total);
return total;
}
count = 0;
last_rowid = 0;
- off = GNUNET_random_u32 (GNUNET_RANDOM_QUALITY_WEAK, total);
+ off = GNUNET_random_u64 (GNUNET_RANDOM_QUALITY_WEAK, total);
while (1)
{
if (count == 0)
@@ -808,7 +828,11 @@
paramLengths,
paramFormats,
1);
- if (GNUNET_OK != check_result (ret, PGRES_TUPLES_OK, "PQexecPrepared"))
+ if (GNUNET_OK != check_result (ret,
+ PGRES_TUPLES_OK,
+ "PQexecPrepared",
+ pname,
+ __LINE__))
{
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
@@ -817,7 +841,12 @@
last_rowid = rowid + 1;
PQclear (ret);
if (datum == NULL)
- continue;
+ {
+ total--;
+ if (count == total)
+ break;
+ continue;
+ }
if ( (key != NULL) &&
(0 != memcmp (&rkey, key, sizeof (GNUNET_HashCode))))
{
@@ -837,6 +866,7 @@
if (iret == GNUNET_NO)
{
payload -= getContentDatastoreSize (datum);
+ lastSync++;
delete_by_rowid (rowid);
}
GNUNET_free (datum);
@@ -859,36 +889,32 @@
put (const GNUNET_HashCode * key, const GNUNET_DatastoreValue * value)
{
unsigned int size = ntohl(value->size);
- unsigned int type = ntohl(value->type);
- unsigned int prio = ntohl(value->priority);
- unsigned int anon = ntohl(value->anonymity_level);
- unsigned long long expir = GNUNET_ntohll(value->expiration_time);
GNUNET_HashCode vhash;
PGresult * ret;
const char * paramValues[] = {
- (const char*) &size,
- (const char*) &type,
- (const char*) &prio,
- (const char*) &anon,
- (const char*) &expir,
+ (const char*) &value->size,
+ (const char*) &value->type,
+ (const char*) &value->priority,
+ (const char*) &value->anonymity_level,
+ (const char*) &value->expiration_time,
(const char*) key,
(const char*) &vhash,
(const char*) &value[1]
};
int paramLengths[] =
{
- sizeof(size),
- sizeof(type),
- sizeof(prio),
- sizeof(anon),
- sizeof(expir),
+ sizeof(value->size),
+ sizeof(value->type),
+ sizeof(value->priority),
+ sizeof(value->anonymity_level),
+ sizeof(value->expiration_time),
sizeof(GNUNET_HashCode),
sizeof(GNUNET_HashCode),
size - sizeof (GNUNET_DatastoreValue)
};
const int paramFormats[] = {1,1,1,1,1,1,1,1};
- if ((ntohl (value->size) < sizeof (GNUNET_DatastoreValue)))
+ if (size < sizeof (GNUNET_DatastoreValue))
{
GNUNET_GE_BREAK (coreAPI->ectx, 0);
return GNUNET_SYSERR;
@@ -904,7 +930,11 @@
paramLengths,
paramFormats,
1);
- if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexecPrepared"))
+ if (GNUNET_OK != check_result (ret,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared",
+ "put",
+ __LINE__))
{
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
@@ -923,17 +953,19 @@
static int
update (unsigned long long uid, int delta, GNUNET_CronTime expire)
{
+ int n_delta = htonl(delta);
+ GNUNET_CronTime n_expire = GNUNET_htonll(expire);
PGresult * ret;
const char * paramValues[] = {
+ (const char*) &n_delta,
+ (const char*) &n_expire,
(const char*) &uid,
- (const char*) &delta,
- (const char*) &expire,
};
int paramLengths[] =
{
+ sizeof(n_delta),
+ sizeof(n_expire),
sizeof(uid),
- sizeof(delta),
- sizeof(expire),
};
const int paramFormats[] = {1,1,1};
@@ -945,7 +977,11 @@
paramLengths,
paramFormats,
1);
- if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexecPrepared"))
+ if (GNUNET_OK != check_result (ret,
+ PGRES_COMMAND_OK,
+ "PQexecPrepared",
+ "update",
+ __LINE__))
{
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
@@ -979,7 +1015,7 @@
static void
drop ()
{
- pq_exec ("DROP TABLE gn080");
+ pq_exec ("DROP TABLE gn080", __LINE__);
postgres_shutdown ();
}
@@ -998,21 +1034,20 @@
payload = 0;
lastSync = 0;
+ lock = GNUNET_mutex_create (GNUNET_NO);
if (GNUNET_OK != init_connection ())
{
GNUNET_GE_BREAK (coreAPI->ectx, 0);
+ GNUNET_mutex_destroy (lock);
return NULL;
}
payload = getStat ("PAYLOAD");
if (payload == GNUNET_SYSERR)
{
GNUNET_GE_BREAK (coreAPI->ectx, 0);
- LOG_POSTGRES (GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "postgres_payload");
GNUNET_mutex_destroy (lock);
return NULL;
}
- lock = GNUNET_mutex_create (GNUNET_NO);
coreAPI = capi;
stats = coreAPI->service_request ("stats");
if (stats)
Modified: GNUnet/src/applications/sqstore_postgres/postgres_test2.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres_test2.c 2009-01-03
05:15:50 UTC (rev 8081)
+++ GNUnet/src/applications/sqstore_postgres/postgres_test2.c 2009-01-03
08:16:29 UTC (rev 8082)
@@ -231,7 +231,8 @@
#if REPORT_ID
"\n"
#endif
- "Useful %llu, API %llu, disk %llu (%.2f%%) / %lluk ops / %llu
ops/s\n", stored_bytes / 1024, /* used size in k */
+ "Useful %llu, API %llu, disk %llu (%.2f%%) / %lluk ops / %llu
ops/s\n",
+ stored_bytes / 1024, /* used size in k */
api->getSize () / 1024, /* API-reported size in k */
size / 1024, /* disk size in kb */
(100.0 * size / stored_bytes) - 100, /* overhead */
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r8082 - GNUnet/src/applications/sqstore_postgres,
gnunet <=