gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r21384 - gnunet-java/src/org/gnunet/dht


From: gnunet
Subject: [GNUnet-SVN] r21384 - gnunet-java/src/org/gnunet/dht
Date: Wed, 9 May 2012 16:13:11 +0200

Author: dold
Date: 2012-05-09 16:13:11 +0200 (Wed, 09 May 2012)
New Revision: 21384

Added:
   gnunet-java/src/org/gnunet/dht/MonitorStartStop.java
Removed:
   gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
Modified:
   gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
Log:
dht monitoring implemented

Modified: gnunet-java/src/org/gnunet/dht/DistributedHashTable.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-05-09 
13:32:11 UTC (rev 21383)
+++ gnunet-java/src/org/gnunet/dht/DistributedHashTable.java    2012-05-09 
14:13:11 UTC (rev 21384)
@@ -37,7 +37,6 @@
      */
     private PutRequest currentPutRequest;
 
-
     /**
      * Create a connection with the DHT service.
      *
@@ -116,6 +115,56 @@
         }
     }
 
+    public interface MonitorGetHandler {
+        void onGet(int options, int type, int hop_count, int 
desired_replication_level, PeerIdentity[] getPath,
+                   HashCode key);
+    }
+
+    public interface MonitorGetResponseHandler {}
+
+    public interface MonitorPutHandler {
+        void onPut(int options, int type, int hop_count, AbsoluteTimeMessage 
expirationTime, PeerIdentity[] putPath,
+                   HashCode key, byte[] data);
+    }
+
+    private class MonitorRequest extends RequestQueue.Request {
+
+        public int blockType;
+        public HashCode key;
+        public MonitorGetHandler getHandler;
+        public MonitorGetResponseHandler getResponseHandler;
+        public MonitorPutHandler putHandler;
+
+        @Override
+        public AbsoluteTime getDeadline() {
+            return AbsoluteTime.FOREVER;
+        }
+
+        @Override
+        public void transmit(Connection.MessageSink sink) {
+            MonitorStartStop mss = new MonitorStartStop();
+            if (key != null) {
+                mss.filter_key = 1;
+                mss.key = key;
+            } else {
+                // todo: right "empty" hash code value
+                mss.key = new HashCode("");
+            }
+            if (getHandler != null) {
+                mss.get = 1;
+            }
+            if (getResponseHandler != null) {
+                mss.getResp = 1;
+            }
+            if (putHandler != null) {
+                mss.put = 1;
+            }
+            mss.type = blockType;
+
+            sink.send(mss);
+        }
+    }
+
     public class DHTMessageReceiver extends RunaboutMessageReceiver {
         public void visit(ClientPutConfirmationMessage pcm) {
             if (currentPutRequest == null || pcm.uid != currentPutRequest.uid) 
{
@@ -129,14 +178,43 @@
             if (currentGetRequest == null || currentGetRequest.uid != rm.uid) {
                 logger.warn("received response on invalid UID");
             } else {
-                
currentGetRequest.cb.handleResult(AbsoluteTime.fromNetwork(rm.expiration), 
rm.key, null, null, BlockType.TEST,
+                
currentGetRequest.cb.handleResult(AbsoluteTime.fromNetwork(rm.expiration), 
rm.key, null, null,
+                        BlockType.TEST,
                         rm.data);
             }
         }
 
+        public void visit(MonitorGetMessage monitorGetMessage) {
+            if (currentMonitorRequest == null || 
currentMonitorRequest.getHandler == null) {
+                logger.warn("monitor service confused in monitoring");
+                return;
+            }
+            currentMonitorRequest.getHandler.onGet(monitorGetMessage.options, 
monitorGetMessage.type,
+                    monitorGetMessage.hop_count, 
monitorGetMessage.desired_replication_level, monitorGetMessage.getPath,
+                    monitorGetMessage.key);
+        }
+
+        public void visit(MonitorGetRespMessage monitorGetRespMessage) {
+            if (currentMonitorRequest == null || 
currentMonitorRequest.getResponseHandler == null) {
+                logger.warn("monitor service confused in monitoring");
+                return;
+            }
+        }
+
+        public void visit(MonitorPutMessage monitorPutMessage) {
+            if (currentMonitorRequest == null || 
currentMonitorRequest.putHandler == null) {
+                logger.warn("monitor service confused in monitoring");
+                return;
+            }
+            currentMonitorRequest.putHandler.onPut(monitorPutMessage.options, 
monitorPutMessage.type,
+                    monitorPutMessage.hop_count, 
monitorPutMessage.expirationTime,
+                    monitorPutMessage.putPath, monitorPutMessage.key, 
monitorPutMessage.data);
+
+        }
+
         @Override
         public void handleError() {
-            throw new AssertionError("not handled");
+            throw new AssertionError("unexpected");
         }
     }
 
@@ -208,19 +286,20 @@
         return requestQueue.add(getRequest);
     }
 
-    public Cancelable monitorStart(int blockType, HashCode key, 
MonitorGetHandler getHandler,
+    public Cancelable startMonitor(int blockType, HashCode key, 
MonitorGetHandler getHandler,
                                    MonitorGetResponseHandler 
getResponseHandler,
                                    MonitorPutHandler putHandler) {
-        /*
-        MonitorHandle monitorHandle = new MonitorHandle();
-        monitorHandle.blockType = blockType;
-        monitorHandle.key = key;
-        monitorHandle.getHandler = getHandler;
-        monitorHandle.getResponseHandler = getResponseHandler;
-        monitorHandle.putHandler = putHandler;
-        */
-        return null;
+        MonitorRequest monitorRequest = new MonitorRequest();
+        monitorRequest.blockType = blockType;
+        monitorRequest.key = key;
+        monitorRequest.getHandler = getHandler;
+        monitorRequest.getResponseHandler = getResponseHandler;
+        monitorRequest.putHandler = putHandler;
 
