gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r3072 - in freeway/src/org/gnu/freeway: . server test trans


From: mdonoughe
Subject: [GNUnet-SVN] r3072 - in freeway/src/org/gnu/freeway: . server test transport/tcp util/net
Date: Tue, 27 Jun 2006 21:45:08 -0700 (PDT)

Author: mdonoughe
Date: 2006-06-27 21:45:03 -0700 (Tue, 27 Jun 2006)
New Revision: 3072

Removed:
   freeway/src/org/gnu/freeway/util/net/TCPServer.java
   freeway/src/org/gnu/freeway/util/net/TCPSession.java
Modified:
   freeway/src/org/gnu/freeway/AbstractClient.java
   freeway/src/org/gnu/freeway/server/ClientServer.java
   freeway/src/org/gnu/freeway/test/TCPTest.java
   freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java
   freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java
   freeway/src/org/gnu/freeway/util/net/CSServer.java
   freeway/src/org/gnu/freeway/util/net/CSSession.java
Log:
Move TCPServer and TCPSession into CSServer and CSSession because they 
are the only implementations


Modified: freeway/src/org/gnu/freeway/AbstractClient.java
===================================================================
--- freeway/src/org/gnu/freeway/AbstractClient.java     2006-06-28 04:24:55 UTC 
(rev 3071)
+++ freeway/src/org/gnu/freeway/AbstractClient.java     2006-06-28 04:45:03 UTC 
(rev 3072)
@@ -60,7 +60,7 @@
                        return null;
                        }
 
-               session=new TCPSession();
+               session=new CSSession();
                if (!session.connect(ip,port,true)) {
                        log(Level.SEVERE,"Could not connect to gnunetd !");
 

Modified: freeway/src/org/gnu/freeway/server/ClientServer.java
===================================================================
--- freeway/src/org/gnu/freeway/server/ClientServer.java        2006-06-28 
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/server/ClientServer.java        2006-06-28 
04:45:03 UTC (rev 3072)
@@ -34,7 +34,7 @@
 
        private Stat                            bytesOut;
 
-       private TCPServer               server;
+       private CSServer                server;
 
        private PersistentDecoder       decoder;
 
@@ -44,7 +44,7 @@
                super(true);
                handlers=new ArrayList();
                exitHandlers=new LinkedList();
-               server=new TCPServer("C/S",this);
+               server=new CSServer("C/S",this);
                decoder=null;
        }
 
@@ -150,7 +150,7 @@
 
 //             debug("Accepted connection from 
"+ip.getHostAddress()+":"+socket.socket().getPort()+".");
 
-               hd=new TCPSession(server);
+               hd=new CSSession(server);
                return (hd.connect(socket,true) ? hd : null);
        }
 

