Index: gnu/java/nio/SelectorImpl.java =================================================================== RCS file: /cvsroot/classpath/classpath/gnu/java/nio/SelectorImpl.java,v retrieving revision 1.13 diff -u -b -B -r1.13 SelectorImpl.java --- gnu/java/nio/SelectorImpl.java 20 Oct 2003 15:32:56 -0000 1.13 +++ gnu/java/nio/SelectorImpl.java 8 Jan 2004 08:43:01 -0000 @@ -65,12 +65,34 @@ private Set keys; private Set selected; + /** + * A dummy object whose monitor regulates access to both our + * selectThread and unhandledWakeup fields. + */ + private Object selectThreadMutex = new Object (); + + /** + * Any thread that's currently blocked in a select operation. + */ + private Thread selectThread; + + /** + * Indicates whether we have an unhandled wakeup call. This can + * be due to either wakeup() triggering a thread interruption while + * a thread was blocked in a select operation (in which case we need + * to reset this thread's interrupt status after interrupting the + * select), or else that no thread was on a select operation at the + * time that wakeup() was called, in which case the following select() + * operation should return immediately with nothing selected. + */ + private boolean unhandledWakeup; + public SelectorImpl (SelectorProvider provider) { super (provider); - keys = new HashSet(); - selected = new HashSet(); + keys = new HashSet (); + selected = new HashSet (); } protected void finalize() throws Throwable @@ -81,61 +103,78 @@ protected final void implCloseSelector() throws IOException { - // FIXME: We surely need to do more here. + // Cancel any pending select operation. wakeup(); + + synchronized (keys) + { + synchronized (selected) + { + synchronized (cancelledKeys ()) + { + // FIXME: Release resources here. + } + } + } } public final Set keys() { + if (!isOpen()) + throw new ClosedSelectorException(); + return Collections.unmodifiableSet (keys); } public final int selectNow() throws IOException { + // FIXME: We're simulating an immediate select + // via a select with a timeout of one millisecond. return select (1); } public final int select() throws IOException { - return select (-1); + return select (0); } - // A timeout value of -1 means block forever. + // A timeout value of 0 means block forever. private static native int implSelect (int[] read, int[] write, - int[] except, long timeout); + int[] except, long timeout) + throws IOException; private final int[] getFDsAsArray (int ops) { int[] result; int counter = 0; - Iterator it = keys.iterator(); + Iterator it = keys.iterator (); // Count the number of file descriptors needed - while (it.hasNext()) + while (it.hasNext ()) { - SelectionKeyImpl key = (SelectionKeyImpl) it.next(); + SelectionKeyImpl key = (SelectionKeyImpl) it.next (); - if ((key.interestOps() & ops) != 0) + if ((key.interestOps () & ops) != 0) { counter++; } } - result = new int [counter]; + result = new int[counter]; counter = 0; - it = keys.iterator(); + it = keys.iterator (); // Fill the array with the file descriptors - while (it.hasNext()) + while (it.hasNext ()) { - SelectionKeyImpl key = (SelectionKeyImpl) it.next(); + SelectionKeyImpl key = (SelectionKeyImpl) it.next (); - if ((key.interestOps() & ops) != 0) + if ((key.interestOps () & ops) != 0) { - result [counter] = key.getNativeFD(); + result[counter] = key.getNativeFD(); counter++; } } @@ -143,48 +182,109 @@ return result; } - public int select (long timeout) + public synchronized int select (long timeout) + throws IOException { if (!isOpen()) throw new ClosedSelectorException(); - if (keys == null) + synchronized (keys) + { + synchronized (selected) { - return 0; - } - deregisterCancelledKeys(); // Set only keys with the needed interest ops into the arrays. - int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT); - int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); - int[] except = new int [0]; // FIXME: We dont need to check this yet - int anzahl = read.length + write.length + except.length; + int[] read = getFDsAsArray (SelectionKey.OP_READ + | SelectionKey.OP_ACCEPT); + int[] write = getFDsAsArray (SelectionKey.OP_WRITE + | SelectionKey.OP_CONNECT); + + // FIXME: We dont need to check this yet + int[] except = new int [0]; + + // Test to see if we've got an unhandled wakeup call, + // in which case we return immediately. Otherwise, + // remember our current thread and jump into the select. + // The monitor for dummy object selectThreadMutex regulates + // access to these fields. + + // FIXME: Not sure from the spec at what point we should + // return "immediately". Is it here or immediately upon + // entry to this function? + + // NOTE: There's a possibility of another thread calling + // wakeup() immediately after our thread releases + // selectThreadMutex's monitor here, in which case we'll + // do the select anyway. Since calls to wakeup() and select() + // among different threads happen in non-deterministic order, + // I don't think this is an issue. + synchronized (selectThreadMutex) + { + if (unhandledWakeup) + { + unhandledWakeup = false; + return 0; + } + else + { + selectThread = Thread.currentThread (); + } + } // Call the native select() on all file descriptors. + int result = 0; + try + { begin(); - int result = implSelect (read, write, except, timeout); + result = implSelect (read, write, except, timeout); + } + finally + { end(); + } + + // If our unhandled wakeup flag is set at this point, + // reset our thread's interrupt flag because we were + // awakened by wakeup() instead of an external thread + // interruption. + // + // NOTE: If we were blocked in a select() and one thread + // called Thread.interrupt() on the blocked thread followed + // by another thread calling Selector.wakeup(), then race + // conditions could make it so that the thread's interrupt + // flag is reset even though the Thread.interrupt() call + // "was there first". I don't think we need to care about + // this scenario. + synchronized (selectThreadMutex) + { + if (unhandledWakeup) + { + unhandledWakeup = false; + selectThread.interrupted (); + } + selectThread = null; + } - Iterator it = keys.iterator(); + Iterator it = keys.iterator (); - while (it.hasNext()) + while (it.hasNext ()) { int ops = 0; - SelectionKeyImpl key = (SelectionKeyImpl) it.next(); + SelectionKeyImpl key = (SelectionKeyImpl) it.next (); // If key is already selected retrieve old ready ops. if (selected.contains (key)) { - ops = key.readyOps(); + ops = key.readyOps (); } // Set new ready read/accept ops for (int i = 0; i < read.length; i++) { - if (key.getNativeFD() == read [i]) + if (key.getNativeFD() == read[i]) { - if (key.channel() instanceof ServerSocketChannelImpl) + if (key.channel () instanceof ServerSocketChannelImpl) { ops = ops | SelectionKey.OP_ACCEPT; } @@ -198,18 +298,18 @@ // Set new ready write ops for (int i = 0; i < write.length; i++) { - if (key.getNativeFD() == write [i]) + if (key.getNativeFD() == write[i]) { ops = ops | SelectionKey.OP_WRITE; -// if (key.channel().isConnected()) -// { -// ops = ops | SelectionKey.OP_WRITE; -// } -// else -// { -// ops = ops | SelectionKey.OP_CONNECT; -// } + // if (key.channel ().isConnected ()) + // { + // ops = ops | SelectionKey.OP_WRITE; + // } + // else + // { + // ops = ops | SelectionKey.OP_CONNECT; + // } } } @@ -222,31 +322,58 @@ } // Set new ready ops - key.readyOps (key.interestOps() & ops); + key.readyOps (key.interestOps () & ops); } - deregisterCancelledKeys(); + return result; } + } + } public final Set selectedKeys() { + if (!isOpen()) + throw new ClosedSelectorException(); + return selected; } public final Selector wakeup() { - return null; + // IMPLEMENTATION NOTE: Whereas the specification says that + // thread interruption should trigger a call to wakeup, we + // do the reverse under the covers: wakeup triggers a thread + // interrupt followed by a subsequent reset of the thread's + // interrupt status within select(). + + // First, acquire the monitor of the object regulating + // access to our selectThread and unhandledWakeup fields. + synchronized (selectThreadMutex) + { + unhandledWakeup = true; + + // Interrupt any thread which is currently blocked in + // a select operation. + if (selectThread != null) + selectThread.interrupt (); + } + + return this; } private final void deregisterCancelledKeys() { - Iterator it = cancelledKeys().iterator(); + Set ckeys = cancelledKeys (); + synchronized (ckeys) + { + Iterator it = ckeys.iterator(); - while (it.hasNext()) + while (it.hasNext ()) { - keys.remove ((SelectionKeyImpl) it.next()); - it.remove(); + keys.remove ((SelectionKeyImpl) it.next ()); + it.remove (); + } } } @@ -280,7 +407,11 @@ throw new InternalError ("No known channel type"); } + synchronized (keys) + { keys.add (result); + } + result.interestOps (ops); result.attach (att); return result;