+        currentMonitorRequest = monitorRequest;
+
+        return requestQueue.add(monitorRequest);
+
     }
 
 
@@ -242,6 +321,13 @@
                     description = "set a value in the DHT; default is get")
             boolean modePut = false;
 
+            @Option(action = OptionAction.SET,
+                    shortname = "m",
+                    longname = "monitor",
+                    description = "monitor requests going to the local DHT")
+            boolean monitor = false;
+
+
             @Option(action = OptionAction.STORE_STRING,
                     shortname = "d",
                     longname = "data",
@@ -275,17 +361,17 @@
 
 
             public void run() {
-                if (key == null) {
-                    System.out.println("key required");
-                    return;
-                }
+                if (modePut) {
 
-                if (modePut) {
+                    if (key == null) {
+                        System.out.println("key required");
+                        return;
+                    }
+
                     if (data == null) {
                         System.out.println("data required on put");
                         return;
                     }
-
                     final DistributedHashTable dht = new 
DistributedHashTable(cfg);
 
                     dht.put(new HashCode(key), data.getBytes(), replication, 
EnumSet.of(RouteOption.NONE),
@@ -301,7 +387,29 @@
                             dht.destroy();
                         }
                     });
-                } else {
+                } else if (monitor) {
+                    final DistributedHashTable dht = new 
DistributedHashTable(cfg);
+                    dht.startMonitor(BlockType.TEST.val, null,
+                            new MonitorGetHandler() {
+                                @Override
+                                public void onGet(int options, int type, int 
hop_count,
+                                                  int 
desired_replication_level, PeerIdentity[] getPath, HashCode key) {
+                                    System.out.println("get monitored");
+                                }
+                            },
+                            new MonitorGetResponseHandler() {},
+                            new MonitorPutHandler() {
+                                @Override
+                                public void onPut(int options, int type, int 
hop_count, AbsoluteTimeMessage
+                                        expirationTime, PeerIdentity[] 
putPath, HashCode key, byte[] data) {
+                                    System.out.println("put monitored");
+                                }
+                            });
+                } else { // get
+                    if (key == null) {
+                        System.out.println("key required");
+                        return;
+                    }
                     if (data != null) {
                         System.out.println("get does not take data as an 
option");
                         return;
@@ -322,15 +430,4 @@
             }
         }.start();
     }
-
-    private class MonitorHandle {}
-
-    private class MonitorGetHandler {}
-
-    private class MonitorGetResponseHandler {}
-
-    private class MonitorPutHandler {}
-
-
-    private class MonitorRequest {}
 }

