[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r4057 - in GNUnet: . src/applications/dht/module src/includ
From: |
grothoff |
Subject: |
[GNUnet-SVN] r4057 - in GNUnet: . src/applications/dht/module src/include |
Date: |
Tue, 26 Dec 2006 21:21:39 -0800 (PST) |
Author: grothoff
Date: 2006-12-26 21:21:35 -0800 (Tue, 26 Dec 2006)
New Revision: 4057
Removed:
GNUnet/src/applications/dht/module/service.h
Modified:
GNUnet/src/applications/dht/module/Makefile.am
GNUnet/src/applications/dht/module/cs.c
GNUnet/src/applications/dht/module/service.c
GNUnet/src/include/gnunet_dht.h
GNUnet/src/include/gnunet_protocols.h
GNUnet/todo
Log:
implementing DHT client API
Modified: GNUnet/src/applications/dht/module/Makefile.am
===================================================================
--- GNUnet/src/applications/dht/module/Makefile.am 2006-12-27 04:34:10 UTC
(rev 4056)
+++ GNUnet/src/applications/dht/module/Makefile.am 2006-12-27 05:21:35 UTC
(rev 4057)
@@ -6,10 +6,11 @@
libgnunetmodule_dht.la
libgnunetmodule_dht_la_SOURCES = \
- table.c table.h \
+ cs.c \
dstore.c dstore.h \
routing.c routing.h \
- service.c service.h
+ service.c \
+ table.c table.h
libgnunetmodule_dht_la_LIBADD = \
$(top_builddir)/src/util/crypto/libgnunetutil_crypto.la \
Modified: GNUnet/src/applications/dht/module/cs.c
===================================================================
--- GNUnet/src/applications/dht/module/cs.c 2006-12-27 04:34:10 UTC (rev
4056)
+++ GNUnet/src/applications/dht/module/cs.c 2006-12-27 05:21:35 UTC (rev
4057)
@@ -30,6 +30,7 @@
#include "gnunet_core.h"
#include "gnunet_protocols.h"
#include "gnunet_rpc_service.h"
+#include "gnunet_dht.h"
#include "gnunet_dht_service.h"
/**
@@ -43,157 +44,79 @@
static DHT_ServiceAPI * dhtAPI;
typedef struct {
- struct ClientHandle * client;
- struct DHT_PUT_RECORD * put_record;
- DHT_TableId table;
- unsigned int replicas; /* confirmed puts? */
-} DHT_CLIENT_PUT_RECORD;
-typedef struct {
struct ClientHandle * client;
+
struct DHT_GET_RECORD * get_record;
- DHT_TableId table;
- unsigned int count;
+
} DHT_CLIENT_GET_RECORD;
static DHT_CLIENT_GET_RECORD ** getRecords;
static unsigned int getRecordsSize;
-static DHT_CLIENT_PUT_RECORD ** putRecords;
-
-static unsigned int putRecordsSize;
-
/**
* Lock.
*/
-static struct MUTEX * csLock;
+static struct MUTEX * lock;
-static struct GE_Context * ectx;
-
-
-static int sendAck(struct ClientHandle * client,
- DHT_TableId * table,
- int value) {
- CS_dht_reply_ack_MESSAGE msg;
-
- msg.header.size = htons(sizeof(CS_dht_reply_ack_MESSAGE));
- msg.header.type = htons(CS_PROTO_dht_REPLY_ACK);
- msg.status = htonl(value);
- msg.table = *table;
- return coreAPI->sendToClient(client,
- &msg.header);
-}
-
-static void cs_put_abort(void * cls) {
- DHT_CLIENT_PUT_RECORD * record = cls;
- int i;
-
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "Signaling client put completion: %d\n",
- record->replicas);
- MUTEX_LOCK(csLock);
- dhtAPI->put_stop(record->put_record);
- if (OK != sendAck(record->client,
- &record->table,
- record->replicas)) {
- GE_LOG(ectx,
- GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendAck");
- coreAPI->terminateClientConnection(record->client);
- }
- for (i=putRecordsSize-1;i>=0;i--)
- if (putRecords[i] == record) {
- putRecords[i] = putRecords[putRecordsSize-1];
- GROW(putRecords,
- putRecordsSize,
- putRecordsSize-1);
- break;
- }
- MUTEX_UNLOCK(csLock);
- FREE(record);
-}
-
/**
* CS handler for inserting <key,value>-pair into DHT-table.
*/
static int csPut(struct ClientHandle * client,
const MESSAGE_HEADER * message) {
- CS_dht_request_put_MESSAGE * req;
- DataContainer * data;
- DHT_CLIENT_PUT_RECORD * ptr;
+ const CS_dht_request_put_MESSAGE * req;
unsigned int size;
if (ntohs(message->size) < sizeof(CS_dht_request_put_MESSAGE)) {
- GE_BREAK(ectx, 0);
+ GE_BREAK(NULL, 0);
return SYSERR;
}
- req = (CS_dht_request_put_MESSAGE*) message;
+ req = (const CS_dht_request_put_MESSAGE*) message;
size = ntohs(req->header.size)
- - sizeof(CS_dht_request_put_MESSAGE)
- + sizeof(DataContainer);
- GE_ASSERT(ectx, size < MAX_BUFFER_SIZE);
- if (size == 0) {
- data = NULL;
- } else {
- data = MALLOC(size);
- data->size = htonl(size);
- memcpy(&data[1],
- &req[1],
- size - sizeof(DataContainer));
- }
- ptr = MALLOC(sizeof(DHT_CLIENT_PUT_RECORD));
- ptr->client = client;
- ptr->replicas = 0;
- ptr->table = req->table;
- ptr->put_record = NULL;
-
- MUTEX_LOCK(csLock);
- GROW(putRecords,
- putRecordsSize,
- putRecordsSize+1);
- putRecords[putRecordsSize-1] = ptr;
- MUTEX_UNLOCK(csLock);
- GE_LOG(ectx,
- GE_DEBUG | GE_REQUEST | GE_USER,
- "Starting DHT put\n");
- ptr->put_record = dhtAPI->put_start(&req->table,
- &req->key,
- ntohll(req->timeout),
- data,
- &cs_put_abort,
- ptr);
- FREE(data);
+ - sizeof(CS_dht_request_put_MESSAGE);
+ GE_ASSERT(NULL,
+ size < MAX_BUFFER_SIZE);
+ dhtAPI->put(&req->key,
+ ntohl(req->type),
+ size,
+ ntohll(req->expire),
+ (const char*) &req[1]);
return OK;
}
-static int cs_get_result_callback(const HashCode512 * key,
- const DataContainer * value,
- void * cls) {
+static int get_result(const HashCode512 * key,
+ const DataContainer * value,
+ void * cls) {
DHT_CLIENT_GET_RECORD * record = cls;
- CS_dht_reply_results_MESSAGE * msg;
+ CS_dht_request_put_MESSAGE * msg;
size_t n;
- n = sizeof(CS_dht_reply_results_MESSAGE) + ntohl(value->size);
+ GE_ASSERT(NULL, ntohl(value->size) >= sizeof(DataContainer));
+ n = sizeof(CS_dht_request_put_MESSAGE) + ntohl(value->size) -
sizeof(DataContainer);
+ if (n > MAX_BUFFER_SIZE) {
+ GE_BREAK(NULL, 0);
+ return SYSERR;
+ }
msg = MALLOC(n);
- msg->key = *key;
+ msg->header.size = htons(n);
+ msg->header.type = htons(CS_PROTO_dht_REQUEST_PUT);
+ msg->expire = 0; /* unknown */
+ msg->key = *key;
memcpy(&msg[1],
- value,
- ntohl(value->size));
- GE_LOG(ectx,
+ &value[1],
+ ntohl(value->size) - sizeof(DataContainer));
+ GE_LOG(coreAPI->ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
- "`%s' processes reply '%.*s'\n",
+ "`%s' at %s:%d processes reply '%.*s'\n",
__FUNCTION__,
+ __FILE__,
+ __LINE__,
ntohl(value->size) - sizeof(DataContainer),
&value[1]);
- msg->table = record->table;
- msg->header.size = htons(n);
- msg->header.type = htons(CS_PROTO_dht_REPLY_GET);
if (OK != coreAPI->sendToClient(record->client,
&msg->header)) {
- GE_LOG(ectx,
+ GE_LOG(coreAPI->ectx,
GE_ERROR | GE_IMMEDIATE | GE_USER,
_("`%s' failed. Terminating connection to client.\n"),
"sendToClient");
@@ -203,33 +126,12 @@
return OK;
}
-static void cs_get_abort(void * cls) {
+static void get_timeout(void * cls) {
DHT_CLIENT_GET_RECORD * record = cls;
int i;
dhtAPI->get_stop(record->get_record);
- if (record->count == 0) {
- if (OK != sendAck(record->client,
- &record->table,
- SYSERR)) {
- GE_LOG(ectx,
- GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendAck");
- coreAPI->terminateClientConnection(record->client);
- }
- } else {
- if (OK != sendAck(record->client,
- &record->table,
- record->count)) {
- GE_LOG(ectx,
- GE_ERROR | GE_IMMEDIATE | GE_USER,
- _("`%s' failed. Terminating connection to client.\n"),
- "sendAck");
- coreAPI->terminateClientConnection(record->client);
- }
- }
- MUTEX_LOCK(csLock);
+ MUTEX_LOCK(lock);
for (i=getRecordsSize-1;i>=0;i--)
if (getRecords[i] == record) {
getRecords[i] = getRecords[getRecordsSize-1];
@@ -238,99 +140,52 @@
getRecordsSize-1);
break;
}
- MUTEX_UNLOCK(csLock);
+ MUTEX_UNLOCK(lock);
FREE(record);
}
-struct CSGetClosure {
- struct ClientHandle * client;
- CS_dht_request_get_MESSAGE * message;
-};
-
/**
- * CS handler for fetching <key,value>-pairs from DHT-table.
- */
-static int csGetJob(struct CSGetClosure * cpc) {
- CS_dht_request_get_MESSAGE * req;
- DHT_CLIENT_GET_RECORD * ptr;
- struct ClientHandle * client;
- unsigned int keyCount;
-
- client = cpc->client;
- req = cpc->message;
- FREE(cpc);
-
- keyCount = 1 + ((ntohs(req->header.size) -
sizeof(CS_dht_request_get_MESSAGE)) / sizeof(HashCode512));
- ptr = MALLOC(sizeof(DHT_CLIENT_GET_RECORD));
- ptr->client = client;
- ptr->count = 0;
- ptr->table = req->table;
- ptr->get_record = NULL;
-
- MUTEX_LOCK(csLock);
- GROW(getRecords,
- getRecordsSize,
- getRecordsSize+1);
- getRecords[getRecordsSize-1] = ptr;
- MUTEX_UNLOCK(csLock);
- ptr->get_record = dhtAPI->get_start(&req->table,
- ntohl(req->type),
- keyCount,
- &req->keys,
- ntohll(req->timeout),
- &cs_get_result_callback,
- ptr,
- &cs_get_abort,
- ptr);
- return OK;
-}
-
-/**
* CS handler for inserting <key,value>-pair into DHT-table.
*/
static int csGet(struct ClientHandle * client,
const MESSAGE_HEADER * message) {
- struct CSGetClosure * cpc;
+ const CS_dht_request_get_MESSAGE * get;
+ DHT_CLIENT_GET_RECORD * cpc;
if (ntohs(message->size) != sizeof(CS_dht_request_get_MESSAGE)) {
- GE_BREAK(ectx, 0);
+ GE_BREAK(NULL, 0);
return SYSERR;
}
-
- cpc = MALLOC(sizeof(struct CSGetClosure));
- cpc->message = MALLOC(ntohs(message->size));
- memcpy(cpc->message,
- message,
- ntohs(message->size));
+ get = (const CS_dht_request_get_MESSAGE *) message;
+ cpc = MALLOC(sizeof(DHT_CLIENT_GET_RECORD));
cpc->client = client;
- cron_add_job(coreAPI->cron,
- (CronJob)&csGetJob,
- 0,
- 0,
- cpc);
+ cpc->get_record = dhtAPI->get_start(ntohl(get->type),
+ &get->key,
+ ntohll(get->timeout),
+ &get_result,
+ cpc,
+ &get_timeout,
+ cpc);
+ MUTEX_LOCK(lock);
+ APPEND(getRecords,
+ getRecordsSize,
+ cpc);
+ MUTEX_UNLOCK(lock);
return OK;
}
/**
* CS handler for handling exiting client. Triggers
- * csLeave for all tables that rely on this client.
+ * get_stop for all operations that rely on this client.
*/
static void csClientExit(struct ClientHandle * client) {
- int i;
+ unsigned int i;
DHT_CLIENT_GET_RECORD * gr;
- DHT_CLIENT_PUT_RECORD * pr;
- cron_suspend(coreAPI->cron,
- YES);
- MUTEX_LOCK(csLock);
+ MUTEX_LOCK(lock);
for (i=0;i<getRecordsSize;i++) {
if (getRecords[i]->client == client) {
gr = getRecords[i];
-
- cron_del_job(coreAPI->cron,
- &cs_get_abort,
- 0,
- gr);
dhtAPI->get_stop(gr->get_record);
getRecords[i] = getRecords[getRecordsSize-1];
GROW(getRecords,
@@ -338,43 +193,24 @@
getRecordsSize-1);
}
}
- for (i=0;i<putRecordsSize;i++) {
- if (putRecords[i]->client == client) {
- pr = putRecords[i];
-
- cron_del_job(coreAPI->cron,
- &cs_put_abort,
- 0,
- pr);
- dhtAPI->put_stop(pr->put_record);
- putRecords[i] = putRecords[putRecordsSize-1];
- GROW(putRecords,
- putRecordsSize,
- putRecordsSize-1);
- }
- }
- MUTEX_UNLOCK(csLock);
- cron_resume_jobs(coreAPI->cron,
- YES);
+ MUTEX_UNLOCK(lock);
}
int initialize_module_dht(CoreAPIForApplication * capi) {
int status;
- ectx = capi->ectx;
dhtAPI = capi->requestService("dht");
if (dhtAPI == NULL)
return SYSERR;
coreAPI = capi;
- GE_LOG(ectx, GE_DEBUG | GE_REQUEST | GE_USER,
- "DHT registering client handlers: "
- "%d %d %d %d\n",
- CS_PROTO_dht_REQUEST_PUT,
- CS_PROTO_dht_REQUEST_GET,
- CS_PROTO_dht_REPLY_GET,
- CS_PROTO_dht_REPLY_ACK);
+ GE_LOG(coreAPI->ectx,
+ GE_DEBUG | GE_REQUEST | GE_USER,
+ "DHT registering client handlers: "
+ "%d %d %d %d\n",
+ CS_PROTO_dht_REQUEST_PUT,
+ CS_PROTO_dht_REQUEST_GET);
status = OK;
- csLock = MUTEX_CREATE(YES);
+ lock = MUTEX_CREATE(NO);
if (SYSERR == capi->registerClientHandler(CS_PROTO_dht_REQUEST_PUT,
&csPut))
status = SYSERR;
@@ -393,7 +229,7 @@
int status;
status = OK;
- GE_LOG(ectx,
+ GE_LOG(coreAPI->ectx,
GE_DEBUG | GE_REQUEST | GE_USER,
"DHT: shutdown\n");
if (OK != coreAPI->unregisterClientHandler(CS_PROTO_dht_REQUEST_PUT,
@@ -405,25 +241,12 @@
if (OK != coreAPI->unregisterClientExitHandler(&csClientExit))
status = SYSERR;
- while (putRecordsSize > 0) {
- cron_del_job(coreAPI->cron,
- &cs_put_abort,
- 0,
- putRecords[0]);
- cs_put_abort(putRecords[0]);
- }
-
- while (getRecordsSize > 0) {
- cron_del_job(coreAPI->cron,
- &cs_get_abort,
- 0,
- getRecords[0]);
- cs_get_abort(getRecords[0]);
- }
+ while (getRecordsSize > 0)
+ get_timeout(getRecords[0]);
coreAPI->releaseService(dhtAPI);
dhtAPI = NULL;
coreAPI = NULL;
- MUTEX_DESTROY(csLock);
+ MUTEX_DESTROY(lock);
return status;
}
Modified: GNUnet/src/applications/dht/module/service.c
===================================================================
--- GNUnet/src/applications/dht/module/service.c 2006-12-27 04:34:10 UTC
(rev 4056)
+++ GNUnet/src/applications/dht/module/service.c 2006-12-27 05:21:35 UTC
(rev 4057)
@@ -28,7 +28,7 @@
#include "dstore.h"
#include "table.h"
#include "routing.h"
-#include "service.h"
+#include "gnunet_dht_service.h"
/**
* Global core API.
Deleted: GNUnet/src/applications/dht/module/service.h
===================================================================
--- GNUnet/src/applications/dht/module/service.h 2006-12-27 04:34:10 UTC
(rev 4056)
+++ GNUnet/src/applications/dht/module/service.h 2006-12-27 05:21:35 UTC
(rev 4057)
@@ -1,38 +0,0 @@
-/*
- This file is part of GNUnet
- (C) 2006 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 2, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
- */
-
-/**
- * @file module/service.h
- * @brief internal GNUnet DHT service
- * @author Christian Grothoff
- */
-
-#ifndef DHT_SERVICE_H
-#define DHT_SERVICE_H
-
-#include "gnunet_util.h"
-#include "gnunet_core.h"
-#include "gnunet_dht_service.h"
-
-DHT_ServiceAPI * provide_module_dht(CoreAPIForApplication * capi);
-
-int release_module_dht(void);
-
-#endif
Modified: GNUnet/src/include/gnunet_dht.h
===================================================================
--- GNUnet/src/include/gnunet_dht.h 2006-12-27 04:34:10 UTC (rev 4056)
+++ GNUnet/src/include/gnunet_dht.h 2006-12-27 05:21:35 UTC (rev 4057)
@@ -31,7 +31,6 @@
#define GNUNET_DHT_H
#include "gnunet_util.h"
-#include "gnunet_blockstore.h"
#ifdef __cplusplus
extern "C" {
@@ -40,48 +39,31 @@
#endif
#endif
-
-/* ************* API specific errorcodes *********** */
-
-#define DHT_ERRORCODES__TIMEOUT -2
-#define DHT_ERRORCODES__OUT_OF_SPACE -3
-#define DHT_ERRORCODES__TABLE_NOT_FOUND -4
-
-
-/* ************************* CS messages ***************************** */
-/* these messages are exchanged between gnunetd and the clients (APIs) */
-
/**
- * DHT table identifier. A special identifier (all zeros) is
- * used internally by the DHT. That table is used to lookup
- * tables. The GNUnet DHT infrastructure supports multiple
- * tables, the table to lookup peers is just one of these.
- */
-typedef HashCode512 DHT_TableId;
-
-#define equalsDHT_TableId(a,b) equalsHashCode512(a,b)
-
-/**
* TCP communication: put <key,value>-mapping to table.
- * Reply is an ACK.
+ * When send by a client to gnunetd, this message is
+ * used to initiate a PUT on the DHT. gnunetd also
+ * uses this message to communicate results from a GET
+ * operation back to the client.<p>
+ *
+ * The given struct is followed by the value.
*/
typedef struct {
MESSAGE_HEADER header;
- DHT_TableId table;
+ unsigned int type; /* nbo */
- unsigned long long timeout; /* nbo */
+ unsigned long long expire; /* nbo */
HashCode512 key;
- unsigned int priority; /* nbo */
-
} CS_dht_request_put_MESSAGE;
/**
- * TCP communication: get <key,value>-mappings
- * for given key. Reply is a CS_dht_reply_results_MESSAGE message.
+ * TCP communication: get <key,value>-mappings for given key. Reply is
+ * a CS_dht_request_put_MESSAGE messages. Clients can abort
+ * the GET operation early by closing the connection.
*/
typedef struct {
@@ -91,47 +73,10 @@
unsigned long long timeout; /* nbo */
- DHT_TableId table;
+ HashCode512 key;
- unsigned int priority; /* nbo */
-
- /* one or more keys */
- HashCode512 keys;
-
} CS_dht_request_get_MESSAGE;
-/**
- * TCP communication: Results for a request. Uses a separate message
- * for each result; CS_dht_reply_results_MESSAGE maybe repeated many
- * times (the total number is given in totalResults).
- */
-typedef struct {
-
- MESSAGE_HEADER header;
-
- unsigned int totalResults;
-
- DHT_TableId table;
-
- HashCode512 key;
-
- DataContainer data;
-
-} CS_dht_reply_results_MESSAGE;
-
-/**
- * TCP communication: status response for a request
- */
-typedef struct {
-
- MESSAGE_HEADER header;
-
- int status; /* NBO */
-
- DHT_TableId table;
-
-} CS_dht_reply_ack_MESSAGE;
-
#if 0 /* keep Emacsens' auto-indent happy */
{
#endif
Modified: GNUnet/src/include/gnunet_protocols.h
===================================================================
--- GNUnet/src/include/gnunet_protocols.h 2006-12-27 04:34:10 UTC (rev
4056)
+++ GNUnet/src/include/gnunet_protocols.h 2006-12-27 05:21:35 UTC (rev
4057)
@@ -248,35 +248,27 @@
#define CS_PROTO_chat_MSG 44
-/* ********** CS TESTBED application messages ********** */
-
-#define CS_PROTO_testbed_REQUEST 50
-#define CS_PROTO_testbed_REPLY 51
-
-
/* ********** CS DHT application messages ********** */
/**
* Client to CS or CS to client: get from table
*/
-#define CS_PROTO_dht_REQUEST_GET 72
+#define CS_PROTO_dht_REQUEST_GET 48
/**
* Client to CS or CS to client: put into table
*/
-#define CS_PROTO_dht_REQUEST_PUT 73
+#define CS_PROTO_dht_REQUEST_PUT 49
-/**
- * Client to CS or CS to client: results from get
- */
-#define CS_PROTO_dht_REPLY_GET 74
-/**
- * Client to CS or CS to client: confirmed
- */
-#define CS_PROTO_dht_REPLY_ACK 75
+/* ********** CS TESTBED application messages ********** */
+#define CS_PROTO_testbed_REQUEST 50
+#define CS_PROTO_testbed_REPLY 51
+
+
+
/* ************* CS VPN messages ************* */
/**
Modified: GNUnet/todo
===================================================================
--- GNUnet/todo 2006-12-27 04:34:10 UTC (rev 4056)
+++ GNUnet/todo 2006-12-27 05:21:35 UTC (rev 4057)
@@ -31,10 +31,11 @@
0.7.2 [3'07]:
- new features:
* XFS / support for location URIs [CG]
- + dht/cs: not done yet [RC]
+ dht/tools: update clients [RC]
+ dht/gap integration [RC]
- + dstore bloomfilter
+ + ecrs/location URIs [RC]
+ + fsui/location URI support [RC]
+ + dstore bloomfilter (optimization)
* HTTP transport (libcurl, libmicrohttpd)
* SMTP transport (libesmtp)
* SMTP logger
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r4057 - in GNUnet: . src/applications/dht/module src/include,
grothoff <=