SelectBlob.java

  1. /*
  2.  ***** BEGIN LICENSE BLOCK *****
  3.  * Version: EPL 1.0/GPL 2.0/LGPL 2.1
  4.  *
  5.  * The contents of this file are subject to the Eclipse Public
  6.  * License Version 1.0 (the "License"); you may not use this file
  7.  * except in compliance with the License. You may obtain a copy of
  8.  * the License at http://www.eclipse.org/legal/epl-v10.html
  9.  *
  10.  * Software distributed under the License is distributed on an "AS
  11.  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  12.  * implied. See the License for the specific language governing
  13.  * rights and limitations under the License.
  14.  *
  15.  * Alternatively, the contents of this file may be used under the terms of
  16.  * either of the GNU General Public License Version 2 or later (the "GPL"),
  17.  * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
  18.  * in which case the provisions of the GPL or the LGPL are applicable instead
  19.  * of those above. If you wish to allow use of your version of this file only
  20.  * under the terms of either the GPL or the LGPL, and not to allow others to
  21.  * use your version of this file under the terms of the EPL, indicate your
  22.  * decision by deleting the provisions above and replace them with the notice
  23.  * and other provisions required by the GPL or the LGPL. If you do not delete
  24.  * the provisions above, a recipient may use your version of this file under
  25.  * the terms of any one of the EPL, the GPL or the LGPL.
  26.  ***** END LICENSE BLOCK *****/
  27. package org.jruby.util.io;

  28. import com.oracle.truffle.api.dsl.TypeCheck;
  29. import org.jruby.Ruby;
  30. import org.jruby.RubyArray;
  31. import org.jruby.RubyFixnum;
  32. import org.jruby.RubyFloat;
  33. import org.jruby.RubyIO;
  34. import org.jruby.RubyThread;
  35. import org.jruby.runtime.ThreadContext;
  36. import org.jruby.runtime.builtin.IRubyObject;
  37. import org.jruby.util.TypeConverter;

  38. import java.io.IOException;
  39. import java.nio.ByteBuffer;
  40. import java.nio.channels.*;
  41. import java.nio.channels.spi.SelectorProvider;
  42. import java.util.*;
  43. import java.util.concurrent.Callable;
  44. import java.util.concurrent.ExecutionException;
  45. import java.util.concurrent.Future;

  46. /**
  47.  * This is a reimplementation of MRI's IO#select logic. It has been rewritten
  48.  * from an earlier version in JRuby to improve performance and readability.
  49.  *
  50.  * This version avoids allocating a selector or any data structures to hold
  51.  * data about the channels/IOs being selected unless absolutely necessary. It
  52.  * also uses simple boolean arrays to track characteristics like whether an IO
  53.  * is pending or unselectable, rather than maintaining Set structures. It avoids
  54.  * hitting Java Integration code to get IO objects out of the incoming Array.
  55.  * Finally, it tries to build a minimal number of data structures an reuse them
  56.  * as much as possible.
  57.  */
  58. @Deprecated
  59. public class SelectBlob {
  60.     public SelectBlob() {}

  61.     public IRubyObject goForIt(ThreadContext context, Ruby runtime, IRubyObject[] args) {
  62.         this.runtime = runtime;
  63.         try {
  64.             processReads(runtime, args, context);
  65.             processWrites(runtime, args, context);
  66.             if (args.length > 2 && !args[2].isNil()) {
  67.                 checkArrayType(runtime, args[2]);
  68.                 // Java's select doesn't do anything about this, so we leave it be.
  69.             }
  70.             boolean has_timeout = args.length > 3 && !args[3].isNil();
  71.             long timeout = !has_timeout ? 0 : getTimeoutFromArg(args[3], runtime);
  72.            
  73.             if (timeout < 0) {
  74.                 throw runtime.newArgumentError("time interval must be positive");
  75.             }
  76.            
  77.             // If all streams are nil, just sleep the specified time (JRUBY-4699)
  78.             if (args[0].isNil() && args[1].isNil() && args[2].isNil()) {
  79.                 RubyThread thread = context.getThread();
  80.                 if (has_timeout) {
  81.                     if (timeout > 0) {
  82.                         long now = System.currentTimeMillis();
  83.                         thread.sleep(timeout);
  84.                         // Guard against spurious wakeup
  85.                         while (System.currentTimeMillis() < now + timeout) {
  86.                             thread.sleep(1);
  87.                         }
  88.                     }
  89.                 } else {
  90.                     thread.sleep(0);
  91.                 }
  92.             } else {
  93.                 doSelect(runtime, has_timeout, timeout);
  94.                 processSelectedKeys(runtime);
  95.                 processPendingAndUnselectable();
  96.                 tidyUp();
  97.             }
  98.            
  99.             if (readResults == null && writeResults == null && errorResults == null) {
  100.                 return runtime.getNil();
  101.             }
  102.             return constructResults(runtime);
  103.         } catch (BadDescriptorException e) {
  104.             throw runtime.newErrnoEBADFError();
  105.         } catch (CancelledKeyException e) {
  106.             throw runtime.newErrnoEBADFError();
  107.         } catch (IOException e) {
  108.             throw runtime.newIOErrorFromException(e);
  109.         } catch (InterruptedException ie) {
  110.             throw runtime.newThreadError("select interrupted");
  111.         } finally {
  112.             for (Selector selector : selectors.values()) {
  113.                 try {
  114.                     selector.close();
  115.                 } catch (Exception e) {
  116.                 }
  117.             }
  118.         }
  119.     }

  120.     private void processReads(Ruby runtime, IRubyObject[] args, ThreadContext context) throws BadDescriptorException, IOException {
  121.         if (!args[0].isNil()) {
  122.             // read
  123.             checkArrayType(runtime, args[0]);
  124.             readArray = (RubyArray) args[0];
  125.             readSize = readArray.size();
  126.             if (readSize == 0) {
  127.                 // clear reference; we aren't going to do anything
  128.                 readArray = null;
  129.             } else {
  130.                 readIOs = new RubyIO[readSize];
  131.                 Map<Character,Integer> attachment = new HashMap<Character,Integer>(1);
  132.                 for (int i = 0; i < readSize; i++) {
  133.                     RubyIO ioObj = saveReadIO(i, context);
  134.                     saveReadBlocking(ioObj, i);
  135.                     saveBufferedRead(ioObj, i);                    
  136.                     attachment.clear();
  137.                     attachment.put('r', i);
  138.                     trySelectRead(context, attachment, ioObj.getOpenFileChecked());
  139.                 }
  140.             }
  141.         }
  142.     }

  143.     private RubyIO saveReadIO(int i, ThreadContext context) {
  144.         IRubyObject obj = readArray.eltOk(i);
  145.         RubyIO ioObj = RubyIO.convertToIO(context, obj);
  146.         readIOs[i] = ioObj;
  147.         return ioObj;
  148.     }

  149.     private void saveReadBlocking(RubyIO ioObj, int i) {
  150.         // save blocking state
  151.         if (ioObj.getChannel() instanceof SelectableChannel) {
  152.             getReadBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking();
  153.         }
  154.     }

  155.     private void saveBufferedRead(RubyIO ioObj, int i) throws BadDescriptorException {
  156.         // already buffered data? don't bother selecting
  157.         if (ioObj.getOpenFile().READ_DATA_BUFFERED()) {
  158.             getUnselectableReads()[i] = true;
  159.         }
  160.     }

  161.     private void trySelectRead(ThreadContext context, Map<Character,Integer> attachment, OpenFile fptr) throws IOException {
  162.         if (fptr.selectChannel() != null && registerSelect(getSelector(context, fptr.selectChannel()), attachment, fptr.selectChannel(), READ_ACCEPT_OPS)) {
  163.             selectedReads++;
  164.             if (fptr.READ_CHAR_PENDING() || fptr.READ_DATA_PENDING()) {
  165.                 getPendingReads()[attachment.get('r')] = true;
  166.             }
  167.         } else {
  168.             if (fptr.isReadable()) {
  169.                 getUnselectableReads()[attachment.get('r')] = true;
  170.             }
  171.         }
  172.     }

  173.     private void processWrites(Ruby runtime, IRubyObject[] args, ThreadContext context) throws IOException {
  174.         if (args.length > 1 && !args[1].isNil()) {
  175.             // write
  176.             checkArrayType(runtime, args[1]);
  177.             writeArray = (RubyArray) args[1];
  178.             writeSize = writeArray.size();
  179.             if (writeArray.size() == 0) {
  180.                 // clear reference; we aren't going to do anything
  181.                 writeArray = null;
  182.             } else {
  183.                 writeIOs = new RubyIO[writeSize];
  184.                 Map<Character,Integer> attachment = new HashMap<Character,Integer>(1);
  185.                 for (int i = 0; i < writeSize; i++) {
  186.                     RubyIO ioObj = saveWriteIO(i, context);
  187.                     saveWriteBlocking(ioObj, i);
  188.                     attachment.clear();                    
  189.                     attachment.put('w', i);
  190.                     trySelectWrite(context, attachment, ioObj.getOpenFileChecked());
  191.                 }
  192.             }
  193.         }
  194.     }

  195.     private RubyIO saveWriteIO(int i, ThreadContext context) {
  196.         IRubyObject obj = writeArray.eltOk(i);
  197.         RubyIO ioObj = RubyIO.convertToIO(context, obj);
  198.         writeIOs[i] = ioObj.GetWriteIO();
  199.         return ioObj;
  200.     }

  201.     private void saveWriteBlocking(RubyIO ioObj, int i) {
  202.         if (ioObj.getChannel() instanceof SelectableChannel) {
  203.             // save blocking state
  204.             if (readBlocking != null) {
  205.                 // some read has saved blocking state
  206.                 // find obj
  207.                 int readIndex = fastSearch(readIOs, ioObj);
  208.                 if (readIndex == -1) {
  209.                     // save blocking only if not found
  210.                     getWriteBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking();
  211.                 }
  212.             } else {
  213.                 getWriteBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking();
  214.             }
  215.         }
  216.     }

  217.     private void trySelectWrite(ThreadContext context, Map<Character,Integer> attachment, OpenFile fptr) throws IOException {
  218.         if (fptr.selectChannel() == null
  219.                 || false == registerSelect(getSelector(context, fptr.selectChannel()), attachment, fptr.selectChannel(), WRITE_CONNECT_OPS)) {
  220.             selectedReads++;
  221.             if (fptr.isWritable()) {
  222.                 getUnselectableWrites()[attachment.get('w')] = true;
  223.             }
  224.         }
  225.     }

  226.     private static long getTimeoutFromArg(IRubyObject timeArg, Ruby runtime) {
  227.         long timeout = 0;
  228.         if (timeArg instanceof RubyFloat) {
  229.             timeout = Math.round(((RubyFloat) timeArg).getDoubleValue() * 1000);
  230.         } else if (timeArg instanceof RubyFixnum) {
  231.             timeout = Math.round(((RubyFixnum) timeArg).getDoubleValue() * 1000);
  232.         } else {
  233.             // TODO: MRI also can hadle Bignum here
  234.             throw runtime.newTypeError("can't convert " + timeArg.getMetaClass().getName() + " into time interval");
  235.         }
  236.         if (timeout < 0) {
  237.             throw runtime.newArgumentError("negative timeout given");
  238.         }
  239.         return timeout;
  240. }

  241. private void doSelect(Ruby runtime, final boolean has_timeout, long timeout) throws IOException {
  242.     if (mainSelector != null) {
  243.         if (pendingReads == null && unselectableReads == null && unselectableWrites == null) {
  244.             if (has_timeout && timeout == 0) {
  245.                 for (Selector selector : selectors.values()) selector.selectNow();
  246.             } else {
  247.                 List<Future> futures = new ArrayList<Future>(enxioSelectors.size());
  248.                 for (ENXIOSelector enxioSelector : enxioSelectors) {
  249.                     futures.add(runtime.getExecutor().submit(enxioSelector));
  250.                 }

  251.                 mainSelector.select(has_timeout ? timeout : 0);
  252.                 for (ENXIOSelector enxioSelector : enxioSelectors) enxioSelector.selector.wakeup();
  253.                 // ensure all the enxio threads have finished
  254.                 for (Future f : futures) try {
  255.                     f.get();
  256.                 } catch (InterruptedException iex) {
  257.                 } catch (ExecutionException eex) {
  258.                     if (eex.getCause() instanceof IOException) {
  259.                         throw (IOException) eex.getCause();
  260.                     }
  261.                 }
  262.             }
  263.         } else {
  264.             for (Selector selector : selectors.values()) selector.selectNow();
  265.         }
  266.     }

  267.     // If any enxio selectors woke up, remove them from the selected key set of the main selector
  268.     for (ENXIOSelector enxioSelector : enxioSelectors) {
  269.         Pipe.SourceChannel source = enxioSelector.pipe.source();
  270.         SelectionKey key = source.keyFor(mainSelector);
  271.         if (key != null && mainSelector.selectedKeys().contains(key)) {
  272.             mainSelector.selectedKeys().remove(key);
  273.             ByteBuffer buf = ByteBuffer.allocate(1);
  274.             source.read(buf);
  275.         }
  276.     }
  277. }
  278.    
  279.     public static final int READ_ACCEPT_OPS = SelectExecutor.READ_ACCEPT_OPS;
  280.     public static final int WRITE_CONNECT_OPS = SelectExecutor.WRITE_CONNECT_OPS;
  281.     private static final int CANCELLED_OPS = SelectionKey.OP_READ | SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT;
  282.    
  283.     private static boolean ready(int ops, int mask) {
  284.         return (ops & mask) != 0;
  285.     }
  286.    
  287.     private static boolean readAcceptReady(int ops) {
  288.         return ready(ops, READ_ACCEPT_OPS);
  289.     }
  290.    
  291.     private static boolean writeConnectReady(int ops) {
  292.         return ready(ops, WRITE_CONNECT_OPS);
  293.     }
  294.    
  295.     private static boolean cancelReady(int ops) {
  296.         return ready(ops, CANCELLED_OPS);
  297.     }
  298.    
  299.     private static boolean writeReady(int ops) {
  300.         return ready(ops, SelectionKey.OP_WRITE);
  301.     }

  302.     @SuppressWarnings("unchecked")
  303.     private void processSelectedKeys(Ruby runtime) throws IOException {
  304.         for (Selector selector : selectors.values()) {
  305.            
  306.             for (SelectionKey key : selector.selectedKeys()) {
  307.                 int readIoIndex = 0;
  308.                 int writeIoIndex = 0;

  309.                 int interestAndReady = key.interestOps() & key.readyOps();

  310.                 if (readArray != null && readAcceptReady(interestAndReady)) {
  311.                     readIoIndex = ((Map<Character,Integer>)key.attachment()).get('r');

  312.                     getReadResults().append(readArray.eltOk(readIoIndex));

  313.                     if (pendingReads != null) {
  314.                         pendingReads[readIoIndex] = false;
  315.                     }
  316.                 }

  317.                 if (writeArray != null && writeConnectReady(interestAndReady)) {
  318.                     writeIoIndex = ((Map<Character,Integer>)key.attachment()).get('w');

  319.                     getWriteResults().append(writeArray.eltOk(writeIoIndex));
  320.                 }
  321.             }
  322.         }
  323.     }

  324.     private void processPendingAndUnselectable() {
  325.         if (pendingReads != null) {
  326.             for (int i = 0; i < pendingReads.length; i++) {
  327.                 if (pendingReads[i]) {
  328.                     getReadResults().append(readArray.eltOk(i));
  329.                 }
  330.             }
  331.         }
  332.         if (unselectableReads != null) {
  333.             for (int i = 0; i < unselectableReads.length; i++) {
  334.                 if (unselectableReads[i]) {
  335.                     getReadResults().append(readArray.eltOk(i));
  336.                 }
  337.             }
  338.         }
  339.         if (unselectableWrites != null) {
  340.             for (int i = 0; i < unselectableWrites.length; i++) {
  341.                 if (unselectableWrites[i]) {
  342.                     getWriteResults().append(writeArray.eltOk(i));
  343.                 }
  344.             }
  345.         }
  346.     }

  347.     private void tidyUp() throws IOException {
  348.         // make all sockets blocking as configured again
  349.         for (Selector selector : selectors.values()) {
  350.             selector.close(); // close unregisters all channels, so we can safely reset blocking modes
  351.         }

  352.         for (ENXIOSelector enxioSelector : enxioSelectors) {
  353.             enxioSelector.pipe.sink().close();
  354.             enxioSelector.pipe.source().close();
  355.         }

  356.         if (readBlocking != null) {
  357.             for (int i = 0; i < readBlocking.length; i++) {
  358.                 if (readBlocking[i] != null) {
  359.                     try {
  360.                         ((SelectableChannel) readIOs[i].getChannel()).configureBlocking(readBlocking[i]);
  361.                     } catch (IllegalBlockingModeException ibme) {
  362.                         throw runtime.newConcurrencyError("can not set IO blocking after select; concurrent select detected?");
  363.                     }
  364.                 }
  365.             }
  366.         }
  367.         if (writeBlocking != null) {
  368.             for (int i = 0; i < writeBlocking.length; i++) {
  369.                 if (writeBlocking[i] != null) {
  370.                     try {
  371.                         ((SelectableChannel) writeIOs[i].getChannel()).configureBlocking(writeBlocking[i]);
  372.                     } catch (IllegalBlockingModeException ibme) {
  373.                         throw runtime.newConcurrencyError("can not set IO blocking after select; concurrent select detected?");
  374.                     }
  375.                 }
  376.             }
  377.         }
  378.     }

  379.     private RubyArray getReadResults() {
  380.         if (readResults == null) {
  381.             readResults = RubyArray.newArray(runtime, readArray.size());
  382.         }
  383.         return readResults;
  384.     }

  385.     private RubyArray getWriteResults() {
  386.         if (writeResults == null) {
  387.             writeResults = RubyArray.newArray(runtime, writeArray.size());
  388.         }
  389.         return writeResults;
  390.     }

  391.     private RubyArray getErrorResults() {
  392.         if (errorResults != null) {
  393.             errorResults = RubyArray.newArray(runtime, readArray.size() + writeArray.size());
  394.         }
  395.         return errorResults;
  396.     }

  397.     private Selector getSelector(ThreadContext context, SelectableChannel channel) throws IOException {
  398.         Selector selector = selectors.get(channel.provider());
  399.         if (selector == null) {
  400.             selector = SelectorFactory.openWithRetryFrom(context.runtime, channel.provider());
  401.             if (selectors.isEmpty()) {
  402.                 selectors = new HashMap<SelectorProvider, Selector>();
  403.             }
  404.             selectors.put(channel.provider(), selector);

  405.             if (!selector.provider().equals(SelectorProvider.provider())) {
  406.                 // need to create pipe between alt impl selector and native NIO selector
  407.                 Pipe pipe = Pipe.open();
  408.                 ENXIOSelector enxioSelector = new ENXIOSelector(selector, pipe);
  409.                 if (enxioSelectors.isEmpty()) enxioSelectors = new ArrayList<ENXIOSelector>();
  410.                 enxioSelectors.add(enxioSelector);
  411.                 pipe.source().configureBlocking(false);
  412.                 pipe.source().register(getSelector(context, pipe.source()), SelectionKey.OP_READ, enxioSelector);
  413.             } else if (mainSelector == null) {
  414.                 mainSelector = selector;
  415.             }
  416.         }

  417.         return selector;
  418.     }

  419.     private Boolean[] getReadBlocking() {
  420.         if (readBlocking == null) {
  421.             readBlocking = new Boolean[readSize];
  422.         }
  423.         return readBlocking;
  424.     }

  425.     private Boolean[] getWriteBlocking() {
  426.         if (writeBlocking == null) {
  427.             writeBlocking = new Boolean[writeSize];
  428.         }
  429.         return writeBlocking;
  430.     }

  431.     private boolean[] getUnselectableReads() {
  432.         if (unselectableReads == null) {
  433.             unselectableReads = new boolean[readSize];
  434.         }
  435.         return unselectableReads;
  436.     }

  437.     private boolean[] getUnselectableWrites() {
  438.         if (unselectableWrites == null) {
  439.             unselectableWrites = new boolean[writeSize];
  440.         }
  441.         return unselectableWrites;
  442.     }

  443.     private boolean[] getPendingReads() {
  444.         if (pendingReads == null) {
  445.             pendingReads = new boolean[readSize];
  446.         }
  447.         return pendingReads;
  448.     }

  449.     private IRubyObject constructResults(Ruby runtime) {
  450.         return RubyArray.newArrayLight(
  451.                 runtime,
  452.                 readResults == null ? RubyArray.newEmptyArray(runtime) : readResults,
  453.                 writeResults == null ? RubyArray.newEmptyArray(runtime) : writeResults,
  454.                 errorResults == null ? RubyArray.newEmptyArray(runtime) : errorResults);
  455.     }

  456.     private int fastSearch(Object[] ary, Object obj) {
  457.         for (int i = 0; i < ary.length; i++) {
  458.             if (ary[i] == obj) {
  459.                 return i;
  460.             }
  461.         }
  462.         return -1;
  463.     }

  464.     private static void checkArrayType(Ruby runtime, IRubyObject obj) {
  465.         if (!(obj instanceof RubyArray)) {
  466.             throw runtime.newTypeError("wrong argument type "
  467.                     + obj.getMetaClass().getName() + " (expected Array)");
  468.         }
  469.     }

  470.     @SuppressWarnings("unchecked")
  471.     private static boolean registerSelect(Selector selector, Map<Character,Integer> obj, SelectableChannel channel, int ops) throws IOException {
  472.         channel.configureBlocking(false);
  473.         int real_ops = channel.validOps() & ops;
  474.         SelectionKey key = channel.keyFor(selector);

  475.         if (key == null) {
  476.             Map<Character,Integer>  attachment = new HashMap<Character,Integer> (1);
  477.             attachment.putAll(obj);
  478.             channel.register(selector, real_ops, attachment );
  479.         } else {
  480.             key.interestOps(key.interestOps() | real_ops);
  481.             Map<Character,Integer> att = (Map<Character,Integer>)key.attachment();
  482.             att.putAll(obj);
  483.             key.attach(att);
  484.         }

  485.         return true;
  486.     }

  487.     private static final class ENXIOSelector implements Callable<Object> {
  488.         private final Selector selector;
  489.         private final Pipe pipe;

  490.         private ENXIOSelector(Selector selector, Pipe pipe) {
  491.             this.selector = selector;
  492.             this.pipe = pipe;
  493.         }

  494.         public Object call() throws Exception {
  495.             try {
  496.                 selector.select();
  497.             } finally {
  498.                 ByteBuffer buf = ByteBuffer.allocate(1);
  499.                 buf.put((byte) 0);
  500.                 buf.flip();
  501.                 pipe.sink().write(buf);
  502.             }

  503.             return null;
  504.         }
  505.     }
  506.    
  507.     Ruby runtime;
  508.     RubyArray readArray = null;
  509.     int readSize = 0;
  510.     RubyIO[] readIOs = null;
  511.     boolean[] unselectableReads = null;
  512.     boolean[] pendingReads = null;
  513.     Boolean[] readBlocking = null;
  514.     int selectedReads = 0;
  515.     RubyArray writeArray = null;
  516.     int writeSize = 0;
  517.     RubyIO[] writeIOs = null;
  518.     boolean[] unselectableWrites = null;
  519.     Boolean[] writeBlocking = null;
  520.     int selectedWrites = 0;
  521.     Selector mainSelector = null;
  522.     Map<SelectorProvider, Selector> selectors = Collections.emptyMap();
  523.     Collection<ENXIOSelector> enxioSelectors = Collections.emptyList();
  524.     RubyArray readResults = null;
  525.     RubyArray writeResults = null;
  526.     RubyArray errorResults = null;
  527. }