Deleted: gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java     2012-05-09 
13:32:11 UTC (rev 21383)
+++ gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java     2012-05-09 
14:13:11 UTC (rev 21384)
@@ -1,50 +0,0 @@
-package org.gnunet.dht;
-
-import org.gnunet.construct.NestedMessage;
-import org.gnunet.construct.UInt16;
-import org.gnunet.construct.UInt32;
-import org.gnunet.construct.UnionCase;
-import org.gnunet.util.GnunetMessage;
-import org.gnunet.util.HashCode;
-
-/**
- * Message to request monitoring messages, clients --> DHT service.
- */
address@hidden(153)
-public class MonitorStartMessage implements GnunetMessage.Body {
-    /**
-     * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
-     */
-    @UInt32
-    public int type;
-
-    /**
-     * Flag whether to notify about GET messages.
-     */
-    @UInt16
-    public int get;
-
-    /**
-     * Flag whether to notify about GET_REPONSE messages.
-     */
-    @UInt16
-    public int getResp;
-
-    /**
-     * Flag whether to notify about PUT messages.
-     */
-    @UInt16
-    public int put;
-
-    /**
-     * Flag whether to use the provided key to filter messages.
-     */
-    @UInt16
-    public int filter_key;
-
-    /*
-    The key to filter messages by.
-     */
-    @NestedMessage
-    public HashCode key;
-}

Copied: gnunet-java/src/org/gnunet/dht/MonitorStartStop.java (from rev 21245, 
gnunet-java/src/org/gnunet/dht/MonitorStartMessage.java)
===================================================================
--- gnunet-java/src/org/gnunet/dht/MonitorStartStop.java                        
        (rev 0)
+++ gnunet-java/src/org/gnunet/dht/MonitorStartStop.java        2012-05-09 
14:13:11 UTC (rev 21384)
@@ -0,0 +1,50 @@
+package org.gnunet.dht;
+
+import org.gnunet.construct.NestedMessage;
+import org.gnunet.construct.UInt16;
+import org.gnunet.construct.UInt32;
+import org.gnunet.construct.UnionCase;
+import org.gnunet.util.GnunetMessage;
+import org.gnunet.util.HashCode;
+
+/**
+ * Message to request monitoring messages, clients --> DHT service.
+ */
address@hidden(153)
+public class MonitorStartStop implements GnunetMessage.Body {
+    /**
+     * The type of data desired, GNUNET_BLOCK_TYPE_ANY for all.
+     */
+    @UInt32
+    public int type;
+
+    /**
+     * Flag whether to notify about GET messages.
+     */
+    @UInt16
+    public int get;
+
+    /**
+     * Flag whether to notify about GET_REPONSE messages.
+     */
+    @UInt16
+    public int getResp;
+
+    /**
+     * Flag whether to notify about PUT messages.
+     */
+    @UInt16
+    public int put;
+
+    /**
+     * Flag whether to use the provided key to filter messages.
+     */
+    @UInt16
+    public int filter_key;
+
+    /*
+    The key to filter messages by.
+     */
+    @NestedMessage
+    public HashCode key;
+}




reply via email to

[Prev in Thread] Current Thread [Next in Thread]