[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r8052 - GNUnet/src/applications/sqstore_postgres
From: |
gnunet |
Subject: |
[GNUnet-SVN] r8052 - GNUnet/src/applications/sqstore_postgres |
Date: |
Tue, 30 Dec 2008 01:44:07 -0700 (MST) |
Author: grothoff
Date: 2008-12-30 01:44:06 -0700 (Tue, 30 Dec 2008)
New Revision: 8052
Modified:
GNUnet/src/applications/sqstore_postgres/postgres.c
Log:
more pg hacking
Modified: GNUnet/src/applications/sqstore_postgres/postgres.c
===================================================================
--- GNUnet/src/applications/sqstore_postgres/postgres.c 2008-12-30 08:12:10 UTC
(rev 8051)
+++ GNUnet/src/applications/sqstore_postgres/postgres.c 2008-12-30 08:44:06 UTC
(rev 8052)
@@ -50,38 +50,46 @@
*/
#define LOG_POSTGRES(level, cmd) do { GNUNET_GE_LOG(ectx, level, _("`%s'
failed at %s:%d with error: %s\n"), cmd, __FILE__, __LINE__,
PQerrorMessage(dbh)); } while(0)
-#define SELECT_IT_LOW_PRIORITY_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio = $1 AND hash > $2) "\
- "ORDER BY hash ASC LIMIT 1"
-#define SELECT_IT_LOW_PRIORITY_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio > $1) "\
- "ORDER BY prio ASC, hash ASC LIMIT 1"
+#define SELECT_IT_LOW_PRIORITY "(SELECT size, type, prio, anonLevel, expire,
hash, value, oid FROM gn080 "\
+ "FORCE INDEX(prio) WHERE (prio = $1 AND oid >
$2) " \
+ "ORDER BY prio ASC,oid ASC LIMIT 1) "\
+ "UNION "\
+ "(SELECT size, type, prio, anonLevel, expire,
hash, value, oid FROM gn080 "\
+ "FORCE INDEX(prio) WHERE (prio > $1 AND oid !=
$2)"\
+ "ORDER BY prio ASC,oid ASC LIMIT 1)"\
+ "ORDER BY prio ASC,oid ASC LIMIT 1"
-#define SELECT_IT_NON_ANONYMOUS_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio = $1 AND hash < $2 AND anonLevel = 0) "\
- " ORDER BY hash DESC LIMIT 1"
+#define SELECT_IT_NON_ANONYMOUS "(SELECT size, type, prio, anonLevel, expire,
hash, value, oid FROM gn080 "\
+ "FORCE INDEX(prio) WHERE (prio = $1 AND oid <
$2)"\
+ " AND anonLevel=0 AND type != 0xFFFFFFFF ORDER
BY prio DESC,oid DESC LIMIT 1) "\
+ "UNION "\
+ "(SELECT size, type, prio, anonLevel, expire,
hash, value, oid FROM gn080 "\
+ "FORCE INDEX(prio) WHERE (prio < $1 AND oid !=
$2)"\
+ " AND anonLevel=0 AND type != 0xFFFFFFFF ORDER
BY prio DESC,oid DESC LIMIT 1) "\
+ "ORDER BY prio DESC,oid DESC LIMIT 1"
-#define SELECT_IT_NON_ANONYMOUS_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(prio < $1 AND anonLevel = 0)"\
- " ORDER BY prio DESC, hash DESC LIMIT 1"
+#define SELECT_IT_EXPIRATION_TIME "(SELECT size, type, prio, anonLevel,
expire, hash, value, oid FROM gn080 "\
+ "FORCE INDEX(expire) WHERE (expire = $1 AND
oid > $2) "\
+ "ORDER BY expire ASC,oid ASC LIMIT 1) "\
+ "UNION "\
+ "(SELECT size, type, prio, anonLevel,
expire, hash, value, oid FROM gn080 "\
+ "FORCE INDEX(expire) WHERE (expire > $1 AND
oid != $2) " \
+ "ORDER BY expire ASC,oid ASC LIMIT 1)"\
+ "ORDER BY expire ASC,oid ASC LIMIT 1"
-#define SELECT_IT_EXPIRATION_TIME_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire = $1 AND hash > $2) "\
- " ORDER BY hash ASC LIMIT 1"
-#define SELECT_IT_EXPIRATION_TIME_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire > $1) "\
- " ORDER BY expire ASC, hash ASC LIMIT 1"
+#define SELECT_IT_MIGRATION_ORDER "(SELECT size, type, prio, anonLevel,
expire, hash, value, oid FROM gn080 "\
+ "FORCE INDEX(expire) WHERE (expire = $1 AND
oid < $2)"\
+ " AND expire > $3 AND type!=3"\
+ " ORDER BY expire DESC,oid DESC LIMIT 1) "\
+ "UNION "\
+ "(SELECT size, type, prio, anonLevel,
expire, hash, value, oid FROM gn080 "\
+ "FORCE INDEX(expire) WHERE (expire < $1 AND
oid != $2)" \
+ " AND expire > $3 AND type!=3"\
+ " ORDER BY expire DESC,oid DESC LIMIT 1)"\
+ "ORDER BY expire DESC,oid DESC LIMIT 1"
-#define SELECT_IT_MIGRATION_ORDER_1 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire = $1 AND hash < $2) "\
- " ORDER BY hash DESC LIMIT 1"
-
-#define SELECT_IT_MIGRATION_ORDER_2 \
- "SELECT size,type,prio,anonLevel,expire,hash,value,_ROWID_ FROM gn080 WHERE
(expire < $1) "\
- " ORDER BY expire DESC, hash DESC LIMIT 1"
-
/**
* After how many ms "busy" should a DB operation fail for good?
* A low value makes sure that we are more responsive to requests
@@ -265,6 +273,32 @@
"AND oid >= $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
3)) ||
(GNUNET_OK !=
+ pq_prepare("put",
+ "INSERT INTO gn080 (size, type, prio, anonLevel, expire,
hash, vhash, value) "
+ "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
+ 8)) ||
+ (GNUNET_OK !=
+ pq_prepare("update",
+ "UPDATE gn080 SET prio = prio + $1, expire = MAX(expire,
$2) "
+ "WHERE oid = $3",
+ 3)) ||
+ (GNUNET_OK !=
+ pq_prepare("select_low_priority",
+ SELECT_IT_LOW_PRIORITY,
+ 2)) ||
+ (GNUNET_OK !=
+ pq_prepare("select_non_anonymous",
+ SELECT_IT_NON_ANONYMOUS,
+ 2)) ||
+ (GNUNET_OK !=
+ pq_prepare("select_expiration_time",
+ SELECT_IT_EXPIRATION_TIME,
+ 2)) ||
+ (GNUNET_OK !=
+ pq_prepare("select_migration_order",
+ SELECT_IT_MIGRATION_ORDER,
+ 3)) ||
+ (GNUNET_OK !=
pq_prepare("delrow",
"DELETE FROM gn080 "
"WHERE oid=$1",
@@ -425,166 +459,113 @@
GNUNET_DatastoreValueIterator dviter,
void *closure)
{
-#if 0
GNUNET_DatastoreValue *datum;
int count;
- int ret;
+ const char * pname;
+ int pcount;
+ int iret;
+ PGresult * ret;
unsigned int last_prio;
unsigned long long last_expire;
- unsigned long long last_vkey;
- unsigned int size;
- unsigned int rtype;
- unsigned int prio;
- unsigned int level;
- unsigned long long expiration;
- unsigned long long vkey;
- unsigned long hashSize;
- GNUNET_HashCode key;
+ unsigned int last_oid;
GNUNET_CronTime now;
- MYSQL_BIND rbind[7];
+ GNUNET_HashCode key;
+ const char * paramValues[3];
+ int paramLengths[3];
+ const int paramFormats[] = {1,1,1};
if (is_asc)
{
last_prio = 0;
- last_vkey = 0;
+ last_oid = 0;
last_expire = 0;
}
else
{
- last_prio = 0x7FFFFFFFL;
- last_vkey = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits */
- last_expire = 0x7FFFFFFFFFFFFFFFLL; /* MySQL only supports 63 bits
*/
+ last_prio = 0x7FFFFFFFL;
+ last_oid = 0xFFFFFFFF;
+ last_expire = 0x7FFFFFFFFFFFFFFFLL;
}
- hashSize = sizeof (GNUNET_HashCode);
- memset (rbind, 0, sizeof (rbind));
- rbind[0].buffer_type = MYSQL_TYPE_LONG;
- rbind[0].buffer = &size;
- rbind[0].is_unsigned = 1;
- rbind[1].buffer_type = MYSQL_TYPE_LONG;
- rbind[1].buffer = &rtype;
- rbind[1].is_unsigned = 1;
- rbind[2].buffer_type = MYSQL_TYPE_LONG;
- rbind[2].buffer = &prio;
- rbind[2].is_unsigned = 1;
- rbind[3].buffer_type = MYSQL_TYPE_LONG;
- rbind[3].buffer = &level;
- rbind[3].is_unsigned = 1;
- rbind[4].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[4].buffer = &expiration;
- rbind[4].is_unsigned = 1;
- rbind[5].buffer_type = MYSQL_TYPE_BLOB;
- rbind[5].buffer = &key;
- rbind[5].buffer_length = hashSize;
- rbind[5].length = &hashSize;
- rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
- rbind[6].buffer = &vkey;
- rbind[6].is_unsigned = GNUNET_YES;
-
+ switch (iter_select)
+ {
+ case 0:
+ pname = "select_low_priority";
+ pcount = 2;
+ paramValues[0] = (const char*) &last_prio;
+ paramValues[1] = (const char*) &last_oid;
+ paramLengths[0] = sizeof(last_prio);
+ paramLengths[1] = sizeof(last_oid);
+ break;
+ case 1:
+ pname = "select_non_anonymous";
+ pcount = 2;
+ paramValues[0] = (const char*) &last_prio;
+ paramValues[1] = (const char*) &last_oid;
+ paramLengths[0] = sizeof(last_prio);
+ paramLengths[1] = sizeof(last_oid);
+ break;
+ case 2:
+ pname = "select_expiration_time";
+ pcount = 2;
+ paramValues[0] = (const char*) &last_expire;
+ paramValues[1] = (const char*) &last_oid;
+ paramLengths[0] = sizeof(last_expire);
+ paramLengths[1] = sizeof(last_oid);
+ break;
+ case 3:
+ pname = "select_migration_order";
+ pcount = 3;
+ paramValues[0] = (const char*) &last_expire;
+ paramValues[1] = (const char*) &last_oid;
+ paramValues[2] = (const char*) &now;
+ paramLengths[0] = sizeof(last_expire);
+ paramLengths[1] = sizeof(last_oid);
+ paramLengths[2] = sizeof(now);
+ break;
+ default:
+ GNUNET_GE_BREAK (NULL, 0);
+ return GNUNET_SYSERR;
+ }
now = GNUNET_get_time ();
count = 0;
+ GNUNET_mutex_lock (lock);
while (1)
{
- switch (iter_select)
- {
- case 0:
- case 1:
- ret = GNUNET_MYSQL_prepared_statement_run_select (iter[iter_select],
- 7,
- rbind,
- &return_ok,
- NULL,
- MYSQL_TYPE_LONG,
- &last_prio,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_vkey,
- GNUNET_YES,
- MYSQL_TYPE_LONG,
- &last_prio,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_vkey,
- GNUNET_YES, -1);
- break;
- case 2:
- ret = GNUNET_MYSQL_prepared_statement_run_select (iter[iter_select],
- 7,
- rbind,
- &return_ok,
- NULL,
-
MYSQL_TYPE_LONGLONG,
- &last_expire,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_vkey,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_expire,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_vkey,
- GNUNET_YES, -1);
- break;
- case 3:
- ret = GNUNET_MYSQL_prepared_statement_run_select (iter[iter_select],
- 7,
- rbind,
- &return_ok,
- NULL,
-
MYSQL_TYPE_LONGLONG,
- &last_expire,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_vkey,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &now,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_expire,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &last_vkey,
- GNUNET_YES,
-
MYSQL_TYPE_LONGLONG,
- &now,
- GNUNET_YES, -1);
- break;
- default:
- GNUNET_GE_BREAK (NULL, 0);
- return GNUNET_SYSERR;
- }
- if (ret != GNUNET_OK)
- break;
- last_vkey = vkey;
- last_prio = prio;
- last_expire = expiration;
+ ret = PQexecPrepared(dbh,
+ pname,
+ pcount,
+ paramValues,
+ paramLengths,
+ paramFormats,
+ 1);
+ if (GNUNET_OK != check_result (ret, PGRES_TUPLES_OK, "PQexecPrepared"))
+ {
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ datum = assembleDatum (ret, &key, &last_oid);
+ if (datum == NULL)
+ continue;
+ last_prio = ntohl(datum->priority);
+ last_expire = GNUNET_ntohll(datum->expiration_time);
count++;
if (dviter != NULL)
{
- datum = assembleDatum (rbind);
- if (datum == NULL)
- continue;
- ret = dviter (&key, datum, closure, vkey);
- if (ret == GNUNET_SYSERR)
+ GNUNET_mutex_unlock (lock);
+ iret = dviter (&key, datum, closure, last_oid);
+ GNUNET_mutex_lock (lock);
+ if (iret == GNUNET_SYSERR)
{
GNUNET_free (datum);
break;
}
- if (ret == GNUNET_NO)
- {
- do_delete_value (vkey);
- do_delete_entry_by_vkey (vkey);
- GNUNET_mutex_lock (lock);
- content_size -= ntohl (datum->size);
- GNUNET_mutex_unlock (lock);
- }
- GNUNET_free (datum);
- }
+ if (iret == GNUNET_NO)
+ delete_by_rowid(last_oid);
+ }
+ GNUNET_free (datum);
}
+ GNUNET_mutex_unlock (lock);
return count;
-#endif
- return 0;
}
/**
@@ -877,99 +858,62 @@
static int
put (const GNUNET_HashCode * key, const GNUNET_DatastoreValue * value)
{
-#if 0
- int n;
- postgres3_stmt *stmt;
- unsigned int contentSize;
- unsigned int size, type, prio, anon;
- unsigned long long expir;
+ unsigned int size = ntohl(value->size);
+ unsigned int type = ntohl(value->type);
+ unsigned int prio = ntohl(value->priority);
+ unsigned int anon = ntohl(value->anonymity_level);
+ unsigned long long expir = GNUNET_ntohll(value->expiration_time);
GNUNET_HashCode vhash;
- postgresHandle *dbh;
-#if DEBUG_POSTGRES
- GNUNET_EncName enc;
+ PGresult * ret;
+ const char * paramValues[] = {
+ (const char*) &size,
+ (const char*) &type,
+ (const char*) &prio,
+ (const char*) &anon,
+ (const char*) &expir,
+ (const char*) key,
+ (const char*) &vhash,
+ (const char*) &value[1]
+ };
+ int paramLengths[] =
+ {
+ sizeof(size),
+ sizeof(type),
+ sizeof(prio),
+ sizeof(anon),
+ sizeof(expir),
+ sizeof(GNUNET_HashCode),
+ sizeof(GNUNET_HashCode),
+ size - sizeof (GNUNET_DatastoreValue)
+ };
+ const int paramFormats[] = {1,1,1,1,1,1,1,1};
- IF_GELOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_BULK | GNUNET_GE_USER,
- GNUNET_hash_to_enc (key, &enc));
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_BULK | GNUNET_GE_USER,
- "Storing in database block with type %u/key `%s'/priority
%u/expiration %llu.\n",
- ntohl (*(int *) &value[1]), &enc, ntohl (value->priority),
- GNUNET_ntohll (value->expiration_time));
-#endif
-
if ((ntohl (value->size) < sizeof (GNUNET_DatastoreValue)))
{
GNUNET_GE_BREAK (ectx, 0);
return GNUNET_SYSERR;
}
- size = ntohl (value->size);
- type = ntohl (value->type);
- prio = ntohl (value->priority);
- anon = ntohl (value->anonymity_level);
- expir = GNUNET_ntohll (value->expiration_time);
- contentSize = size - sizeof (GNUNET_DatastoreValue);
- GNUNET_hash (&value[1], contentSize, &vhash);
+ GNUNET_hash (&value[1], size - sizeof (GNUNET_DatastoreValue), &vhash);
GNUNET_mutex_lock (lock);
if (lastSync > 1000)
- syncStats (dbh);
- stmt = dbh->insertContent;
- if ((POSTGRES_OK != postgres3_bind_int (stmt, 1, size)) ||
- (POSTGRES_OK != postgres3_bind_int (stmt, 2, type)) ||
- (POSTGRES_OK != postgres3_bind_int (stmt, 3, prio)) ||
- (POSTGRES_OK != postgres3_bind_int (stmt, 4, anon)) ||
- (POSTGRES_OK != postgres3_bind_int64 (stmt, 5, expir)) ||
- (POSTGRES_OK !=
- postgres3_bind_blob (stmt, 6, key, sizeof (GNUNET_HashCode),
- POSTGRES_TRANSIENT)) ||
- (POSTGRES_OK !=
- postgres3_bind_blob (stmt, 7, &vhash, sizeof (GNUNET_HashCode),
- POSTGRES_TRANSIENT))
- || (POSTGRES_OK !=
- postgres3_bind_blob (stmt, 8, &value[1], contentSize,
- POSTGRES_TRANSIENT)))
- {
- LOG_POSTGRES (dbh,
- GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "postgres3_bind_XXXX");
- if (POSTGRES_OK != postgres3_reset (stmt))
- LOG_POSTGRES (dbh,
- GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "postgres3_reset");
+ syncStats ();
+ ret = PQexecPrepared (dbh,
+ "put",
+ 8,
+ paramValues,
+ paramLengths,
+ paramFormats,
+ 1);
+ if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexecPrepared"))
+ {
GNUNET_mutex_unlock (lock);
return GNUNET_SYSERR;
}
-
- n = postgres3_step (stmt);
- if (n != POSTGRES_DONE)
- {
- if (n == POSTGRES_BUSY)
- {
- postgres3_reset (stmt);
- GNUNET_mutex_unlock (lock);
- GNUNET_GE_BREAK (NULL, 0);
- return GNUNET_NO;
- }
- LOG_POSTGRES (dbh,
- GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "postgres3_step");
- postgres3_reset (stmt);
- GNUNET_mutex_unlock (lock);
- return GNUNET_SYSERR;
- }
- if (POSTGRES_OK != postgres3_reset (stmt))
- LOG_POSTGRES (dbh,
- GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "postgres3_reset");
+ PQclear (ret);
lastSync++;
payload += getContentDatastoreSize (value);
-#if DEBUG_POSTGRES
- GNUNET_GE_LOG (ectx,
- GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "Postgres: done writing content\n");
-#endif
GNUNET_mutex_unlock (lock);
return GNUNET_OK;
-#endif
- return GNUNET_SYSERR;
}
/**
@@ -979,31 +923,36 @@
static int
update (unsigned long long uid, int delta, GNUNET_CronTime expire)
{
-#if 0
- int n;
+ PGresult * ret;
+ const char * paramValues[] = {
+ (const char*) &uid,
+ (const char*) &delta,
+ (const char*) &expire,
+ };
+ int paramLengths[] =
+ {
+ sizeof(uid),
+ sizeof(delta),
+ sizeof(expire),
+ };
+ const int paramFormats[] = {1,1,1};
GNUNET_mutex_lock (lock);
- postgres3_bind_int (dbh->updPrio, 1, delta);
- postgres3_bind_int64 (dbh->updPrio, 2, expire);
- postgres3_bind_int64 (dbh->updPrio, 3, uid);
- n = postgres3_step (dbh->updPrio);
- if (n != POSTGRES_DONE)
- LOG_POSTGRES (dbh,
- GNUNET_GE_ERROR | GNUNET_GE_ADMIN | GNUNET_GE_USER |
- GNUNET_GE_BULK, "postgres3_step");
-
- postgres3_reset (dbh->updPrio);
-
-#if DEBUG_POSTGRES
- GNUNET_GE_LOG (ectx, GNUNET_GE_DEBUG | GNUNET_GE_REQUEST | GNUNET_GE_USER,
- "Postgres: block updated\n");
-#endif
+ ret = PQexecPrepared (dbh,
+ "update",
+ 3,
+ paramValues,
+ paramLengths,
+ paramFormats,
+ 1);
+ if (GNUNET_OK != check_result (ret, PGRES_COMMAND_OK, "PQexecPrepared"))
+ {
+ GNUNET_mutex_unlock (lock);
+ return GNUNET_SYSERR;
+ }
+ PQclear (ret);
GNUNET_mutex_unlock (lock);
- if (n == POSTGRES_BUSY)
- return GNUNET_NO;
- return n == POSTGRES_OK ? GNUNET_OK : GNUNET_SYSERR;
-#endif
- return GNUNET_SYSERR;
+ return GNUNET_OK;
}
@@ -1104,6 +1053,8 @@
GNUNET_mutex_destroy (lock);
lock = NULL;
coreAPI = NULL;
+ payload = 0;
+ lastSync = 0;
}
/**
@@ -1112,8 +1063,6 @@
void
update_module_sqstore_postgres (GNUNET_UpdateAPI * uapi)
{
- payload = 0;
- lastSync = 0;
lock = GNUNET_mutex_create (GNUNET_NO);
if (GNUNET_OK != init_connection ())
{
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r8052 - GNUnet/src/applications/sqstore_postgres,
gnunet <=