[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r21384 - gnunet-java/src/org/gnunet/dht
From: |
gnunet |
Subject: |
[GNUnet-SVN] r21384 - gnunet-java/src/org/gnunet/dht |
Date: |
Wed, 9 May 2012 16:13:11 +0200 |
Author: dold
Date: 2012-05-09 16:13:11 +0200 (Wed, 09 May 2012)
New Revision: 21384
Added:
gnunet-java/src/org/gnunet/dht/MonitorStartStop.java
Removed:
gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
Modified:
gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
Log:
dht monitoring implemented
Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-05-09
13:32:11 UTC (rev 21383)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java 2012-05-09
14:13:11 UTC (rev 21384)
@@ -37,7 +37,6 @@
*/
private PutRequest currentPutRequest;
-
/**
* Create a connection with the DHT service.
*
@@ -116,6 +115,56 @@
}
}
+ public interface MonitorGetHandler {
+ void onGet(int options, int type, int hop_count, int
desired_replication_level, PeerIdentity[] getPath,
+ HashCode key);
+ }
+
+ public interface MonitorGetResponseHandler {}
+
+ public interface MonitorPutHandler {
+ void onPut(int options, int type, int hop_count, AbsoluteTimeMessage
expirationTime, PeerIdentity[] putPath,
+ HashCode key, byte[] data);
+ }
+
+ private class MonitorRequest extends RequestQueue.Request {
+
+ public int blockType;
+ public HashCode key;
+ public MonitorGetHandler getHandler;
+ public MonitorGetResponseHandler getResponseHandler;
+ public MonitorPutHandler putHandler;
+
+ @Override
+ public AbsoluteTime getDeadline() {
+ return AbsoluteTime.FOREVER;
+ }
+
+ @Override
+ public void transmit(Connection.MessageSink sink) {
+ MonitorStartStop mss = new MonitorStartStop();
+ if (key != null) {
+ mss.filter_key = 1;
+ mss.key = key;
+ } else {
+ // todo: right "empty" hash code value
+ mss.key = new HashCode("");
+ }
+ if (getHandler != null) {
+ mss.get = 1;
+ }
+ if (getResponseHandler != null) {
+ mss.getResp = 1;
+ }
+ if (putHandler != null) {
+ mss.put = 1;
+ }
+ mss.type = blockType;
+
+ sink.send(mss);
+ }
+ }
+
public class DHTMessageReceiver extends RunaboutMessageReceiver {
public void visit(ClientPutConfirmationMessage pcm) {
if (currentPutRequest == null || pcm.uid != currentPutRequest.uid)
{
@@ -129,14 +178,43 @@
if (currentGetRequest == null || currentGetRequest.uid != rm.uid) {
logger.warn("received response on invalid UID");
} else {
-
currentGetRequest.cb.handleResult(AbsoluteTime.fromNetwork(rm.expiration),
rm.key, null, null, BlockType.TEST,
+
currentGetRequest.cb.handleResult(AbsoluteTime.fromNetwork(rm.expiration),
rm.key, null, null,
+ BlockType.TEST,
rm.data);
}
}
+ public void visit(MonitorGetMessage monitorGetMessage) {
+ if (currentMonitorRequest == null ||
currentMonitorRequest.getHandler == null) {
+ logger.warn("monitor service confused in monitoring");
+ return;
+ }
+ currentMonitorRequest.getHandler.onGet(monitorGetMessage.options,
monitorGetMessage.type,
+ monitorGetMessage.hop_count,
monitorGetMessage.desired_replication_level, monitorGetMessage.getPath,
+ monitorGetMessage.key);
+ }
+
+ public void visit(MonitorGetRespMessage monitorGetRespMessage) {
+ if (currentMonitorRequest == null ||
currentMonitorRequest.getResponseHandler == null) {
+ logger.warn("monitor service confused in monitoring");
+ return;
+ }
+ }
+
+ public void visit(MonitorPutMessage monitorPutMessage) {
+ if (currentMonitorRequest == null ||
currentMonitorRequest.putHandler == null) {
+ logger.warn("monitor service confused in monitoring");
+ return;
+ }
+ currentMonitorRequest.putHandler.onPut(monitorPutMessage.options,
monitorPutMessage.type,
+ monitorPutMessage.hop_count,
monitorPutMessage.expirationTime,
+ monitorPutMessage.putPath, monitorPutMessage.key,
monitorPutMessage.data);
+
+ }
+
@Override
public void handleError() {
- throw new AssertionError("not handled");
+ throw new AssertionError("unexpected");
}
}
@@ -208,19 +286,20 @@
return requestQueue.add(getRequest);
}
- public Cancelable monitorStart(int blockType, HashCode key,
MonitorGetHandler getHandler,
+ public Cancelable startMonitor(int blockType, HashCode key,
MonitorGetHandler getHandler,
MonitorGetResponseHandler
getResponseHandler,
MonitorPutHandler putHandler) {
- /*
- MonitorHandle monitorHandle = new MonitorHandle();
- monitorHandle.blockType = blockType;
- monitorHandle.key = key;
- monitorHandle.getHandler = getHandler;
- monitorHandle.getResponseHandler = getResponseHandler;
- monitorHandle.putHandler = putHandler;
- */
- return null;
+ MonitorRequest monitorRequest = new MonitorRequest();
+ monitorRequest.blockType = blockType;
+ monitorRequest.key = key;
+ monitorRequest.getHandler = getHandler;
+ monitorRequest.getResponseHandler = getResponseHandler;
+ monitorRequest.putHandler = putHandler;
+ currentMonitorRequest = monitorRequest;
+
+ return requestQueue.add(monitorRequest);
+
}
@@ -242,6 +321,13 @@
description = "set a value in the DHT; default is get")
boolean modePut = false;
+ @Option(action = OptionAction.SET,
+ shortname = "m",
+ longname = "monitor",
+ description = "monitor requests going to the local DHT")
+ boolean monitor = false;
+
+
@Option(action = OptionAction.STORE_STRING,
shortname = "d",
longname = "data",
@@ -275,17 +361,17 @@
public void run() {
- if (key == null) {
- System.out.println("key required");
- return;
- }
+ if (modePut) {
- if (modePut) {
+ if (key == null) {
+ System.out.println("key required");
+ return;
+ }
+
if (data == null) {
System.out.println("data required on put");
return;
}
-
final DistributedHashTable dht = new
DistributedHashTable(cfg);
dht.put(new HashCode(key), data.getBytes(), replication,
EnumSet.of(RouteOption.NONE),
@@ -301,7 +387,29 @@
dht.destroy();
}
});
- } else {
+ } else if (monitor) {
+ final DistributedHashTable dht = new
DistributedHashTable(cfg);
+ dht.startMonitor(BlockType.TEST.val, null,
+ new MonitorGetHandler() {
+ @Override
+ public void onGet(int options, int type, int
hop_count,
+ int
desired_replication_level, PeerIdentity[] getPath, HashCode key) {
+ System.out.println("get monitored");
+ }
+ },
+ new MonitorGetResponseHandler() {},
+ new MonitorPutHandler() {
+ @Override
+ public void onPut(int options, int type, int
hop_count, AbsoluteTimeMessage
+ expirationTime, PeerIdentity[]
putPath, HashCode key, byte[] data) {
+ System.out.println("put monitored");
+ }
+ });
+ } else { // get
+ if (key == null) {
+ System.out.println("key required");
+ return;
+ }
if (data != null) {
System.out.println("get does not take data as an
option");
return;
@@ -322,15 +430,4 @@
}
}.start();
}
-
- private class MonitorHandle {}
-
- private class MonitorGetHandler {}
-
- private class MonitorGetResponseHandler {}
-
- private class MonitorPutHandler {}
-
-
- private class MonitorRequest {}
}
Deleted: gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java 2012-05-09
13:32:11 UTC (rev 21383)
+++ gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java 2012-05-09
14:13:11 UTC (rev 21384)
@@ -1,50 +0,0 @@
-package org.gnunet.dht;
-
-import org.gnunet.construct.NestedMessage;
-import org.gnunet.construct.UInt16;
-import org.gnunet.construct.UInt32;
-import org.gnunet.construct.UnionCase;
-import org.gnunet.util.GnunetMessage;
-import org.gnunet.util.HashCode;
-
-/**
- * Message to request monitoring messages, clients --> DHT service.
- */
address@hidden(153)
-public class MonitorStartMessage implements GnunetMessage.Body {
- /**
- * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
- */
- @UInt32
- public int type;
-
- /**
- * Flag whether to notify about GET messages.
- */
- @UInt16
- public int get;
-
- /**
- * Flag whether to notify about GET_REPONSE messages.
- */
- @UInt16
- public int getResp;
-
- /**
- * Flag whether to notify about PUT messages.
- */
- @UInt16
- public int put;
-
- /**
- * Flag whether to use the provided key to filter messages.
- */
- @UInt16
- public int filter_key;
-
- /*
- The key to filter messages by.
- */
- @NestedMessage
- public HashCode key;
-}
Copied: gnunet-java/src/org/gnunet/dht/MonitorStartStop.java (from rev 21245,
gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java)
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorStartStop.java
(rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorStartStop.java 2012-05-09
14:13:11 UTC (rev 21384)
@@ -0,0 +1,50 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt16;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+ * Message to request monitoring messages, clients --> DHT service.
+ */
address@hidden(153)
+public class MonitorStartStop implements GnunetMessage.Body {
+ /**
+ * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+ */
+ @UInt32
+ public int type;
+
+ /**
+ * Flag whether to notify about GET messages.
+ */
+ @UInt16
+ public int get;
+
+ /**
+ * Flag whether to notify about GET_REPONSE messages.
+ */
+ @UInt16
+ public int getResp;
+
+ /**
+ * Flag whether to notify about PUT messages.
+ */
+ @UInt16
+ public int put;
+
+ /**
+ * Flag whether to use the provided key to filter messages.
+ */
+ @UInt16
+ public int filter_key;
+
+ /*
+ The key to filter messages by.
+ */
+ @NestedMessage
+ public HashCode key;
+}
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r21384 - gnunet-java/src/org/gnunet/dht,
gnunet <=