BlockingIO.java

  1. /*
  2.  **** BEGIN LICENSE BLOCK *****
  3.  * Version: EPL 1.0/GPL 2.0/LGPL 2.1
  4.  *
  5.  * The contents of this file are subject to the Eclipse Public
  6.  * License Version 1.0 (the "License"); you may not use this file
  7.  * except in compliance with the License. You may obtain a copy of
  8.  * the License at http://www.eclipse.org/legal/epl-v10.html
  9.  *
  10.  * Software distributed under the License is distributed on an "AS
  11.  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  12.  * implied. See the License for the specific language governing
  13.  * rights and limitations under the License.
  14.  *
  15.  * Copyright (C) 2008 The JRuby Community <www.jruby.org>
  16.  *
  17.  * Alternatively, the contents of this file may be used under the terms of
  18.  * either of the GNU General Public License Version 2 or later (the "GPL"),
  19.  * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
  20.  * in which case the provisions of the GPL or the LGPL are applicable instead
  21.  * of those above. If you wish to allow use of your version of this file only
  22.  * under the terms of either the GPL or the LGPL, and not to allow others to
  23.  * use your version of this file under the terms of the EPL, indicate your
  24.  * decision by deleting the provisions above and replace them with the notice
  25.  * and other provisions required by the GPL or the LGPL. If you do not delete
  26.  * the provisions above, a recipient may use your version of this file under
  27.  * the terms of any one of the EPL, the GPL or the LGPL.
  28.  ***** END LICENSE BLOCK *****/

  29. package org.jruby.util.io;

  30. import java.io.IOException;
  31. import java.io.InterruptedIOException;
  32. import java.nio.ByteBuffer;
  33. import java.nio.channels.Channel;
  34. import java.nio.channels.ReadableByteChannel;
  35. import java.nio.channels.SelectableChannel;
  36. import java.nio.channels.SelectionKey;
  37. import java.nio.channels.Selector;
  38. import java.nio.channels.WritableByteChannel;
  39. import java.nio.channels.spi.SelectorProvider;
  40. import java.util.HashSet;
  41. import java.util.LinkedList;
  42. import java.util.List;
  43. import java.util.Map;
  44. import java.util.Set;
  45. import java.util.concurrent.ConcurrentHashMap;
  46. import java.util.concurrent.ConcurrentLinkedQueue;
  47. import java.util.concurrent.TimeUnit;

  48. /**
  49.  * A Utility class to emulate blocking I/O operations on non-blocking channels.
  50.  */
  51. public class BlockingIO {
  52.     public static final class Condition {
  53.         private final IOChannel channel;
  54.         Condition(IOChannel channel) {
  55.             this.channel = channel;
  56.         }
  57.         public void cancel() {
  58.             channel.wakeup(false);
  59.         }
  60.         public void interrupt() {
  61.             channel.interrupt();
  62.         }
  63.         public boolean await() throws InterruptedException {
  64.             return channel.await();
  65.         }
  66.         public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
  67.             return channel.await(timeout, unit);
  68.         }
  69.     }
  70.     static final class IOChannel {
  71.         final SelectableChannel channel;
  72.         final int ops;        
  73.         private final Object monitor;
  74.         private boolean woken = false;
  75.         private boolean ready = false;
  76.         private boolean interrupted = false;
  77.        
  78.         IOChannel(SelectableChannel channel, int ops, Object monitor) {
  79.             this.channel = channel;
  80.             this.ops = ops;
  81.             this.monitor = monitor;
  82.         }
  83.         public final void wakeup(boolean ready) {
  84.             synchronized (monitor) {
  85.                 this.woken = true;
  86.                 this.ready = ready;
  87.                 monitor.notifyAll();
  88.             }
  89.         }
  90.         public final void interrupt() {
  91.             synchronized (monitor) {
  92.                 this.woken = true;
  93.                 this.interrupted = true;
  94.                 monitor.notifyAll();
  95.             }
  96.         }
  97.         public final boolean await() throws InterruptedException {
  98.             return await(0, TimeUnit.MILLISECONDS);
  99.         }
  100.         public final boolean await(final long timeout, TimeUnit unit) throws InterruptedException {
  101.             synchronized (monitor) {
  102.                 if (!woken) {
  103.                     monitor.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
  104.                 }
  105.                 if (interrupted) {
  106.                     throw new InterruptedException("Interrupted");
  107.                 }
  108.                 return ready;
  109.             }
  110.         }
  111.     }
  112.     static final class IOSelector implements Runnable {
  113.         private final Selector selector;
  114.         private final ConcurrentLinkedQueue<IOChannel> registrationQueue;

  115.         public IOSelector(SelectorProvider provider) throws IOException {
  116.             selector = SelectorFactory.openWithRetryFrom(null, provider);
  117.             registrationQueue = new ConcurrentLinkedQueue<IOChannel>();
  118.         }
  119.         public void run() {
  120.             for ( ; ; ) {
  121.                 try {
  122.                     //
  123.                     // Wake up any channels that became unblocked
  124.                     //
  125.                     Set<SelectionKey> selected = new HashSet<SelectionKey>(selector.selectedKeys());
  126.                     for (SelectionKey k : selected) {
  127.                         List<IOChannel> waitq = (List<IOChannel>) k.attachment();
  128.                         for (IOChannel ch : waitq) {
  129.                             ch.wakeup(true);
  130.                         }
  131.                         waitq.clear();
  132.                     }

  133.                     //
  134.                     // Register any new blocking I/O requests
  135.                     //
  136.                     IOChannel ch;
  137.                     Set<SelectableChannel> added = new HashSet<SelectableChannel>();
  138.                     while ((ch = registrationQueue.poll()) != null) {
  139.                         SelectionKey k = ch.channel.keyFor(selector);
  140.                         List<IOChannel> waitq = k == null
  141.                                 ? new LinkedList<IOChannel>()
  142.                                 : (List<IOChannel>) k.attachment();
  143.                         ch.channel.register(selector, ch.ops, waitq);
  144.                         waitq.add(ch);
  145.                         added.add(ch.channel);
  146.                     }

  147.                     // Now clear out any previously selected channels
  148.                     for (SelectionKey k : selected) {
  149.                         if (!added.contains(k.channel())) {
  150.                             k.cancel();
  151.                         }
  152.                     }

  153.                     //
  154.                     // Wait for I/O on any channel
  155.                     //
  156.                     selector.select();
  157.                 } catch (IOException ex) {

  158.                 }
  159.             }
  160.         }
  161.         Condition add(Channel channel, int ops, Object monitor) {
  162.             IOChannel io = new IOChannel((SelectableChannel) channel, ops, monitor);
  163.             registrationQueue.add(io);
  164.             selector.wakeup();
  165.             return new Condition(io);
  166.         }
  167.         public void await(Channel channel, int op) throws InterruptedException {
  168.             add(channel, op, new Object()).await();
  169.         }
  170.     }
  171.     static final private Map<SelectorProvider, IOSelector> selectors
  172.             = new ConcurrentHashMap<SelectorProvider, IOSelector>();

  173.     private static IOSelector getSelector(SelectorProvider provider) throws IOException {
  174.         IOSelector sel = selectors.get(provider);
  175.         if (sel != null) {
  176.             return sel;
  177.         }

  178.         //
  179.         // Synchronize and re-check to avoid creating more than one Selector per provider
  180.         //
  181.         synchronized (provider) {
  182.             sel = selectors.get(provider);
  183.             if (sel == null) {
  184.                 sel = new IOSelector(provider);
  185.                 selectors.put(provider, sel);
  186.                 Thread t = new Thread(sel);
  187.                 t.setDaemon(true);
  188.                 t.start();
  189.             }
  190.         }
  191.         return sel;
  192.     }
  193.     private static IOSelector getSelector(Channel channel) throws IOException {
  194.         if (!(channel instanceof SelectableChannel)) {
  195.             throw new IllegalArgumentException("channel must be a SelectableChannel");
  196.         }        
  197.         return getSelector(((SelectableChannel) channel).provider());
  198.     }
  199.     public static final Condition newCondition(Channel channel, int ops, Object monitor) throws IOException {
  200.         return getSelector(channel).add(channel, ops, monitor);
  201.     }
  202.     public static final Condition newCondition(Channel channel, int ops) throws IOException {
  203.         return newCondition(channel, ops, new Object());
  204.     }
  205.     public static void waitForIO(Channel channel, int op) throws InterruptedException, IOException {
  206.         getSelector(channel).await(channel, op);
  207.     }
  208.     public static void awaitReadable(ReadableByteChannel channel) throws InterruptedException, IOException {
  209.         waitForIO(channel, SelectionKey.OP_READ);
  210.     }
  211.     public static void awaitWritable(WritableByteChannel channel) throws InterruptedException, IOException {
  212.         waitForIO(channel, SelectionKey.OP_WRITE);
  213.     }
  214.     public static int read(ReadableByteChannel channel, ByteBuffer buf, boolean blocking) throws IOException {
  215.         do {
  216.             int n = channel.read(buf);
  217.             if (n != 0 || !blocking || !(channel instanceof SelectableChannel) || !buf.hasRemaining()) {
  218.                 return n;
  219.             }
  220.             try {
  221.                 awaitReadable(channel);
  222.             } catch (InterruptedException ex) {
  223.                 throw new InterruptedIOException(ex.getMessage());
  224.             }
  225.         } while (true);
  226.     }
  227.     public static int write(WritableByteChannel channel, ByteBuffer buf, boolean blocking) throws IOException {
  228.         do {
  229.             int n = channel.write(buf);
  230.             if (n != 0 || !blocking || !(channel instanceof SelectableChannel) || !buf.hasRemaining()) {
  231.                 return n;
  232.             }
  233.             try {
  234.                 awaitWritable(channel);
  235.             } catch (InterruptedException ex) {
  236.                 throw new InterruptedIOException(ex.getMessage());
  237.             }
  238.         } while (true);
  239.     }
  240.     public static int blockingRead(ReadableByteChannel channel, ByteBuffer buf) throws IOException {
  241.         return read(channel, buf, true);
  242.     }
  243.     public static int blockingWrite(WritableByteChannel channel, ByteBuffer buf) throws IOException {
  244.         return write(channel, buf, true);
  245.     }
  246. }