[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r31191 - gnunet-mqtt/src/mqtt
From: |
gnunet |
Subject: |
[GNUnet-SVN] r31191 - gnunet-mqtt/src/mqtt |
Date: |
Sun, 8 Dec 2013 23:51:19 +0100 |
Author: grothoff
Date: 2013-12-08 23:51:19 +0100 (Sun, 08 Dec 2013)
New Revision: 31191
Modified:
gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c
Log:
-fix build with current mesh API
Modified: gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c
===================================================================
--- gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c 2013-12-08 22:45:15 UTC (rev
31190)
+++ gnunet-mqtt/src/mqtt/gnunet-service-mqtt.c 2013-12-08 22:51:19 UTC (rev
31191)
@@ -46,7 +46,7 @@
* Pointer to next item in the list
*/
struct RegexSearchContext *next;
-
+
/**
* Pointer to previous item in the list
*/
@@ -73,13 +73,13 @@
* deletion
*/
int message_delivered;
-
+
/**
* Task responsible for freeing the context once the message
* associated with it has been delivered
*/
GNUNET_SCHEDULER_TaskIdentifier free_task;
-
+
/**
* Pointer to the regex search handle
*/
@@ -106,7 +106,7 @@
* Pointer to the actual message to be sent
*/
struct GNUNET_MessageHeader *msg;
-
+
/**
* Pointer to the filepath where the topic and the content of the
* message will be stored
@@ -168,12 +168,12 @@
struct GNUNET_PeerIdentity id;
/**
- * Tunnel connecting us to the subscriber.
+ * Channel connecting us to the subscriber.
*/
- struct GNUNET_MESH_Tunnel *tunnel;
+ struct GNUNET_MESH_Channel *channel;
/**
- * Has the subscriber been added to the tunnel yet?
+ * Has the subscriber been added to the channel yet?
*/
int peer_added;
@@ -221,17 +221,17 @@
struct GNUNET_REGEX_announce_handle *regex_announce_handle;
/**
- * The subscribed client
+ * The subscribed client
*/
struct ClientInfo *client;
/**
- * Unique ID for this subscription
+ * Unique ID for this subscription
*/
uint64_t request_id;
/**
- * The automaton built using the subcription provided by the user
+ * The automaton built using the subcription provided by the user
*/
regex_t automaton;
};
@@ -314,8 +314,8 @@
/**
* String constant for prefixing the topic
- */
-static const char *prefix = "GNUNET-MQTT 0001 00000";
+ */
+static const char *prefix = "GNUNET-MQTT 0001 00000";
/**
* String constant for replacing '+' wildcard in the subscribed topics.
@@ -335,28 +335,28 @@
* @param regex_topic client identification of the client
*/
static void
-add_prefix (const char *topic,
+add_prefix (const char *topic,
char **prefixed_topic)
{
int n;
int i;
*prefixed_topic = GNUNET_malloc(strlen(prefix) + strlen(topic)+1);
- n = 0;
- for (i = 0; prefix[i] != '\0'; i++)
+ n = 0;
+ for (i = 0; prefix[i] != '\0'; i++)
(*prefixed_topic)[i] = prefix[i];
n = i;
-
- for (i = 0; topic[i] != '\0'; i++)
+
+ for (i = 0; topic[i] != '\0'; i++)
{
(*prefixed_topic)[n] = topic[i];
n++;
}
-
+
(*prefixed_topic)[n] = '\0';
}
-
+
/**
* Transform topics to regex expression.
*
@@ -364,7 +364,7 @@
* @param regex_topic client identification of the client
*/
static void
-get_regex (char *topic,
+get_regex (char *topic,
char **regex_topic)
{
char *plus;
@@ -372,29 +372,29 @@
char *prefixed_topic;
int i;
int j;
- int k;
+ int k;
int plus_counter = 0;
int hash_exists = 0;
-
+
plus = strchr(topic,'+');
while (plus != NULL)
{
plus_counter +=1;
plus=strchr(plus+1,'+');
- }
+ }
hash = strchr(topic,'#');
if (hash != NULL)
{
hash_exists = 1;
}
-
- add_prefix(topic, &prefixed_topic);
-
+
+ add_prefix(topic, &prefixed_topic);
+
*regex_topic = GNUNET_malloc (strlen(prefixed_topic) - plus_counter -
hash_exists + plus_counter*strlen(plus_regex) +
hash_exists*strlen(hash_regex)+1);
j = 0;
- for (i = 0; prefixed_topic[i] != '\0'; i++)
+ for (i = 0; prefixed_topic[i] != '\0'; i++)
{
- if (prefixed_topic[i] == '+')
+ if (prefixed_topic[i] == '+')
{
for (k = 0; k<strlen(plus_regex); k++)
{
@@ -402,7 +402,7 @@
j++;
}
}
- else if (prefixed_topic[i] == '#')
+ else if (prefixed_topic[i] == '#')
{
j--;
for (k = 0; k<strlen(hash_regex); k++)
@@ -435,7 +435,7 @@
{
struct PendingMessage *pm;
- if (NULL != client_info->transmit_handle)
+ if (NULL != client_info->transmit_handle)
{
GNUNET_SERVER_notify_transmit_ready_cancel (client_info->transmit_handle);
client_info->transmit_handle = NULL;
@@ -505,7 +505,7 @@
*
* This function takes care to cancel any pending transmission requests
* and discard all outstanding messages not delivered to the subscriber
- * yet. Moreover, it destroys the tunnel connecting us to the subscriber
+ * yet. Moreover, it destroys the channel connecting us to the subscriber
* before finally freeing the given RemoteSubscriberInfo struct.
*
* @param subscriber pointer to a RemoteSubscriberInfo struct
@@ -527,7 +527,7 @@
GNUNET_free (pm->msg);
GNUNET_free (pm);
}
- GNUNET_MESH_tunnel_destroy (subscriber->tunnel);
+ GNUNET_MESH_channel_destroy (subscriber->channel);
GNUNET_free (subscriber);
}
@@ -554,7 +554,7 @@
if (0 != UNLINK (filepath))
GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_WARNING,
- "unlink",
+ "unlink",
filepath);
}
GNUNET_CONTAINER_multipeermap_destroy (context->subscribers);
@@ -590,7 +590,7 @@
*
* @param pm pointer to the pending message
*/
-static void
+static void
set_timer_for_deleting_message (struct PendingMessage *pm)
{
if (GNUNET_NO == pm->context->message_delivered)
@@ -622,7 +622,7 @@
* @return the number of bytes actually copied, 0 indicates failure
*/
static size_t
-send_msg_to_subscriber (void *cls,
+send_msg_to_subscriber (void *cls,
size_t size,
void *buf)
{
@@ -631,7 +631,7 @@
struct PendingMessage *pm;
size_t off;
size_t msize;
-
+
subscriber->transmit_handle = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Send message to subscriber.\n");
@@ -641,7 +641,7 @@
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Subscriber %s disconnected, pending messages will be
discarded\n",
GNUNET_i2s (&subscriber->id));
-
+
return 0;
}
@@ -657,7 +657,7 @@
"Transmitting %u bytes to subscriber %s\n", msize,
GNUNET_i2s (&subscriber->id));
off += msize;
- set_timer_for_deleting_message(pm);
+ set_timer_for_deleting_message(pm);
GNUNET_free (pm->msg);
GNUNET_free (pm);
}
@@ -666,7 +666,7 @@
"Transmitted %zu/%zu bytes to subscriber %s\n",
off, size, GNUNET_i2s (&subscriber->id));
process_pending_subscriber_messages (subscriber);
-
+
return off;
}
@@ -674,7 +674,7 @@
/**
* Task run to check for messages that need to be sent to a subscriber.
*
- * @param client a RemoteSubscriberInfo struct, containing the tunnel
+ * @param client a RemoteSubscriberInfo struct, containing the channel
* handle and any messages to be sent to it
*/
static void
@@ -699,7 +699,7 @@
ntohs (msg->size), GNUNET_i2s (&subscriber->id));
subscriber->transmit_handle =
- GNUNET_MESH_notify_transmit_ready (subscriber->tunnel,
+ GNUNET_MESH_notify_transmit_ready (subscriber->channel,
GNUNET_NO,
GNUNET_TIME_UNIT_FOREVER_REL,
ntohs (msg->size),
@@ -723,7 +723,7 @@
static void
-deliver_incoming_publish (const struct GNUNET_MQTT_ClientPublishMessage *msg,
+deliver_incoming_publish (const struct GNUNET_MQTT_ClientPublishMessage *msg,
struct RegexSearchContext *context);
@@ -787,15 +787,15 @@
if (NULL == subscriber)
{
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "creating a new tunnel to %s\n", GNUNET_i2s(id));
+ "creating a new channel to %s\n", GNUNET_i2s(id));
subscriber = GNUNET_new (struct RemoteSubscriberInfo);
- subscriber->tunnel = GNUNET_MESH_tunnel_create (mesh_handle,
- NULL,
- id,
-
GNUNET_APPLICATION_TYPE_MQTT,
- GNUNET_NO, GNUNET_YES);
+ subscriber->channel = GNUNET_MESH_channel_create (mesh_handle,
+ NULL,
+ id,
+
GNUNET_APPLICATION_TYPE_MQTT,
+
GNUNET_MESH_OPTION_RELIABLE);
subscriber->peer_added = GNUNET_NO;
subscriber->peer_connecting = GNUNET_NO;
@@ -857,10 +857,10 @@
const char *file_name;
char *file_path;
size_t message_len;
-
+
size_t msg_len = ntohs (msg->size);
struct GNUNET_MQTT_ClientPublishMessage *publish_msg;
-
+
if (NULL == folder_name)
{
GNUNET_break (0);
@@ -872,7 +872,7 @@
topic = GNUNET_malloc (publish_msg->topic_len);
strncpy(topic, (char *) (publish_msg + 1), publish_msg->topic_len);
topic[publish_msg->topic_len - 1] = '\0';
-
+
/* Extract message */
message_len = ntohs (publish_msg->header.size) -
sizeof (struct GNUNET_MQTT_ClientPublishMessage) - publish_msg->topic_len;
@@ -884,28 +884,28 @@
GNUNET_CRYPTO_hash_create_random (GNUNET_CRYPTO_QUALITY_WEAK,
&file_name_hash);
file_name = GNUNET_h2s_full(&file_name_hash);
- GNUNET_asprintf (&file_path,
- "%s%s%s",
+ GNUNET_asprintf (&file_path,
+ "%s%s%s",
folder_name,
DIR_SEPARATOR_STR,
file_name);
-
+
if (NULL != (persistence_file = fopen(file_path, "w+")))
{
fwrite(topic, 1, strlen(topic)+1, persistence_file);
fwrite(message, 1, strlen(message), persistence_file);
- fclose(persistence_file);
+ fclose(persistence_file);
search_for_subscribers (prefixed_topic, publish_msg, file_path);
}
else
{
- GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
+ GNUNET_log_strerror_file (GNUNET_ERROR_TYPE_ERROR,
"open",
file_path);
- }
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"outgoing PUBLISH message received: %s [%d bytes] (%d
overall)\n",
- topic,
+ topic,
publish_msg->topic_len,
ntohs (publish_msg->header.size));
GNUNET_SERVER_receive_done (client, GNUNET_OK);
@@ -925,7 +925,7 @@
{
struct Subscription *subscription;
char *topic, *regex_topic;
-
+
const struct GNUNET_MQTT_ClientSubscribeMessage *subscribe_msg;
/* Extract topic */
@@ -937,7 +937,7 @@
subscription = GNUNET_new (struct Subscription);
get_regex (topic, ®ex_topic);
if (0 != regcomp (&subscription->automaton,
- regex_topic,
+ regex_topic,
REG_NOSUB))
{
GNUNET_break (0);
@@ -955,8 +955,8 @@
regex_topic,1 , NULL);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "MQTT SUBSCRIBE message received: %s->%s\n",
- topic,
+ "MQTT SUBSCRIBE message received: %s->%s\n",
+ topic,
regex_topic);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
@@ -1010,15 +1010,15 @@
GNUNET_free (reply->msg);
GNUNET_free (reply);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Transmitting %u bytes to client %p\n",
+ "Transmitting %u bytes to client %p\n",
msize,
client->client_handle);
off += msize;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Transmitted %u/%u bytes to client %p\n",
- (unsigned int) off,
- (unsigned int) size,
+ (unsigned int) off,
+ (unsigned int) size,
client->client_handle);
process_pending_client_messages (client);
return off;
@@ -1045,7 +1045,7 @@
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking for transmission of %u bytes to client %p\n",
- ntohs (client->pending_head->msg->size),
+ ntohs (client->pending_head->msg->size),
client->client_handle);
client->transmit_handle =
GNUNET_SERVER_notify_transmit_ready (client->client_handle,
@@ -1108,7 +1108,7 @@
subscription->request_id);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Cancelling subscription with ID %lu\n",
subscription->request_id);
@@ -1137,7 +1137,7 @@
* appropriate subscribing application.
*/
static void
-deliver_incoming_publish (const struct GNUNET_MQTT_ClientPublishMessage *msg,
+deliver_incoming_publish (const struct GNUNET_MQTT_ClientPublishMessage *msg,
struct RegexSearchContext *context)
{
char *topic;
@@ -1151,11 +1151,11 @@
publish_msg = GNUNET_malloc (msg_len);
memcpy (publish_msg, msg, msg_len);
topic = GNUNET_malloc (publish_msg->topic_len);
- strncpy (topic,
- (const char *) &publish_msg[1],
+ strncpy (topic,
+ (const char *) &publish_msg[1],
publish_msg->topic_len);
topic[publish_msg->topic_len - 1] = '\0';
-
+
add_prefix(topic, &prefixed_topic);
for (subscription = subscription_head; NULL != subscription; subscription =
subscription->next)
@@ -1166,7 +1166,7 @@
struct GNUNET_MQTT_ClientPublishMessage *return_msg;
struct ClientInfo *client_info = subscription->client;
- if (GNUNET_YES == free_publish_msg)
+ if (GNUNET_YES == free_publish_msg)
{
return_msg = publish_msg;
free_publish_msg = GNUNET_NO;
@@ -1185,7 +1185,7 @@
}
GNUNET_free (topic);
GNUNET_free (prefixed_topic);
- if (GNUNET_YES == free_publish_msg)
+ if (GNUNET_YES == free_publish_msg)
GNUNET_free (publish_msg);
}
@@ -1197,8 +1197,8 @@
* node is subscribed to.
*/
static int
-handle_incoming_publish (void *cls, struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
+handle_incoming_publish (void *cls, struct GNUNET_MESH_Channel *channel,
+ void **channel_ctx,
const struct GNUNET_MessageHeader *msg)
{
deliver_incoming_publish ((const struct GNUNET_MQTT_ClientPublishMessage*)
msg,
@@ -1209,8 +1209,8 @@
static int
-handle_tunnel_message (void *cls, struct GNUNET_MESH_Tunnel *tunnel,
- void **tunnel_ctx,
+handle_channel_message (void *cls, struct GNUNET_MESH_Channel *channel,
+ void **channel_ctx,
const struct GNUNET_MessageHeader *msg)
{
return GNUNET_OK;
@@ -1218,7 +1218,7 @@
static int
-free_remote_subscriber_iterator (void *cls,
+free_remote_subscriber_iterator (void *cls,
const struct GNUNET_PeerIdentity *key,
void *value)
{
@@ -1256,7 +1256,7 @@
}
}
if (NULL == client_info)
- for (client_info = client_head; NULL != client_info; client_info =
client_info->next)
+ for (client_info = client_head; NULL != client_info; client_info =
client_info->next)
if (client_info->client_handle == client)
break;
if (NULL != client_info)
@@ -1271,7 +1271,7 @@
* @param tc unused
*/
static void
-shutdown_task (void *cls,
+shutdown_task (void *cls,
const struct GNUNET_SCHEDULER_TaskContext *tc)
{
struct Subscription *subscription;
@@ -1301,18 +1301,19 @@
static void *
-new_incoming_tunnel_callback (void *cls, struct GNUNET_MESH_Tunnel *tunnel,
- const struct GNUNET_PeerIdentity *initiator,
- uint32_t port)
+new_incoming_channel_callback (void *cls, struct GNUNET_MESH_Channel *channel,
+ const struct GNUNET_PeerIdentity *initiator,
+ uint32_t port,
+ enum GNUNET_MESH_ChannelOption options)
{
return NULL;
}
static void
-incoming_tunnel_destroyed_callback (void *cls,
- const struct GNUNET_MESH_Tunnel *tunnel,
- void *tunnel_ctx)
+incoming_channel_destroyed_callback (void *cls,
+ const struct GNUNET_MESH_Channel *channel,
+ void *channel_ctx)
{
}
@@ -1336,11 +1337,11 @@
int ch;
long long length;
struct GNUNET_MQTT_ClientPublishMessage *old_publish_msg;
-
+
uid_gen = GNUNET_CRYPTO_random_u64 (GNUNET_CRYPTO_QUALITY_WEAK,
UINT64_MAX);
folder_name = NULL;
- if (GNUNET_OK !=
+ if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (cfg,
"MQTT",
"MESSAGE_FOLDER",
@@ -1350,7 +1351,7 @@
"MQTT", "MESSAGE_FOLDER");
return;
}
- if (NULL == (dir = opendir (folder_name)))
+ if (NULL == (dir = opendir (folder_name)))
{
GNUNET_DISK_directory_create (folder_name);
return;
@@ -1360,8 +1361,8 @@
{
if (!strcmp(ent->d_name, ".") || !strcmp(ent->d_name, ".."))
continue;
-
- GNUNET_asprintf (&file_path,
+
+ GNUNET_asprintf (&file_path,
"%s%s%s",
folder_name,
DIR_SEPARATOR_STR,
@@ -1375,46 +1376,46 @@
GNUNET_free (file_path);
continue;
}
- n = 0;
+ n = 0;
fseeko (file, 0, SEEK_END); // seek to end
length = ftello (file); // determine offset of end
rewind (file); // restore position
-
+
struct_size = sizeof(struct GNUNET_MQTT_ClientPublishMessage) + length + 1;
-
+
old_publish_msg = GNUNET_malloc(struct_size);
old_publish_msg->header.size = htons (struct_size);
old_publish_msg->header.type = htons
(GNUNET_MESSAGE_TYPE_MQTT_CLIENT_PUBLISH);
old_publish_msg->request_id = ++uid_gen;
-
+
aux = (char*)&old_publish_msg[1];
while ((ch = fgetc(file)) != EOF && (ch != '\0') )
{
- aux[n] = (char) ch;
+ aux[n] = (char) ch;
n++;
}
-
+
old_publish_msg->topic_len = n + 1;
aux[n] = '\0';
n++;
while ((ch = fgetc(file)) != EOF )
{
- aux[n] = (char) ch;
+ aux[n] = (char) ch;
n++;
}
-
+
aux[n] = '\0';
-
+
topic = GNUNET_malloc (old_publish_msg->topic_len);
strncpy(topic, (char *) (old_publish_msg + 1), old_publish_msg->topic_len);
topic[old_publish_msg->topic_len - 1] = '\0';
-
+
add_prefix (topic, &prefixed_topic);
-
+
search_for_subscribers(prefixed_topic, old_publish_msg, file_path);
GNUNET_free (file_path);
GNUNET_free (topic);
- }
+ }
closedir (dir);
}
@@ -1440,7 +1441,7 @@
};
static const struct GNUNET_MESH_MessageHandler mesh_handlers[] = {
{&handle_incoming_publish, GNUNET_MESSAGE_TYPE_MQTT_CLIENT_PUBLISH, 0},
- {&handle_tunnel_message, GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE, 0},
+ {&handle_channel_message, GNUNET_MESSAGE_TYPE_MQTT_CLIENT_SUBSCRIBE, 0},
{NULL, 0, 0}
};
static const uint32_t ports[] = {
@@ -1448,7 +1449,7 @@
GNUNET_APPLICATION_TYPE_END
};
-
+
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (c, "mqtt",
"MESSAGE_DELETE_TIME",
@@ -1467,19 +1468,19 @@
GNUNET_CRYPTO_get_peer_identity (c,
&my_id));
dht_handle = GNUNET_DHT_connect (c, 32);
- mesh_handle = GNUNET_MESH_connect (c,
- NULL,
- &new_incoming_tunnel_callback,
- &incoming_tunnel_destroyed_callback,
+ mesh_handle = GNUNET_MESH_connect (c,
+ NULL,
+ &new_incoming_channel_callback,
+ &incoming_channel_destroyed_callback,
mesh_handlers, ports);
cfg = c;
GNUNET_SERVER_add_handlers (server, handlers);
GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL);
- GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
+ GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_FOREVER_REL,
&shutdown_task,
NULL);
- look_for_old_messages ();
+ look_for_old_messages ();
}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r31191 - gnunet-mqtt/src/mqtt,
gnunet <=