Modified: freeway/src/org/gnu/freeway/test/TCPTest.java
===================================================================
--- freeway/src/org/gnu/freeway/test/TCPTest.java       2006-06-28 04:24:55 UTC 
(rev 3071)
+++ freeway/src/org/gnu/freeway/test/TCPTest.java       2006-06-28 04:45:03 UTC 
(rev 3072)
@@ -46,7 +46,7 @@
 
                imax=4;
                for (i=0; i<imax; i++) {
-                       acceptSocket=new TCPSession();
+                       acceptSocket=new CSSession();
 
                        if (acceptSocket.connect(doAccept(serverSocket),true)) {
                                acceptSocket.setBlocking(true);

Modified: freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java
===================================================================
--- freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java   2006-06-28 
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/transport/tcp/TCPSession.java   2006-06-28 
04:45:03 UTC (rev 3072)
@@ -18,7 +18,7 @@
  * Transport Session handle.
  */
 
-public class TCPSession extends org.gnu.freeway.util.net.TCPSession implements 
Session
+public class TCPSession extends org.gnu.freeway.util.net.CSSession implements 
Session
 {
        /** after how much time of the core not being associated with a tcp 
connection anymore do we close it ? */
        public static final long        TIME_OUT        =       
Scheduler.SECS_30;
@@ -45,7 +45,7 @@
        private long                                    lastWrite;
 
 
-       public TCPSession( TCPServer s, TCPTransport t )
+       public TCPSession( CSServer s, TCPTransport t )
        {
                super(s);
                transport=t;

Modified: freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java
===================================================================
--- freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java 2006-06-28 
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/transport/tcp/TCPTransport.java 2006-06-28 
04:45:03 UTC (rev 3072)
@@ -26,7 +26,7 @@
        private StatusCallsService      status;
 
        /** */
-       private TCPServer                       server;
+       private CSServer                        server;
 
        /** */
        private int                                     mtu;
@@ -44,7 +44,7 @@
        public TCPTransport()
        {
                super(TCP_PROTOCOL_NUMBER,"TCP");
-               server=new TCPServer("TCP PEER SERVER",this);
+               server=new CSServer("TCP PEER SERVER",this);
        }
 
        public String toString()

Modified: freeway/src/org/gnu/freeway/util/net/CSServer.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/CSServer.java  2006-06-28 04:24:55 UTC 
(rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/CSServer.java  2006-06-28 04:45:03 UTC 
(rev 3072)
@@ -4,24 +4,645 @@
 
 package org.gnu.freeway.util.net;
 
+import org.gnu.freeway.util.*;
+
+import java.io.*;
+import java.net.*;
+import java.nio.channels.*;
+import java.util.*;
+import java.util.logging.*;
+
 /**
  *
  */
 
-public interface CSServer
+public class CSServer extends LoggedObject
 {
-       public String getLabel();
+       /** */
+       private static final long               SELECT_TIMEOUT          =       
Scheduler.SECS_3;
 
-       public boolean isLaunched();
-       public boolean launch( int port );
-       public boolean shutdown();
+       /** Maximum of pending unhandled connections. */
+       private static final int                MAX_QUEUED_REQUESTS     =       
5;
 
+       /** Maximum number of concurrent allowed sessions. */
+       private static final int                MAX_SESSIONS                    
=       64;
+
+       /** Name of this server (for debugging purpose only). */
+       private String                          label;
+
+       /** Selector of the server thread */
+       private Selector                                selector;
+
+       /** The TCP socket that we listen on for new inbound connections. */
+       private ServerSocketChannel     server;
+
+       /** Thread for listening for new connections. */
+       private MasterTask                      listenTask;
+
+       /** Thread for accepting new connections. */
+       private SlaveTask                       acceptTask;
+
+       /** Thread for reading on all open sockets. */
+       private SlaveTask                       readTask;
+
+       /** Thread for writing on all open sockets. */
+       private SlaveTask                       writeTask;
+
+       /** Should the listen thread exit ? */
+       private boolean                         running;
+
+       /** Array of currently active TCP sessions. */
+       private CSSession[]                     sessions;
+
+       /** */
+       private int                                     sessionCount;
+
+       /** Sessions' current operations. */
+       private int[]                           sessionsOps;
+
+       /** */
+       private boolean                         acceptingOp;
+
+       /** Sessions lock. */
+       private Object                          internal;
+
+       /** */
+       private CSSessionHandler                handler;
+
+
+       public CSServer( String str, CSSessionHandler h )
+       {
+               super(true);
+               label=str;
+               selector=null;
+               server=null;
+               listenTask=new MasterTask("TCP-LISTEN("+str+")",new 
EvalAction(this,"performListen"));
+               acceptTask=listenTask.create("TCP-ACCEPT("+str+")",new 
EvalAction(this,"performAccept"));
+               readTask=listenTask.create("TCP-READ("+str+")",new 
EvalAction(this,"performRead"));
+               writeTask=listenTask.create("TCP-WRITE("+str+")",new 
EvalAction(this,"performWrite"));
+               running=false;
+               sessions=new CSSession[0];
+               sessionCount=0;
+               sessionsOps=new int[0];
+               acceptingOp=false;
+               internal=new Object();
+               handler=h;
+       }
+
+       public String toString()
+       {
+               return "Abstract TCP server";
+       }
+
+
+       
////////////////////////////////////////////////////////////////////////////////////////////////
+
+       public String getLabel()
+       {
+               return label;
+       }
+
+       public boolean isLaunched()
+       {
+               return running;
+       }
+
+       public boolean launch( int port )
+       {
+               int     secs;
+
+               log(Level.INFO,label+" Launching TCP server...");
+
+               // open selector
+               try {
+                       selector=Selector.open();
+                       }
+               catch( IOException x ) {
+                       err("Could not create selector !",x);
+                       return false;
+                       }
+
+               // create server socket
+               secs=5;
+               while (server==null && secs<60) {
+                       try {
+                               server=ServerSocketChannel.open();
+                               server.configureBlocking(false);
+                               server.socket().setReuseAddress(true);
+                               server.socket().bind(new 
InetSocketAddress(port),MAX_QUEUED_REQUESTS);
+                               log(Level.INFO,label+" TCP server bound to port 
"+port+".");
+                               }
+                       catch( IOException x ) {
+                               err("Failed to open socket at port "+port+". 
Trying again in "+secs+" seconds...",x);
+
+                               Scheduler.sleep(Scheduler.seconds(secs));
+                               secs+=5;        // slow progression...
+
+                               if (server!=null) {
+                                       try {
+                                               server.close();
+                                               }
+                                       catch( IOException xx ) {
+                                               }
+                                       server=null;
+                                       }
+                               }
+                       }
+
+               if (server==null) {
+                       log(Level.SEVERE,label+" Could not create socket, 
abort.");
+                       try {
+                               selector.close();
+                               }
+                       catch( IOException x ) {
+                               }
+                       selector=null;
+                       return false;
+                       }
+
+               // start listening thread
+               running=true;
+               listenTask.launch();
+               return true;
+       }
+
+       public boolean shutdown()
+       {
+               int     i;
+
+               // signal listening thread
+               running=false;
+               selector.wakeup();
+
+               // stop listening thread
+               listenTask.shutdown();
+
+               try {
+                       server.close();
+                       }
+               catch( IOException x ) {
+                       err("Failed to close socket !",x);
+                       return false;
+                       }
+               finally {
+                       server=null;
+                       try {
+                               selector.close();
+                               }
+                       catch( IOException x ) {
+                               err("Failed to close selector !",x);
+                               return false;
+                               }
+                       finally {
+                               selector=null;
+                               }
+                       }
+               log(Level.INFO,label+" TCP server stopped.");
+
+               synchronized(internal) {
+                       for (i=0; i<sessions.length; i++) {
+                               if (sessions[i]!=null) {
+                                       log(Level.WARNING,label+" Session still 
alive : "+sessions[i].getLabel());
+                                       destroySession(i);
+                                       }
+                               }
+                       }
+               return true;
+       }
+
+       public void wakeUp()
+       {
+               selector.wakeup();
+       }
+
        /**
-        * Add manually a session to the pool of listened sessions.
+        * Add session to the pool of listened sessions. If it can't be added, 
session will be disconnected and false returned.
         *
-        * @param s     The session to add.
-        * @return      True if okay (enough ressources), false otherwise.
+        * @param s
+        * @return
+        * @see CSSession#disconnect()
         */
 
-       public boolean register( CSSession s );
+       public boolean register( CSSession s )
+       {
+               if (addSession(s)>=0) {
+                       // signal the thread that is blocked in a select call 
that the set of sockets to listen to has changed
+                       selector.wakeup();
+                       return true;
+                       }
+               return false;
+       }
+
+       /**
+        * Listen for incoming connections.
+        * Main method for the thread listening on the tcp socket and all tcp 
connections.
+        * Whenever a message is received, it is processed by the handler.
+        * This thread waits for activity on any of the TCP connections and 
processes deferred (async) writes and buffers reads
+        * until an entire message has been received.
+        *
+        * @throws IOException
+        */
+
+       public void performListen() throws IOException
+       {
+               SelectionKey    key;
+               Iterator                iter;
+               int                     mergedOps,ops,ret,i;
+
+               acceptingOp=true;
+
+               while (running) {
+                       synchronized(internal) {
+                               server.register(selector,(acceptingOp ? 
SelectionKey.OP_ACCEPT : 0));
+
+                               for (i=0; i<sessions.length; i++) {
+                                       if (sessions[i]!=null) {
+                                               if (sessions[i].isConnected()) 
{        // always check because impl. may disconnect after timeout...
+                                                       
key=sessions[i].registerOps(selector,(sessions[i].getOps() & ~sessionsOps[i]));
+                                                       if (key!=null) {
+                                                               key.attach(new 
Integer(i));
+                                                               }
+                                                       else {
+                                                               
destroySession(i);
+                                                               }
+                                                       }
+                                               else {
+                                                       // clean up (depends on 
session implementation : timeout detected, other side closed connection...)
+                                                       destroySession(i);
+                                                       }
+                                               }
+                                       }
+                               }
+
+                       // should wake up regularly (to clean up sessions...)
+                       ret=selector.select(Scheduler.toMillis(SELECT_TIMEOUT));
+                       if (ret==0) {
+                               continue;
+                               }
+
+                       synchronized(internal) {
+                               mergedOps=0;
+
+                               iter=selector.selectedKeys().iterator();
+                               while (iter.hasNext()) {
+                                       key=(SelectionKey) iter.next();
+                                       iter.remove();
+
+                                       if (key.isValid()) {
+                                               ops=key.readyOps();
+                                               mergedOps|=ops;
+                                               if ((ops & 
SelectionKey.OP_ACCEPT)==0) {        // read or write op
+                                                       i=((Number) 
key.attachment()).intValue();
+                                                       sessionsOps[i]|=ops;
+                                                       }
+                                               }
+                                       }
+
+                               debug(label+" Selected #"+ret+" sockets with 
merged ops { "+NetUtils.labelForOps(mergedOps)+" }.");
+
+                               // signal appropriate tasks
+                               if ((mergedOps & SelectionKey.OP_ACCEPT)!=0) {
+                                       acceptingOp=false;
+                                       acceptTask.signal();
+                                       }
+                               if ((mergedOps & SelectionKey.OP_READ)!=0) {
+                                       readTask.signal();
+                                       }
+                               if ((mergedOps & SelectionKey.OP_WRITE)!=0) {
+                                       writeTask.signal();
+                                       }
+                               }
+                       }
+
+               // shutdown... close all sessions
+               synchronized(internal) {
+                       for (i=0; i<sessions.length; i++) {
+                               if (sessions[i]!=null) {
+                                       destroySession(i);
+                                       }
+                               }
+                       }
+       }
+
+       public void performAccept()
+       {
+               CSSession               s;
+               SocketChannel   c;
+
+               try {
+                       for (c=server.accept(); c!=null; c=server.accept()) {
+                               s=handler.handleAccept(c);
+                               if (s!=null) {
+                                       if (addSession(s)<0) {
+                                               s.disconnect();
+                                               }
+                                       }
+                               else {
+                                       try {
+                                               c.close();
+                                               }
+                                       catch( IOException xx ) {
+                                               err("Failed to close channel 
!",xx);
+                                               }
+                                       }
+                               }
+                       }
+               catch( IOException x ) {
+                       err("Failed to accept new connection !",x);
+                       }
+
+               synchronized(internal) {
+                       acceptingOp=true;
+                       }
+
+               selector.wakeup();
+       }
+
+       public void performRead()
+       {
+               CSSession       s;
+               int                     len;
+
+               do {
+                       s=firstSessionWithOp(SelectionKey.OP_READ);
+                       if (s!=null) {
+                               len=s.doReceive();
+                               if (len>0 && handler.handleRead(s,len)) {
+                                       clearSessionOp(s,SelectionKey.OP_READ);
+                                       }
+                               else {
+                                       debug(s.getLabel()+" End of stream.");
+                                       destroySession(s);
+                                       }
+                               }
+                       }
+               while (s!=null);
+
+               // signal the thread that is blocked in a select call that the 
set of sockets to listen to has changed
+               selector.wakeup();
+       }
+
+       public void performWrite()
+       {
+               CSSession       s;
+               int                     len;
+
+               do {
+                       s=firstSessionWithOp(SelectionKey.OP_WRITE);
+                       if (s!=null) {
+                               len=s.doSend();
+                               if (len>0 && handler.handleWrite(s,len)) {
+                                       clearSessionOp(s,SelectionKey.OP_WRITE);
+                                       }
+                               else {
+                                       debug(s.getLabel()+" End of stream.");
+                                       destroySession(s);
+                                       }
+                               }
+                       }
+               while (s!=null);
+
+               // signal the thread that is blocked in a select call that the 
set of sockets to listen to has changed
+               selector.wakeup();
+       }
+
+       /**
+        * Add a new session to the array watched by the select thread. Grows 
the array if needed.
+        *
+        * @param s     Session to add.
+        * @return      Index of added session, or -1 on error.
+        */
+
+       protected int addSession( CSSession s )
+       {
+               CSSession[]     tmp;
+               int[]           tmp2;
+               int                     i;
+
+               synchronized(internal) {
+                       if (sessionCount==MAX_SESSIONS) {
+                               log(Level.WARNING,"Too many sessions 
("+MAX_SESSIONS+"), ignore connection.");
+                               return -1;
+                               }
+
+                       for (i=0; i<sessions.length && sessions[i]!=null; i++) 
{}
+                       if (i==sessions.length) {
+                               tmp=new CSSession[sessions.length+16];
+                               Arrays.fill(tmp,null);
+                               
System.arraycopy(sessions,0,tmp,0,sessions.length);
+                               sessions=tmp;
+
+                               tmp2=new int[sessionsOps.length+16];
+                               Arrays.fill(tmp2,0);
+                               
System.arraycopy(sessionsOps,0,tmp2,0,sessionsOps.length);
+                               sessionsOps=tmp2;
+                               }
+
+                       sessions[i]=s;
+                       sessionsOps[i]=0;
+                       sessionCount++;
+                       debug("Add session at slot #"+i+" 
"+Utils.gauge(sessionCount,sessions.length)+".");
+                       return i;
+                       }
+       }
+
+       protected CSSession firstSessionWithOp( int op )
+       {
+               int     i;
+
+               synchronized(internal) {
+                       for (i=0; i<sessionsOps.length && (sessionsOps[i] & 
op)==0; i++) {}
+                       return (i<sessionsOps.length ? sessions[i] : null);
+                       }
+       }
+
+       protected boolean clearSessionOp( CSSession s, int op )
+       {
+               int     i;
+
+               assert(s!=null);
+
+               synchronized(internal) {
+                       for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
+                       if (i==sessions.length) {
+                               log(Level.WARNING,label+" Session not found : 
"+s.getLabel()+".");
+                               return false;
+                               }
+
+                       sessionsOps[i]&=~op;
+                       return true;
+                       }
+       }
+
+       protected boolean destroySession( CSSession s )
+       {
+               int     i;
+
+               assert(s!=null);
+
+               synchronized(internal) {
+                       for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
+                       if (i==sessions.length) {
+                               log(Level.WARNING,label+" Session not found : 
"+s.getLabel()+".");
+                               return false;
+                               }
+
+                       return destroySession(i);
+                       }
+       }
+
+       /**
+        * The client has disconnected. Close the socket, free the buffers, 
unlink session from the linked list.
+        * Remove a session, either the other side closed the connection or we 
have otherwise reason to believe
+        * that it should better be killed.
+        *
+        * @param index index to the session handle
+        * @return
+        */
+
+       protected boolean destroySession( int index )
+       {
+               assert(index>=0);
+
+               synchronized(internal) {
+                       if (index>=sessions.length || sessions[index]==null) {
+                               log(Level.WARNING,label+" No session at slot 
"+index+".");
+                               return false;
+                               }
+
+                       if (sessions[index].isConnected()) {
+                               sessions[index].disconnect();
+                               }
+
+                       handler.handleDestroy(sessions[index]);
+
+                       sessions[index]=null;
+                       sessionsOps[index]=0;
+                       sessionCount--;
+                       debug("Destroyed session at slot #"+index+" 
"+Utils.gauge(sessionCount,sessions.length)+".");
+                       }
+               return true;
+       }
 }
+
+
+/*
+       public void add( int id, Class c, CommandHandler h )
+       {
+               decoder.add(id,c);
+               setHandler(c,h);
+       }
+
+       public void setDefault( int id, Class c, CommandHandler h )
+       {
+               decoder.add(id,c);
+               setHandler(c,h);
+               decoder.setDefault(id);
+       }
+
+       public void setCorrupted( int id, Class c, CommandHandler h )
+       {
+               decoder.add(id,c);
+               setHandler(c,h);
+               decoder.setCorrupted(id);
+       }
+
+       public boolean hasHandler( Class c )
+       {
+               synchronized(handlers) {
+                       return handlers.get(c)!=null;
+                       }
+       }
+
+       public CommandHandler getHandler( Class c )
+       {
+               synchronized(handlers) {
+                       return (CommandHandler) handlers.get(c);
+                       }
+       }
+
+       public boolean setHandler( Class c, CommandHandler h )
+       {
+               synchronized(handlers) {
+                       if (handlers.get(c)!=null) {
+                               log(Level.WARNING,"Could not assign handler, 
class "+c.getName()+" is already registered.");
+                               return false;
+                               }
+                       handlers.put(c,h);
+                       return true;
+                       }
+       }
+
+       public boolean removeHandler( Class c, CommandHandler h )
+       {
+               synchronized(handlers) {
+                       if (handlers.get(c)==null) {
+                               log(Level.WARNING,"Could not remove handler, 
class "+c.getName()+" is not registered.");
+                               return false;
+                               }
+                       handlers.remove(c);
+                       return true;
+                       }
+       }
+
+
+deriver de TCP Server:
+       public void start()
+       {
+               add(ProxyCommand.HELLO_ID,ProxyHello.class,this);
+               add(ProxyCommand.CONNECT_ID,ProxyConnect.class,this);
+               add(ProxyCommand.SETBLOCKING_ID,ProxySetBlocking.class,this);
+
+               setDefault(ProxyCommand.UNKNOWN_ID,ProxyUnknown.class,this);
+               
setCorrupted(ProxyCommand.CORRUPTED_ID,ProxyCorrupted.class,this);
+
+               super.start();
+       }
+
+       public void stop()
+       {
+               super.stop();
+       }
+
+       public TCP Session createSession( PersistentDecoder decoder )
+       {
+               return new ProxySession(decoder);
+       }
+
+       public boolean handle( TCP Session session, Persistent p )
+       {
+               ProxySession    s;
+               ProxyToken      token;
+               int     id;
+
+               id=((ProxyCommand) p).getID();
+
+               s=(ProxySession) session;
+               if (!s.isWelcomed()) {
+                       if (id!=ProxyCommand.HELLO_ID) {
+                               log("No hello received, close session.");
+                               return false;
+                               }
+                       s.welcome();
+
+                       token=new ProxyToken(s.getToken());
+                       token.setAddress("",0);
+                       sendToClient(s,token);
+                       return true;
+                       }
+
+               switch (id) {
+                       case ProxyCommand.CONNECT_ID:
+                               break;
+
+                       case ProxyCommand.SETBLOCKING_ID:
+                               break;
+
+                       default:
+                               log("Unknown message : "+p);
+                               return false;
+                       }
+               return true;
+       }
+
+*/

Modified: freeway/src/org/gnu/freeway/util/net/CSSession.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/CSSession.java 2006-06-28 04:24:55 UTC 
(rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/CSSession.java 2006-06-28 04:45:03 UTC 
(rev 3072)
@@ -4,36 +4,407 @@
 
 package org.gnu.freeway.util.net;
 
+import org.gnu.freeway.util.*;
+
 import java.net.*;
 import java.nio.channels.*;
+import java.util.logging.*;
 
 /**
+ * Per-client data structure (kept in linked list).  Also: the opaque
+ * handle for client connections passed by the core to the CSHandlers.
+ * Opaque handle for client connections passed by
+ * the core to the CSHandlers.
  *
+ * A connection to a freeway client application. To be used in non-blocking 
mode.
+ *
+ * Struct to refer to a GNUnet TCP connection.
+ * This is more than just a socket because if the server
+ * drops the connection, the client automatically tries
+ * to reconnect (and for that needs connection information).
+ *
+ * Code for synchronized access to TCP streams
+ *
+ * Generic TCP code for reliable, mostly blocking, record-oriented TCP
+ * connections. GNUnet uses the "tcpio" code for trusted client-server
+ * (e.g. gnunet-gtk to gnunetd via loopback) communications.  Note
+ * that an unblocking write is also provided since if both client and
+ * server use blocking IO, both may block on a write and cause a
+ * mutual inter-process deadlock.
+ *
+ * Since we do not want other peers (!) to be able to block a peer by
+ * not reading from the TCP stream, the peer-to-peer TCP transport
+ * uses unreliable, buffered, non-blocking, record-oriented TCP code
+ * with a select call to reduce the number of threads which is
+ * provided in transports/tcp.c.
+ * Generic TCP code. This module is used to receive or send records
+ * (!) from a TCP stream. The code automatically attempts to
+ * re-connect if the other side closes the connection.<br>
+ *
+ * The code can be used on the server- or the client side, just in
+ * case of the server the reconnect can of course not be used. The TCP
+ * stream is broken into records of maximum length MAX_BUFFER_SIZE,
+ * each preceeded by a 16 bits integer (not signed) giving the length of the
+ * following record.<p>
  */
 
-public interface CSSession
+public class CSSession extends LoggedObject
 {
-       public String getLabel();
+       /** */
+       private CSServer                        server;
 
-       public int getOps();
-       public SelectionKey registerOps( Selector sel, int ops );
+       /** Socket to communicate with the other side. */
+       private PersistentSocket                socket;
 
-       public boolean isConnected();
-       public boolean connect( InetAddress ip, int port, boolean careful );
-       public boolean connect( SocketChannel channel, boolean careful );
-       public boolean disconnect();
+       /** */
+       private String                          label;
 
-       public boolean isBlocking();
-       public void setBlocking( boolean flag );
+       /** Lock used to synchronized read operations. */
+       protected Object                                readLock;
 
-       public int doReceive();
-       public boolean hasReceived();
-       public Persistent receive( Class c );
-       public Persistent receive( PersistentDecoder decoder );
+       /** Lock used to synchronized write operations. */
+       protected Object                                writeLock;
 
-       public boolean send( Persistent p );
-       public boolean sendAndCheck( Persistent p );
-       public boolean flushAndSend( Persistent p );
-       public boolean hasToSend();
-       public int doSend();
+
+       public CSSession()
+       {
+               this(null);
+       }
+
+       public CSSession( CSServer s )
+       {
+               super(true);
+               server=s;
+               socket=new PersistentSocket();
+               socket.setDebug(false);
+               label=socket.getLabel();
+               readLock=new Object();
+               writeLock=new Object();
+       }
+
+       public String toString()
+       {
+               return "Client/server session";
+       }
+
+
+       
////////////////////////////////////////////////////////////////////////////////////////////////
+
+       public String getLabel()
+       {
+               return label;
+       }
+
+       public int getOps()
+       {
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               return (socket.shouldWrite() ? 
(SelectionKey.OP_READ | SelectionKey.OP_WRITE) : SelectionKey.OP_READ);
+                               }
+                       }
+       }
+
+       public SelectionKey registerOps( Selector sel, int ops )
+       {
+               SelectionKey            key;
+
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               key=socket.getChannel().keyFor(sel);
+                               if (key==null) {
+                                       try {
+                                               
key=socket.getChannel().register(sel,0);
+                                               }
+                                       catch( ClosedChannelException x ) {
+                                               err(label+" Failed to register 
on selector !",x);
+                                               return null;
+                                               }
+                                       }
+                               key.interestOps(ops);
+                               return key;
+                               }
+                       }
+       }
+
+       public boolean isConnected()
+       {
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               return !socket.isClosed();
+                               }
+                       }
+       }
+
+       /**
+        * Connect this session to the specified ip and port in *blocking mode*.
+        * Used when connecting to a server at {ip,port}.
+        *
+        * @param ip                    IP of the host to connect to.
+        * @param port          The port number.
+        * @param careful       Should we treat socket with respect (SO_LINGER 
not set) ?
+        * @return                      True if successful, false on failure.
+        */
+
+       public boolean connect( InetAddress ip, int port, boolean careful )
+       {
+               boolean res;
+
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               res=socket.open(ip,port,careful);
+                               if (res) {
+                                       label=socket.getLabel();
+                                       debug(label+" Connected.");
+                                       }
+                               return res;
+                               }
+                       }
+       }
+
+       /**
+        * Connect this session to the specified channel in *non blocking* mode.
+        * Used when connecting to a client from a server.
+        *
+        * @param channel       The open client socket.
+        * @param careful       Should we treat socket with respect (SO_LINGER 
not set) ?
+        * @return                      True if successful, false on failure.
+        */
+
+       public boolean connect( SocketChannel channel, boolean careful )
+       {
+               boolean res;
+
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               res=socket.open(channel,careful);
+                               if (res) {
+                                       label=socket.getLabel();
+                                       debug(label+" Connected.");
+                                       }
+                               return res;
+                               }
+                       }
+       }
+
+       /**
+        * Close the session.
+        *
+        * @return      True if succeedeed, false otherwise.
+        */
+
+       public boolean disconnect()
+       {
+               boolean res;
+
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               if (socket.isClosed()) {
+                                       log(Level.WARNING,"Session is already 
closed.");
+                                       return false;
+                                       }
+
+                               res=socket.close();
+                               if (res) {
+                                       debug(label+" Disconnected.");
+                                       }
+                               return res;
+                               }
+                       }
+       }
+
+       public boolean isBlocking()
+       {
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               return socket.isBlocking();
+                               }
+                       }
+       }
+
+       public void setBlocking( boolean flag )
+       {
+               synchronized(readLock) {
+                       synchronized(writeLock) {
+                               socket.setBlocking(flag);
+                               }
+                       }
+       }
+
+       /**
+        * Buffer data received from the other side.
+        *
+        * @return      True if at least one byte has been received, false if 
the socket was closed by the other side or if an error occured.
+        */
+
+       public int doReceive()
+       {
+               int     len;
+
+               synchronized(readLock) {
+                       len=socket.doRead();
+                       if (len>0) {
+                               debug(label+" Have read "+len+" bytes.");
+                               }
+                       return len;
+                       }
+       }
+
+       public boolean hasReceived()
+       {
+               synchronized(readLock) {
+                       return socket.shouldDequeue();
+                       }
+       }
+
+       /**
+        * @param c
+        * @return
+        */
+
+       public Persistent receive( Class c )
+       {
+               Persistent      p;
+
+               synchronized(readLock) {
+                       if (!socket.shouldDequeue() && socket.isBlocking()) {
+                               doReceive();
+                               }
+
+                       p=socket.dequeue(c);
+                       if (p!=null) {
+                               debug(label+" Received message : "+p+".");
+                               }
+                       return p;
+                       }
+       }
+
+       /**
+        * Decode buffered data. If in blocking mode and no messages are 
buffered, an attempt is made to read fresh data.
+        *
+        * @param decoder       Decoder used to transform transmitted data into 
messages.
+        * @return                      Any decoded data if available, null 
otherwise.
+        */
+
+       public Persistent receive( PersistentDecoder decoder )
+       {
+               Persistent      p;
+
+               synchronized(readLock) {
+                       if (!socket.shouldDequeue() && socket.isBlocking()) {
+                               doReceive();
+                               }
+
+                       p=socket.dequeue(decoder);
+                       if (p!=null) {
+                               debug(label+" Received message : "+p+".");
+                               }
+                       return p;
+                       }
+       }
+
+       /**
+        * Add data to the buffer, and if blocking, start transferring buffered 
data.
+        *
+        * <div>When in blocking mode, try to also send buffered data to the 
other side. Returns true if, at least, one byte
+        * has been transmitted. Please note that it does *not* imply that any 
part of the data <code>p</code> has been transmitted,
+        * since other data may had been buffered previously (transfer is 
initiated but may be incomplete).</div>
+        *
+        * <div>In non-blocking mode, returns true. The actual transfer happens 
asynchronously.</div>
+        *
+        * @param p     The data to write (duplicated, because may be buffered 
and stored a certain amount of time...).
+        * @return      True if in non-blocking mode, or if at least one byte 
of buffered data has been transmitted, false otherwise.
+        */
+
+       public boolean send( Persistent p )
+       {
+               p=PersistentHelper.copy(p);
+
+               synchronized(writeLock) {
+                       socket.enqueue(p);
+                       debug(label+" Sent message : "+p+".");
+
+                       if (server!=null) {
+                               server.wakeUp();
+                               }
+
+                       return (socket.isBlocking() ? doSend()>0 : true);
+                       }
+       }
+
+       public boolean sendAndCheck( Persistent p )
+       {
+               CSResult                res;
+
+               synchronized(writeLock) {
+                       if (!send(p)) {
+                               return false;
+                               }
+
+                       res=(CSResult) receive(CSResult.class);
+                       return (res!=null && res.isOkay());
+                       }
+       }
+
+       /**
+        * Flush buffered data, buffer given data <code>p</code> and try to 
initiate transfer of this data.
+        * Note that it is possible that only a part of the message is sent.
+        *
+        * Returning true here means that at least a small part of the message 
has been transmitted,
+        * though it may be transmitted entirely a bit later.
+        *
+        * @param p     The data to write (duplicated, because may be buffered 
and stored a certain amount of time...).
+        * @return      False if an I/O error occurred, or if it did not 
transmit any byte of the message. Return true otherwise.
+        */
+
+       public boolean flushAndSend( Persistent p )
+       {
+               boolean empty;
+
+               p=PersistentHelper.copy(p);
+
+               synchronized(writeLock) {
+                       doSend();
+
+                       empty=!socket.shouldWrite();
+
+                       socket.enqueue(p);
+                       debug(label+" Sent message : "+p+".");
+
+                       if (server!=null) {
+                               server.wakeUp();
+                               }
+
+                       return (empty ? doSend()>0 : false);
+                       }
+       }
+
+       public boolean hasToSend()
+       {
+               synchronized(writeLock) {
+                       return socket.shouldWrite();
+                       }
+       }
+
+       /**
+        * Send buffered data, if any.
+        *
+        * @return      True if at least one byte has been transmitted, false 
otherwise
+        *                      (an error occured, the other side is not ready, 
or there is no data in buffer).
+        */
+
+       public int doSend()
+       {
+               int     len;
+
+               synchronized(writeLock) {
+                       len=0;
+                       if (socket.shouldWrite()) {
+                               len=socket.doWrite();
+                               if (len>0) {
+                                       debug(label+" Have written "+len+" 
bytes.");
+                                       }
+                               }
+                       return len;
+                       }
+       }
 }

Deleted: freeway/src/org/gnu/freeway/util/net/TCPServer.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/TCPServer.java 2006-06-28 04:24:55 UTC 
(rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/TCPServer.java 2006-06-28 04:45:03 UTC 
(rev 3072)
@@ -1,648 +0,0 @@
-/**
- * @PROJECT_INFO@
- */
-
-package org.gnu.freeway.util.net;
-
-import org.gnu.freeway.util.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.channels.*;
-import java.util.*;
-import java.util.logging.*;
-
-/**
- *
- */
-
-public class TCPServer extends LoggedObject implements CSServer
-{
-       /** */
-       private static final long               SELECT_TIMEOUT          =       
Scheduler.SECS_3;
-
-       /** Maximum of pending unhandled connections. */
-       private static final int                MAX_QUEUED_REQUESTS     =       
5;
-
-       /** Maximum number of concurrent allowed sessions. */
-       private static final int                MAX_SESSIONS                    
=       64;
-
-       /** Name of this server (for debugging purpose only). */
-       private String                          label;
-
-       /** Selector of the server thread */
-       private Selector                                selector;
-
-       /** The TCP socket that we listen on for new inbound connections. */
-       private ServerSocketChannel     server;
-
-       /** Thread for listening for new connections. */
-       private MasterTask                      listenTask;
-
-       /** Thread for accepting new connections. */
-       private SlaveTask                       acceptTask;
-
-       /** Thread for reading on all open sockets. */
-       private SlaveTask                       readTask;
-
-       /** Thread for writing on all open sockets. */
-       private SlaveTask                       writeTask;
-
-       /** Should the listen thread exit ? */
-       private boolean                         running;
-
-       /** Array of currently active TCP sessions. */
-       private CSSession[]                     sessions;
-
-       /** */
-       private int                                     sessionCount;
-
-       /** Sessions' current operations. */
-       private int[]                           sessionsOps;
-
-       /** */
-       private boolean                         acceptingOp;
-
-       /** Sessions lock. */
-       private Object                          internal;
-
-       /** */
-       private CSSessionHandler                handler;
-
-
-       public TCPServer( String str, CSSessionHandler h )
-       {
-               super(true);
-               label=str;
-               selector=null;
-               server=null;
-               listenTask=new MasterTask("TCP-LISTEN("+str+")",new 
EvalAction(this,"performListen"));
-               acceptTask=listenTask.create("TCP-ACCEPT("+str+")",new 
EvalAction(this,"performAccept"));
-               readTask=listenTask.create("TCP-READ("+str+")",new 
EvalAction(this,"performRead"));
-               writeTask=listenTask.create("TCP-WRITE("+str+")",new 
EvalAction(this,"performWrite"));
-               running=false;
-               sessions=new CSSession[0];
-               sessionCount=0;
-               sessionsOps=new int[0];
-               acceptingOp=false;
-               internal=new Object();
-               handler=h;
-       }
-
-       public String toString()
-       {
-               return "Abstract TCP server";
-       }
-
-
-       
////////////////////////////////////////////////////////////////////////////////////////////////
-
-       public String getLabel()
-       {
-               return label;
-       }
-
-       public boolean isLaunched()
-       {
-               return running;
-       }
-
-       public boolean launch( int port )
-       {
-               int     secs;
-
-               log(Level.INFO,label+" Launching TCP server...");
-
-               // open selector
-               try {
-                       selector=Selector.open();
-                       }
-               catch( IOException x ) {
-                       err("Could not create selector !",x);
-                       return false;
-                       }
-
-               // create server socket
-               secs=5;
-               while (server==null && secs<60) {
-                       try {
-                               server=ServerSocketChannel.open();
-                               server.configureBlocking(false);
-                               server.socket().setReuseAddress(true);
-                               server.socket().bind(new 
InetSocketAddress(port),MAX_QUEUED_REQUESTS);
-                               log(Level.INFO,label+" TCP server bound to port 
"+port+".");
-                               }
-                       catch( IOException x ) {
-                               err("Failed to open socket at port "+port+". 
Trying again in "+secs+" seconds...",x);
-
-                               Scheduler.sleep(Scheduler.seconds(secs));
-                               secs+=5;        // slow progression...
-
-                               if (server!=null) {
-                                       try {
-                                               server.close();
-                                               }
-                                       catch( IOException xx ) {
-                                               }
-                                       server=null;
-                                       }
-                               }
-                       }
-
-               if (server==null) {
-                       log(Level.SEVERE,label+" Could not create socket, 
abort.");
-                       try {
-                               selector.close();
-                               }
-                       catch( IOException x ) {
-                               }
-                       selector=null;
-                       return false;
-                       }
-
-               // start listening thread
-               running=true;
-               listenTask.launch();
-               return true;
-       }
-
-       public boolean shutdown()
-       {
-               int     i;
-
-               // signal listening thread
-               running=false;
-               selector.wakeup();
-
-               // stop listening thread
-               listenTask.shutdown();
-
-               try {
-                       server.close();
-                       }
-               catch( IOException x ) {
-                       err("Failed to close socket !",x);
-                       return false;
-                       }
-               finally {
-                       server=null;
-                       try {
-                               selector.close();
-                               }
-                       catch( IOException x ) {
-                               err("Failed to close selector !",x);
-                               return false;
-                               }
-                       finally {
-                               selector=null;
-                               }
-                       }
-               log(Level.INFO,label+" TCP server stopped.");
-
-               synchronized(internal) {
-                       for (i=0; i<sessions.length; i++) {
-                               if (sessions[i]!=null) {
-                                       log(Level.WARNING,label+" Session still 
alive : "+sessions[i].getLabel());
-                                       destroySession(i);
-                                       }
-                               }
-                       }
-               return true;
-       }
-
-       public void wakeUp()
-       {
-               selector.wakeup();
-       }
-
-       /**
-        * Add session to the pool of listened sessions. If it can't be added, 
session will be disconnected and false returned.
-        *
-        * @param s
-        * @return
-        * @see CSSession#disconnect()
-        */
-
-       public boolean register( CSSession s )
-       {
-               if (addSession(s)>=0) {
-                       // signal the thread that is blocked in a select call 
that the set of sockets to listen to has changed
-                       selector.wakeup();
-                       return true;
-                       }
-               return false;
-       }
-
-       /**
-        * Listen for incoming connections.
-        * Main method for the thread listening on the tcp socket and all tcp 
connections.
-        * Whenever a message is received, it is processed by the handler.
-        * This thread waits for activity on any of the TCP connections and 
processes deferred (async) writes and buffers reads
-        * until an entire message has been received.
-        *
-        * @throws IOException
-        */
-
-       public void performListen() throws IOException
-       {
-               SelectionKey    key;
-               Iterator                iter;
-               int                     mergedOps,ops,ret,i;
-
-               acceptingOp=true;
-
-               while (running) {
-                       synchronized(internal) {
-                               server.register(selector,(acceptingOp ? 
SelectionKey.OP_ACCEPT : 0));
-
-                               for (i=0; i<sessions.length; i++) {
-                                       if (sessions[i]!=null) {
-                                               if (sessions[i].isConnected()) 
{        // always check because impl. may disconnect after timeout...
-                                                       
key=sessions[i].registerOps(selector,(sessions[i].getOps() & ~sessionsOps[i]));
-                                                       if (key!=null) {
-                                                               key.attach(new 
Integer(i));
-                                                               }
-                                                       else {
-                                                               
destroySession(i);
-                                                               }
-                                                       }
-                                               else {
-                                                       // clean up (depends on 
session implementation : timeout detected, other side closed connection...)
-                                                       destroySession(i);
-                                                       }
-                                               }
-                                       }
-                               }
-
-                       // should wake up regularly (to clean up sessions...)
-                       ret=selector.select(Scheduler.toMillis(SELECT_TIMEOUT));
-                       if (ret==0) {
-                               continue;
-                               }
-
-                       synchronized(internal) {
-                               mergedOps=0;
-
-                               iter=selector.selectedKeys().iterator();
-                               while (iter.hasNext()) {
-                                       key=(SelectionKey) iter.next();
-                                       iter.remove();
-
-                                       if (key.isValid()) {
-                                               ops=key.readyOps();
-                                               mergedOps|=ops;
-                                               if ((ops & 
SelectionKey.OP_ACCEPT)==0) {        // read or write op
-                                                       i=((Number) 
key.attachment()).intValue();
-                                                       sessionsOps[i]|=ops;
-                                                       }
-                                               }
-                                       }
-
-                               debug(label+" Selected #"+ret+" sockets with 
merged ops { "+NetUtils.labelForOps(mergedOps)+" }.");
-
-                               // signal appropriate tasks
-                               if ((mergedOps & SelectionKey.OP_ACCEPT)!=0) {
-                                       acceptingOp=false;
-                                       acceptTask.signal();
-                                       }
-                               if ((mergedOps & SelectionKey.OP_READ)!=0) {
-                                       readTask.signal();
-                                       }
-                               if ((mergedOps & SelectionKey.OP_WRITE)!=0) {
-                                       writeTask.signal();
-                                       }
-                               }
-                       }
-
-               // shutdown... close all sessions
-               synchronized(internal) {
-                       for (i=0; i<sessions.length; i++) {
-                               if (sessions[i]!=null) {
-                                       destroySession(i);
-                                       }
-                               }
-                       }
-       }
-
-       public void performAccept()
-       {
-               CSSession               s;
-               SocketChannel   c;
-
-               try {
-                       for (c=server.accept(); c!=null; c=server.accept()) {
-                               s=handler.handleAccept(c);
-                               if (s!=null) {
-                                       if (addSession(s)<0) {
-                                               s.disconnect();
-                                               }
-                                       }
-                               else {
-                                       try {
-                                               c.close();
-                                               }
-                                       catch( IOException xx ) {
-                                               err("Failed to close channel 
!",xx);
-                                               }
-                                       }
-                               }
-                       }
-               catch( IOException x ) {
-                       err("Failed to accept new connection !",x);
-                       }
-
-               synchronized(internal) {
-                       acceptingOp=true;
-                       }
-
-               selector.wakeup();
-       }
-
-       public void performRead()
-       {
-               CSSession       s;
-               int                     len;
-
-               do {
-                       s=firstSessionWithOp(SelectionKey.OP_READ);
-                       if (s!=null) {
-                               len=s.doReceive();
-                               if (len>0 && handler.handleRead(s,len)) {
-                                       clearSessionOp(s,SelectionKey.OP_READ);
-                                       }
-                               else {
-                                       debug(s.getLabel()+" End of stream.");
-                                       destroySession(s);
-                                       }
-                               }
-                       }
-               while (s!=null);
-
-               // signal the thread that is blocked in a select call that the 
set of sockets to listen to has changed
-               selector.wakeup();
-       }
-
-       public void performWrite()
-       {
-               CSSession       s;
-               int                     len;
-
-               do {
-                       s=firstSessionWithOp(SelectionKey.OP_WRITE);
-                       if (s!=null) {
-                               len=s.doSend();
-                               if (len>0 && handler.handleWrite(s,len)) {
-                                       clearSessionOp(s,SelectionKey.OP_WRITE);
-                                       }
-                               else {
-                                       debug(s.getLabel()+" End of stream.");
-                                       destroySession(s);
-                                       }
-                               }
-                       }
-               while (s!=null);
-
-               // signal the thread that is blocked in a select call that the 
set of sockets to listen to has changed
-               selector.wakeup();
-       }
-
-       /**
-        * Add a new session to the array watched by the select thread. Grows 
the array if needed.
-        *
-        * @param s     Session to add.
-        * @return      Index of added session, or -1 on error.
-        */
-
-       protected int addSession( CSSession s )
-       {
-               CSSession[]     tmp;
-               int[]           tmp2;
-               int                     i;
-
-               synchronized(internal) {
-                       if (sessionCount==MAX_SESSIONS) {
-                               log(Level.WARNING,"Too many sessions 
("+MAX_SESSIONS+"), ignore connection.");
-                               return -1;
-                               }
-
-                       for (i=0; i<sessions.length && sessions[i]!=null; i++) 
{}
-                       if (i==sessions.length) {
-                               tmp=new CSSession[sessions.length+16];
-                               Arrays.fill(tmp,null);
-                               
System.arraycopy(sessions,0,tmp,0,sessions.length);
-                               sessions=tmp;
-
-                               tmp2=new int[sessionsOps.length+16];
-                               Arrays.fill(tmp2,0);
-                               
System.arraycopy(sessionsOps,0,tmp2,0,sessionsOps.length);
-                               sessionsOps=tmp2;
-                               }
-
-                       sessions[i]=s;
-                       sessionsOps[i]=0;
-                       sessionCount++;
-                       debug("Add session at slot #"+i+" 
"+Utils.gauge(sessionCount,sessions.length)+".");
-                       return i;
-                       }
-       }
-
-       protected CSSession firstSessionWithOp( int op )
-       {
-               int     i;
-
-               synchronized(internal) {
-                       for (i=0; i<sessionsOps.length && (sessionsOps[i] & 
op)==0; i++) {}
-                       return (i<sessionsOps.length ? sessions[i] : null);
-                       }
-       }
-
-       protected boolean clearSessionOp( CSSession s, int op )
-       {
-               int     i;
-
-               assert(s!=null);
-
-               synchronized(internal) {
-                       for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
-                       if (i==sessions.length) {
-                               log(Level.WARNING,label+" Session not found : 
"+s.getLabel()+".");
-                               return false;
-                               }
-
-                       sessionsOps[i]&=~op;
-                       return true;
-                       }
-       }
-
-       protected boolean destroySession( CSSession s )
-       {
-               int     i;
-
-               assert(s!=null);
-
-               synchronized(internal) {
-                       for (i=0; i<sessions.length && sessions[i]!=s; i++) {}
-                       if (i==sessions.length) {
-                               log(Level.WARNING,label+" Session not found : 
"+s.getLabel()+".");
-                               return false;
-                               }
-
-                       return destroySession(i);
-                       }
-       }
-
-       /**
-        * The client has disconnected. Close the socket, free the buffers, 
unlink session from the linked list.
-        * Remove a session, either the other side closed the connection or we 
have otherwise reason to believe
-        * that it should better be killed.
-        *
-        * @param index index to the session handle
-        * @return
-        */
-
-       protected boolean destroySession( int index )
-       {
-               assert(index>=0);
-
-               synchronized(internal) {
-                       if (index>=sessions.length || sessions[index]==null) {
-                               log(Level.WARNING,label+" No session at slot 
"+index+".");
-                               return false;
-                               }
-
-                       if (sessions[index].isConnected()) {
-                               sessions[index].disconnect();
-                               }
-
-                       handler.handleDestroy(sessions[index]);
-
-                       sessions[index]=null;
-                       sessionsOps[index]=0;
-                       sessionCount--;
-                       debug("Destroyed session at slot #"+index+" 
"+Utils.gauge(sessionCount,sessions.length)+".");
-                       }
-               return true;
-       }
-}
-
-
-/*
-       public void add( int id, Class c, CommandHandler h )
-       {
-               decoder.add(id,c);
-               setHandler(c,h);
-       }
-
-       public void setDefault( int id, Class c, CommandHandler h )
-       {
-               decoder.add(id,c);
-               setHandler(c,h);
-               decoder.setDefault(id);
-       }
-
-       public void setCorrupted( int id, Class c, CommandHandler h )
-       {
-               decoder.add(id,c);
-               setHandler(c,h);
-               decoder.setCorrupted(id);
-       }
-
-       public boolean hasHandler( Class c )
-       {
-               synchronized(handlers) {
-                       return handlers.get(c)!=null;
-                       }
-       }
-
-       public CommandHandler getHandler( Class c )
-       {
-               synchronized(handlers) {
-                       return (CommandHandler) handlers.get(c);
-                       }
-       }
-
-       public boolean setHandler( Class c, CommandHandler h )
-       {
-               synchronized(handlers) {
-                       if (handlers.get(c)!=null) {
-                               log(Level.WARNING,"Could not assign handler, 
class "+c.getName()+" is already registered.");
-                               return false;
-                               }
-                       handlers.put(c,h);
-                       return true;
-                       }
-       }
-
-       public boolean removeHandler( Class c, CommandHandler h )
-       {
-               synchronized(handlers) {
-                       if (handlers.get(c)==null) {
-                               log(Level.WARNING,"Could not remove handler, 
class "+c.getName()+" is not registered.");
-                               return false;
-                               }
-                       handlers.remove(c);
-                       return true;
-                       }
-       }
-
-
-deriver de TCP Server:
-       public void start()
-       {
-               add(ProxyCommand.HELLO_ID,ProxyHello.class,this);
-               add(ProxyCommand.CONNECT_ID,ProxyConnect.class,this);
-               add(ProxyCommand.SETBLOCKING_ID,ProxySetBlocking.class,this);
-
-               setDefault(ProxyCommand.UNKNOWN_ID,ProxyUnknown.class,this);
-               
setCorrupted(ProxyCommand.CORRUPTED_ID,ProxyCorrupted.class,this);
-
-               super.start();
-       }
-
-       public void stop()
-       {
-               super.stop();
-       }
-
-       public TCP Session createSession( PersistentDecoder decoder )
-       {
-               return new ProxySession(decoder);
-       }
-
-       public boolean handle( TCP Session session, Persistent p )
-       {
-               ProxySession    s;
-               ProxyToken      token;
-               int     id;
-
-               id=((ProxyCommand) p).getID();
-
-               s=(ProxySession) session;
-               if (!s.isWelcomed()) {
-                       if (id!=ProxyCommand.HELLO_ID) {
-                               log("No hello received, close session.");
-                               return false;
-                               }
-                       s.welcome();
-
-                       token=new ProxyToken(s.getToken());
-                       token.setAddress("",0);
-                       sendToClient(s,token);
-                       return true;
-                       }
-
-               switch (id) {
-                       case ProxyCommand.CONNECT_ID:
-                               break;
-
-                       case ProxyCommand.SETBLOCKING_ID:
-                               break;
-
-                       default:
-                               log("Unknown message : "+p);
-                               return false;
-                       }
-               return true;
-       }
-
-*/

Deleted: freeway/src/org/gnu/freeway/util/net/TCPSession.java
===================================================================
--- freeway/src/org/gnu/freeway/util/net/TCPSession.java        2006-06-28 
04:24:55 UTC (rev 3071)
+++ freeway/src/org/gnu/freeway/util/net/TCPSession.java        2006-06-28 
04:45:03 UTC (rev 3072)
@@ -1,410 +0,0 @@
-/**
- * @PROJECT_INFO@
- */
-
-package org.gnu.freeway.util.net;
-
-import org.gnu.freeway.util.*;
-
-import java.net.*;
-import java.nio.channels.*;
-import java.util.logging.*;
-
-/**
- * Per-client data structure (kept in linked list).  Also: the opaque
- * handle for client connections passed by the core to the CSHandlers.
- * Opaque handle for client connections passed by
- * the core to the CSHandlers.
- *
- * A connection to a freeway client application. To be used in non-blocking 
mode.
- *
- * Struct to refer to a GNUnet TCP connection.
- * This is more than just a socket because if the server
- * drops the connection, the client automatically tries
- * to reconnect (and for that needs connection information).
- *
- * Code for synchronized access to TCP streams
- *
- * Generic TCP code for reliable, mostly blocking, record-oriented TCP
- * connections. GNUnet uses the "tcpio" code for trusted client-server
- * (e.g. gnunet-gtk to gnunetd via loopback) communications.  Note
- * that an unblocking write is also provided since if both client and
- * server use blocking IO, both may block on a write and cause a
- * mutual inter-process deadlock.
- *
- * Since we do not want other peers (!) to be able to block a peer by
- * not reading from the TCP stream, the peer-to-peer TCP transport
- * uses unreliable, buffered, non-blocking, record-oriented TCP code
- * with a select call to reduce the number of threads which is
- * provided in transports/tcp.c.
- * Generic TCP code. This module is used to receive or send records
- * (!) from a TCP stream. The code automatically attempts to
- * re-connect if the other side closes the connection.<br>
- *
- * The code can be used on the server- or the client side, just in
- * case of the server the reconnect can of course not be used. The TCP
- * stream is broken into records of maximum length MAX_BUFFER_SIZE,
- * each preceeded by a 16 bits integer (not signed) giving the length of the
- * following record.<p>
- */
-
-public class TCPSession extends LoggedObject implements CSSession
-{
-       /** */
-       private TCPServer                       server;
-
-       /** Socket to communicate with the other side. */
-       private PersistentSocket                socket;
-
-       /** */
-       private String                          label;
-
-       /** Lock used to synchronized read operations. */
-       protected Object                                readLock;
-
-       /** Lock used to synchronized write operations. */
-       protected Object                                writeLock;
-
-
-       public TCPSession()
-       {
-               this(null);
-       }
-
-       public TCPSession( TCPServer s )
-       {
-               super(true);
-               server=s;
-               socket=new PersistentSocket();
-               socket.setDebug(false);
-               label=socket.getLabel();
-               readLock=new Object();
-               writeLock=new Object();
-       }
-
-       public String toString()
-       {
-               return "Client/server session";
-       }
-
-
-       
////////////////////////////////////////////////////////////////////////////////////////////////
-
-       public String getLabel()
-       {
-               return label;
-       }
-
-       public int getOps()
-       {
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               return (socket.shouldWrite() ? 
(SelectionKey.OP_READ | SelectionKey.OP_WRITE) : SelectionKey.OP_READ);
-                               }
-                       }
-       }
-
-       public SelectionKey registerOps( Selector sel, int ops )
-       {
-               SelectionKey            key;
-
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               key=socket.getChannel().keyFor(sel);
-                               if (key==null) {
-                                       try {
-                                               
key=socket.getChannel().register(sel,0);
-                                               }
-                                       catch( ClosedChannelException x ) {
-                                               err(label+" Failed to register 
on selector !",x);
-                                               return null;
-                                               }
-                                       }
-                               key.interestOps(ops);
-                               return key;
-                               }
-                       }
-       }
-
-       public boolean isConnected()
-       {
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               return !socket.isClosed();
-                               }
-                       }
-       }
-
-       /**
-        * Connect this session to the specified ip and port in *blocking mode*.
-        * Used when connecting to a server at {ip,port}.
-        *
-        * @param ip                    IP of the host to connect to.
-        * @param port          The port number.
-        * @param careful       Should we treat socket with respect (SO_LINGER 
not set) ?
-        * @return                      True if successful, false on failure.
-        */
-
-       public boolean connect( InetAddress ip, int port, boolean careful )
-       {
-               boolean res;
-
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               res=socket.open(ip,port,careful);
-                               if (res) {
-                                       label=socket.getLabel();
-                                       debug(label+" Connected.");
-                                       }
-                               return res;
-                               }
-                       }
-       }
-
-       /**
-        * Connect this session to the specified channel in *non blocking* mode.
-        * Used when connecting to a client from a server.
-        *
-        * @param channel       The open client socket.
-        * @param careful       Should we treat socket with respect (SO_LINGER 
not set) ?
-        * @return                      True if successful, false on failure.
-        */
-
-       public boolean connect( SocketChannel channel, boolean careful )
-       {
-               boolean res;
-
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               res=socket.open(channel,careful);
-                               if (res) {
-                                       label=socket.getLabel();
-                                       debug(label+" Connected.");
-                                       }
-                               return res;
-                               }
-                       }
-       }
-
-       /**
-        * Close the session.
-        *
-        * @return      True if succeedeed, false otherwise.
-        */
-
-       public boolean disconnect()
-       {
-               boolean res;
-
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               if (socket.isClosed()) {
-                                       log(Level.WARNING,"Session is already 
closed.");
-                                       return false;
-                                       }
-
-                               res=socket.close();
-                               if (res) {
-                                       debug(label+" Disconnected.");
-                                       }
-                               return res;
-                               }
-                       }
-       }
-
-       public boolean isBlocking()
-       {
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               return socket.isBlocking();
-                               }
-                       }
-       }
-
-       public void setBlocking( boolean flag )
-       {
-               synchronized(readLock) {
-                       synchronized(writeLock) {
-                               socket.setBlocking(flag);
-                               }
-                       }
-       }
-
-       /**
-        * Buffer data received from the other side.
-        *
-        * @return      True if at least one byte has been received, false if 
the socket was closed by the other side or if an error occured.
-        */
-
-       public int doReceive()
-       {
-               int     len;
-
-               synchronized(readLock) {
-                       len=socket.doRead();
-                       if (len>0) {
-                               debug(label+" Have read "+len+" bytes.");
-                               }
-                       return len;
-                       }
-       }
-
-       public boolean hasReceived()
-       {
-               synchronized(readLock) {
-                       return socket.shouldDequeue();
-                       }
-       }
-
-       /**
-        * @param c
-        * @return
-        */
-
-       public Persistent receive( Class c )
-       {
-               Persistent      p;
-
-               synchronized(readLock) {
-                       if (!socket.shouldDequeue() && socket.isBlocking()) {
-                               doReceive();
-                               }
-
-                       p=socket.dequeue(c);
-                       if (p!=null) {
-                               debug(label+" Received message : "+p+".");
-                               }
-                       return p;
-                       }
-       }
-
-       /**
-        * Decode buffered data. If in blocking mode and no messages are 
buffered, an attempt is made to read fresh data.
-        *
-        * @param decoder       Decoder used to transform transmitted data into 
messages.
-        * @return                      Any decoded data if available, null 
otherwise.
-        */
-
-       public Persistent receive( PersistentDecoder decoder )
-       {
-               Persistent      p;
-
-               synchronized(readLock) {
-                       if (!socket.shouldDequeue() && socket.isBlocking()) {
-                               doReceive();
-                               }
-
-                       p=socket.dequeue(decoder);
-                       if (p!=null) {
-                               debug(label+" Received message : "+p+".");
-                               }
-                       return p;
-                       }
-       }
-
-       /**
-        * Add data to the buffer, and if blocking, start transferring buffered 
data.
-        *
-        * <div>When in blocking mode, try to also send buffered data to the 
other side. Returns true if, at least, one byte
-        * has been transmitted. Please note that it does *not* imply that any 
part of the data <code>p</code> has been transmitted,
-        * since other data may had been buffered previously (transfer is 
initiated but may be incomplete).</div>
-        *
-        * <div>In non-blocking mode, returns true. The actual transfer happens 
asynchronously.</div>
-        *
-        * @param p     The data to write (duplicated, because may be buffered 
and stored a certain amount of time...).
-        * @return      True if in non-blocking mode, or if at least one byte 
of buffered data has been transmitted, false otherwise.
-        */
-
-       public boolean send( Persistent p )
-       {
-               p=PersistentHelper.copy(p);
-
-               synchronized(writeLock) {
-                       socket.enqueue(p);
-                       debug(label+" Sent message : "+p+".");
-
-                       if (server!=null) {
-                               server.wakeUp();
-                               }
-
-                       return (socket.isBlocking() ? doSend()>0 : true);
-                       }
-       }
-
-       public boolean sendAndCheck( Persistent p )
-       {
-               CSResult                res;
-
-               synchronized(writeLock) {
-                       if (!send(p)) {
-                               return false;
-                               }
-
-                       res=(CSResult) receive(CSResult.class);
-                       return (res!=null && res.isOkay());
-                       }
-       }
-
-       /**
-        * Flush buffered data, buffer given data <code>p</code> and try to 
initiate transfer of this data.
-        * Note that it is possible that only a part of the message is sent.
-        *
-        * Returning true here means that at least a small part of the message 
has been transmitted,
-        * though it may be transmitted entirely a bit later.
-        *
-        * @param p     The data to write (duplicated, because may be buffered 
and stored a certain amount of time...).
-        * @return      False if an I/O error occurred, or if it did not 
transmit any byte of the message. Return true otherwise.
-        */
-
-       public boolean flushAndSend( Persistent p )
-       {
-               boolean empty;
-
-               p=PersistentHelper.copy(p);
-
-               synchronized(writeLock) {
-                       doSend();
-
-                       empty=!socket.shouldWrite();
-
-                       socket.enqueue(p);
-                       debug(label+" Sent message : "+p+".");
-
-                       if (server!=null) {
-                               server.wakeUp();
-                               }
-
-                       return (empty ? doSend()>0 : false);
-                       }
-       }
-
-       public boolean hasToSend()
-       {
-               synchronized(writeLock) {
-                       return socket.shouldWrite();
-                       }
-       }
-
-       /**
-        * Send buffered data, if any.
-        *
-        * @return      True if at least one byte has been transmitted, false 
otherwise
-        *                      (an error occured, the other side is not ready, 
or there is no data in buffer).
-        */
-
-       public int doSend()
-       {
-               int     len;
-
-               synchronized(writeLock) {
-                       len=0;
-                       if (socket.shouldWrite()) {
-                               len=socket.doWrite();
-                               if (len>0) {
-                                       debug(label+" Have written "+len+" 
bytes.");
-                                       }
-                               }
-                       return len;
-                       }
-       }
-}





reply via email to

[Prev in Thread] Current Thread [Next in Thread]