[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r29287 - in gnunet/src: include psycstore
From: |
gnunet |
Subject: |
[GNUnet-SVN] r29287 - in gnunet/src: include psycstore |
Date: |
Mon, 16 Sep 2013 06:59:05 +0200 |
Author: tg
Date: 2013-09-16 06:59:05 +0200 (Mon, 16 Sep 2013)
New Revision: 29287
Modified:
gnunet/src/include/gnunet_env_lib.h
gnunet/src/include/gnunet_protocols.h
gnunet/src/include/gnunet_psyc_service.h
gnunet/src/include/gnunet_psycstore_plugin.h
gnunet/src/include/gnunet_psycstore_service.h
gnunet/src/psycstore/Makefile.am
gnunet/src/psycstore/gnunet-service-psycstore.c
gnunet/src/psycstore/plugin_psycstore_sqlite.c
gnunet/src/psycstore/psycstore.h
gnunet/src/psycstore/psycstore_api.c
gnunet/src/psycstore/test_plugin_psycstore.c
gnunet/src/psycstore/test_psycstore.c
gnunet/src/psycstore/test_psycstore.conf
Log:
PSYCstore service and API implementation
Modified: gnunet/src/include/gnunet_env_lib.h
===================================================================
--- gnunet/src/include/gnunet_env_lib.h 2013-09-15 22:48:52 UTC (rev 29286)
+++ gnunet/src/include/gnunet_env_lib.h 2013-09-16 04:59:05 UTC (rev 29287)
@@ -203,6 +203,26 @@
/**
+ * Perform an operation on a variable.
+ *
+ * @param name Name of variable.
+ * @param current_value Current value of variable.
+ * @param current_value_size Size of @a current_value.
+ * @param oper Operator.
+ * @param args Arguments for the operation.
+ * @param args_size Size of @a args.
+ * @param return_value Return value.
+ * @param return_value_size Size of @a return_value.
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+int
+GNUNET_ENV_operation (char *name, void *current_value, size_t
current_value_size,
+ enum GNUNET_ENV_Operator oper, void *args, size_t
args_size,
+ void **return_value, size_t *return_value_size);
+
+
+/**
* Get the variable's value as an integer.
*
* @param size Size of value.
Modified: gnunet/src/include/gnunet_protocols.h
===================================================================
--- gnunet/src/include/gnunet_protocols.h 2013-09-15 22:48:52 UTC (rev
29286)
+++ gnunet/src/include/gnunet_protocols.h 2013-09-16 04:59:05 UTC (rev
29287)
@@ -1967,23 +1967,54 @@
******************************************************************************/
/**
- * Generic response from PSYCstore service with success and/or error message.
+ * Store a membership event.
*/
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE 650
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE 650
/**
- * Store a membership event.
+ * Test for membership of a member at a particular point in time.
*/
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE 651
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST 651
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE 652
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET 653
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET 654
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT 655
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER 656
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE 657
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY 658
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC 659
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET 660
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE 661
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET 662
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX 663
+
/**
- * Test for membership of a member at a particular point in time.
+ * Generic response from PSYCstore service with success and/or error message.
*/
-#define GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST 652
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE 664
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT 665
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER 666
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE 667
+
+#define GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE 668
+
/**
- * Next available: 670
+ * Next available: 680
*/
Modified: gnunet/src/include/gnunet_psyc_service.h
===================================================================
--- gnunet/src/include/gnunet_psyc_service.h 2013-09-15 22:48:52 UTC (rev
29286)
+++ gnunet/src/include/gnunet_psyc_service.h 2013-09-16 04:59:05 UTC (rev
29287)
@@ -701,29 +701,6 @@
/**
- * Return all channel state variables whose name matches a given prefix.
- *
- * A name matches if it starts with the given @a name_prefix, thus requesting
the
- * empty prefix ("") will match all values; requesting "_a_b" will also return
- * values stored under "_a_b_c".
- *
- * The @a state_cb is invoked on all matching state variables asynchronously,
as
- * the state is stored in and retrieved from the PSYCstore,
- *
- * @param channel Channel handle.
- * @param name_prefix Prefix of the state variable name to match.
- * @param cb Function to call with the matching state variables.
- * @param cb_cls Closure for the callbacks.
- * @return Handle that can be used to cancel the query operation.
- */
-struct GNUNET_PSYC_StateQuery *
-GNUNET_PSYC_channel_state_get_all (struct GNUNET_PSYC_Channel *channel,
- const char *name_prefix,
- GNUNET_PSYC_StateCallback cb,
- void *cb_cls);
-
-
-/**
* Retrieve the best matching channel state variable.
*
* If the requested variable name is not present in the state, the nearest
@@ -746,6 +723,29 @@
/**
+ * Return all channel state variables whose name matches a given prefix.
+ *
+ * A name matches if it starts with the given @a name_prefix, thus requesting
the
+ * empty prefix ("") will match all values; requesting "_a_b" will also return
+ * values stored under "_a_b_c".
+ *
+ * The @a state_cb is invoked on all matching state variables asynchronously,
as
+ * the state is stored in and retrieved from the PSYCstore,
+ *
+ * @param channel Channel handle.
+ * @param name_prefix Prefix of the state variable name to match.
+ * @param cb Function to call with the matching state variables.
+ * @param cb_cls Closure for the callbacks.
+ * @return Handle that can be used to cancel the query operation.
+ */
+struct GNUNET_PSYC_StateQuery *
+GNUNET_PSYC_channel_state_get_prefix (struct GNUNET_PSYC_Channel *channel,
+ const char *name_prefix,
+ GNUNET_PSYC_StateCallback cb,
+ void *cb_cls);
+
+
+/**
* Cancel a state query operation.
*
* @param query Handle for the operation to cancel.
Modified: gnunet/src/include/gnunet_psycstore_plugin.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_plugin.h 2013-09-15 22:48:52 UTC
(rev 29286)
+++ gnunet/src/include/gnunet_psycstore_plugin.h 2013-09-16 04:59:05 UTC
(rev 29287)
@@ -1,6 +1,6 @@
/*
This file is part of GNUnet
- (C) 2012, 2013 Christian Grothoff (and other contributing authors)
+ (C) 2013 Christian Grothoff (and other contributing authors)
GNUnet is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published
@@ -137,6 +137,7 @@
(*message_get) (void *cls,
const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
uint64_t message_id,
+ uint64_t *returned_fragments,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls);
@@ -182,22 +183,91 @@
const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
uint64_t *max_state_msg_id);
+
/**
- * Set a state variable to the given value.
+ * Begin modifying current state.
*
* @see GNUNET_PSYCSTORE_state_modify()
*
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
int
- (*state_set) (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
- const char *name,
- const void *value,
- size_t value_size);
+ (*state_modify_begin) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t message_id, uint64_t state_delta);
+ /**
+ * Set the current value of a state variable.
+ *
+ * The state modification process is started with state_modify_begin(),
+ * which is followed by one or more calls to this function,
+ * and finished with state_modify_end().
+ *
+ * @see GNUNET_PSYCSTORE_state_modify()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*state_modify_set) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const char *name, const void *value, size_t value_size);
+
/**
+ * End modifying current state.
+ *
+ * @see GNUNET_PSYCSTORE_state_modify()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*state_modify_end) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t message_id);
+
+
+ /**
+ * Begin synchronizing state.
+ *
+ * @see GNUNET_PSYCSTORE_state_sync()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*state_sync_begin) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key);
+
+ /**
+ * Set the value of a state variable while synchronizing state.
+ *
+ * The state synchronization process is started with state_sync_begin(),
+ * which is followed by one or more calls to this function,
+ * and finished with state_sync_end().
+ *
+ * @see GNUNET_PSYCSTORE_state_sync()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*state_sync_set) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ const char *name, const void *value, size_t value_size);
+
+
+ /**
+ * End synchronizing state.
+ *
+ * @see GNUNET_PSYCSTORE_state_sync()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+ int
+ (*state_sync_end) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id);
+
+
+ /**
* Reset the state of a channel.
*
* Delete all state variables stored for the given channel.
@@ -219,25 +289,6 @@
(*state_update_signed) (void *cls,
const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key);
- /**
- * Update signed values of state variables in the state store.
- *
- * @param h Handle for the PSYCstore.
- * @param channel_key The channel we are interested in.
- * @param message_id Message ID that contained the state @a hash.
- * @param hash Hash of the serialized full state.
- * @param rcb Callback to call with the result of the operation.
- * @param rcb_cls Closure for the callback.
- *
- * @return #GNUNET_OK on success, else #GNUNET_SYSERR
- */
- int
- (*state_hash_update) (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
- uint64_t message_id,
- const struct GNUNET_HashCode *hash,
- GNUNET_PSYCSTORE_ResultCallback rcb,
- void *rcb_cls);
/**
* Retrieve a state variable by name (exact match).
@@ -254,16 +305,16 @@
/**
* Retrieve all state variables for a channel with the given prefix.
*
- * @see GNUNET_PSYCSTORE_state_get_all()
+ * @see GNUNET_PSYCSTORE_state_get_prefix()
*
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
int
- (*state_get_all) (void *cls,
- const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
- const char *name,
- GNUNET_PSYCSTORE_StateCallback cb,
- void *cb_cls);
+ (*state_get_prefix) (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const char *name,
+ GNUNET_PSYCSTORE_StateCallback cb,
+ void *cb_cls);
/**
Modified: gnunet/src/include/gnunet_psycstore_service.h
===================================================================
--- gnunet/src/include/gnunet_psycstore_service.h 2013-09-15 22:48:52 UTC
(rev 29286)
+++ gnunet/src/include/gnunet_psycstore_service.h 2013-09-16 04:59:05 UTC
(rev 29287)
@@ -108,7 +108,7 @@
*/
typedef void
(*GNUNET_PSYCSTORE_ResultCallback) (void *cls,
- int result,
+ int64_t result,
const char *err_msg);
@@ -202,7 +202,7 @@
* @param cls Closure.
* @param message The retrieved message fragment. A NULL value indicates that
* there are no more results to be returned.
- * @param flags Flags stored with the message.
+ * @param psycstore_flags Flags stored with the message.
*
* @return #GNUNET_NO to stop calling this callback with further fragments,
* #GNUNET_YES to continue.
@@ -210,7 +210,7 @@
typedef int
(*GNUNET_PSYCSTORE_FragmentCallback) (void *cls,
struct GNUNET_MULTICAST_MessageHeader
*message,
- enum GNUNET_PSYCSTORE_MessageFlags
flags);
+ enum GNUNET_PSYCSTORE_MessageFlags
psycstore_flags);
/**
@@ -219,8 +219,9 @@
* @param h Handle for the PSYCstore.
* @param channel_key The channel we are interested in.
* @param fragment_id Fragment ID to check. Use 0 to get the latest message
fragment.
- * @param cb Callback to call with the retrieved fragment.
- * @param cb_cls Closure for the callback.
+ * @param fcb Callback to call with the retrieved fragment.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
*
* @return Handle that can be used to cancel the operation.
*/
@@ -228,8 +229,9 @@
GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
uint64_t fragment_id,
- GNUNET_PSYCSTORE_FragmentCallback cb,
- void *cb_cls);
+ GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
/**
@@ -238,8 +240,9 @@
* @param h Handle for the PSYCstore.
* @param channel_key The channel we are interested in.
* @param message_id Message ID to check. Use 0 to get the latest message.
- * @param cb Callback to call with the retrieved fragments.
- * @param cb_cls Closure for the callback.
+ * @param fcb Callback to call with the retrieved fragments.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
*
* @return Handle that can be used to cancel the operation.
*/
@@ -247,8 +250,9 @@
GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
uint64_t message_id,
- GNUNET_PSYCSTORE_FragmentCallback cb,
- void *cb_cls);
+ GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
/**
@@ -259,8 +263,9 @@
* @param channel_key The channel we are interested in.
* @param message_id Message ID to check. Use 0 to get the latest message.
* @param fragment_offset Offset of the fragment to retrieve.
- * @param cb Callback to call with the retrieved fragments.
- * @param cb_cls Closure for the callback.
+ * @param fcb Callback to call with the retrieved fragments.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
*
* @return Handle that can be used to cancel the operation.
*/
@@ -269,8 +274,9 @@
const struct
GNUNET_CRYPTO_EccPublicSignKey *channel_key,
uint64_t message_id,
uint64_t fragment_offset,
- GNUNET_PSYCSTORE_FragmentCallback cb,
- void *cb_cls);
+ GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
/**
@@ -312,16 +318,16 @@
*
* @param h Handle for the PSYCstore.
* @param channel_key Public key that identifies the channel.
- * @param cb Callback to call with the result.
- * @param cb_cls Closure for the callback.
+ * @param mccb Callback to call with the result.
+ * @param mccb_cls Closure for the callback.
*
- * @return
+ * @return Handle that can be used to cancel the operation.
*/
struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_counters_get_master (struct GNUNET_PSYCSTORE_Handle *h,
struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
- GNUNET_PSYCSTORE_MasterCountersCallback
*cb,
- void *cb_cls);
+ GNUNET_PSYCSTORE_MasterCountersCallback
mccb,
+ void *mccb_cls);
/**
@@ -332,16 +338,16 @@
*
* @param h Handle for the PSYCstore.
* @param channel_key Public key that identifies the channel.
- * @param cb Callback to call with the result.
- * @param cb_cls Closure for the callback.
+ * @param sccb Callback to call with the result.
+ * @param sccb_cls Closure for the callback.
*
- * @return
+ * @return Handle that can be used to cancel the operation.
*/
struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_counters_get_slave (struct GNUNET_PSYCSTORE_Handle *h,
struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
- GNUNET_PSYCSTORE_SlaveCountersCallback
*cb,
- void *cb_cls);
+ GNUNET_PSYCSTORE_SlaveCountersCallback
sccb,
+ void *sccb_cls);
/**
@@ -353,7 +359,7 @@
* @param h Handle for the PSYCstore.
* @param channel_key The channel we are interested in.
* @param message_id ID of the message that contains the @a modifiers.
- * @param state_delta Value of the _state_delta PSYC header variable of the
message.
+ * @param state_delta Value of the @e state_delta PSYC header variable of the
message.
* @param modifier_count Number of elements in the @a modifiers array.
* @param modifiers List of modifiers to apply.
* @param rcb Callback to call with the result of the operation.
@@ -373,6 +379,30 @@
/**
+ * Store synchronized state.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param message_id ID of the message that contains the state_hash PSYC
header variable.
+ * @param modifier_count Number of elements in the @a modifiers array.
+ * @param modifiers Full state to store.
+ * @param rcb Callback to call with the result of the operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t message_id,
+ size_t modifier_count,
+ const struct GNUNET_ENV_Modifier *modifiers,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls);
+
+
+
+/**
* Reset the state of a channel.
*
* Delete all state variables stored for the given channel.
@@ -384,7 +414,7 @@
*
* @return Handle that can be used to cancel the operation.
*/
-struct GNUNET_PSYCSTORE_Handle *
+struct GNUNET_PSYCSTORE_OperationHandle *
GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
@@ -423,12 +453,10 @@
*
* @return #GNUNET_NO to stop calling this callback with further variables,
* #GNUNET_YES to continue.
- */
+ */;
typedef int
-(*GNUNET_PSYCSTORE_StateCallback) (void *cls,
- const char *name,
- const void *value,
- size_t value_size);
+(*GNUNET_PSYCSTORE_StateCallback) (void *cls, const char *name,
+ const void *value, size_t value_size);
/**
@@ -437,8 +465,9 @@
* @param h Handle for the PSYCstore.
* @param channel_key The channel we are interested in.
* @param name Name of variable to match, the returned variable might be less
specific.
- * @param cb Callback to return matching state variables.
- * @param cb_cls Closure for the callback.
+ * @param scb Callback to return the matching state variable.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
*
* @return Handle that can be used to cancel the operation.
*/
@@ -446,8 +475,9 @@
GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
const char *name,
- GNUNET_PSYCSTORE_StateCallback cb,
- void *cb_cls);
+ GNUNET_PSYCSTORE_StateCallback scb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
/**
@@ -456,17 +486,19 @@
* @param h Handle for the PSYCstore.
* @param channel_key The channel we are interested in.
* @param name_prefix Prefix of state variable names to match.
- * @param cb Callback to return matching state variables.
- * @param cb_cls Closure for the callback.
+ * @param scb Callback to return matching state variables.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
*
* @return Handle that can be used to cancel the operation.
*/
struct GNUNET_PSYCSTORE_OperationHandle *
-GNUNET_PSYCSTORE_state_get_all (struct GNUNET_PSYCSTORE_Handle *h,
- const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
- const char *name_prefix,
- GNUNET_PSYCSTORE_StateCallback cb,
- void *cb_cls);
+GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const char *name_prefix,
+ GNUNET_PSYCSTORE_StateCallback scb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls);
/**
Modified: gnunet/src/psycstore/Makefile.am
===================================================================
--- gnunet/src/psycstore/Makefile.am 2013-09-15 22:48:52 UTC (rev 29286)
+++ gnunet/src/psycstore/Makefile.am 2013-09-16 04:59:05 UTC (rev 29287)
@@ -104,9 +104,3 @@
test_plugin_psycstore_sqlite_LDADD = \
$(top_builddir)/src/testing/libgnunettesting.la \
$(top_builddir)/src/util/libgnunetutil.la
-
-test_plugin_psycstore_postgres_SOURCES = \
- test_plugin_psycstore.c
-test_plugin_psycstore_postgres_LDADD = \
- $(top_builddir)/src/testing/libgnunettesting.la \
- $(top_builddir)/src/util/libgnunetutil.la
Modified: gnunet/src/psycstore/gnunet-service-psycstore.c
===================================================================
--- gnunet/src/psycstore/gnunet-service-psycstore.c 2013-09-15 22:48:52 UTC
(rev 29286)
+++ gnunet/src/psycstore/gnunet-service-psycstore.c 2013-09-16 04:59:05 UTC
(rev 29287)
@@ -1,21 +1,21 @@
/*
- This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
-
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
*/
/**
@@ -89,71 +89,134 @@
}
-/**
+/**
* Send a result code back to the client.
*
- * @param client client that should receive the result code
- * @param result_code code to transmit
- * @param emsg error message to include (or NULL for none)
+ * @param client Client that should receive the result code.
+ * @param result_code Code to transmit.
+ * @param op_id Operation ID.
+ * @param err_msg Error message to include (or NULL for none).
*/
static void
-send_result_code (struct GNUNET_SERVER_Client *client,
- uint32_t result_code,
- const char *emsg)
+send_result_code (struct GNUNET_SERVER_Client *client, uint32_t result_code,
+ uint32_t op_id, const char *err_msg)
{
- struct ResultCodeMessage *rcm;
- size_t elen;
+ struct OperationResult *res;
+ size_t err_len;
- if (NULL == emsg)
- elen = 0;
+ if (NULL == err_msg)
+ err_len = 0;
else
- elen = strlen (emsg) + 1;
- rcm = GNUNET_malloc (sizeof (struct ResultCodeMessage) + elen);
- rcm->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
- rcm->header.size = htons (sizeof (struct ResultCodeMessage) + elen);
- rcm->result_code = htonl (result_code);
- memcpy (&rcm[1], emsg, elen);
+ err_len = strlen (err_msg) + 1;
+ res = GNUNET_malloc (sizeof (struct OperationResult) + err_len);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE);
+ res->header.size = htons (sizeof (struct OperationResult) + err_len);
+ res->result_code = htonl (result_code);
+ res->op_id = op_id;
+ memcpy (&res[1], err_msg, err_len);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Sending result %d (%s) to client\n",
(int) result_code,
- emsg);
- GNUNET_SERVER_notification_context_unicast (nc, client, &rcm->header,
+ err_msg);
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res->header,
GNUNET_NO);
- GNUNET_free (rcm);
+ GNUNET_free (res);
}
+struct SendClosure
+{
+ struct GNUNET_SERVER_Client *client;
+ uint64_t op_id;
+};
+
+
+static int
+send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct SendClosure *sc = cls;
+ struct FragmentResult *res;
+ size_t msg_size = ntohs (msg->header.size);
+
+ res = GNUNET_malloc (sizeof (struct FragmentResult) + msg_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT);
+ res->header.size = htons (sizeof (struct FragmentResult) + msg_size);
+ res->op_id = sc->op_id;
+ res->psycstore_flags = htonl (flags);
+ memcpy (&res[1], msg, msg_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending fragment %ld to client\n",
+ GNUNET_ntohll (msg->fragment_id));
+ GNUNET_free (msg);
+ GNUNET_SERVER_notification_context_add (nc, sc->client);
+ GNUNET_SERVER_notification_context_unicast (nc, sc->client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+ return GNUNET_OK;
+}
+
+
+static int
+send_state_var (void *cls, const char *name,
+ const void *value, size_t value_size)
+{
+ struct SendClosure *sc = cls;
+ struct StateResult *res;
+ size_t name_size = strlen (name) + 1;
+
+ res = GNUNET_malloc (sizeof (struct StateResult) + name_size + value_size);
+ res->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE);
+ res->header.size = htons (sizeof (struct StateResult) + name_size +
value_size);
+ res->op_id = sc->op_id;
+ res->name_size = htons (name_size);
+ memcpy (&res[1], name, name_size);
+ memcpy ((void *) &res[1] + name_size, value, value_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Sending state variable %s to client\n", name);
+ GNUNET_SERVER_notification_context_add (nc, sc->client);
+ GNUNET_SERVER_notification_context_unicast (nc, sc->client, &res->header,
+ GNUNET_NO);
+ GNUNET_free (res);
+ return GNUNET_OK;
+}
+
+
static void
handle_membership_store (void *cls,
- struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
{
- const struct MembershipStoreMessage *msg =
- (const struct MembershipStoreMessage *) message;
+ const struct MembershipStoreRequest *req =
+ (const struct MembershipStoreRequest *) msg;
- int res = db->membership_store (db->cls, msg->channel_key, msg->slave_key,
- msg->did_join, msg->announced_at,
- msg->effective_since, msg->group_generation);
+ int ret = db->membership_store (db->cls, &req->channel_key, &req->slave_key,
+ ntohl (req->did_join),
+ GNUNET_ntohll (req->announced_at),
+ GNUNET_ntohll (req->effective_since),
+ GNUNET_ntohll (req->group_generation));
- if (res != GNUNET_OK)
+ if (ret != GNUNET_OK)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
_("Failed to store membership information!\n"));
- send_result_code (client, res, NULL);
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
static void
handle_membership_test (void *cls,
struct GNUNET_SERVER_Client *client,
- const struct GNUNET_MessageHeader *message)
+ const struct GNUNET_MessageHeader *msg)
{
- const struct MembershipTestMessage *msg =
- (const struct MembershipTestMessage *) message;
+ const struct MembershipTestRequest *req =
+ (const struct MembershipTestRequest *) msg;
- int res = db->membership_test (db->cls, msg->channel_key, msg->slave_key,
- msg->message_id);
- switch (res)
+ int ret = db->membership_test (db->cls, &req->channel_key, &req->slave_key,
+ GNUNET_ntohll (req->message_id));
+ switch (ret)
{
case GNUNET_YES:
case GNUNET_NO:
@@ -163,10 +226,443 @@
_("Failed to test membership!\n"));
}
- send_result_code (client, res, NULL);
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
+static void
+handle_fragment_store (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct FragmentStoreRequest *req =
+ (const struct FragmentStoreRequest *) msg;
+
+ int ret = db->fragment_store (db->cls, &req->channel_key,
+ (const struct GNUNET_MULTICAST_MessageHeader *)
+ &req[1], ntohl (req->psycstore_flags));
+
+ if (ret != GNUNET_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to store fragment!\n"));
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_fragment_get (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct FragmentGetRequest *req
+ = (const struct FragmentGetRequest *) msg;
+ struct SendClosure sc = { .op_id = req->op_id, .client = client };
+
+ int ret = db->fragment_get (db->cls, &req->channel_key,
+ GNUNET_ntohll (req->fragment_id),
+ &send_fragment, &sc);
+ switch (ret)
+ {
+ case GNUNET_YES:
+ case GNUNET_NO:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get fragment!\n"));
+ }
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_message_get (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct MessageGetRequest *req = (const struct MessageGetRequest *) msg;
+ struct SendClosure sc = { .op_id = req->op_id, .client = client };
+ uint64_t ret_frags = 0;
+ int64_t ret = db->message_get (db->cls, &req->channel_key,
+ GNUNET_ntohll (req->message_id),
+ &ret_frags, &send_fragment, &sc);
+ switch (ret)
+ {
+ case GNUNET_YES:
+ case GNUNET_NO:
+ break;
+ default:
+ ret_frags = ret;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get message!\n"));
+ }
+
+ send_result_code (client, ret_frags, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_message_get_fragment (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct MessageGetFragmentRequest *req =
+ (const struct MessageGetFragmentRequest *) msg;
+
+ struct SendClosure sc = { .op_id = req->op_id, .client = client };
+
+ int ret = db->message_get_fragment (db->cls, &req->channel_key,
+ GNUNET_ntohll (req->message_id),
+ GNUNET_ntohll (req->fragment_offset),
+ &send_fragment, &sc);
+ switch (ret)
+ {
+ case GNUNET_YES:
+ case GNUNET_NO:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get message fragment!\n"));
+ }
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_counters_get_master (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct OperationRequest *req = (const struct OperationRequest *) msg;
+ struct MasterCountersResult res = { {0} };
+
+ int ret = db->counters_get_master (db->cls, &req->channel_key,
+ &res.fragment_id, &res.message_id,
+ &res.group_generation);
+ switch (ret)
+ {
+ case GNUNET_YES:
+ case GNUNET_NO:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get master counters!\n"));
+ }
+
+ res.header.type
+ = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER);
+ res.header.size = htons (sizeof (res));
+ res.result_code = htonl (ret);
+ res.op_id = req->op_id;
+ res.fragment_id = GNUNET_htonll (res.fragment_id);
+ res.message_id = GNUNET_htonll (res.message_id);
+ res.group_generation = GNUNET_htonll (res.group_generation);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
+ GNUNET_NO);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_counters_get_slave (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct OperationRequest *req = (const struct OperationRequest *) msg;
+ struct SlaveCountersResult res = { {0} };
+
+ int ret = db->counters_get_slave (db->cls, &req->channel_key,
+ &res.max_known_msg_id);
+
+ switch (ret)
+ {
+ case GNUNET_YES:
+ case GNUNET_NO:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get slave counters!\n"));
+ }
+
+ res.header.type
+ = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE);
+ res.header.size = htons (sizeof (res));
+ res.result_code = htonl (ret);
+ res.op_id = req->op_id;
+ res.max_known_msg_id = GNUNET_htonll (res.max_known_msg_id);
+
+ GNUNET_SERVER_notification_context_add (nc, client);
+ GNUNET_SERVER_notification_context_unicast (nc, client, &res.header,
+ GNUNET_NO);
+
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/* FIXME: stop processing further state modify messages after an error */
+static void
+handle_state_modify (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct StateModifyRequest *req
+ = (const struct StateModifyRequest *) msg;
+
+ int ret = GNUNET_SYSERR;
+ const char *name = (const char *) &req[1];
+ uint16_t name_size = ntohs (req->name_size);
+
+ if (name_size <= 2 || '\0' != name[name_size - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Tried to set invalid state variable name!\n"));
+ GNUNET_break_op (0);
+ }
+ else
+ {
+ ret = GNUNET_OK;
+
+ if (req->flags & STATE_OP_FIRST)
+ {
+ ret = db->state_modify_begin (db->cls, &req->channel_key,
+ GNUNET_ntohll (req->message_id),
+ GNUNET_ntohll (req->state_delta));
+ }
+ if (ret != GNUNET_OK)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to begin modifying state!\n"));
+ }
+ else
+ {
+ switch (req->oper)
+ {
+ case GNUNET_ENV_OP_ASSIGN:
+ ret = db->state_modify_set (db->cls, &req->channel_key,
+ (const char *) &req[1],
+ name + ntohs (req->name_size),
+ ntohs (req->header.size) - sizeof (*req)
+ - ntohs (req->name_size));
+ break;
+ default:
+#if TODO
+ ret = GNUNET_ENV_operation ((const char *) &req[1],
+ current_value, current_value_size,
+ req->oper, name + ntohs (req->name_size),
+ ntohs (req->header.size) - sizeof (*req)
+ - ntohs (req->name_size), &value,
&value_size);
+#endif
+ ret = GNUNET_SYSERR;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Unknown operator: %c\n"), req->oper);
+ }
+ }
+
+ if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
+ {
+ ret = db->state_modify_end (db->cls, &req->channel_key,
+ GNUNET_ntohll (req->message_id));
+ if (ret != GNUNET_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to end modifying state!\n"));
+ }
+ }
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+/* FIXME: stop processing further state sync messages after an error */
+static void
+handle_state_sync (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct StateSyncRequest *req
+ = (const struct StateSyncRequest *) msg;
+
+ int ret = GNUNET_SYSERR;
+ const char *name = (const char *) &req[1];
+ uint16_t name_size = ntohs (req->name_size);
+
+ if (name_size <= 2 || '\0' != name[name_size - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Tried to set invalid state variable name!\n"));
+ GNUNET_break_op (0);
+ }
+ else
+ {
+ ret = GNUNET_OK;
+
+ if (req->flags & STATE_OP_FIRST)
+ {
+ ret = db->state_sync_begin (db->cls, &req->channel_key);
+ }
+ if (ret != GNUNET_OK)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to begin synchronizing state!\n"));
+ }
+ else
+ {
+ ret = db->state_sync_set (db->cls, &req->channel_key, name,
+ name + ntohs (req->name_size),
+ ntohs (req->header.size) - sizeof (*req)
+ - ntohs (req->name_size));
+ }
+
+ if (GNUNET_OK == ret && req->flags & STATE_OP_LAST)
+ {
+ ret = db->state_sync_end (db->cls, &req->channel_key,
+ GNUNET_ntohll (req->message_id));
+ if (ret != GNUNET_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to end synchronizing state!\n"));
+ }
+ }
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_state_reset (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct OperationRequest *req =
+ (const struct OperationRequest *) msg;
+
+ int ret = db->state_reset (db->cls, &req->channel_key);
+
+ if (ret != GNUNET_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to reset state!\n"));
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_state_hash_update (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct OperationRequest *req =
+ (const struct OperationRequest *) msg;
+
+ int ret = db->state_reset (db->cls, &req->channel_key);
+
+ if (ret != GNUNET_OK)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to reset state!\n"));
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_state_get (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct OperationRequest *req =
+ (const struct OperationRequest *) msg;
+
+ struct SendClosure sc = { .op_id = req->op_id, .client = client };
+ int64_t ret = GNUNET_SYSERR;
+ const char *name = (const char *) &req[1];
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+
+ if (name_size <= 2 || '\0' != name[name_size - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Tried to get invalid state variable name!\n"));
+ GNUNET_break (0);
+ }
+ else
+ {
+ ret = db->state_get (db->cls, &req->channel_key, name,
+ &send_state_var, &sc);
+ if (GNUNET_NO == ret && name_size >= 5) /* min: _a_b\0 */
+ {
+ char *p, *n = GNUNET_malloc (name_size);
+ memcpy (n, name, name_size);
+ while (&n[1] < (p = strrchr (n, '_')) && GNUNET_NO == ret)
+ {
+ *p = '\0';
+ ret = db->state_get (db->cls, &req->channel_key, n,
+ &send_state_var, &sc);
+ }
+ }
+ }
+ switch (ret)
+ {
+ case GNUNET_OK:
+ case GNUNET_NO:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get state variable!\n"));
+ }
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
+static void
+handle_state_get_prefix (void *cls,
+ struct GNUNET_SERVER_Client *client,
+ const struct GNUNET_MessageHeader *msg)
+{
+ const struct OperationRequest *req =
+ (const struct OperationRequest *) msg;
+
+ struct SendClosure sc = { .op_id = req->op_id, .client = client };
+ int64_t ret = GNUNET_SYSERR;
+ const char *name = (const char *) &req[1];
+ uint16_t name_size = ntohs (req->header.size) - sizeof (*req);
+
+ if (name_size <= 1 || '\0' != name[name_size - 1])
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Tried to get invalid state variable name!\n"));
+ GNUNET_break (0);
+ }
+ else
+ {
+ ret = db->state_get_prefix (db->cls, &req->channel_key, name,
+ &send_state_var, &sc);
+ }
+ switch (ret)
+ {
+ case GNUNET_OK:
+ case GNUNET_NO:
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ _("Failed to get state variable!\n"));
+ }
+
+ send_result_code (client, ret, req->op_id, NULL);
+ GNUNET_SERVER_receive_done (client, GNUNET_OK);
+}
+
+
/**
* Handle PSYCstore clients.
*
@@ -180,13 +676,58 @@
const struct GNUNET_CONFIGURATION_Handle *c)
{
static const struct GNUNET_SERVER_MessageHandler handlers[] = {
- {&handle_membership_store, NULL,
- GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE,
- sizeof (struct MembershipStoreMessage)},
- {&handle_membership_test, NULL,
- GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST,
- sizeof (struct MembershipTestMessage)},
- {NULL, NULL, 0, 0}
+ { &handle_membership_store, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE,
+ sizeof (struct MembershipStoreRequest) },
+
+ { &handle_membership_test, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST,
+ sizeof (struct MembershipTestRequest) },
+
+ { &handle_fragment_store, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE, 0, },
+
+ { &handle_fragment_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET,
+ sizeof (struct FragmentGetRequest) },
+
+ { &handle_message_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET,
+ sizeof (struct MessageGetRequest) },
+
+ { &handle_message_get_fragment, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT,
+ sizeof (struct MessageGetFragmentRequest) },
+
+ { &handle_counters_get_master, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER,
+ sizeof (struct OperationRequest) },
+
+ { &handle_counters_get_slave, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE,
+ sizeof (struct OperationRequest) },
+
+ { &handle_state_modify, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY, 0 },
+
+ { &handle_state_sync, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC, 0 },
+
+ { &handle_state_reset, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET,
+ sizeof (struct OperationRequest) },
+
+ { &handle_state_hash_update, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE,
+ sizeof (struct StateHashUpdateRequest) },
+
+ { &handle_state_get, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET, 0 },
+
+ { &handle_state_get_prefix, NULL,
+ GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX, 0 },
+
+ { NULL, NULL, 0, 0 }
};
cfg = c;
Modified: gnunet/src/psycstore/plugin_psycstore_sqlite.c
===================================================================
--- gnunet/src/psycstore/plugin_psycstore_sqlite.c 2013-09-15 22:48:52 UTC
(rev 29286)
+++ gnunet/src/psycstore/plugin_psycstore_sqlite.c 2013-09-16 04:59:05 UTC
(rev 29287)
@@ -1,22 +1,22 @@
- /*
- * This file is part of GNUnet
- * (C) 2009-2013 Christian Grothoff (and other contributing authors)
- *
- * GNUnet is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published
- * by the Free Software Foundation; either version 3, or (at your
- * option) any later version.
- *
- * GNUnet is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of
-t * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNUnet; see the file COPYING. If not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
- */
+/*
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
/**
* @file psycstore/plugin_psycstore_sqlite.c
@@ -61,6 +61,10 @@
#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__)
+enum Transactions {
+ TRANSACTION_NONE = 0,
+ TRANSACTION_STATE_MODIFY
+};
/**
* Context for all functions in this plugin.
@@ -81,6 +85,17 @@
sqlite3 *dbh;
/**
+ * Current transaction.
+ */
+ enum Transactions transaction;
+
+ sqlite3_stmt *transaction_begin;
+
+ sqlite3_stmt *transaction_commit;
+
+ sqlite3_stmt *transaction_rollback;
+
+ /**
* Precompiled SQL for channel_key_store()
*/
sqlite3_stmt *insert_channel_key;
@@ -135,15 +150,35 @@
/**
* Precompiled SQL for counters_get_slave()
*/
- sqlite3_stmt *select_counters_slave;
+ sqlite3_stmt *select_max_state_message_id;
+ /**
+ * Precompiled SQL for state_modify_end()
+ */
+ sqlite3_stmt *update_state_hash_message_id;
/**
- * Precompiled SQL for state_set()
+ * Precompiled SQL for state_sync_end()
*/
+ sqlite3_stmt *update_max_state_message_id;
+
+
+ /**
+ * Precompiled SQL for message_modify_begin()
+ */
+ sqlite3_stmt *select_message_state_delta;
+
+ /**
+ * Precompiled SQL for state_modify_set()
+ */
sqlite3_stmt *insert_state_current;
/**
+ * Precompiled SQL for state_modify_end()
+ */
+ sqlite3_stmt *delete_state_empty;
+
+ /**
* Precompiled SQL for state_set_signed()
*/
sqlite3_stmt *update_state_signed;
@@ -179,7 +214,7 @@
sqlite3_stmt *select_state_one;
/**
- * Precompiled SQL for state_get_all()
+ * Precompiled SQL for state_get_prefix()
*/
sqlite3_stmt *select_state_prefix;
@@ -211,7 +246,7 @@
result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt,
(const char **) &tail);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"Prepared `%s' / %p: %d\n", sql, *stmt, result);
if (result != SQLITE_OK)
LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -234,7 +269,7 @@
int result;
result = sqlite3_exec (dbh, sql, NULL, NULL, NULL);
- LOG (GNUNET_ERROR_TYPE_DEBUG,
+ LOG (GNUNET_ERROR_TYPE_DEBUG,
"Executed `%s' / %d\n", sql, result);
if (result != SQLITE_OK)
LOG (GNUNET_ERROR_TYPE_ERROR,
@@ -295,7 +330,9 @@
sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF");
sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL");
sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\"");
+#if ! DEBUG_PSYCSTORE
sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE");
+#endif
sql_exec (plugin->dbh, "PRAGMA count_changes=OFF");
sql_exec (plugin->dbh, "PRAGMA page_size=4096");
@@ -306,7 +343,9 @@
sql_exec (plugin->dbh,
"CREATE TABLE IF NOT EXISTS channels (\n"
" id INTEGER PRIMARY KEY,\n"
- " pub_key BLOB UNIQUE\n"
+ " pub_key BLOB UNIQUE,\n"
+ " max_state_message_id INTEGER,\n"
+ " state_hash_message_id INTEGER\n"
");");
sql_exec (plugin->dbh,
@@ -364,6 +403,12 @@
/* Prepare statements */
+ sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin);
+
+ sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit);
+
+ sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback);
+
sql_prepare (plugin->dbh,
"INSERT OR IGNORE INTO channels (pub_key) VALUES (?);",
&plugin->insert_channel_key);
@@ -420,8 +465,8 @@
" multicast_flags, psycstore_flags, data\n"
"FROM messages\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?)\n"
- " AND message_id = ?;",
- &plugin->select_message);
+ " AND message_id = ? AND fragment_offset = ?;",
+ &plugin->select_message_fragment);
sql_prepare (plugin->dbh,
"SELECT hop_counter, signature, purpose, fragment_id,\n"
@@ -429,8 +474,8 @@
" multicast_flags, psycstore_flags, data\n"
"FROM messages\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?)\n"
- " AND message_id = ? AND fragment_offset = ?;",
- &plugin->select_message_fragment);
+ " AND message_id = ?;",
+ &plugin->select_message);
sql_prepare (plugin->dbh,
"SELECT fragment_id, message_id, group_generation\n"
@@ -440,14 +485,35 @@
&plugin->select_counters_master);
sql_prepare (plugin->dbh,
- "SELECT message_id\n"
- "FROM messages\n"
- "WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?)\n"
- " AND psycstore_flags & ?\n"
- "ORDER BY message_id DESC LIMIT 1",
- &plugin->select_counters_slave);
+ "SELECT max_state_message_id\n"
+ "FROM channels\n"
+ "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;",
+ &plugin->select_max_state_message_id);
sql_prepare (plugin->dbh,
+ "UPDATE channels\n"
+ "SET max_state_message_id = ?\n"
+ "WHERE pub_key = ?;",
+ &plugin->update_max_state_message_id);
+
+ sql_prepare (plugin->dbh,
+ "UPDATE channels\n"
+ "SET state_hash_message_id = ?\n"
+ "WHERE pub_key = ?;",
+ &plugin->update_state_hash_message_id);
+
+ sql_prepare (plugin->dbh,
+ "SELECT 1\n"
+ "FROM channels AS c\n"
+ "LEFT JOIN messages AS m\n"
+ "ON c.id = m.channel_id\n"
+ "WHERE c.pub_key = ?\n"
+ " AND ((? < c.state_hash_message_id AND
c.state_hash_message_id < ?)\n"
+ " OR (m.message_id = ? AND m.psycstore_flags & ?))\n"
+ "LIMIT 1;",
+ &plugin->select_message_state_delta);
+
+ sql_prepare (plugin->dbh,
"INSERT OR REPLACE INTO state\n"
" (channel_id, name, value_current, value_signed)\n"
"SELECT new.channel_id, new.name,\n"
@@ -461,22 +527,29 @@
&plugin->insert_state_current);
sql_prepare (plugin->dbh,
+ "DELETE FROM state\n"
+ "WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?)\n"
+ " AND (value_current IS NULL OR length(value_current) =
0)\n"
+ " AND (value_signed IS NULL OR length(value_signed) = 0);",
+ &plugin->delete_state_empty);
+
+ sql_prepare (plugin->dbh,
"UPDATE state\n"
"SET value_signed = value_current\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?);",
&plugin->update_state_signed);
sql_prepare (plugin->dbh,
+ "DELETE FROM state\n"
+ "WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?);",
+ &plugin->delete_state);
+
+ sql_prepare (plugin->dbh,
"INSERT INTO state_sync (channel_id, name, value)\n"
"VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);",
&plugin->insert_state_sync);
sql_prepare (plugin->dbh,
- "DELETE FROM state\n"
- "WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?);",
- &plugin->delete_state);
-
- sql_prepare (plugin->dbh,
"INSERT INTO state\n"
" (channel_id, name, value_current, value_signed)\n"
"SELECT channel_id, name, value, value\n"
@@ -500,7 +573,7 @@
"SELECT name, value_current\n"
"FROM state\n"
"WHERE channel_id = (SELECT id FROM channels WHERE pub_key =
?)\n"
- " AND name = ? OR name LIKE ?;",
+ " AND (name = ? OR name LIKE ?);",
&plugin->select_state_prefix);
sql_prepare (plugin->dbh,
@@ -524,86 +597,125 @@
{
int result;
sqlite3_stmt *stmt;
+ while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL)))
+ {
+ result = sqlite3_finalize (stmt);
+ if (SQLITE_OK != result)
+ LOG (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to close statement %p: %d\n", stmt, result);
+ }
+ if (SQLITE_OK != sqlite3_close (plugin->dbh))
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
- if (NULL != plugin->insert_channel_key)
- sqlite3_finalize (plugin->insert_channel_key);
+ GNUNET_free_non_null (plugin->fn);
+}
- if (NULL != plugin->insert_slave_key)
- sqlite3_finalize (plugin->insert_slave_key);
+/**
+ * Execute a prepared statement with a @a channel_key argument.
+ *
+ * @param plugin Plugin handle.
+ * @param stmt Statement to execute.
+ * @param channel_key Public key of the channel.
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+static int
+exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
+{
+ if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
+ sizeof (*channel_key), SQLITE_STATIC))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ }
+ else if (SQLITE_DONE != sqlite3_step (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ }
- if (NULL != plugin->insert_membership)
- sqlite3_finalize (plugin->insert_membership);
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ return GNUNET_SYSERR;
+ }
- if (NULL != plugin->select_membership)
- sqlite3_finalize (plugin->select_membership);
+ return GNUNET_OK;
+}
- if (NULL != plugin->insert_fragment)
- sqlite3_finalize (plugin->insert_fragment);
+/**
+ * Begin a transaction.
+ */
+static int
+transaction_begin (struct Plugin *plugin, enum Transactions transaction)
+{
+ sqlite3_stmt *stmt = plugin->transaction_begin;
- if (NULL != plugin->update_message_flags)
- sqlite3_finalize (plugin->update_message_flags);
+ if (SQLITE_DONE != sqlite3_step (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ }
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ return GNUNET_SYSERR;
+ }
- if (NULL != plugin->select_fragment)
- sqlite3_finalize (plugin->select_fragment);
+ plugin->transaction = transaction;
+ return GNUNET_OK;
+}
- if (NULL != plugin->select_message)
- sqlite3_finalize (plugin->select_message);
- if (NULL != plugin->select_message_fragment)
- sqlite3_finalize (plugin->select_message_fragment);
+/**
+ * Commit current transaction.
+ */
+static int
+transaction_commit (struct Plugin *plugin)
+{
+ sqlite3_stmt *stmt = plugin->transaction_commit;
- if (NULL != plugin->select_counters_master)
- sqlite3_finalize (plugin->select_counters_master);
+ if (SQLITE_DONE != sqlite3_step (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ }
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ return GNUNET_SYSERR;
+ }
- if (NULL != plugin->select_counters_slave)
- sqlite3_finalize (plugin->select_counters_slave);
+ plugin->transaction = TRANSACTION_NONE;
+ return GNUNET_OK;
+}
- if (NULL != plugin->insert_state_current)
- sqlite3_finalize (plugin->insert_state_current);
- if (NULL != plugin->update_state_signed)
- sqlite3_finalize (plugin->update_state_signed);
+/**
+ * Roll back current transaction.
+ */
+static int
+transaction_rollback (struct Plugin *plugin)
+{
+ sqlite3_stmt *stmt = plugin->transaction_rollback;
- if (NULL != plugin->insert_state_sync)
- sqlite3_finalize (plugin->insert_state_sync);
-
- if (NULL != plugin->delete_state)
- sqlite3_finalize (plugin->delete_state);
-
- if (NULL != plugin->insert_state_from_sync)
- sqlite3_finalize (plugin->insert_state_from_sync);
-
- if (NULL != plugin->delete_state_sync)
- sqlite3_finalize (plugin->delete_state_sync);
-
- if (NULL != plugin->select_state_one)
- sqlite3_finalize (plugin->select_state_one);
-
- if (NULL != plugin->select_state_prefix)
- sqlite3_finalize (plugin->select_state_prefix);
-
- result = sqlite3_close (plugin->dbh);
- if (result == SQLITE_BUSY)
+ if (SQLITE_DONE != sqlite3_step (stmt))
{
- LOG (GNUNET_ERROR_TYPE_WARNING,
- _("Tried to close sqlite without finalizing all prepared
statements.\n"));
- stmt = sqlite3_next_stmt (plugin->dbh, NULL);
- while (stmt != NULL)
- {
- GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG, "sqlite",
- "Closing statement %p\n", stmt);
- result = sqlite3_finalize (stmt);
- if (result != SQLITE_OK)
- GNUNET_log_from (GNUNET_ERROR_TYPE_WARNING, "sqlite",
- "Failed to close statement %p: %d\n", stmt, result);
- stmt = sqlite3_next_stmt (plugin->dbh, NULL);
- }
- result = sqlite3_close (plugin->dbh);
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
}
- if (SQLITE_OK != result)
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close");
-
- GNUNET_free_non_null (plugin->fn);
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ return GNUNET_SYSERR;
+ }
+ plugin->transaction = TRANSACTION_NONE;
+ return GNUNET_OK;
}
@@ -617,18 +729,18 @@
sizeof (*channel_key), SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_channel_key (bind)");
+ "sqlite3_bind");
}
else if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_channel_key (step)");
+ "sqlite3_step");
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_channel_key (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
@@ -646,19 +758,19 @@
sizeof (*slave_key), SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_slave_key (bind)");
+ "sqlite3_bind");
}
else if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_slave_key (step)");
+ "sqlite3_step");
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_slave_key (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
@@ -683,6 +795,11 @@
uint64_t effective_since,
uint64_t group_generation)
{
+ struct Plugin *plugin = cls;
+ sqlite3_stmt *stmt = plugin->insert_membership;
+
+ GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
+
if (announced_at > INT64_MAX ||
effective_since > INT64_MAX ||
group_generation > INT64_MAX)
@@ -691,35 +808,32 @@
return GNUNET_SYSERR;
}
- struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->insert_membership;
-
- if (GNUNET_OK != channel_key_store (plugin, channel_key) ||
- GNUNET_OK != slave_key_store (plugin, slave_key))
+ if (GNUNET_OK != channel_key_store (plugin, channel_key)
+ || GNUNET_OK != slave_key_store (plugin, slave_key))
return GNUNET_SYSERR;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
- sizeof (*slave_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
+ sizeof (*slave_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_membership (bind)");
+ "sqlite3_bind");
}
else if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_membership (step)");
+ "sqlite3_step");
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_membership (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
@@ -745,13 +859,13 @@
int ret = GNUNET_SYSERR;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
- sizeof (*slave_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key,
+ sizeof (*slave_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_membership (bind)");
+ "sqlite3_bind");
}
else
{
@@ -768,7 +882,7 @@
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_membership (reset)");
+ "sqlite3_reset");
}
return ret;
@@ -787,51 +901,60 @@
const struct GNUNET_MULTICAST_MessageHeader *msg,
uint32_t psycstore_flags)
{
- if (msg->fragment_id > INT64_MAX ||
- msg->fragment_offset > INT64_MAX ||
- msg->message_id > INT64_MAX ||
- msg->group_generation > INT64_MAX)
+ struct Plugin *plugin = cls;
+ sqlite3_stmt *stmt = plugin->insert_fragment;
+
+ GNUNET_assert (TRANSACTION_NONE == plugin->transaction);
+
+ uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id);
+ uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset);
+ uint64_t message_id = GNUNET_ntohll (msg->message_id);
+ uint64_t group_generation = GNUNET_ntohll (msg->group_generation);
+
+ if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX ||
+ message_id > INT64_MAX || group_generation > INT64_MAX)
{
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Tried to store fragment with a field > INT64_MAX: "
+ "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset,
+ message_id, group_generation);
GNUNET_break (0);
return GNUNET_SYSERR;
}
- struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->insert_fragment;
-
if (GNUNET_OK != channel_key_store (plugin, channel_key))
return GNUNET_SYSERR;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 2, msg->hop_counter ) ||
- SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature,
- sizeof (msg->signature), SQLITE_STATIC)
||
- SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
- sizeof (msg->purpose), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 5, msg->fragment_id) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 6, msg->fragment_offset) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 7, msg->message_id) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 8, msg->group_generation) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 9, msg->flags) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags) ||
- SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
- ntohs (msg->header.size) - sizeof (*msg),
- SQLITE_STATIC))
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) )
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *)
&msg->signature,
+ sizeof (msg->signature),
SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose,
+ sizeof (msg->purpose), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags))
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1],
+ ntohs (msg->header.size)
+ - sizeof (*msg), SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_fragment (bind)");
+ "sqlite3_bind");
}
else if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_fragment (step)");
+ "sqlite3_step");
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_fragment (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
@@ -855,13 +978,13 @@
sqlite3_stmt *stmt = plugin->update_message_flags;
int ret = GNUNET_SYSERR;
- if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags) ||
- SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
- sizeof (*channel_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
+ if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "update_message_flags (bind)");
+ "sqlite3_bind");
}
else
{
@@ -872,14 +995,14 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "update_message_flags (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "update_message_flags (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
@@ -896,18 +1019,18 @@
msg->header.size = htons (sizeof (*msg) + data_size);
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
- msg->hop_counter = (uint32_t) sqlite3_column_int64 (stmt, 0);
+ msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0));
memcpy (&msg->signature,
sqlite3_column_blob (stmt, 1),
sqlite3_column_bytes (stmt, 1));
memcpy (&msg->purpose,
sqlite3_column_blob (stmt, 2),
sqlite3_column_bytes (stmt, 2));
- msg->fragment_id = sqlite3_column_int64 (stmt, 3);
- msg->fragment_offset = sqlite3_column_int64 (stmt, 4);
- msg->message_id = sqlite3_column_int64 (stmt, 5);
- msg->group_generation = sqlite3_column_int64 (stmt, 6);
- msg->flags = sqlite3_column_int64 (stmt, 7);
+ msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3));
+ msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4));
+ msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5));
+ msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6));
+ msg->flags = htonl (sqlite3_column_int64 (stmt, 7));
memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size);
return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8));
@@ -922,8 +1045,7 @@
*/
static int
fragment_get (void *cls,
- const struct
- GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
uint64_t fragment_id,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls)
@@ -934,11 +1056,11 @@
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
- SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id))
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_id))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_fragment (bind)");
+ "sqlite3_bind");
}
else
{
@@ -952,14 +1074,14 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_fragment (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_fragment (reset)");
+ "sqlite3_reset");
}
return ret;
@@ -976,20 +1098,22 @@
message_get (void *cls,
const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
uint64_t message_id,
+ uint64_t *returned_fragments,
GNUNET_PSYCSTORE_FragmentCallback cb,
void *cb_cls)
{
struct Plugin *plugin = cls;
sqlite3_stmt *stmt = plugin->select_message;
int ret = GNUNET_SYSERR;
+ *returned_fragments = 0;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
- SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id))
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_message (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1005,12 +1129,13 @@
break;
case SQLITE_ROW:
ret = fragment_row (stmt, cb, cb_cls);
+ (*returned_fragments)++;
if (ret != GNUNET_YES)
sql_ret = SQLITE_DONE;
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_message (step)");
+ "sqlite3_step");
}
}
while (sql_ret == SQLITE_ROW);
@@ -1019,7 +1144,7 @@
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_message (reset)");
+ "sqlite3_reset");
}
return ret;
@@ -1042,18 +1167,17 @@
void *cb_cls)
{
struct Plugin *plugin = cls;
+ sqlite3_stmt *stmt = plugin->select_message_fragment;
int ret = GNUNET_SYSERR;
- sqlite3_stmt *stmt = plugin->select_message_fragment;
-
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
- SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_message_fragment (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1067,14 +1191,14 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_message_fragment (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_message_fragment (reset)");
+ "sqlite3_reset");
}
return ret;
@@ -1103,7 +1227,7 @@
SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_counters_master (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1120,21 +1244,21 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_counters_master (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_counters_master (reset)");
+ "sqlite3_reset");
}
return ret;
}
/**
- * Retrieve latest values of counters for a channel slave.
+ * Retrieve latest values of counters for a channel slave.
*
* @see GNUNET_PSYCSTORE_counters_get_slave()
*
@@ -1146,17 +1270,15 @@
uint64_t *max_state_msg_id)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->select_counters_slave;
+ sqlite3_stmt *stmt = plugin->select_max_state_message_id;
int ret = GNUNET_SYSERR;
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
- SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
- GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
+ SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_counters_slave (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1171,62 +1293,58 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_counters_slave (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_counters_slave (reset)");
+ "sqlite3_reset");
}
return ret;
}
+
/**
* Set a state variable to the given value.
- *
- * @see GNUNET_PSYCSTORE_state_modify()
*
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
static int
-state_set (void *cls,
+state_set (struct Plugin *plugin, sqlite3_stmt *stmt,
const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
const char *name, const void *value, size_t value_size)
{
- struct Plugin *plugin = cls;
int ret = GNUNET_SYSERR;
- sqlite3_stmt *stmt = plugin->insert_state_current;
-
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
- SQLITE_STATIC))
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size,
+ SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_state_current (bind)");
+ "sqlite3_bind");
}
else
{
switch (sqlite3_step (stmt))
{
case SQLITE_DONE:
- ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO;
+ ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO;
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_state_current (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "insert_state_current (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
@@ -1234,43 +1352,209 @@
}
-/**
- * Reset the state of a channel.
- *
- * @see GNUNET_PSYCSTORE_state_reset()
- *
- * @return #GNUNET_OK on success, else #GNUNET_SYSERR
- */
static int
-state_reset (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key)
+update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id)
{
- struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->delete_state;
-
- if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC))
+ if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id)
+ || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key,
+ sizeof (*channel_key), SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "delete_state (bind)");
+ "sqlite3_bind");
}
else if (SQLITE_DONE != sqlite3_step (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "delete_state (step)");
+ "sqlite3_step");
}
-
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "delete_state (reset)");
+ "sqlite3_reset");
return GNUNET_SYSERR;
}
-
return GNUNET_OK;
}
+/**
+ * Begin modifying current state.
+ */
+static int
+state_modify_begin (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id, uint64_t state_delta)
+{
+ struct Plugin *plugin = cls;
+ sqlite3_stmt *stmt = plugin->select_message_state_delta;
+
+ if (state_delta > 0)
+ {
+ int ret = GNUNET_SYSERR;
+ if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 2,
+ message_id - state_delta)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 3,
+ message_id)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 4,
+ message_id - state_delta)
+ || SQLITE_OK != sqlite3_bind_int64 (stmt, 5,
+
GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED))
+ {
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_bind");
+ }
+ else
+ {
+ switch (sqlite3_step (stmt))
+ {
+ case SQLITE_DONE:
+ ret = GNUNET_NO;
+ break;
+ case SQLITE_ROW:
+ ret = GNUNET_OK;
+ break;
+ default:
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_step");
+ }
+ }
+ if (SQLITE_OK != sqlite3_reset (stmt))
+ {
+ ret = GNUNET_SYSERR;
+ LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
+ "sqlite3_reset");
+ }
+ if (GNUNET_OK != ret)
+ return ret;
+ }
+
+ if (TRANSACTION_NONE != plugin->transaction)
+ if (GNUNET_OK != transaction_rollback (plugin))
+ return GNUNET_SYSERR;
+
+ return transaction_begin (plugin, TRANSACTION_STATE_MODIFY);
+}
+
+
/**
+ * Set the current value of state variable.
+ *
+ * @see GNUNET_PSYCSTORE_state_modify()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+static int
+state_modify_set (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ const char *name, const void *value, size_t value_size)
+{
+ struct Plugin *plugin = cls;
+ GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
+
+ return state_set (plugin, plugin->insert_state_current, channel_key,
+ name, value, value_size);
+
+}
+
+
+/**
+ * End modifying current state.
+ */
+static int
+state_modify_end (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id)
+{
+ struct Plugin *plugin = cls;
+ GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction);
+
+ return
+ GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key)
+ && GNUNET_OK == update_message_id (plugin,
+ plugin->update_max_state_message_id,
+ channel_key, message_id)
+ && GNUNET_OK == transaction_commit (plugin)
+ ? GNUNET_OK : GNUNET_SYSERR;
+}
+
+
+/**
+ * Begin state synchronization.
+ */
+static int
+state_sync_begin (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
+{
+ struct Plugin *plugin = cls;
+ return exec_channel (plugin, plugin->delete_state_sync, channel_key);
+}
+
+
+/**
+ * Set the current value of state variable.
+ *
+ * @see GNUNET_PSYCSTORE_state_modify()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+static int
+state_sync_set (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ const char *name, const void *value, size_t value_size)
+{
+ struct Plugin *plugin = cls;
+ return state_set (cls, plugin->insert_state_sync, channel_key,
+ name, value, value_size);
+}
+
+
+/**
+ * End modifying current state.
+ */
+static int
+state_sync_end (void *cls,
+ const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id)
+{
+ struct Plugin *plugin = cls;
+ int ret = GNUNET_SYSERR;
+
+ GNUNET_OK == transaction_begin (plugin, TRANSACTION_NONE)
+ && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key)
+ && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync,
+ channel_key)
+ && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync,
+ channel_key)
+ && GNUNET_OK == update_message_id (plugin,
+ plugin->update_state_hash_message_id,
+ channel_key, message_id)
+ && GNUNET_OK == transaction_commit (plugin)
+ ? ret = GNUNET_OK
+ : transaction_rollback (plugin);
+ return ret;
+}
+
+
+/**
+ * Reset the state of a channel.
+ *
+ * @see GNUNET_PSYCSTORE_state_reset()
+ *
+ * @return #GNUNET_OK on success, else #GNUNET_SYSERR
+ */
+static int
+state_reset (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key)
+{
+ struct Plugin *plugin = cls;
+ return exec_channel (plugin, plugin->delete_state, channel_key);
+}
+
+
+/**
* Update signed values of state variables in the state store.
*
* @see GNUNET_PSYCSTORE_state_hash_update()
@@ -1282,28 +1566,7 @@
const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key)
{
struct Plugin *plugin = cls;
- sqlite3_stmt *stmt = plugin->update_state_signed;
-
- if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC))
- {
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "update_state_signed (bind)");
- }
- else if (SQLITE_DONE != sqlite3_step (stmt))
- {
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "update_state_signed (step)");
- }
-
- if (SQLITE_OK != sqlite3_reset (stmt))
- {
- LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "update_state_signed (reset)");
- return GNUNET_SYSERR;
- }
-
- return GNUNET_OK;
+ return exec_channel (plugin, plugin->update_state_signed, channel_key);
}
@@ -1325,11 +1588,11 @@
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
sizeof (*channel_key),
- SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
+ SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_one (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1344,16 +1607,15 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_one (step)");
+ "sqlite3_step");
}
}
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_one (reset)");
+ "sqlite3_reset");
}
-
return ret;
}
@@ -1362,14 +1624,14 @@
/**
* Retrieve all state variables for a channel with the given prefix.
*
- * @see GNUNET_PSYCSTORE_state_get_all()
+ * @see GNUNET_PSYCSTORE_state_get_prefix()
*
* @return #GNUNET_OK on success, else #GNUNET_SYSERR
*/
static int
-state_get_all (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
- const char *name, GNUNET_PSYCSTORE_StateCallback cb,
- void *cb_cls)
+state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const char *name, GNUNET_PSYCSTORE_StateCallback cb,
+ void *cb_cls)
{
struct Plugin *plugin = cls;
int ret = GNUNET_SYSERR;
@@ -1381,13 +1643,13 @@
memcpy (name_prefix + name_len, "_%", 2);
if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key,
- sizeof (*channel_key), SQLITE_STATIC) ||
- SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC)
||
- SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
- SQLITE_STATIC))
+ sizeof (*channel_key), SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len,
SQLITE_STATIC)
+ || SQLITE_OK != sqlite3_bind_text (stmt, 3, name_prefix, name_len + 2,
+ SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_prefix (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1410,7 +1672,7 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_prefix (step)");
+ "sqlite3_step");
}
}
while (sql_ret == SQLITE_ROW);
@@ -1419,7 +1681,7 @@
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_prefix (reset)");
+ "sqlite3_reset");
}
return ret;
@@ -1447,7 +1709,7 @@
sizeof (*channel_key), SQLITE_STATIC))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_signed (bind)");
+ "sqlite3_bind");
}
else
{
@@ -1470,7 +1732,7 @@
break;
default:
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_signed (step)");
+ "sqlite3_step");
}
}
while (sql_ret == SQLITE_ROW);
@@ -1479,7 +1741,7 @@
if (SQLITE_OK != sqlite3_reset (stmt))
{
LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK,
- "select_state_signed (reset)");
+ "sqlite3_reset");
}
return ret;
@@ -1502,7 +1764,7 @@
if (NULL != plugin.cfg)
return NULL; /* can only initialize once! */
memset (&plugin, 0, sizeof (struct Plugin));
- plugin.cfg = cfg;
+ plugin.cfg = cfg;
if (GNUNET_OK != database_setup (&plugin))
{
database_shutdown (&plugin);
@@ -1519,11 +1781,16 @@
api->message_get_fragment = &message_get_fragment;
api->counters_get_master = &counters_get_master;
api->counters_get_slave = &counters_get_slave;
- api->state_set = &state_set;
+ api->state_modify_begin = &state_modify_begin;
+ api->state_modify_set = &state_modify_set;
+ api->state_modify_end = &state_modify_end;
+ api->state_sync_begin = &state_sync_begin;
+ api->state_sync_set = &state_sync_set;
+ api->state_sync_end = &state_sync_end;
api->state_reset = &state_reset;
api->state_update_signed = &state_update_signed;
api->state_get = &state_get;
- api->state_get_all = &state_get_all;
+ api->state_get_prefix = &state_get_prefix;
api->state_get_signed = &state_get_signed;
LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n"));
Modified: gnunet/src/psycstore/psycstore.h
===================================================================
--- gnunet/src/psycstore/psycstore.h 2013-09-15 22:48:52 UTC (rev 29286)
+++ gnunet/src/psycstore/psycstore.h 2013-09-16 04:59:05 UTC (rev 29287)
@@ -1,23 +1,23 @@
/*
- This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public Liceidentity as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public Liceidentity for more details.
-
- You should have received a copy of the GNU General Public Liceidentity
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
/**
* @file psycstore/psycstore.h
* @brief Common type definitions for the PSYCstore service and API.
@@ -35,7 +35,7 @@
/**
* Answer from service to client about last operation.
*/
-struct ResultCodeMessage
+struct OperationResult
{
/**
* Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
@@ -43,41 +43,361 @@
struct GNUNET_MessageHeader header;
/**
- * Status code for the last operation, in NBO.
- * (currently not used).
+ * Status code for the operation.
*/
- uint32_t result_code GNUNET_PACKED;
+ int64_t result_code GNUNET_PACKED;
+ uint32_t op_id GNUNET_PACKED;
+
/* followed by 0-terminated error message (on error) */
};
/**
+ * Answer from service to client about master counters.
+ *
+ * @see GNUNET_PSYCSTORE_counters_get_master()
+ */
+struct MasterCountersResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint64_t fragment_id GNUNET_PACKED;
+
+ uint64_t message_id GNUNET_PACKED;
+
+ uint64_t group_generation GNUNET_PACKED;
+
+ /**
+ * Status code for the operation.
+ */
+ int64_t result_code GNUNET_PACKED;
+
+ uint32_t op_id GNUNET_PACKED;
+
+};
+
+
+/**
+ * Answer from service to client about slave counters.
+ *
+ * @see GNUNET_PSYCSTORE_counters_get_slave()
+ */
+struct SlaveCountersResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint64_t max_known_msg_id GNUNET_PACKED;
+
+ /**
+ * Status code for the operation.
+ */
+ int64_t result_code GNUNET_PACKED;
+
+ uint32_t op_id GNUNET_PACKED;
+
+};
+
+
+/**
+ * Answer from service to client containing a message fragment.
+ */
+struct FragmentResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t op_id GNUNET_PACKED;
+
+ uint32_t psycstore_flags GNUNET_PACKED;
+
+ /* followed by GNUNET_MULTICAST_MessageHeader */
+
+};
+
+
+/**
+ * Answer from service to client containing a state variable.
+ */
+struct StateResult
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE
+ */
+ struct GNUNET_MessageHeader header;
+
+ uint32_t op_id GNUNET_PACKED;
+
+ uint16_t name_size GNUNET_PACKED;
+
+ /* followed by name and value */
+};
+
+
+/**
+ * Generic operation request.
+ */
+struct OperationRequest
+{
+ struct GNUNET_MessageHeader header;
+
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ uint32_t op_id GNUNET_PACKED;
+};
+
+
+/**
* @see GNUNET_PSYCSTORE_membership_store()
*/
-struct MembershipStoreMessage
+struct MembershipStoreRequest
{
- const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key;
- const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key;
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ /**
+ * Slave's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey slave_key;
+
int did_join;
uint64_t announced_at;
uint64_t effective_since;
uint64_t group_generation;
+
+ uint32_t op_id GNUNET_PACKED;
};
/**
* @see GNUNET_PSYCSTORE_membership_test()
*/
-struct MembershipTestMessage
+struct MembershipTestRequest
{
- const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key;
- const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key;
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ /**
+ * Slave's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey slave_key;
+
+ uint64_t message_id GNUNET_PACKED;
+
+ uint64_t group_generation GNUNET_PACKED;
+
+ uint32_t op_id GNUNET_PACKED;
+};
+
+
+/**
+ * @see GNUNET_PSYCSTORE_fragment_store()
+ */
+struct FragmentStoreRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ uint32_t psycstore_flags GNUNET_PACKED;
+
+ uint32_t op_id GNUNET_PACKED;
+};
+
+
+/**
+ * @see GNUNET_PSYCSTORE_fragment_get()
+ */
+struct FragmentGetRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ uint64_t fragment_id;
+
+ uint32_t op_id GNUNET_PACKED;
+};
+
+
+/**
+ * @see GNUNET_PSYCSTORE_message_get()
+ */
+struct MessageGetRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
uint64_t message_id;
- uint64_t group_generation;
+
+ uint32_t op_id GNUNET_PACKED;
};
+
+/**
+ * @see GNUNET_PSYCSTORE_message_get_fragment()
+ */
+struct MessageGetFragmentRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_FRAGMENT_GET
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ uint64_t message_id;
+
+ uint64_t fragment_offset;
+
+ uint32_t op_id GNUNET_PACKED;
+};
+
+
+/**
+ * @see GNUNET_PSYCSTORE_state_hash_update()
+ */
+struct StateHashUpdateRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ struct GNUNET_HashCode hash;
+
+ uint32_t op_id GNUNET_PACKED;
+};
+
+enum StateOpFlags
+{
+ STATE_OP_FIRST = 1 << 0,
+ STATE_OP_LAST = 1 << 1
+};
+
+/**
+ * @see GNUNET_PSYCSTORE_state_modify()
+ */
+struct StateModifyRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ uint64_t message_id GNUNET_PACKED;
+
+ uint64_t state_delta GNUNET_PACKED;
+
+ uint32_t op_id GNUNET_PACKED;
+
+ /**
+ * Size of name, including NUL terminator.
+ */
+ uint16_t name_size GNUNET_PACKED;
+
+ /**
+ * OR'd StateOpFlags
+ */
+ uint8_t flags;
+
+ /**
+ * enum GNUNET_ENV_Operator
+ */
+ uint8_t oper;
+
+ /* Followed by NUL-terminated name, then the value. */
+};
+
+
+/**
+ * @see GNUNET_PSYCSTORE_state_sync()
+ */
+struct StateSyncRequest
+{
+ /**
+ * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC
+ */
+ struct GNUNET_MessageHeader header;
+
+ /**
+ * Channel's public key.
+ */
+ struct GNUNET_CRYPTO_EccPublicSignKey channel_key;
+
+ uint64_t message_id GNUNET_PACKED;
+
+ uint32_t op_id GNUNET_PACKED;
+
+ /**
+ * Size of name, including NUL terminator.
+ */
+ uint16_t name_size GNUNET_PACKED;
+
+ /**
+ * OR'd StateOpFlags
+ */
+ uint8_t flags;
+
+ /* Followed by NUL-terminated name, then the value. */
+};
+
+
GNUNET_NETWORK_STRUCT_END
#endif
Modified: gnunet/src/psycstore/psycstore_api.c
===================================================================
--- gnunet/src/psycstore/psycstore_api.c 2013-09-15 22:48:52 UTC (rev
29286)
+++ gnunet/src/psycstore/psycstore_api.c 2013-09-16 04:59:05 UTC (rev
29287)
@@ -1,23 +1,23 @@
/*
- This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public Liceidentity as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public Liceidentity for more details.
-
- You should have received a copy of the GNU General Public Liceidentity
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
/**
* @file psycstore/psycstore_api.c
* @brief API to interact with the PSYCstore service
@@ -30,10 +30,12 @@
#include "gnunet_constants.h"
#include "gnunet_protocols.h"
#include "gnunet_psycstore_service.h"
+#include "gnunet_multicast_service.h"
#include "psycstore.h"
#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__)
+typedef void (*DataCallback) ();
/**
* Handle for an operation with the PSYCstore service.
@@ -45,7 +47,7 @@
* Main PSYCstore handle.
*/
struct GNUNET_PSYCSTORE_Handle *h;
-
+
/**
* We keep operations in a DLL.
*/
@@ -57,31 +59,30 @@
struct GNUNET_PSYCSTORE_OperationHandle *prev;
/**
- * Message to send to the PSYCstore service.
- * Allocated at the end of this struct.
+ * Continuation to invoke with the result of an operation.
*/
- const struct GNUNET_MessageHeader *msg;
+ GNUNET_PSYCSTORE_ResultCallback res_cb;
/**
- * Continuation to invoke with the result of an operation.
+ * Continuation to invoke with the result of an operation returning data.
*/
- GNUNET_PSYCSTORE_ResultCallback res_cb;
+ DataCallback data_cb;
/**
- * Continuation to invoke with the result of an operation returning a
fragment.
+ * Closure for the callbacks.
*/
- GNUNET_PSYCSTORE_FragmentCallback frag_cb;
+ void *cls;
/**
- * Continuation to invoke with the result of an operation returning a state
variable.
+ * Operation ID.
*/
- GNUNET_PSYCSTORE_StateCallback state_cb;
+ uint32_t op_id;
/**
- * Closure for the callbacks.
+ * Message to send to the PSYCstore service.
+ * Allocated at the end of this struct.
*/
- void *cls;
-
+ const struct GNUNET_MessageHeader *msg;
};
@@ -101,13 +102,23 @@
struct GNUNET_CLIENT_Connection *client;
/**
- * Head of active operations.
- */
+ * Head of operations to transmit.
+ */
+ struct GNUNET_PSYCSTORE_OperationHandle *transmit_head;
+
+ /**
+ * Tail of operations to transmit.
+ */
+ struct GNUNET_PSYCSTORE_OperationHandle *transmit_tail;
+
+ /**
+ * Head of active operations waiting for response.
+ */
struct GNUNET_PSYCSTORE_OperationHandle *op_head;
/**
- * Tail of active operations.
- */
+ * Tail of active operations waiting for response.
+ */
struct GNUNET_PSYCSTORE_OperationHandle *op_tail;
/**
@@ -130,10 +141,47 @@
*/
int in_receive;
+ /**
+ * The last operation id used for a PSYCstore operation.
+ */
+ uint32_t last_op_id_used;
+
};
/**
+ * Get a fresh operation ID to distinguish between PSYCstore requests.
+ *
+ * @param h Handle to the PSYCstore service.
+ * @return next operation id to use
+ */
+static uint32_t
+get_next_op_id (struct GNUNET_PSYCSTORE_Handle *h)
+{
+ return h->last_op_id_used++;
+}
+
+
+/**
+ * Find operation by ID.
+ *
+ * @return OperationHandle if found, or NULL otherwise.
+ */
+static struct GNUNET_PSYCSTORE_OperationHandle *
+find_op_by_id (struct GNUNET_PSYCSTORE_Handle *h, uint32_t op_id)
+{
+ struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
+ while (NULL != op)
+ {
+ if (op->op_id == op_id)
+ return op;
+ op = op->next;
+ }
+ return NULL;
+}
+
+
+/**
* Try again to connect to the PSYCstore service.
*
* @param cls handle to the PSYCstore service.
@@ -175,6 +223,15 @@
/**
+ * Schedule transmission of the next message from our queue.
+ *
+ * @param h PSYCstore handle
+ */
+static void
+transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
+
+
+/**
* Type of a function to call when we receive a message
* from the service.
*
@@ -182,12 +239,16 @@
* @param msg message received, NULL on timeout or fatal error
*/
static void
-message_handler (void *cls,
+message_handler (void *cls,
const struct GNUNET_MessageHeader *msg)
{
struct GNUNET_PSYCSTORE_Handle *h = cls;
struct GNUNET_PSYCSTORE_OperationHandle *op;
- const struct ResultCodeMessage *rcm;
+ const struct OperationResult *opres;
+ const struct MasterCountersResult *mcres;
+ const struct SlaveCountersResult *scres;
+ const struct FragmentResult *fres;
+ const struct StateResult *sres;
const char *str;
uint16_t size;
@@ -203,68 +264,240 @@
switch (ntohs (msg->type))
{
case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE:
- if (size < sizeof (struct ResultCodeMessage))
+ if (size < sizeof (struct OperationResult))
{
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message of type %d with length %lu bytes. "
+ "Expected >= %lu\n",
+ ntohs (msg->type), size, sizeof (struct OperationResult));
GNUNET_break (0);
reschedule_connect (h);
return;
}
- rcm = (const struct ResultCodeMessage *) msg;
- str = (const char *) &rcm[1];
- if ( (size > sizeof (struct ResultCodeMessage)) &&
- ('\0' != str[size - sizeof (struct ResultCodeMessage) - 1]) )
+
+ opres = (const struct OperationResult *) msg;
+ str = (const char *) &opres[1];
+ if ( (size > sizeof (struct OperationResult)) &&
+ ('\0' != str[size - sizeof (struct OperationResult) - 1]) )
{
GNUNET_break (0);
reschedule_connect (h);
return;
}
- if (size == sizeof (struct ResultCodeMessage))
+ if (size == sizeof (struct OperationResult))
str = NULL;
- op = h->op_head;
- GNUNET_CONTAINER_DLL_remove (h->op_head,
- h->op_tail,
- op);
- GNUNET_CLIENT_receive (h->client, &message_handler, h,
- GNUNET_TIME_UNIT_FOREVER_REL);
- if (NULL != op->res_cb)
- op->res_cb (op->cls, rcm->result_code , str);
- GNUNET_free (op);
+ op = find_op_by_id (h, ntohl (opres->op_id));
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received result of an unkown operation ID: %ld\n",
+ ntohl (opres->op_id));
+ }
+ else
+ {
+ GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
+ if (NULL != op->res_cb)
+ {
+ const struct StateModifyRequest *smreq;
+ const struct StateSyncRequest *ssreq;
+ switch (ntohs (op->msg->type))
+ {
+ case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY:
+ smreq = (const struct StateModifyRequest *) op->msg;
+ if (!(smreq->flags & STATE_OP_LAST
+ || GNUNET_OK != ntohl (opres->result_code)))
+ op->res_cb = NULL;
+ break;
+ case GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC:
+ ssreq = (const struct StateSyncRequest *) op->msg;
+ if (!(ssreq->flags & STATE_OP_LAST
+ || GNUNET_OK != ntohl (opres->result_code)))
+ op->res_cb = NULL;
+ break;
+ }
+ }
+ if (NULL != op->res_cb)
+ op->res_cb (op->cls, ntohl (opres->result_code), str);
+ GNUNET_free (op);
+ }
break;
+
+ case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_MASTER:
+ if (size != sizeof (struct MasterCountersResult))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message of type %d with length %lu bytes. "
+ "Expected %lu\n",
+ ntohs (msg->type), size, sizeof (struct MasterCountersResult));
+ GNUNET_break (0);
+ reschedule_connect (h);
+ return;
+ }
+
+ mcres = (const struct MasterCountersResult *) msg;
+
+ op = find_op_by_id (h, ntohl (mcres->op_id));
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received result of an unkown operation ID: %ld\n",
+ ntohl (mcres->op_id));
+ }
+ else
+ {
+ GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
+ if (NULL != op->data_cb)
+ ((GNUNET_PSYCSTORE_MasterCountersCallback)
+ op->data_cb) (op->cls,
+ GNUNET_ntohll (mcres->fragment_id),
+ GNUNET_ntohll (mcres->message_id),
+ GNUNET_ntohll (mcres->group_generation));
+ GNUNET_free (op);
+ }
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS_SLAVE:
+ if (size != sizeof (struct SlaveCountersResult))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message of type %d with length %lu bytes. "
+ "Expected %lu\n",
+ ntohs (msg->type), size, sizeof (struct SlaveCountersResult));
+ GNUNET_break (0);
+ reschedule_connect (h);
+ return;
+ }
+
+ scres = (const struct SlaveCountersResult *) msg;
+
+ op = find_op_by_id (h, ntohl (scres->op_id));
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received result of an unkown operation ID: %ld\n",
+ ntohl (scres->op_id));
+ }
+ else
+ {
+ GNUNET_CONTAINER_DLL_remove (h->op_head, h->op_tail, op);
+ if (NULL != op->data_cb)
+ ((GNUNET_PSYCSTORE_SlaveCountersCallback)
+ op->data_cb) (op->cls, GNUNET_ntohll (scres->max_known_msg_id));
+ GNUNET_free (op);
+ }
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT:
+ if (size < sizeof (struct FragmentResult))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message of type %d with length %lu bytes. "
+ "Expected >= %lu\n",
+ ntohs (msg->type), size, sizeof (struct FragmentResult));
+ GNUNET_break (0);
+ reschedule_connect (h);
+ return;
+ }
+
+ fres = (const struct FragmentResult *) msg;
+ struct GNUNET_MULTICAST_MessageHeader *mmsg =
+ (struct GNUNET_MULTICAST_MessageHeader *) &fres[1];
+ if (size != sizeof (struct FragmentResult) + ntohs (mmsg->header.size))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message of type %d with length %lu bytes. "
+ "Expected = %lu\n",
+ ntohs (msg->type), size,
+ sizeof (struct FragmentResult) + ntohs (mmsg->header.size));
+ GNUNET_break (0);
+ reschedule_connect (h);
+ return;
+ }
+
+ op = find_op_by_id (h, ntohl (fres->op_id));
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received result of an unkown operation ID: %ld\n",
+ ntohl (fres->op_id));
+ }
+ else
+ {
+ if (NULL != op->data_cb)
+ ((GNUNET_PSYCSTORE_FragmentCallback)
+ op->data_cb) (op->cls, mmsg, ntohl (fres->psycstore_flags));
+ }
+ break;
+
+ case GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE:
+ if (size < sizeof (struct StateResult))
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received message of type %d with length %lu bytes. "
+ "Expected >= %lu\n",
+ ntohs (msg->type), size, sizeof (struct StateResult));
+ GNUNET_break (0);
+ reschedule_connect (h);
+ return;
+ }
+
+ sres = (const struct StateResult *) msg;
+ const char *name = (const char *) &sres[1];
+ uint16_t name_size = ntohs (sres->name_size);
+
+ if (name_size <= 2 || '\0' != name[name_size - 1])
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received state result message (type %d) with invalid name.\n",
+ ntohs (msg->type), name_size, name);
+ GNUNET_break (0);
+ reschedule_connect (h);
+ return;
+ }
+
+ op = find_op_by_id (h, ntohl (sres->op_id));
+ if (NULL == op)
+ {
+ LOG (GNUNET_ERROR_TYPE_ERROR,
+ "Received result of an unkown operation ID: %ld\n",
+ ntohl (sres->op_id));
+ }
+ else
+ {
+ if (NULL != op->data_cb)
+ ((GNUNET_PSYCSTORE_StateCallback)
+ op->data_cb) (op->cls, name, (void *) &sres[1] + name_size,
+ ntohs (sres->header.size) - sizeof (*sres) - name_size);
+ }
+ break;
+
default:
GNUNET_break (0);
reschedule_connect (h);
return;
}
+
+ GNUNET_CLIENT_receive (h->client, &message_handler, h,
+ GNUNET_TIME_UNIT_FOREVER_REL);
}
/**
- * Schedule transmission of the next message from our queue.
- *
- * @param h PSYCstore handle
- */
-static void
-transmit_next (struct GNUNET_PSYCSTORE_Handle *h);
-
-
-/**
* Transmit next message to service.
*
- * @param cls the 'struct GNUNET_PSYCSTORE_Handle'.
- * @param size number of bytes available in buf
- * @param buf where to copy the message
- * @return number of bytes copied to buf
+ * @param cls The 'struct GNUNET_PSYCSTORE_Handle'.
+ * @param size Number of bytes available in buf.
+ * @param buf Where to copy the message.
+ * @return Number of bytes copied to buf.
*/
static size_t
-send_next_message (void *cls,
- size_t size,
- void *buf)
+send_next_message (void *cls, size_t size, void *buf)
{
struct GNUNET_PSYCSTORE_Handle *h = cls;
- struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
+ struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
size_t ret;
-
+
h->th = NULL;
if (NULL == op)
return 0;
@@ -273,26 +506,30 @@
{
reschedule_connect (h);
return 0;
- }
+ }
LOG (GNUNET_ERROR_TYPE_DEBUG,
"Sending message of type %d to PSYCstore service\n",
ntohs (op->msg->type));
memcpy (buf, op->msg, ret);
- if ( (NULL == op->res_cb) &&
- (NULL == op->frag_cb) &&
- (NULL == op->state_cb))
+
+ GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
+
+ if (NULL == op->res_cb && NULL == op->data_cb)
{
- GNUNET_CONTAINER_DLL_remove (h->op_head,
- h->op_tail,
- op);
GNUNET_free (op);
+ }
+ else
+ {
+ GNUNET_CONTAINER_DLL_insert_tail (h->op_head, h->op_tail, op);
+ }
+
+ if (NULL != h->transmit_head)
transmit_next (h);
- }
+
if (GNUNET_NO == h->in_receive)
{
h->in_receive = GNUNET_YES;
- GNUNET_CLIENT_receive (h->client,
- &message_handler, h,
+ GNUNET_CLIENT_receive (h->client, &message_handler, h,
GNUNET_TIME_UNIT_FOREVER_REL);
}
return ret;
@@ -302,18 +539,18 @@
/**
* Schedule transmission of the next message from our queue.
*
- * @param h PSYCstore handle
+ * @param h PSYCstore handle.
*/
static void
transmit_next (struct GNUNET_PSYCSTORE_Handle *h)
{
- struct GNUNET_PSYCSTORE_OperationHandle *op = h->op_head;
+ if (NULL != h->th || NULL == h->client)
+ return;
- GNUNET_assert (NULL == h->th);
+ struct GNUNET_PSYCSTORE_OperationHandle *op = h->transmit_head;
if (NULL == op)
return;
- if (NULL == h->client)
- return;
+
h->th = GNUNET_CLIENT_notify_transmit_ready (h->client,
ntohs (op->msg->size),
GNUNET_TIME_UNIT_FOREVER_REL,
@@ -326,8 +563,8 @@
/**
* Try again to connect to the PSYCstore service.
*
- * @param cls the handle to the PSYCstore service
- * @param tc scheduler context
+ * @param cls Handle to the PSYCstore service.
+ * @param tc Scheduler context.
*/
static void
reconnect (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
@@ -347,8 +584,8 @@
/**
* Connect to the PSYCstore service.
*
- * @param cfg the configuration to use
- * @return handle to use
+ * @param cfg The configuration to use
+ * @return Handle to use
*/
struct GNUNET_PSYCSTORE_Handle *
GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg)
@@ -366,7 +603,7 @@
/**
* Disconnect from PSYCstore service
*
- * @param h handle to destroy
+ * @param h Handle to destroy
*/
void
GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h)
@@ -405,13 +642,10 @@
{
struct GNUNET_PSYCSTORE_Handle *h = op->h;
- if ( (h->op_head != op) ||
- (NULL == h->client) )
+ if (h->transmit_head != NULL && (h->transmit_head != op || NULL ==
h->client))
{
/* request not active, can simply remove */
- GNUNET_CONTAINER_DLL_remove (h->op_head,
- h->op_tail,
- op);
+ GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
GNUNET_free (op);
return;
}
@@ -420,47 +654,720 @@
/* request active but not yet with service, can still abort */
GNUNET_CLIENT_notify_transmit_ready_cancel (h->th);
h->th = NULL;
- GNUNET_CONTAINER_DLL_remove (h->op_head,
- h->op_tail,
- op);
+ GNUNET_CONTAINER_DLL_remove (h->transmit_head, h->transmit_tail, op);
GNUNET_free (op);
transmit_next (h);
return;
}
/* request active with service, simply ensure continuations are not called */
op->res_cb = NULL;
- op->frag_cb = NULL;
- op->state_cb = NULL;
+ op->data_cb = NULL;
}
+/**
+ * Store join/leave events for a PSYC channel in order to be able to answer
+ * membership test queries later.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel where the event happened.
+ * @param slave_key Public key of joining/leaving slave.
+ * @param did_join #GNUNET_YES on join, #GNUNET_NO on part.
+ * @param announced_at ID of the message that announced the membership change.
+ * @param effective_since Message ID this membership change is in effect since.
+ * For joins it is <= announced_at, for parts it is always 0.
+ * @param group_generation In case of a part, the last group generation the
+ * slave has access to. It has relevance when a larger message have
+ * fragments with different group generations.
+ * @param rcb Callback to call with the result of the storage operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Operation handle that can be used to cancel the operation.
+ */
struct GNUNET_PSYCSTORE_OperationHandle *
-GNUNET_PSYCSTORE_membership_store (
- struct GNUNET_PSYCSTORE_Handle *h,
- const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
- const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
- int did_join,
- uint64_t announced_at,
- uint64_t effective_since,
- uint64_t group_generation,
- GNUNET_PSYCSTORE_ResultCallback rcb,
- void *rcb_cls)
+GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+ int did_join,
+ uint64_t announced_at,
+ uint64_t effective_since,
+ uint64_t group_generation,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
{
-
+ GNUNET_assert (NULL != h);
+ GNUNET_assert (NULL != channel_key);
+ GNUNET_assert (NULL != slave_key);
+ GNUNET_assert (did_join
+ ? effective_since <= announced_at
+ : effective_since == 0);
+
+ struct MembershipStoreRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+
+ req = (struct MembershipStoreRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->slave_key = *slave_key;
+ req->did_join = htonl (did_join);
+ req->announced_at = GNUNET_htonll (announced_at);
+ req->effective_since = GNUNET_htonll (effective_since);
+ req->group_generation = GNUNET_htonll (group_generation);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
}
+/**
+ * Test if a member was admitted to the channel at the given message ID.
+ *
+ * This is useful when relaying and replaying messages to check if a particular
+ * slave has access to the message fragment with a given group generation. It
+ * is also used when handling join requests to determine whether the slave is
+ * currently admitted to the channel.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param slave_key Public key of slave whose membership to check.
+ * @param message_id Message ID for which to do the membership test.
+ * @param group_generation Group generation of the fragment of the message to
+ * test. It has relevance if the message consists of multiple fragments
+ * with different group generations.
+ * @param rcb Callback to call with the test result.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Operation handle that can be used to cancel the operation.
+ */
struct GNUNET_PSYCSTORE_OperationHandle *
-GNUNET_PSYCSTORE_membership_test (
- struct GNUNET_PSYCSTORE_Handle *h,
- const struct GNUNET_CRYPTO_EccPublicSignKey *channel_key,
- const struct GNUNET_CRYPTO_EccPublicSignKey *slave_key,
- uint64_t message_id,
- uint64_t group_generation,
- GNUNET_PSYCSTORE_ResultCallback rcb,
- void *rcb_cls)
+GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*slave_key,
+ uint64_t message_id,
+ uint64_t group_generation,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
{
+ struct MembershipTestRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+ req = (struct MembershipTestRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->slave_key = *slave_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->group_generation = GNUNET_htonll (group_generation);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
}
+
+/**
+ * Store a message fragment sent to a channel.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel the message belongs to.
+ * @param message Message to store.
+ * @param psycstore_flags Flags indicating whether the PSYC message contains
+ * state modifiers.
+ * @param rcb Callback to call with the result of the operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const struct GNUNET_MULTICAST_MessageHeader
*message,
+ uint32_t psycstore_flags,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
+{
+ uint16_t size = ntohs (message->header.size);
+ struct FragmentStoreRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req) + size);
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+
+ req = (struct FragmentStoreRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE);
+ req->header.size = htons (sizeof (*req) + size);
+ req->channel_key = *channel_key;
+ req->psycstore_flags = htonl (psycstore_flags);
+ memcpy (&req[1], message, size);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve a message fragment by fragment ID.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param fragment_id Fragment ID to check. Use 0 to get the latest message
fragment.
+ * @param fcb Callback to call with the retrieved fragments.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t fragment_id,
+ GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ struct FragmentGetRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = (DataCallback) fcb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct FragmentGetRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->fragment_id = GNUNET_htonll (fragment_id);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve all fragments of a message.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param message_id Message ID to check. Use 0 to get the latest message.
+ * @param fcb Callback to call with the retrieved fragments.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t message_id,
+ GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ struct MessageGetRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = (DataCallback) fcb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct MessageGetRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve a fragment of message specified by its message ID and fragment
+ * offset.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param message_id Message ID to check. Use 0 to get the latest message.
+ * @param fragment_offset Offset of the fragment to retrieve.
+ * @param fcb Callback to call with the retrieved fragments.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct
GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id,
+ uint64_t fragment_offset,
+ GNUNET_PSYCSTORE_FragmentCallback fcb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ struct MessageGetFragmentRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = (DataCallback) fcb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct MessageGetFragmentRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons
(GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->fragment_offset = GNUNET_htonll (fragment_offset);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve latest values of counters for a channel master.
+ *
+ * The current value of counters are needed when a channel master is restarted,
+ * so that it can continue incrementing the counters from their last value.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key Public key that identifies the channel.
+ * @param mccb Callback to call with the result.
+ * @param mccb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_counters_get_master (struct GNUNET_PSYCSTORE_Handle *h,
+ struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ GNUNET_PSYCSTORE_MasterCountersCallback
mccb,
+ void *mccb_cls)
+{
+ struct OperationRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = mccb;
+ op->cls = mccb_cls;
+
+ req = (struct OperationRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_MASTER);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+
+/**
+ * Retrieve latest values of counters for a channel slave.
+ *
+ * The current value of counters are needed when a channel slave rejoins
+ * and starts the state synchronization process.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key Public key that identifies the channel.
+ * @param sccb Callback to call with the result.
+ * @param sccb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_counters_get_slave (struct GNUNET_PSYCSTORE_Handle *h,
+ struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ GNUNET_PSYCSTORE_SlaveCountersCallback
sccb,
+ void *sccb_cls)
+{
+ struct OperationRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->data_cb = sccb;
+ op->cls = sccb_cls;
+
+ req = (struct OperationRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET_SLAVE);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Apply modifiers of a message to the current channel state.
+ *
+ * An error is returned if there are missing messages containing state
+ * operations before the current one.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param message_id ID of the message that contains the @a modifiers.
+ * @param state_delta Value of the _state_delta PSYC header variable of the
message.
+ * @param modifier_count Number of elements in the @a modifiers array.
+ * @param modifiers List of modifiers to apply.
+ * @param rcb Callback to call with the result of the operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t message_id,
+ uint64_t state_delta,
+ size_t modifier_count,
+ const struct GNUNET_ENV_Modifier *modifiers,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
+{
+ struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
+ size_t i;
+
+ for (i = 0; i < modifier_count; i++) {
+ struct StateModifyRequest *req;
+ uint16_t name_size = strlen (modifiers[i].name) + 1;
+
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
+ modifiers[i].value_size);
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+
+ req = (struct StateModifyRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY);
+ req->header.size = htons (sizeof (*req) + name_size
+ + modifiers[i].value_size);
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->state_delta = GNUNET_htonll (state_delta);
+ req->oper = modifiers[i].oper;
+ req->name_size = htons (name_size);
+ req->flags
+ = 0 == i
+ ? STATE_OP_FIRST
+ : modifier_count - 1 == i
+ ? STATE_OP_LAST
+ : 0;
+
+ memcpy (&req[1], modifiers[i].name, name_size);
+ memcpy ((void *) &req[1] + name_size, modifiers[i].value,
modifiers[i].value_size);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+ }
+ return op;
+ /* FIXME: only the last operation is returned,
+ * operation_cancel() should be able to cancel all of them.
+ */
+}
+
+
+/**
+ * Store synchronized state.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param message_id ID of the message that contains the state_hash PSYC
header variable.
+ * @param modifier_count Number of elements in the @a modifiers array.
+ * @param modifiers Full state to store.
+ * @param rcb Callback to call with the result of the operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ uint64_t message_id,
+ size_t modifier_count,
+ const struct GNUNET_ENV_Modifier *modifiers,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
+{
+ struct GNUNET_PSYCSTORE_OperationHandle *op = NULL;
+ size_t i;
+
+ for (i = 0; i < modifier_count; i++) {
+ struct StateSyncRequest *req;
+ uint16_t name_size = strlen (modifiers[i].name) + 1;
+
+ op = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size +
+ modifiers[i].value_size);
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+
+ req = (struct StateSyncRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC);
+ req->header.size = htons (sizeof (*req) + name_size
+ + modifiers[i].value_size);
+ req->channel_key = *channel_key;
+ req->message_id = GNUNET_htonll (message_id);
+ req->name_size = htons (name_size);
+ req->flags
+ = 0 == i
+ ? STATE_OP_FIRST
+ : modifier_count - 1 == i
+ ? STATE_OP_LAST
+ : 0;
+
+ memcpy (&req[1], modifiers[i].name, name_size);
+ memcpy ((void *) &req[1] + name_size, modifiers[i].value,
modifiers[i].value_size);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+ }
+ return op;
+}
+
+
+/**
+ * Reset the state of a channel.
+ *
+ * Delete all state variables stored for the given channel.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param rcb Callback to call with the result of the operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
+ *channel_key,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
+{
+ struct OperationRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+
+ req = (struct OperationRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+
+/**
+ * Update signed values of state variables in the state store.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param message_id Message ID that contained the state @a hash.
+ * @param hash Hash of the serialized full state.
+ * @param rcb Callback to call with the result of the operation.
+ * @param rcb_cls Closure for the callback.
+ *
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct
GNUNET_CRYPTO_EccPublicSignKey *channel_key,
+ uint64_t message_id,
+ const struct GNUNET_HashCode *hash,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *rcb_cls)
+{
+ struct StateHashUpdateRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req));
+ op->h = h;
+ op->res_cb = rcb;
+ op->cls = rcb_cls;
+
+ req = (struct StateHashUpdateRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET);
+ req->header.size = htons (sizeof (*req));
+ req->channel_key = *channel_key;
+ req->hash = *hash;
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+/**
+ * Retrieve the best matching state variable.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param name Name of variable to match, the returned variable might be less
specific.
+ * @param scb Callback to return the matching state variable.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const char *name,
+ GNUNET_PSYCSTORE_StateCallback scb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ size_t name_size = strlen (name) + 1;
+ struct OperationRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
+ op->h = h;
+ op->data_cb = (DataCallback) scb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct OperationRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET);
+ req->header.size = htons (sizeof (*req) + name_size);
+ req->channel_key = *channel_key;
+ memcpy (&req[1], name, name_size);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
+
+
+/**
+ * Retrieve all state variables for a channel with the given prefix.
+ *
+ * @param h Handle for the PSYCstore.
+ * @param channel_key The channel we are interested in.
+ * @param name_prefix Prefix of state variable names to match.
+ * @param scb Callback to return matching state variables.
+ * @param rcb Callback to call with the result of the operation.
+ * @param cls Closure for the callbacks.
+ *
+ * @return Handle that can be used to cancel the operation.
+ */
+struct GNUNET_PSYCSTORE_OperationHandle *
+GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h,
+ const struct GNUNET_CRYPTO_EccPublicSignKey
*channel_key,
+ const char *name_prefix,
+ GNUNET_PSYCSTORE_StateCallback scb,
+ GNUNET_PSYCSTORE_ResultCallback rcb,
+ void *cls)
+{
+ size_t name_size = strlen (name_prefix) + 1;
+ struct OperationRequest *req;
+ struct GNUNET_PSYCSTORE_OperationHandle *op
+ = GNUNET_malloc (sizeof (*op) + sizeof (*req) + name_size);
+ op->h = h;
+ op->data_cb = (DataCallback) scb;
+ op->res_cb = rcb;
+ op->cls = cls;
+
+ req = (struct OperationRequest *) &op[1];
+ op->msg = (struct GNUNET_MessageHeader *) req;
+ req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX);
+ req->header.size = htons (sizeof (*req) + name_size);
+ req->channel_key = *channel_key;
+ memcpy (&req[1], name_prefix, name_size);
+
+ op->op_id = get_next_op_id (h);
+ req->op_id = htonl (op->op_id);
+
+ GNUNET_CONTAINER_DLL_insert_tail (h->transmit_head, h->transmit_tail, op);
+ transmit_next (h);
+
+ return op;
+}
+
/* end of psycstore_api.c */
Modified: gnunet/src/psycstore/test_plugin_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_plugin_psycstore.c 2013-09-15 22:48:52 UTC
(rev 29286)
+++ gnunet/src/psycstore/test_plugin_psycstore.c 2013-09-16 04:59:05 UTC
(rev 29287)
@@ -1,25 +1,26 @@
/*
- This file is part of GNUnet.
- (C) 2012 Christian Grothoff (and other contributing authors)
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
/*
* @file psycstore/test_plugin_psycstore.c
- * @brief Test for the psycstore plugins
+ * @brief Test for the PSYCstore plugins.
* @author Gabor X Toth
* @author Christian Grothoff
*/
@@ -39,9 +40,17 @@
#define C2ARG(str) str, (sizeof (str) - 1)
-#define LOG(kind,...) GNUNET_log_from (kind, "test-plugin-psycstore",
__VA_ARGS__)
+#define LOG(kind,...)
\
+ GNUNET_log_from (kind, "test-plugin-psycstore", __VA_ARGS__)
-#define ASSERT(x) do { if (! (x)) { printf("Error at %s:%d\n", __FILE__,
__LINE__); goto FAILURE;} } while (0)
+#define ASSERT(x)
\
+ do {
\
+ if (! (x))
\
+ {
\
+ LOG (GNUNET_ERROR_TYPE_ERROR, "Error at %s:%d\n", __FILE__, __LINE__);
\
+ goto FAILURE;
\
+ }
\
+ } while (0)
static int ok;
@@ -118,13 +127,13 @@
&& 0 == memcmp (msg1, msg2, ntohs (msg1->header.size)))
{
LOG (GNUNET_ERROR_TYPE_DEBUG, "Fragment %llu matches\n",
- msg1->fragment_id);
+ GNUNET_ntohll (msg1->fragment_id));
ret = GNUNET_YES;
}
else
{
LOG (GNUNET_ERROR_TYPE_ERROR, "Fragment %llu differs\n",
- msg1->fragment_id);
+ GNUNET_ntohll (msg1->fragment_id));
ret = GNUNET_SYSERR;
}
@@ -135,6 +144,7 @@
struct StateClosure {
size_t n;
+ char *name[16];
void *value[16];
size_t value_size[16];
};
@@ -146,6 +156,8 @@
const void *val = scls->value[scls->n];
size_t val_size = scls->value_size[scls->n++];
+ /* FIXME: check name */
+
return value_size == val_size && 0 == memcmp (value, val, val_size)
? GNUNET_YES
: GNUNET_SYSERR;
@@ -157,19 +169,19 @@
const struct GNUNET_CONFIGURATION_Handle *cfg)
{
struct GNUNET_PSYCSTORE_PluginFunctions *db;
-
+
ok = 1;
db = load_plugin (cfg);
if (NULL == db)
{
FPRINTF (stderr,
- "%s",
+ "%s",
"Failed to initialize PSYCstore. "
"Database likely not setup, skipping test.\n");
return;
}
- /* Membership */
+ /* Store & test membership */
channel_key = GNUNET_CRYPTO_ecc_key_create ();
slave_key = GNUNET_CRYPTO_ecc_key_create ();
@@ -177,21 +189,21 @@
GNUNET_CRYPTO_ecc_key_get_public_for_signature (channel_key,
&channel_pub_key);
GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key);
- ASSERT (GNUNET_OK == db->membership_store(db->cls, &channel_pub_key,
- &slave_pub_key, GNUNET_YES,
- 4, 2, 1));
+ ASSERT (GNUNET_OK == db->membership_store (db->cls, &channel_pub_key,
+ &slave_pub_key, GNUNET_YES,
+ 4, 2, 1));
- ASSERT (GNUNET_YES == db->membership_test(db->cls, &channel_pub_key,
- &slave_pub_key, 4));
+ ASSERT (GNUNET_YES == db->membership_test (db->cls, &channel_pub_key,
+ &slave_pub_key, 4));
- ASSERT (GNUNET_YES == db->membership_test(db->cls, &channel_pub_key,
- &slave_pub_key, 2));
+ ASSERT (GNUNET_YES == db->membership_test (db->cls, &channel_pub_key,
+ &slave_pub_key, 2));
- ASSERT (GNUNET_NO == db->membership_test(db->cls, &channel_pub_key,
- &slave_pub_key, 1));
+ ASSERT (GNUNET_NO == db->membership_test (db->cls, &channel_pub_key,
+ &slave_pub_key, 1));
- /* Messages */
+ /* Store & get messages */
struct GNUNET_MULTICAST_MessageHeader *msg
= GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
@@ -200,12 +212,12 @@
msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key));
- msg->hop_counter = 9;
- msg->fragment_id = INT64_MAX - 1;
- msg->fragment_offset = 0;
- msg->message_id = INT64_MAX - 2;
- msg->group_generation = INT64_MAX - 3;
- msg->flags = GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT;
+ msg->hop_counter = htonl (9);
+ msg->fragment_id = GNUNET_htonll (INT64_MAX - 1);
+ msg->fragment_offset = GNUNET_htonll (0);
+ msg->message_id = GNUNET_htonll (INT64_MAX - 10);
+ msg->group_generation = GNUNET_htonll (INT64_MAX - 3);
+ msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT);
memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key));
@@ -225,27 +237,27 @@
fcls.flags[0]));
ASSERT (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
- msg->fragment_id,
+ GNUNET_ntohll (msg->fragment_id),
fragment_cb, &fcls));
ASSERT (fcls.n == 1);
fcls.n = 0;
ASSERT (GNUNET_OK == db->message_get_fragment (db->cls, &channel_pub_key,
- msg->message_id,
- msg->fragment_offset,
+ GNUNET_ntohll
(msg->message_id),
+ GNUNET_ntohll
(msg->fragment_offset),
fragment_cb, &fcls));
ASSERT (fcls.n == 1);
ASSERT (GNUNET_OK == db->message_add_flags (
- db->cls, &channel_pub_key, msg->message_id,
+ db->cls, &channel_pub_key, GNUNET_ntohll (msg->message_id),
GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED));
fcls.n = 0;
fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED;
ASSERT (GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key,
- msg->fragment_id,
+ GNUNET_ntohll (msg->fragment_id),
fragment_cb, &fcls));
ASSERT (fcls.n == 1);
@@ -254,8 +266,8 @@
memcpy (msg1, msg, sizeof (*msg1) + sizeof (channel_pub_key));
- msg1->fragment_id++;
- msg1->fragment_offset += 32768;
+ msg1->fragment_id = GNUNET_htonll (INT64_MAX);
+ msg1->fragment_offset = GNUNET_htonll (32768);
fcls.n = 0;
fcls.msg[1] = msg1;
@@ -264,34 +276,39 @@
ASSERT (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1,
fcls.flags[1]));
+ uint64_t retfrags = 0;
ASSERT (GNUNET_OK == db->message_get (db->cls, &channel_pub_key,
- msg->message_id,
- fragment_cb, &fcls));
- ASSERT (fcls.n == 2);
+ GNUNET_ntohll (msg->message_id),
+ &retfrags, fragment_cb, &fcls));
+ ASSERT (fcls.n == 2 && retfrags == 2);
- uint64_t max_state_msg_id = 0;
- ASSERT (GNUNET_OK == db->counters_get_slave (db->cls, &channel_pub_key,
- &max_state_msg_id)
- && max_state_msg_id == msg->message_id);
+ /* Master counters */
uint64_t fragment_id = 0, message_id = 0, group_generation = 0;
ASSERT (GNUNET_OK == db->counters_get_master (db->cls, &channel_pub_key,
&fragment_id, &message_id,
&group_generation)
- && fragment_id == msg1->fragment_id
- && message_id == msg1->message_id
- && group_generation == msg1->group_generation);
+ && fragment_id == GNUNET_ntohll (msg1->fragment_id)
+ && message_id == GNUNET_ntohll (msg1->message_id)
+ && group_generation == GNUNET_ntohll (msg1->group_generation));
- /* State */
+ /* Modify state */
- ASSERT (GNUNET_OK == db->state_set (db->cls, &channel_pub_key, "_foo",
- C2ARG("one two three")));
+ message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1;
+ ASSERT (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
+ message_id, 1));
- ASSERT (GNUNET_OK == db->state_set (db->cls, &channel_pub_key, "_foo_bar",
- slave_key,
- sizeof (*slave_key)));
+ ASSERT (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key, "_foo",
+ C2ARG("one two three")));
+ ASSERT (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
+ "_foo_bar", slave_key,
+ sizeof (*slave_key)));
+
+ ASSERT (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
+ message_id));
+
struct StateClosure scls = { 0 };
scls.n = 0;
scls.value[0] = "one two three";
@@ -305,8 +322,8 @@
scls.value[1] = slave_key;
scls.value_size[1] = sizeof (*slave_key);
- ASSERT (GNUNET_OK == db->state_get_all (db->cls, &channel_pub_key, "_foo",
- state_cb, &scls));
+ ASSERT (GNUNET_OK == db->state_get_prefix (db->cls, &channel_pub_key, "_foo",
+ state_cb, &scls));
ASSERT (scls.n == 2);
scls.n = 0;
@@ -321,8 +338,66 @@
state_cb, &scls));
ASSERT (scls.n == 2);
+ /* Slave counters */
+
+ uint64_t max_state_msg_id = 0;
+ ASSERT (GNUNET_OK == db->counters_get_slave (db->cls, &channel_pub_key,
+ &max_state_msg_id)
+ && max_state_msg_id == message_id);
+
+ /* State sync */
+
+ scls.n = 0;
+ scls.value[0] = channel_key;
+ scls.value_size[0] = sizeof (*channel_key);
+ scls.value[1] = "three two one";
+ scls.value_size[1] = strlen ("three two one");
+
+ ASSERT (GNUNET_OK == db->state_sync_begin (db->cls, &channel_pub_key));
+
+ ASSERT (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
+ "_sync_bar",
+ scls.value[0], scls.value_size[0]));
+
+ ASSERT (GNUNET_OK == db->state_sync_set (db->cls, &channel_pub_key,
+ "_sync_foo",
+ scls.value[1], scls.value_size[1]));
+
+ ASSERT (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key,
INT64_MAX - 5));
+
+ ASSERT (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key, "_foo",
+ state_cb, &scls));
+ ASSERT (scls.n == 0);
+
+ ASSERT (GNUNET_OK == db->state_get_prefix (db->cls, &channel_pub_key,
"_sync",
+ state_cb, &scls));
+ ASSERT (scls.n == 2);
+
+ scls.n = 0;
+ ASSERT (GNUNET_OK == db->state_get_signed (db->cls, &channel_pub_key,
+ state_cb, &scls));
+ ASSERT (scls.n == 2);
+
+ /* Modify state after sync */
+
+ message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6;
+ ASSERT (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key,
+ message_id, 3));
+
+ ASSERT (GNUNET_OK == db->state_modify_set (db->cls, &channel_pub_key,
"_sync_foo",
+ C2ARG("five six seven")));
+
+ ASSERT (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key,
+ message_id));
+
+ /* Reset state */
+
+ scls.n = 0;
+ ASSERT (GNUNET_OK == db->state_reset (db->cls, &channel_pub_key));
+ ASSERT (scls.n == 0);
+
ok = 0;
-
+
FAILURE:
if (NULL != channel_key)
@@ -353,7 +428,6 @@
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_OPTION_END
};
-
GNUNET_DISK_directory_remove ("/tmp/gnunet-test-plugin-psycstore-sqlite");
GNUNET_log_setup ("test-plugin-psycstore", LOG_LEVEL, NULL);
plugin_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]);
Modified: gnunet/src/psycstore/test_psycstore.c
===================================================================
--- gnunet/src/psycstore/test_psycstore.c 2013-09-15 22:48:52 UTC (rev
29286)
+++ gnunet/src/psycstore/test_psycstore.c 2013-09-16 04:59:05 UTC (rev
29287)
@@ -1,26 +1,26 @@
/*
- This file is part of GNUnet.
- (C) 2013 Christian Grothoff (and other contributing authors)
+ * This file is part of GNUnet
+ * (C) 2013 Christian Grothoff (and other contributing authors)
+ *
+ * GNUnet is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published
+ * by the Free Software Foundation; either version 3, or (at your
+ * option) any later version.
+ *
+ * GNUnet is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNUnet; see the file COPYING. If not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
- GNUnet is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published
- by the Free Software Foundation; either version 3, or (at your
- option) any later version.
-
- GNUnet is distributed in the hope that it will be useful, but
- WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with GNUnet; see the file COPYING. If not, write to the
- Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- Boston, MA 02111-1307, USA.
-*/
-
/**
* @file psycstore/test_psycstore.c
- * @brief Testcase for the PSYCstore service
+ * @brief Test for the PSYCstore service.
* @author Gabor X Toth
* @author Christian Grothoff
*/
@@ -31,10 +31,14 @@
#include "gnunet_psycstore_service.h"
#include "gnunet_testing_lib.h"
+#define ASSERT(x) do { if (! (x)) { printf ("Error at %s:%d\n", __FILE__,
__LINE__); cleanup (); return; } } while (0)
+#define ASSERRT(x) do { if (! (x)) { printf ("Error at %s:%d\n", __FILE__,
__LINE__); cleanup (); return GNUNET_SYSERR; } } while (0)
#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 10)
+#define DEBUG_SERVICE 1
+
/**
* Return value from 'main'.
*/
@@ -52,8 +56,8 @@
/**
* Handle for task for timeout termination.
- */
-static GNUNET_SCHEDULER_TaskIdentifier endbadly_task;
+ */
+static GNUNET_SCHEDULER_TaskIdentifier end_badly_task;
static struct GNUNET_CRYPTO_EccPrivateKey *channel_key;
static struct GNUNET_CRYPTO_EccPrivateKey *slave_key;
@@ -61,6 +65,23 @@
static struct GNUNET_CRYPTO_EccPublicSignKey channel_pub_key;
static struct GNUNET_CRYPTO_EccPublicSignKey slave_pub_key;
+static struct FragmentClosure
+{
+ uint8_t n;
+ uint8_t n_expected;
+ uint64_t flags[16];
+ struct GNUNET_MULTICAST_MessageHeader *msg[16];
+} fcls;
+
+struct StateClosure {
+ size_t n;
+ char *name[16];
+ void *value[16];
+ size_t value_size[16];
+} scls;
+
+static struct GNUNET_ENV_Modifier modifiers[16];
+
/**
* Clean up all resources used.
*/
@@ -92,21 +113,21 @@
/**
- * Termiante the testcase (failure).
+ * Terminate the testcase (failure).
*
* @param cls NULL
* @param tc scheduler context
*/
static void
-endbadly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
+end_badly (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ res = 1;
cleanup ();
- res = 1;
}
/**
- * Termiante the testcase (success).
+ * Terminate the testcase (success).
*
* @param cls NULL
* @param tc scheduler context
@@ -114,32 +135,361 @@
static void
end_normally (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc)
{
+ res = 0;
cleanup ();
- res = 0;
}
/**
* Finish the testcase (successfully).
*/
-static void
+static void
end ()
{
- if (endbadly_task != GNUNET_SCHEDULER_NO_TASK)
+ if (end_badly_task != GNUNET_SCHEDULER_NO_TASK)
{
- GNUNET_SCHEDULER_cancel (endbadly_task);
- endbadly_task = GNUNET_SCHEDULER_NO_TASK;
+ GNUNET_SCHEDULER_cancel (end_badly_task);
+ end_badly_task = GNUNET_SCHEDULER_NO_TASK;
}
GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS,
&end_normally, NULL);
}
+
void
-membership_store_result (void *cls, int result, const char *err_msg)
+state_reset_result (void *cls, int64_t result, const char *err_msg)
{
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_reset_result:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+ op = GNUNET_PSYCSTORE_state_reset (h, &channel_pub_key,
+ &state_reset_result, cls);
+ GNUNET_PSYCSTORE_operation_cancel (op);
+ op = NULL;
+ end ();
}
+
+static int
+state_result (void *cls, const char *name, const void *value, size_t
value_size)
+{
+ struct StateClosure *scls = cls;
+ const char *nam = scls->name[scls->n];
+ const void *val = scls->value[scls->n];
+ size_t val_size = scls->value_size[scls->n++];
+
+ if (value_size == val_size
+ && 0 == memcmp (value, val, val_size)
+ && 0 == strcmp (name, nam))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " variable %s matches\n", name);
+ return GNUNET_YES;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ " variable %s differs\nReceived: %.*s\nExpected: %.*s\n",
+ name, value_size, value, val_size, val);
+ ASSERRT (0);
+ return GNUNET_SYSERR;
+ }
+}
+
+
+void
+state_get_prefix_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct StateClosure *scls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_prefix_result:\t%d\n",
result);
+ ASSERT (GNUNET_OK == result && 2 == scls->n);
+
+ op = GNUNET_PSYCSTORE_state_reset (h, &channel_pub_key,
+ &state_reset_result, cls);
+}
+
+
+void
+state_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_result:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+
+ scls.n = 0;
+
+ scls.name[0] = "_sync_bar";
+ scls.value[0] = "ten eleven twelve";
+ scls.value_size[0] = sizeof ("ten eleven twelve") - 1;
+
+ scls.name[1] = "_sync_foo";
+ scls.value[1] = "one two three";
+ scls.value_size[1] = sizeof ("one two three") - 1;
+
+ op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync",
+ &state_result,
+ &state_get_prefix_result, &scls);
+}
+
+
+void
+counters_slave_result (void *cls, uint64_t max_state_msg_id)
+{
+ struct FragmentClosure *fcls = cls;
+ int result = 0;
+ op = NULL;
+
+ if (max_state_msg_id == GNUNET_ntohll (fcls->msg[0]->message_id))
+ result = 1;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "counters_get_slave:\t%d\n", result);
+ ASSERT (result == 1);
+
+ scls.n = 0;
+ scls.name[0] = "_bar";
+ scls.value[0] = "four five six";
+ scls.value_size[0] = sizeof ("four five six") - 1;
+
+ op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_bar_x_yy_zzz",
+ &state_result, &state_get_result, &scls);
+}
+
+
+void
+state_modify_result (void *cls, int64_t result, const char *err_msg)
+{
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_modify_result:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+
+ op = GNUNET_PSYCSTORE_counters_get_slave (h, &channel_pub_key,
+ &counters_slave_result, cls);
+}
+
+
+void
+state_sync_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct FragmentClosure *fcls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+
+ modifiers[0] = (struct GNUNET_ENV_Modifier) {
+ .oper = '=',
+ .name = "_sync_foo",
+ .value = "one two three",
+ .value_size = sizeof ("one two three") - 1
+ };
+ modifiers[1] = (struct GNUNET_ENV_Modifier) {
+ .oper = '=',
+ .name = "_bar",
+ .value = "four five six",
+ .value_size = sizeof ("four five six") - 1
+ };
+
+ op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key,
+ GNUNET_ntohll
(fcls->msg[0]->message_id), 0,
+ 2, modifiers, state_modify_result, fcls);
+}
+
+
+void
+counters_master_result (void *cls, uint64_t fragment_id, uint64_t message_id,
+ uint64_t group_generation)
+{
+ struct FragmentClosure *fcls = cls;
+ int result = 0;
+ op = NULL;
+
+ if (fragment_id == GNUNET_ntohll (fcls->msg[2]->fragment_id) &&
+ message_id == GNUNET_ntohll (fcls->msg[2]->message_id) &&
+ group_generation == GNUNET_ntohll (fcls->msg[2]->group_generation))
+ result = 1;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "counters_get_master:\t%d\n", result);
+ ASSERT (result == 1);
+
+ modifiers[0] = (struct GNUNET_ENV_Modifier) {
+ .oper = '=',
+ .name = "_sync_foo",
+ .value = "three two one",
+ .value_size = sizeof ("three two one") - 1
+ };
+ modifiers[1] = (struct GNUNET_ENV_Modifier) {
+ .oper = '=',
+ .name = "_sync_bar",
+ .value = "ten eleven twelve",
+ .value_size = sizeof ("ten eleven twelve") - 1
+ };
+
+ op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key,
+ GNUNET_ntohll (fcls->msg[0]->message_id) +
1,
+ 2, modifiers, state_sync_result, fcls);
+}
+
+
+int
+fragment_result (void *cls,
+ struct GNUNET_MULTICAST_MessageHeader *msg,
+ enum GNUNET_PSYCSTORE_MessageFlags flags)
+{
+ struct FragmentClosure *fcls = cls;
+ struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n];
+ uint64_t flags0 = fcls->flags[fcls->n++];
+
+ if (flags == flags0 && msg->header.size == msg0->header.size
+ && 0 == memcmp (msg, msg0, ntohs (msg->header.size)))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %llu matches\n",
+ GNUNET_ntohll (msg->fragment_id));
+ return GNUNET_YES;
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, " fragment %llu differs\n",
+ GNUNET_ntohll (msg->fragment_id));
+ ASSERRT (0);
+ return GNUNET_SYSERR;
+ }
+}
+
+
+void
+message_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct FragmentClosure *fcls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%d\n", result);
+ ASSERT (result > 0 && fcls->n && fcls->n_expected);
+
+ op = GNUNET_PSYCSTORE_counters_get_master (h, &channel_pub_key,
+ &counters_master_result, fcls);
+}
+
+
+void
+message_get_fragment_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct FragmentClosure *fcls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%d\n",
result);
+ ASSERT (result > 0 && fcls->n && fcls->n_expected);
+
+ fcls->n = 0;
+ fcls->n_expected = 3;
+ op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key,
+ GNUNET_ntohll (fcls->msg[0]->message_id),
+ &fragment_result,
+ &message_get_result, fcls);
+}
+
+
+void
+fragment_get_result (void *cls, int64_t result, const char *err_msg)
+{
+ struct FragmentClosure *fcls = cls;
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get:\t%d\n", result);
+ ASSERT (result > 0 && fcls->n && fcls->n_expected);
+
+ fcls->n = 1;
+ fcls->n_expected = 2;
+ op = GNUNET_PSYCSTORE_message_get_fragment (h, &channel_pub_key,
+ GNUNET_ntohll
(fcls->msg[1]->message_id),
+ GNUNET_ntohll
(fcls->msg[1]->fragment_offset),
+ &fragment_result,
+ &message_get_fragment_result,
+ fcls);
+
+}
+
+
+void
+fragment_store_result (void *cls, int64_t result, const char *err_msg)
+{
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_store:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+
+ if ((intptr_t) cls == GNUNET_YES)
+ {
+ fcls.n = 0;
+ fcls.n_expected = 1;
+ op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key,
+ GNUNET_ntohll
(fcls.msg[0]->fragment_id),
+ &fragment_result,
+ &fragment_get_result, &fcls);
+ }
+}
+
+
+void
+membership_test_result (void *cls, int64_t result, const char *err_msg)
+{
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_test:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+
+ struct GNUNET_MULTICAST_MessageHeader *msg;
+ fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE;
+ fcls.msg[0] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
+ ASSERT (msg != NULL);
+
+ msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE);
+ msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key));
+
+ msg->hop_counter = htonl (9);
+ msg->fragment_id = GNUNET_htonll (INT64_MAX - 8);
+ msg->fragment_offset = GNUNET_htonll (0);
+ msg->message_id = GNUNET_htonll (INT64_MAX - 10);
+ msg->group_generation = GNUNET_htonll (INT64_MAX - 3);
+ msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT);
+
+ memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key));
+
+ msg->purpose.size = htonl (ntohs (msg->header.size)
+ - sizeof (msg->header)
+ - sizeof (msg->hop_counter)
+ - sizeof (msg->signature));
+ msg->purpose.purpose = htonl (234);
+ GNUNET_CRYPTO_ecc_sign (slave_key, &msg->purpose, &msg->signature);
+
+ op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg,
fcls.flags[0],
+ &fragment_store_result, GNUNET_NO);
+
+ fcls.flags[1] = GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED;
+ fcls.msg[1] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
+ memcpy (msg, fcls.msg[0], sizeof (*msg) + sizeof (channel_pub_key));
+ msg->fragment_id = GNUNET_htonll (INT64_MAX - 4);
+ msg->fragment_offset = GNUNET_htonll (1024);
+
+ op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg,
fcls.flags[1],
+ &fragment_store_result, GNUNET_NO);
+
+ fcls.flags[2] = GNUNET_PSYCSTORE_MESSAGE_STATE_HASH;
+ fcls.msg[2] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key));
+ memcpy (msg, fcls.msg[1], sizeof (*msg) + sizeof (channel_pub_key));
+ msg->fragment_id = GNUNET_htonll (INT64_MAX);
+ msg->fragment_offset = GNUNET_htonll (16384);
+
+ op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg,
fcls.flags[2],
+ &fragment_store_result, (void *)
GNUNET_YES);
+}
+
+void
+membership_store_result (void *cls, int64_t result, const char *err_msg)
+{
+ op = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_store:\t%d\n", result);
+ ASSERT (GNUNET_OK == result);
+
+ op = GNUNET_PSYCSTORE_membership_test (h, &channel_pub_key, &slave_pub_key,
+ 4, 1,
+ &membership_test_result, NULL);
+}
+
/**
* Main function of the test, run from scheduler.
*
@@ -148,14 +498,19 @@
* @param peer handle to access more of the peer (not used)
*/
static void
-run (void *cls,
- const struct GNUNET_CONFIGURATION_Handle *cfg,
- struct GNUNET_TESTING_Peer *peer)
+#if DEBUG_SERVICE
+run (void *cls, char *const *args, const char *cfgfile,
+ const struct GNUNET_CONFIGURATION_Handle *cfg)
+#else
+ run (void *cls,
+ const struct GNUNET_CONFIGURATION_Handle *cfg,
+ struct GNUNET_TESTING_Peer *peer)
+#endif
{
- endbadly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT,
- &endbadly, NULL);
+ end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT,
+ &end_badly, NULL);
h = GNUNET_PSYCSTORE_connect (cfg);
- GNUNET_assert (NULL != h);
+ ASSERT (NULL != h);
channel_key = GNUNET_CRYPTO_ecc_key_create ();
slave_key = GNUNET_CRYPTO_ecc_key_create ();
@@ -164,10 +519,8 @@
GNUNET_CRYPTO_ecc_key_get_public_for_signature (slave_key, &slave_pub_key);
op = GNUNET_PSYCSTORE_membership_store (h, &channel_pub_key, &slave_pub_key,
- GNUNET_YES, 2, 2, 1,
+ GNUNET_YES, 4, 2, 1,
&membership_store_result, NULL);
-
- end ();
}
@@ -175,15 +528,20 @@
main (int argc, char *argv[])
{
res = 1;
- if (0 !=
- GNUNET_TESTING_service_run ("test-psycstore",
- "psycstore",
- "test_psycstore.conf",
- &run,
- NULL))
+#if DEBUG_SERVICE
+ const struct GNUNET_GETOPT_CommandLineOption opts[] = {
+ GNUNET_GETOPT_OPTION_END
+ };
+ if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "test-psycstore",
+ "test-psycstore [options]",
+ opts, &run, NULL))
return 1;
+#else
+ if (0 != GNUNET_TESTING_service_run ("test-psycstore", "psycstore",
+ "test_psycstore.conf", &run, NULL))
+ return 1;
+#endif
return res;
}
-
/* end of test_psycstore.c */
Modified: gnunet/src/psycstore/test_psycstore.conf
===================================================================
--- gnunet/src/psycstore/test_psycstore.conf 2013-09-15 22:48:52 UTC (rev
29286)
+++ gnunet/src/psycstore/test_psycstore.conf 2013-09-16 04:59:05 UTC (rev
29287)
@@ -1,6 +1,7 @@
[arm]
+UNIXPATH = /tmp/test-gnunet-service-arm.sock
DEFAULTSERVICES = psycstore
-UNIXPATH = /tmp/test-psycstore-service-arm.sock
+GLOBAL_POSTFIX = -L DEBUG
[psycstore]
AUTOSTART = YES
@@ -10,6 +11,8 @@
UNIX_MATCH_UID = NO
UNIX_MATCH_GID = YES
DATABASE = sqlite
+OPTIONS = -L DEBUG
+DEBUG = YES
[psycstore-sqlite]
FILENAME = $SERVICEHOME/psycstore/sqlite_test.db
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r29287 - in gnunet/src: include psycstore,
gnunet <=