RubyThread.java

  1. /***** BEGIN LICENSE BLOCK *****
  2.  * Version: EPL 1.0/GPL 2.0/LGPL 2.1
  3.  *
  4.  * The contents of this file are subject to the Eclipse Public
  5.  * License Version 1.0 (the "License"); you may not use this file
  6.  * except in compliance with the License. You may obtain a copy of
  7.  * the License at http://www.eclipse.org/legal/epl-v10.html
  8.  *
  9.  * Software distributed under the License is distributed on an "AS
  10.  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  11.  * implied. See the License for the specific language governing
  12.  * rights and limitations under the License.
  13.  *
  14.  * Copyright (C) 2002 Jason Voegele <jason@jvoegele.com>
  15.  * Copyright (C) 2002-2004 Anders Bengtsson <ndrsbngtssn@yahoo.se>
  16.  * Copyright (C) 2002-2004 Jan Arne Petersen <jpetersen@uni-bonn.de>
  17.  * Copyright (C) 2004 Thomas E Enebo <enebo@acm.org>
  18.  * Copyright (C) 2004-2005 Charles O Nutter <headius@headius.com>
  19.  * Copyright (C) 2004 Stefan Matthias Aust <sma@3plus4.de>
  20.  *
  21.  * Alternatively, the contents of this file may be used under the terms of
  22.  * either of the GNU General Public License Version 2 or later (the "GPL"),
  23.  * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
  24.  * in which case the provisions of the GPL or the LGPL are applicable instead
  25.  * of those above. If you wish to allow use of your version of this file only
  26.  * under the terms of either the GPL or the LGPL, and not to allow others to
  27.  * use your version of this file under the terms of the EPL, indicate your
  28.  * decision by deleting the provisions above and replace them with the notice
  29.  * and other provisions required by the GPL or the LGPL. If you do not delete
  30.  * the provisions above, a recipient may use your version of this file under
  31.  * the terms of any one of the EPL, the GPL or the LGPL.
  32.  ***** END LICENSE BLOCK *****/
  33. package org.jruby;

  34. import java.io.IOException;
  35. import java.lang.ref.WeakReference;
  36. import java.nio.channels.Channel;
  37. import java.nio.channels.SelectableChannel;
  38. import java.nio.channels.SelectionKey;
  39. import java.nio.channels.Selector;
  40. import java.util.Iterator;
  41. import java.util.Queue;
  42. import java.util.Vector;
  43. import java.util.WeakHashMap;
  44. import java.util.HashMap;
  45. import java.util.List;
  46. import java.util.ArrayList;
  47. import java.util.Map;

  48. import java.util.Set;
  49. import org.jruby.common.IRubyWarnings.ID;
  50. import org.jruby.exceptions.RaiseException;
  51. import org.jruby.exceptions.ThreadKill;
  52. import org.jruby.internal.runtime.NativeThread;
  53. import org.jruby.internal.runtime.RubyRunnable;
  54. import org.jruby.internal.runtime.ThreadLike;
  55. import org.jruby.internal.runtime.ThreadService;
  56. import org.jruby.runtime.Block;
  57. import org.jruby.runtime.ObjectAllocator;
  58. import org.jruby.runtime.ThreadContext;
  59. import org.jruby.runtime.ExecutionContext;
  60. import org.jruby.runtime.builtin.IRubyObject;

  61. import java.util.concurrent.ConcurrentLinkedQueue;
  62. import java.util.concurrent.ExecutionException;
  63. import java.util.concurrent.Semaphore;
  64. import java.util.concurrent.TimeUnit;
  65. import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  66. import java.util.concurrent.atomic.AtomicReference;
  67. import java.util.concurrent.locks.Lock;

  68. import org.jruby.anno.JRubyMethod;
  69. import org.jruby.anno.JRubyClass;
  70. import org.jruby.runtime.ClassIndex;
  71. import org.jruby.runtime.ObjectMarshal;
  72. import static org.jruby.runtime.Visibility.*;

  73. import org.jruby.util.TypeConverter;
  74. import org.jruby.util.io.BlockingIO;
  75. import org.jruby.util.io.OpenFile;
  76. import org.jruby.util.io.SelectorFactory;
  77. import org.jruby.util.log.Logger;
  78. import org.jruby.util.log.LoggerFactory;

  79. import org.jruby.java.proxies.ConcreteJavaProxy;
  80. import org.jruby.runtime.Helpers;
  81. import org.jruby.runtime.backtrace.RubyStackTraceElement;
  82. import org.jruby.util.ByteList;

  83. /**
  84.  * Implementation of Ruby's <code>Thread</code> class.  Each Ruby thread is
  85.  * mapped to an underlying Java Virtual Machine thread.
  86.  * <p>
  87.  * Thread encapsulates the behavior of a thread of execution, including the main
  88.  * thread of the Ruby script.  In the descriptions that follow, the parameter
  89.  * <code>aSymbol</code> refers to a symbol, which is either a quoted string or a
  90.  * <code>Symbol</code> (such as <code>:name</code>).
  91.  *
  92.  * Note: For CVS history, see ThreadClass.java.
  93.  */
  94. @JRubyClass(name="Thread")
  95. public class RubyThread extends RubyObject implements ExecutionContext {

  96.     private static final Logger LOG = LoggerFactory.getLogger("RubyThread");

  97.     /** The thread-like think that is actually executing */
  98.     private ThreadLike threadImpl;

  99.     /** Normal thread-local variables */
  100.     private transient Map<IRubyObject, IRubyObject> threadLocalVariables;

  101.     /** Context-local variables, internal-ish thread locals */
  102.     private final Map<Object, IRubyObject> contextVariables = new WeakHashMap<Object, IRubyObject>();

  103.     /** Whether this thread should try to abort the program on exception */
  104.     private boolean abortOnException;

  105.     /** The final value resulting from the thread's execution */
  106.     private IRubyObject finalResult;

  107.     /**
  108.      * The exception currently being raised out of the thread. We reference
  109.      * it here to continue propagating it while handling thread shutdown
  110.      * logic and abort_on_exception.
  111.      */
  112.     private RaiseException exitingException;

  113.     /** The ThreadGroup to which this thread belongs */
  114.     private RubyThreadGroup threadGroup;

  115.     /** Per-thread "current exception" */
  116.     private IRubyObject errorInfo;

  117.     /** Weak reference to the ThreadContext for this thread. */
  118.     private volatile WeakReference<ThreadContext> contextRef;

  119.     /** Whether to scan for cross-thread events */
  120.     private volatile boolean handleInterrupt = true;

  121.     /** Stack of interrupt masks active for this thread */
  122.     private final List<RubyHash> interruptMaskStack = new ArrayList<RubyHash>();

  123.     private static final boolean DEBUG = false;
  124.     private int RUBY_MIN_THREAD_PRIORITY = -3;
  125.     private int RUBY_MAX_THREAD_PRIORITY = 3;

  126.     /** Thread statuses */
  127.     public static enum Status {
  128.         RUN, SLEEP, ABORTING, DEAD;
  129.        
  130.         public final ByteList bytes;
  131.        
  132.         Status() {
  133.             bytes = new ByteList(toString().toLowerCase().getBytes(RubyEncoding.UTF8));
  134.         }
  135.     }

  136.     /** Current status in an atomic reference */
  137.     private final AtomicReference<Status> status = new AtomicReference<Status>(Status.RUN);

  138.     /** Mail slot for cross-thread events */
  139.     private final Queue<IRubyObject> pendingInterruptQueue = new ConcurrentLinkedQueue();

  140.     /** A function to use to unblock this thread, if possible */
  141.     private Unblocker unblockFunc;

  142.     /** Argument to pass to the unblocker */
  143.     private Object unblockArg;

  144.     /** The list of locks this thread currently holds, so they can be released on exit */
  145.     private final List<Lock> heldLocks = new Vector<Lock>();

  146.     /** Whether or not this thread has been disposed of */
  147.     private volatile boolean disposed = false;

  148.     /** Interrupt flags */
  149.     private volatile int interruptFlag = 0;

  150.     /** Interrupt mask to use for disabling certain types */
  151.     private volatile int interruptMask;

  152.     /** Short circuit to avoid-re-scanning for interrupts */
  153.     private volatile boolean pendingInterruptQueueChecked = false;

  154.     private volatile Selector currentSelector;

  155.     private static final AtomicIntegerFieldUpdater INTERRUPT_FLAG_UPDATER =
  156.             AtomicIntegerFieldUpdater.newUpdater(RubyThread.class, "interruptFlag");

  157.     private static final int TIMER_INTERRUPT_MASK         = 0x01;
  158.     private static final int PENDING_INTERRUPT_MASK       = 0x02;
  159.     private static final int POSTPONED_JOB_INTERRUPT_MASK = 0x04;
  160.     private static final int TRAP_INTERRUPT_MASK          = 0x08;

  161.     private static final int INTERRUPT_NONE = 0;
  162.     private static final int INTERRUPT_IMMEDIATE = 1;
  163.     private static final int INTERRUPT_ON_BLOCKING = 2;
  164.     private static final int INTERRUPT_NEVER = 3;

  165.     protected RubyThread(Ruby runtime, RubyClass type) {
  166.         super(runtime, type);

  167.         finalResult = runtime.getNil();
  168.         errorInfo = runtime.getNil();
  169.     }
  170.    
  171.     public RubyThread(Ruby runtime, RubyClass klass, Runnable runnable) {
  172.         this(runtime, klass);
  173.        
  174.         startWith(runnable);
  175.     }

  176.     private void executeInterrupts(ThreadContext context, boolean blockingTiming) {
  177.         Ruby runtime = context.runtime;
  178.         int interrupt;

  179.         boolean postponedJobInterrupt = false;

  180.         while ((interrupt = getInterrupts()) != 0) {
  181.             boolean timerInterrupt = (interrupt & TIMER_INTERRUPT_MASK) == TIMER_INTERRUPT_MASK;
  182.             boolean pendingInterrupt = (interrupt & PENDING_INTERRUPT_MASK) == PENDING_INTERRUPT_MASK;

  183. //            if (postponedJobInterrupt) {
  184. //                postponedJobFlush(context);
  185. //            }
  186.             // Missing: signal handling...but perhaps we don't need it on JVM

  187.             if (pendingInterrupt && pendingInterruptActive()) {
  188.                 IRubyObject err = pendingInterruptDeque(context, blockingTiming ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);

  189.                 if (err == UNDEF) {
  190.                     // no error
  191.                 } else if (err == RubyFixnum.zero(runtime) ||
  192.                         err == RubyFixnum.one(runtime) ||
  193.                         err == RubyFixnum.two(runtime)) {
  194.                     toKill();
  195.                 } else {
  196.                     afterBlockingCall();
  197.                     if (status.get() == Status.SLEEP) {
  198.                         exitSleep();
  199.                     }
  200.                     RubyKernel.raise(context, runtime.getKernel(), new IRubyObject[]{err}, Block.NULL_BLOCK);
  201.                 }
  202.             }

  203.             // Missing: timer interrupt...needed?
  204.         }
  205.     }

  206.     private void postponedJobFlush(ThreadContext context) {
  207.         // unsure if this function has any relevance in JRuby

  208. //        int savedPostponedJobInterruptMask = interruptMask & POSTPONED_JOB_INTERRUPT_MASK;
  209. //
  210. //        errorInfo = context.nil;
  211. //        interruptMask |= POSTPONED_JOB_INTERRUPT_MASK;
  212.     }

  213.     private boolean pendingInterruptActive() {
  214.         if (pendingInterruptQueueChecked) return false;
  215.         if (pendingInterruptQueue.isEmpty()) return false;
  216.         return true;
  217.     }

  218.     private void toKill() {
  219.         pendingInterruptClear();
  220.         throwThreadKill();
  221.     }

  222.     private void pendingInterruptClear() {
  223.         pendingInterruptQueue.clear();
  224.     }

  225.     private int getInterrupts() {
  226.         int interrupt;
  227.         while (true) {
  228.             interrupt = interruptFlag;
  229.             if (INTERRUPT_FLAG_UPDATER.compareAndSet(this, interrupt, interrupt & interruptMask)) {
  230.                 break;
  231.             }
  232.         }
  233.         return interrupt & ~interruptMask;
  234.     }

  235.     private IRubyObject pendingInterruptDeque(ThreadContext context, int timing) {
  236.         for (Iterator<IRubyObject> iterator = pendingInterruptQueue.iterator(); iterator.hasNext();) {
  237.             IRubyObject err = iterator.next();
  238.             int maskTiming = pendingInterruptCheckMask(context, err);

  239.             switch (maskTiming) {
  240.                 case INTERRUPT_ON_BLOCKING:
  241.                     if (timing != INTERRUPT_ON_BLOCKING) break;
  242.                 case INTERRUPT_NONE:
  243.                 case INTERRUPT_IMMEDIATE:
  244.                     iterator.remove();
  245.                     return err;
  246.                 case INTERRUPT_NEVER:
  247.                     break;
  248.             }
  249.         }

  250.         pendingInterruptQueueChecked = true;

  251.         return UNDEF;
  252.     }

  253.     private int pendingInterruptCheckMask(ThreadContext context, IRubyObject err) {
  254.         List<IRubyObject> ancestors = err.getMetaClass().getAncestorList();
  255.         int ancestorsLen = ancestors.size();

  256.         List<RubyHash> maskStack = interruptMaskStack;
  257.         int maskStackLen = maskStack.size();

  258.         for (int i = 0; i < maskStackLen; i++) {
  259.             RubyHash mask = maskStack.get(maskStackLen - (i + 1));

  260.             for (int j = 0; j < ancestorsLen; j++) {
  261.                 IRubyObject klass = ancestors.get(j);
  262.                 IRubyObject sym;

  263.                 if (!(sym = mask.op_aref(context, klass)).isNil()) {
  264.                     String symStr = sym.toString();
  265.                     switch (symStr) {
  266.                         case "immediate": return INTERRUPT_IMMEDIATE;
  267.                         case "on_blocking": return INTERRUPT_ON_BLOCKING;
  268.                         case "never": return INTERRUPT_NEVER;
  269.                         default:
  270.                             throw context.runtime.newThreadError("unknown mask signature");
  271.                     }
  272.                 }
  273.             }
  274.         }
  275.         return INTERRUPT_NONE;
  276.     }

  277.     public IRubyObject getErrorInfo() {
  278.         return errorInfo;
  279.     }

  280.     public IRubyObject setErrorInfo(IRubyObject errorInfo) {
  281.         this.errorInfo = errorInfo;
  282.         return errorInfo;
  283.     }

  284.     public void setContext(ThreadContext context) {
  285.         this.contextRef = new WeakReference<ThreadContext>(context);
  286.     }

  287.     public ThreadContext getContext() {
  288.         return contextRef == null ? null : contextRef.get();
  289.     }


  290.     public Thread getNativeThread() {
  291.         return threadImpl.nativeThread();
  292.     }

  293.     /**
  294.      * Perform pre-execution tasks once the native thread is running, but we
  295.      * have not yet called the Ruby code for the thread.
  296.      */
  297.     public void beforeStart() {
  298.     }

  299.     /**
  300.      * Dispose of the current thread by tidying up connections to other stuff
  301.      */
  302.     public void dispose() {
  303.         if (disposed) return;
  304.        
  305.         synchronized (this) {
  306.             if (disposed) return;
  307.            
  308.             disposed = true;

  309.             // remove from parent thread group
  310.             threadGroup.remove(this);

  311.             // unlock all locked locks
  312.             unlockAll();

  313.             // mark thread as DEAD
  314.             beDead();
  315.         }

  316.         // unregister from runtime's ThreadService
  317.         getRuntime().getThreadService().unregisterThread(this);
  318.     }
  319.    
  320.     public static RubyClass createThreadClass(Ruby runtime) {
  321.         // FIXME: In order for Thread to play well with the standard 'new' behavior,
  322.         // it must provide an allocator that can create empty object instances which
  323.         // initialize then fills with appropriate data.
  324.         RubyClass threadClass = runtime.defineClass("Thread", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
  325.         runtime.setThread(threadClass);

  326.         threadClass.setClassIndex(ClassIndex.THREAD);
  327.         threadClass.setReifiedClass(RubyThread.class);

  328.         threadClass.defineAnnotatedMethods(RubyThread.class);

  329.         RubyThread rubyThread = new RubyThread(runtime, threadClass);
  330.         // TODO: need to isolate the "current" thread from class creation
  331.         rubyThread.threadImpl = new NativeThread(rubyThread, Thread.currentThread());
  332.         runtime.getThreadService().setMainThread(Thread.currentThread(), rubyThread);
  333.        
  334.         // set to default thread group
  335.         runtime.getDefaultThreadGroup().addDirectly(rubyThread);
  336.        
  337.         threadClass.setMarshal(ObjectMarshal.NOT_MARSHALABLE_MARSHAL);
  338.        
  339.         // set up Thread::Backtrace::Location class
  340.         RubyClass backtrace = threadClass.defineClassUnder("Backtrace", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
  341.         RubyClass location = backtrace.defineClassUnder("Location", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);

  342.         location.defineAnnotatedMethods(Location.class);

  343.         runtime.setLocation(location);
  344.        
  345.         return threadClass;
  346.     }
  347.    
  348.     public static class Location extends RubyObject {
  349.         public Location(Ruby runtime, RubyClass klass, RubyStackTraceElement element) {
  350.             super(runtime, klass);
  351.             this.element = element;
  352.         }
  353.        
  354.         @JRubyMethod
  355.         public IRubyObject absolute_path(ThreadContext context) {
  356.             return context.runtime.newString(element.getFileName());
  357.         }
  358.        
  359.         @JRubyMethod
  360.         public IRubyObject base_label(ThreadContext context) {
  361.             return context.runtime.newString(element.getMethodName());
  362.         }
  363.        
  364.         @JRubyMethod
  365.         public IRubyObject inspect(ThreadContext context) {
  366.             return to_s(context).inspect();
  367.         }
  368.        
  369.         @JRubyMethod
  370.         public IRubyObject label(ThreadContext context) {
  371.             return context.runtime.newString(element.getMethodName());
  372.         }
  373.        
  374.         @JRubyMethod
  375.         public IRubyObject lineno(ThreadContext context) {
  376.             return context.runtime.newFixnum(element.getLineNumber());
  377.         }
  378.        
  379.         @JRubyMethod
  380.         public IRubyObject path(ThreadContext context) {
  381.             return context.runtime.newString(element.getFileName());
  382.         }
  383.        
  384.         @JRubyMethod
  385.         public IRubyObject to_s(ThreadContext context) {
  386.             return context.runtime.newString(element.mriStyleString());
  387.         }

  388.         public static IRubyObject newLocationArray(Ruby runtime, RubyStackTraceElement[] elements) {
  389.             RubyArray ary = runtime.newArray(elements.length);

  390.             for (RubyStackTraceElement element : elements) {
  391.                 ary.append(new RubyThread.Location(runtime, runtime.getLocation(), element));
  392.             }

  393.             return ary;
  394.         }
  395.        
  396.         private final RubyStackTraceElement element;
  397.     }

  398.     /**
  399.      * <code>Thread.new</code>
  400.      * <p>
  401.      * Thread.new( <i>[ arg ]*</i> ) {| args | block } -> aThread
  402.      * <p>
  403.      * Creates a new thread to execute the instructions given in block, and
  404.      * begins running it. Any arguments passed to Thread.new are passed into the
  405.      * block.
  406.      * <pre>
  407.      * x = Thread.new { sleep .1; print "x"; print "y"; print "z" }
  408.      * a = Thread.new { print "a"; print "b"; sleep .2; print "c" }
  409.      * x.join # Let the threads finish before
  410.      * a.join # main thread exits...
  411.      * </pre>
  412.      * <i>produces:</i> abxyzc
  413.      */
  414.     @JRubyMethod(name = {"new", "fork"}, rest = true, meta = true)
  415.     public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) {
  416.         return startThread(recv, args, true, block);
  417.     }

  418.     /**
  419.      * Basically the same as Thread.new . However, if class Thread is
  420.      * subclassed, then calling start in that subclass will not invoke the
  421.      * subclass's initialize method.
  422.      */
  423.     public static RubyThread start(IRubyObject recv, IRubyObject[] args, Block block) {
  424.         return start19(recv, args, block);
  425.     }
  426.    
  427.     @JRubyMethod(rest = true, name = "start", meta = true)
  428.     public static RubyThread start19(IRubyObject recv, IRubyObject[] args, Block block) {
  429.         Ruby runtime = recv.getRuntime();
  430.         // The error message may appear incongruous here, due to the difference
  431.         // between JRuby's Thread model and MRI's.
  432.         // We mimic MRI's message in the name of compatibility.
  433.         if (! block.isGiven()) throw runtime.newArgumentError("tried to create Proc object without a block");
  434.         return startThread(recv, args, false, block);
  435.     }
  436.    
  437.     public static RubyThread adopt(IRubyObject recv, Thread t) {
  438.         return adoptThread(recv, t, Block.NULL_BLOCK);
  439.     }

  440.     private static RubyThread adoptThread(final IRubyObject recv, Thread t, Block block) {
  441.         final Ruby runtime = recv.getRuntime();
  442.         final RubyThread rubyThread = new RubyThread(runtime, (RubyClass) recv);
  443.        
  444.         rubyThread.threadImpl = new NativeThread(rubyThread, t);
  445.         ThreadContext context = runtime.getThreadService().registerNewThread(rubyThread);
  446.         runtime.getThreadService().associateThread(t, rubyThread);
  447.        
  448.         context.preAdoptThread();
  449.        
  450.         // set to default thread group
  451.         runtime.getDefaultThreadGroup().addDirectly(rubyThread);
  452.        
  453.         return rubyThread;
  454.     }
  455.    
  456.     @JRubyMethod(rest = true, visibility = PRIVATE)
  457.     public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) {
  458.         Ruby runtime = getRuntime();
  459.         if (!block.isGiven()) throw runtime.newThreadError("must be called with a block");
  460.         if (threadImpl != null) throw runtime.newThreadError("already initialized thread");

  461.         RubyRunnable runnable = new RubyRunnable(this, args, block);
  462.        
  463.         return startWith(runnable);
  464.     }

  465.     private IRubyObject startWith(Runnable runnable) throws RaiseException, OutOfMemoryError {
  466.         Ruby runtime = getRuntime();
  467.         ThreadContext context = runtime.getCurrentContext();
  468.        
  469.         try {
  470.             Thread thread = new Thread(runnable);
  471.             thread.setDaemon(true);
  472.             thread.setName("Ruby-" + runtime.getRuntimeNumber() + "-" + thread.getName() + ": " + context.getFile() + ":" + (context.getLine() + 1));
  473.             threadImpl = new NativeThread(this, thread);

  474.             addToCorrectThreadGroup(context);

  475.             // JRUBY-2380, associate thread early so it shows up in Thread.list right away, in case it doesn't run immediately
  476.             runtime.getThreadService().associateThread(thread, this);

  477.             threadImpl.start();

  478.             // We yield here to hopefully permit the target thread to schedule
  479.             // MRI immediately schedules it, so this is close but not exact
  480.             Thread.yield();
  481.        
  482.             return this;
  483.         } catch (OutOfMemoryError oome) {
  484.             if (oome.getMessage().equals("unable to create new native thread")) {
  485.                 throw runtime.newThreadError(oome.getMessage());
  486.             }
  487.             throw oome;
  488.         } catch (SecurityException ex) {
  489.           throw runtime.newThreadError(ex.getMessage());
  490.         }
  491.     }
  492.    
  493.     private static RubyThread startThread(final IRubyObject recv, final IRubyObject[] args, boolean callInit, Block block) {
  494.         RubyThread rubyThread = new RubyThread(recv.getRuntime(), (RubyClass) recv);
  495.        
  496.         if (callInit) {
  497.             rubyThread.callInit(args, block);
  498.         } else {
  499.             // for Thread::start, which does not call the subclass's initialize
  500.             rubyThread.initialize(recv.getRuntime().getCurrentContext(), args, block);
  501.         }
  502.        
  503.         return rubyThread;
  504.     }
  505.    
  506.     public synchronized void cleanTerminate(IRubyObject result) {
  507.         finalResult = result;
  508.     }

  509.     public synchronized void beDead() {
  510.         status.set(Status.DEAD);
  511.     }

  512.     public void pollThreadEvents() {
  513.         pollThreadEvents(getRuntime().getCurrentContext());
  514.     }

  515.     // CHECK_INTS
  516.     public void pollThreadEvents(ThreadContext context) {
  517.         if (anyInterrupted()) {
  518.             executeInterrupts(context, true);
  519.         }
  520.     }

  521.     // RUBY_VM_INTERRUPTED_ANY
  522.     private boolean anyInterrupted() {
  523.         return Thread.interrupted() || (interruptFlag & ~interruptMask) != 0;
  524.     }
  525.    
  526.     private static void throwThreadKill() {
  527.         throw new ThreadKill();
  528.     }

  529.     @JRubyMethod(meta = true)
  530.     public static IRubyObject handle_interrupt(ThreadContext context, IRubyObject self, IRubyObject _mask, Block block) {
  531.         Ruby runtime = context.runtime;

  532.         if (!block.isGiven()) {
  533.             throw runtime.newArgumentError("block is needed");
  534.         }

  535.         RubyHash mask = (RubyHash)TypeConverter.convertToTypeWithCheck(_mask, runtime.getHash(), "to_hash");

  536.         mask.visitAll(new RubyHash.Visitor() {
  537.             @Override
  538.             public void visit(IRubyObject key, IRubyObject value) {
  539.                 if (value instanceof RubySymbol) {
  540.                     RubySymbol sym = (RubySymbol)value;
  541.                     String symString = sym.toString();
  542.                     if (!symString.equals("immediate") && !symString.equals("on_blocking") && !symString.equals("never")) {
  543.                         throw key.getRuntime().newArgumentError("unknown mask signature");
  544.                     }
  545.                 }
  546.             }
  547.         });

  548.         RubyThread th = context.getThread();
  549.         th.interruptMaskStack.add(mask);
  550.         if (th.pendingInterruptQueue.isEmpty()) {
  551.             th.pendingInterruptQueueChecked = false;
  552.             th.setInterrupt();
  553.         }

  554.         try {
  555.             return block.call(context);
  556.         } finally {
  557.             th.interruptMaskStack.remove(th.interruptMaskStack.size() - 1);
  558.             th.setInterrupt();

  559.             th.pollThreadEvents(context);
  560.         }
  561.     }

  562.     @JRubyMethod(name = "pending_interrupt?", meta = true, optional = 1)
  563.     public static IRubyObject pending_interrupt_p(ThreadContext context, IRubyObject self, IRubyObject[] args) {
  564.         return context.getThread().pending_interrupt_p(context, args);
  565.     }

  566.     @JRubyMethod(name = "pending_interrupt?", optional = 1)
  567.     public IRubyObject pending_interrupt_p(ThreadContext context, IRubyObject[] args) {
  568.         if (pendingInterruptQueue.isEmpty()) {
  569.             return context.runtime.getFalse();
  570.         } else {
  571.             if (args.length == 1) {
  572.                 IRubyObject err = args[0];
  573.                 if (!(err instanceof RubyModule)) {
  574.                     throw context.runtime.newTypeError("class or module required for rescue clause");
  575.                 }
  576.                 if (pendingInterruptInclude(err)) {
  577.                     return context.runtime.getTrue();
  578.                 } else {
  579.                     return context.runtime.getFalse();
  580.                 }
  581.             } else {
  582.                 return context.runtime.getTrue();
  583.             }
  584.         }
  585.     }

  586.     private boolean pendingInterruptInclude(IRubyObject err) {
  587.         Iterator<IRubyObject> iterator = pendingInterruptQueue.iterator();
  588.         while (iterator.hasNext()) {
  589.             IRubyObject e = iterator.next();
  590.             if (((RubyModule)e).op_le(err).isTrue()) {
  591.                 return true;
  592.             }
  593.         }
  594.         return false;
  595.     }

  596.     /**
  597.      * Returns the status of the global ``abort on exception'' condition. The
  598.      * default is false. When set to true, will cause all threads to abort (the
  599.      * process will exit(0)) if an exception is raised in any thread. See also
  600.      * Thread.abort_on_exception= .
  601.      */
  602.     @JRubyMethod(name = "abort_on_exception", meta = true)
  603.     public static RubyBoolean abort_on_exception_x(IRubyObject recv) {
  604.         Ruby runtime = recv.getRuntime();
  605.         return runtime.isGlobalAbortOnExceptionEnabled() ? runtime.getTrue() : runtime.getFalse();
  606.     }

  607.     @JRubyMethod(name = "abort_on_exception=", required = 1, meta = true)
  608.     public static IRubyObject abort_on_exception_set_x(IRubyObject recv, IRubyObject value) {
  609.         recv.getRuntime().setGlobalAbortOnExceptionEnabled(value.isTrue());
  610.         return value;
  611.     }

  612.     @JRubyMethod(meta = true)
  613.     public static RubyThread current(IRubyObject recv) {
  614.         return recv.getRuntime().getCurrentContext().getThread();
  615.     }

  616.     @JRubyMethod(meta = true)
  617.     public static RubyThread main(IRubyObject recv) {
  618.         return recv.getRuntime().getThreadService().getMainThread();
  619.     }

  620.     @JRubyMethod(meta = true)
  621.     public static IRubyObject pass(IRubyObject recv) {
  622.         Ruby runtime = recv.getRuntime();
  623.         ThreadService ts = runtime.getThreadService();
  624.         boolean critical = ts.getCritical();
  625.        
  626.         ts.setCritical(false);

  627.         try {
  628.             Thread.yield();
  629.         } finally {
  630.             ts.setCritical(critical);
  631.         }
  632.        
  633.         return recv.getRuntime().getNil();
  634.     }

  635.     @JRubyMethod(meta = true)
  636.     public static IRubyObject exclusive(ThreadContext context, IRubyObject recv, Block block) {
  637.         Ruby runtime = context.runtime;
  638.         ThreadService ts = runtime.getThreadService();
  639.         boolean critical = ts.getCritical();

  640.         ts.setCritical(true);

  641.         try {
  642.             return block.yieldSpecific(context);
  643.         } finally {
  644.             ts.setCritical(critical);
  645.         }
  646.     }

  647.     @JRubyMethod(meta = true)
  648.     public static RubyArray list(IRubyObject recv) {
  649.         RubyThread[] activeThreads = recv.getRuntime().getThreadService().getActiveRubyThreads();
  650.        
  651.         return recv.getRuntime().newArrayNoCopy(activeThreads);
  652.     }

  653.     private void addToCorrectThreadGroup(ThreadContext context) {
  654.         // JRUBY-3568, inherit threadgroup or use default
  655.         IRubyObject group = context.getThread().group();
  656.         if (!group.isNil()) {
  657.             ((RubyThreadGroup) group).addDirectly(this);
  658.         } else {
  659.             context.runtime.getDefaultThreadGroup().addDirectly(this);
  660.         }
  661.     }
  662.    
  663.     private IRubyObject getSymbolKey(IRubyObject originalKey) {
  664.         if (originalKey instanceof RubySymbol) {
  665.             return originalKey;
  666.         } else if (originalKey instanceof RubyString) {
  667.             return getRuntime().newSymbol(originalKey.asJavaString());
  668.         } else if (originalKey instanceof RubyFixnum) {
  669.             getRuntime().getWarnings().warn(ID.FIXNUMS_NOT_SYMBOLS, "Do not use Fixnums as Symbols");
  670.             throw getRuntime().newArgumentError(originalKey + " is not a symbol");
  671.         } else {
  672.             throw getRuntime().newTypeError(originalKey + " is not a symbol");
  673.         }
  674.     }
  675.    
  676.     private synchronized Map<IRubyObject, IRubyObject> getThreadLocals() {
  677.         if (threadLocalVariables == null) {
  678.             threadLocalVariables = new HashMap<IRubyObject, IRubyObject>();
  679.         }
  680.         return threadLocalVariables;
  681.     }

  682.     private void clearThreadLocals() {
  683.         threadLocalVariables = null;
  684.     }

  685.     @Override
  686.     public final Map<Object, IRubyObject> getContextVariables() {
  687.         return contextVariables;
  688.     }

  689.     public boolean isAlive(){
  690.         return threadImpl.isAlive() && status.get() != Status.ABORTING;
  691.     }

  692.     @JRubyMethod(name = "[]", required = 1)
  693.     public IRubyObject op_aref(IRubyObject key) {
  694.         IRubyObject value;
  695.         if ((value = getThreadLocals().get(getSymbolKey(key))) != null) {
  696.             return value;
  697.         }
  698.         return getRuntime().getNil();
  699.     }

  700.     @JRubyMethod(name = "[]=", required = 2)
  701.     public IRubyObject op_aset(IRubyObject key, IRubyObject value) {
  702.         key = getSymbolKey(key);
  703.        
  704.         getThreadLocals().put(key, value);
  705.         return value;
  706.     }

  707.     @JRubyMethod
  708.     public RubyBoolean abort_on_exception() {
  709.         return abortOnException ? getRuntime().getTrue() : getRuntime().getFalse();
  710.     }

  711.     @JRubyMethod(name = "abort_on_exception=", required = 1)
  712.     public IRubyObject abort_on_exception_set(IRubyObject val) {
  713.         abortOnException = val.isTrue();
  714.         return val;
  715.     }

  716.     @JRubyMethod(name = "alive?")
  717.     public RubyBoolean alive_p() {
  718.         return isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
  719.     }

  720.     @Deprecated
  721.     public IRubyObject join(IRubyObject[] args) {
  722.         return join(getRuntime().getCurrentContext(), args);
  723.     }

  724.     @JRubyMethod(optional = 1)
  725.     public IRubyObject join(ThreadContext context, IRubyObject[] args) {
  726.         Ruby runtime = context.runtime;
  727.         long timeoutMillis = Long.MAX_VALUE;

  728.         if (args.length > 0 && !args[0].isNil()) {
  729.             if (args.length > 1) {
  730.                 throw runtime.newArgumentError(args.length, 1);
  731.             }
  732.             // MRI behavior: value given in seconds; converted to Float; less
  733.             // than or equal to zero returns immediately; returns nil
  734.             timeoutMillis = (long)(1000.0D * args[0].convertToFloat().getValue());
  735.             if (timeoutMillis <= 0) {
  736.             // TODO: not sure that we should skip calling join() altogether.
  737.             // Thread.join() has some implications for Java Memory Model, etc.
  738.                 if (threadImpl.isAlive()) {
  739.                     return context.nil;
  740.                 } else {  
  741.                    return this;
  742.                 }
  743.             }
  744.         }

  745.         if (isCurrent()) {
  746.             throw runtime.newThreadError("thread " + identityString() + " tried to join itself");
  747.         }

  748.         RubyThread currentThread = context.getThread();

  749.         try {
  750.             if (runtime.getThreadService().getCritical()) {
  751.                 // If the target thread is sleeping or stopped, wake it
  752.                 synchronized (this) {
  753.                     notify();
  754.                 }
  755.                
  756.                 // interrupt the target thread in case it's blocking or waiting
  757.                 // WARNING: We no longer interrupt the target thread, since this usually means
  758.                 // interrupting IO and with NIO that means the channel is no longer usable.
  759.                 // We either need a new way to handle waking a target thread that's waiting
  760.                 // on IO, or we need to accept that we can't wake such threads and must wait
  761.                 // for them to complete their operation.
  762.                 //threadImpl.interrupt();
  763.             }

  764.             final long timeToWait = Math.min(timeoutMillis, 200);

  765.             // We need this loop in order to be able to "unblock" the
  766.             // join call without actually calling interrupt.
  767.             long start = System.currentTimeMillis();
  768.             while(true) {
  769.                 currentThread.pollThreadEvents();
  770.                 threadImpl.join(timeToWait);
  771.                 if (!threadImpl.isAlive()) {
  772.                     break;
  773.                 }
  774.                 if (System.currentTimeMillis() - start > timeoutMillis) {
  775.                     break;
  776.                 }
  777.             }
  778.         } catch (InterruptedException ie) {
  779.             ie.printStackTrace();
  780.             assert false : ie;
  781.         } catch (ExecutionException ie) {
  782.             ie.printStackTrace();
  783.             assert false : ie;
  784.         }

  785.         if (exitingException != null) {
  786.             // Set $! in the current thread before exiting
  787.             runtime.getGlobalVariables().set("$!", (IRubyObject)exitingException.getException());
  788.             throw exitingException;

  789.         }

  790.         // check events before leaving
  791.         currentThread.pollThreadEvents(context);

  792.         if (threadImpl.isAlive()) {
  793.             return context.nil;
  794.         } else {
  795.             return this;
  796.         }
  797.     }

  798.     @JRubyMethod
  799.     public IRubyObject value() {
  800.         join(new IRubyObject[0]);
  801.         synchronized (this) {
  802.             return finalResult;
  803.         }
  804.     }

  805.     @JRubyMethod
  806.     public IRubyObject group() {
  807.         if (threadGroup == null) {
  808.             return getRuntime().getNil();
  809.         }
  810.        
  811.         return threadGroup;
  812.     }
  813.    
  814.     void setThreadGroup(RubyThreadGroup rubyThreadGroup) {
  815.         threadGroup = rubyThreadGroup;
  816.     }
  817.    
  818.     @JRubyMethod
  819.     @Override
  820.     public synchronized IRubyObject inspect() {
  821.         // FIXME: There's some code duplication here with RubyObject#inspect
  822.         StringBuilder part = new StringBuilder();
  823.         String cname = getMetaClass().getRealClass().getName();
  824.         part.append("#<").append(cname).append(":");
  825.         part.append(identityString());
  826.         part.append(' ');
  827.         part.append(status.toString().toLowerCase());
  828.         part.append('>');
  829.         return getRuntime().newString(part.toString());
  830.     }

  831.     @JRubyMethod(name = "key?", required = 1)
  832.     public RubyBoolean key_p(IRubyObject key) {
  833.         key = getSymbolKey(key);
  834.        
  835.         return getRuntime().newBoolean(getThreadLocals().containsKey(key));
  836.     }

  837.     @JRubyMethod
  838.     public RubyArray keys() {
  839.         IRubyObject[] keys = new IRubyObject[getThreadLocals().size()];
  840.        
  841.         return RubyArray.newArrayNoCopy(getRuntime(), getThreadLocals().keySet().toArray(keys));
  842.     }
  843.    
  844.     @JRubyMethod(meta = true)
  845.     public static IRubyObject stop(ThreadContext context, IRubyObject receiver) {
  846.         RubyThread rubyThread = context.getThread();
  847.        
  848.         synchronized (rubyThread) {
  849.             rubyThread.pollThreadEvents(context);
  850.             try {
  851.                 // attempt to decriticalize all if we're the critical thread
  852.                 receiver.getRuntime().getThreadService().setCritical(false);

  853.                 rubyThread.status.set(Status.SLEEP);
  854.                 rubyThread.wait();
  855.             } catch (InterruptedException ie) {
  856.                 rubyThread.pollThreadEvents(context);
  857.                 rubyThread.status.set(Status.RUN);
  858.             }
  859.         }
  860.        
  861.         return receiver.getRuntime().getNil();
  862.     }
  863.    
  864.     @JRubyMethod(required = 1, meta = true)
  865.     public static IRubyObject kill(IRubyObject receiver, IRubyObject rubyThread, Block block) {
  866.         if (!(rubyThread instanceof RubyThread)) throw receiver.getRuntime().newTypeError(rubyThread, receiver.getRuntime().getThread());
  867.         return ((RubyThread)rubyThread).kill();
  868.     }
  869.    
  870.     @JRubyMethod(meta = true)
  871.     public static IRubyObject exit(IRubyObject receiver, Block block) {
  872.         RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();

  873.         return rubyThread.kill();
  874.     }

  875.     @JRubyMethod(name = "stop?")
  876.     public RubyBoolean stop_p() {
  877.         // not valid for "dead" state
  878.         return getRuntime().newBoolean(status.get() == Status.SLEEP || status.get() == Status.DEAD);
  879.     }
  880.    
  881.     @JRubyMethod
  882.     public synchronized RubyThread wakeup() {
  883.         if(!threadImpl.isAlive() && status.get() == Status.DEAD) {
  884.             throw getRuntime().newThreadError("killed thread");
  885.         }

  886.         status.set(Status.RUN);
  887.         interrupt();

  888.         return this;
  889.     }
  890.    
  891.     @JRubyMethod
  892.     public RubyFixnum priority() {
  893.         return RubyFixnum.newFixnum(getRuntime(), javaPriorityToRubyPriority(threadImpl.getPriority()));
  894.     }

  895.     @JRubyMethod(name = "priority=", required = 1)
  896.     public IRubyObject priority_set(IRubyObject priority) {
  897.         int iPriority = RubyNumeric.fix2int(priority);
  898.        
  899.         if (iPriority < RUBY_MIN_THREAD_PRIORITY) {
  900.             iPriority = RUBY_MIN_THREAD_PRIORITY;
  901.         } else if (iPriority > RUBY_MAX_THREAD_PRIORITY) {
  902.             iPriority = RUBY_MAX_THREAD_PRIORITY;
  903.         }

  904.         if (threadImpl.isAlive()) {
  905.             int jPriority = rubyPriorityToJavaPriority(iPriority);
  906.             if (jPriority < Thread.MIN_PRIORITY) {
  907.                 jPriority = Thread.MIN_PRIORITY;
  908.             } else if (jPriority > Thread.MAX_PRIORITY) {
  909.                 jPriority = Thread.MAX_PRIORITY;
  910.             }
  911.             threadImpl.setPriority(jPriority);
  912.         }

  913.         return RubyFixnum.newFixnum(getRuntime(), iPriority);
  914.     }
  915.      
  916.     /* helper methods to translate java thread priority (1-10) to
  917.      * Ruby thread priority (-3 to 3) using a quadratic polynoimal ant its
  918.      * inverse
  919.      * i.e., j = r^2/18 + 3*r/2 + 5
  920.      *       r = 3/2*sqrt(8*j + 41) - 27/2
  921.      */
  922.     private int javaPriorityToRubyPriority(int javaPriority) {
  923.         double d; // intermediate value
  924.         d = 1.5 * Math.sqrt(8.0*javaPriority + 41) - 13.5;
  925.         return Math.round((float) d);
  926.     }
  927.    
  928.     private int rubyPriorityToJavaPriority(int rubyPriority) {
  929.         double d;
  930.         if (rubyPriority < RUBY_MIN_THREAD_PRIORITY) {
  931.             rubyPriority = RUBY_MIN_THREAD_PRIORITY;
  932.         } else if (rubyPriority > RUBY_MAX_THREAD_PRIORITY) {
  933.             rubyPriority = RUBY_MAX_THREAD_PRIORITY;
  934.         }
  935.         d = Math.pow(rubyPriority, 2.0)/18.0 + 1.5 * rubyPriority + 5;
  936.         return Math.round((float) d);
  937.     }
  938.    
  939.     /**
  940.      * Simplified utility method for just raising an existing exception in this
  941.      * thread.
  942.      *
  943.      * @param exception the exception to raise
  944.      * @return this thread
  945.      */
  946.     public IRubyObject raise(IRubyObject exception) {
  947.         return raise(new IRubyObject[]{exception}, Block.NULL_BLOCK);
  948.     }

  949.     @JRubyMethod(optional = 3)
  950.     public IRubyObject raise(IRubyObject[] args, Block block) {
  951.         Ruby runtime = getRuntime();

  952.         RubyThread currentThread = runtime.getCurrentContext().getThread();

  953.         return genericRaise(runtime, args, currentThread);
  954.     }

  955.     public IRubyObject genericRaise(Ruby runtime, IRubyObject[] args, RubyThread currentThread) {
  956.         if (!isAlive()) return runtime.getNil();

  957.         if (currentThread == this) {
  958.             RubyKernel.raise(runtime.getCurrentContext(), runtime.getKernel(), args, Block.NULL_BLOCK);
  959.             // should not reach here
  960.         }

  961.         IRubyObject exception = prepareRaiseException(runtime, args, Block.NULL_BLOCK);

  962.         pendingInterruptEnqueue(exception);
  963.         interrupt();

  964.         return runtime.getNil();
  965.     }

  966.     private IRubyObject prepareRaiseException(Ruby runtime, IRubyObject[] args, Block block) {
  967.         if(args.length == 0) {
  968.             IRubyObject lastException = errorInfo;
  969.             if(lastException.isNil()) {
  970.                 return new RaiseException(runtime, runtime.getRuntimeError(), "", false).getException();
  971.             }
  972.             return lastException;
  973.         }

  974.         IRubyObject exception;
  975.         ThreadContext context = getRuntime().getCurrentContext();
  976.        
  977.         if(args.length == 1) {
  978.             if(args[0] instanceof RubyString) {
  979.                 return runtime.getRuntimeError().newInstance(context, args, block);
  980.             } else if (args[0] instanceof ConcreteJavaProxy) {
  981.                 return args[0];
  982.             } else if(!args[0].respondsTo("exception")) {
  983.                 return runtime.newTypeError("exception class/object expected").getException();
  984.             }
  985.             exception = args[0].callMethod(context, "exception");
  986.         } else {
  987.             if (!args[0].respondsTo("exception")) {
  988.                 return runtime.newTypeError("exception class/object expected").getException();
  989.             }
  990.            
  991.             exception = args[0].callMethod(context, "exception", args[1]);
  992.         }
  993.        
  994.         if (!runtime.getException().isInstance(exception)) {
  995.             return runtime.newTypeError("exception object expected").getException();
  996.         }
  997.        
  998.         if (args.length == 3) {
  999.             ((RubyException) exception).set_backtrace(args[2]);
  1000.         }
  1001.        
  1002.         return exception;
  1003.     }
  1004.    
  1005.     @JRubyMethod
  1006.     public synchronized IRubyObject run() {
  1007.         return wakeup();
  1008.     }

  1009.     /**
  1010.      * Sleep the current thread for millis, waking up on any thread interrupts.
  1011.      *
  1012.      * We can never be sure if a wait will finish because of a Java "spurious wakeup".  So if we
  1013.      * explicitly wakeup and we wait less than requested amount we will return false.  We will
  1014.      * return true if we sleep right amount or less than right amount via spurious wakeup.
  1015.      *
  1016.      * @param millis Number of milliseconds to sleep. Zero sleeps forever.
  1017.      */
  1018.     public boolean sleep(long millis) throws InterruptedException {
  1019.         assert this == getRuntime().getCurrentContext().getThread();
  1020.         Semaphore sem = new Semaphore(1);
  1021.         sem.acquire();
  1022.         if (executeTask(getContext(), new Object[]{sem, millis, 0}, SLEEP_TASK2) >= millis) {
  1023.             return true;
  1024.         } else {
  1025.             return false;
  1026.         }
  1027.     }

  1028.     public IRubyObject status() {
  1029.         return status(getRuntime());
  1030.     }
  1031.     @JRubyMethod
  1032.     public IRubyObject status(ThreadContext context) {
  1033.         return status(context.runtime);
  1034.     }
  1035.    
  1036.     private synchronized IRubyObject status(Ruby runtime) {
  1037.         if (threadImpl.isAlive()) {
  1038.             return runtime.getThreadStatus(status.get());
  1039.         } else if (exitingException != null) {
  1040.             return runtime.getNil();
  1041.         } else {
  1042.             return runtime.getFalse();
  1043.         }
  1044.     }

  1045.     @Deprecated
  1046.     public static interface BlockingTask {
  1047.         public void run() throws InterruptedException;
  1048.         public void wakeup();
  1049.     }

  1050.     public interface Unblocker<Data> {
  1051.         public void wakeup(RubyThread thread, Data self);
  1052.     }

  1053.     public interface Task<Data, Return> extends Unblocker<Data> {
  1054.         public Return run(ThreadContext context, Data data) throws InterruptedException;
  1055.         public void wakeup(RubyThread thread, Data data);
  1056.     }

  1057.     public static final class SleepTask implements BlockingTask {
  1058.         private final Object object;
  1059.         private final long millis;
  1060.         private final int nanos;

  1061.         public SleepTask(Object object, long millis, int nanos) {
  1062.             this.object = object;
  1063.             this.millis = millis;
  1064.             this.nanos = nanos;
  1065.         }

  1066.         @Override
  1067.         public void run() throws InterruptedException {
  1068.             synchronized (object) {
  1069.                 object.wait(millis, nanos);
  1070.             }
  1071.         }

  1072.         @Override
  1073.         public void wakeup() {
  1074.             synchronized (object) {
  1075.                 object.notify();
  1076.             }
  1077.         }
  1078.     }

  1079.     private static final class SleepTask2 implements Task<Object[], Long> {
  1080.         @Override
  1081.         public Long run(ThreadContext context, Object[] data) throws InterruptedException {
  1082.             long millis = (Long)data[1];
  1083.             int nanos = (Integer)data[2];

  1084.             long start = System.currentTimeMillis();
  1085.             // TODO: nano handling?
  1086.             if (millis == 0) {
  1087.                 ((Semaphore) data[0]).acquire();
  1088.             } else {
  1089.                 ((Semaphore) data[0]).tryAcquire(millis, TimeUnit.MILLISECONDS);
  1090.             }
  1091.             return System.currentTimeMillis() - start;
  1092.         }

  1093.         @Override
  1094.         public void wakeup(RubyThread thread, Object[] data) {
  1095.             ((Semaphore)data[0]).release();
  1096.         }
  1097.     }

  1098.     private static final Task<Object[], Long> SLEEP_TASK2 = new SleepTask2();

  1099.     @Deprecated
  1100.     public void executeBlockingTask(BlockingTask task) throws InterruptedException {
  1101.         try {
  1102.             this.currentBlockingTask = task;
  1103.             enterSleep();
  1104.             pollThreadEvents();
  1105.             task.run();
  1106.         } finally {
  1107.             exitSleep();
  1108.             currentBlockingTask = null;
  1109.             pollThreadEvents();
  1110.         }
  1111.     }

  1112.     public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
  1113.         try {
  1114.             this.unblockFunc = task;
  1115.             this.unblockArg = data;

  1116.             // check for interrupt before going into blocking call
  1117.             pollThreadEvents(context);

  1118.             enterSleep();

  1119.             return task.run(context, data);
  1120.         } finally {
  1121.             exitSleep();
  1122.             this.unblockFunc = null;
  1123.             this.unblockArg = null;
  1124.             pollThreadEvents(context);
  1125.         }
  1126.     }

  1127.     public void enterSleep() {
  1128.         status.set(Status.SLEEP);
  1129.     }

  1130.     public void exitSleep() {
  1131.         status.set(Status.RUN);
  1132.     }

  1133.     @JRubyMethod(name = {"kill", "exit", "terminate"})
  1134.     public IRubyObject kill() {
  1135.         Ruby runtime = getRuntime();
  1136.         // need to reexamine this
  1137.         RubyThread currentThread = runtime.getCurrentContext().getThread();

  1138.         if (currentThread == runtime.getThreadService().getMainThread()) {
  1139.             // rb_exit to hard exit process...not quite right for us
  1140.         }
  1141.         return genericKill(runtime, currentThread);
  1142.     }

  1143.     private IRubyObject genericKill(Ruby runtime, RubyThread currentThread) {
  1144.         // If the killee thread is the same as the killer thread, just die
  1145.         if (currentThread == this) throwThreadKill();

  1146.         pendingInterruptEnqueue(RubyFixnum.zero(runtime));
  1147.         interrupt();

  1148.         return this;
  1149.     }

  1150.     private void pendingInterruptEnqueue(IRubyObject v) {
  1151.         pendingInterruptQueue.add(v);
  1152.         pendingInterruptQueueChecked = false;
  1153.     }
  1154.    
  1155.     /**
  1156.      * Used for finalizers that need to kill a Ruby thread. Finalizers run in
  1157.      * a VM thread to which we do not want to attach a ThreadContext and within
  1158.      * which we do not want to check for Ruby thread events. This mechanism goes
  1159.      * directly to mail delivery, bypassing all Ruby Thread-related steps.
  1160.      */
  1161.     public void dieFromFinalizer() {
  1162.         genericKill(getRuntime(), null);
  1163.     }

  1164.     private static void debug(RubyThread thread, String message) {
  1165.         if (DEBUG) LOG.debug(Thread.currentThread() + "(" + thread.status + "): " + message);
  1166.     }
  1167.    
  1168.     @JRubyMethod
  1169.     public IRubyObject safe_level() {
  1170.         throw getRuntime().newNotImplementedError("Thread-specific SAFE levels are not supported");
  1171.     }

  1172.     public IRubyObject backtrace(ThreadContext context) {
  1173.         return backtrace20(context, NULL_ARRAY);
  1174.     }

  1175.     @JRubyMethod(name = "backtrace", optional = 2)
  1176.     public IRubyObject backtrace20(ThreadContext context, IRubyObject[] args) {
  1177.         ThreadContext myContext = getContext();

  1178.         // context can be nil if we have not started or GC has claimed our context
  1179.         if (myContext == null) return context.nil;

  1180.         Thread nativeThread = getNativeThread();

  1181.         // nativeThread can be null if the thread has terminated and GC has claimed it
  1182.         if (nativeThread == null) return context.nil;
  1183.        
  1184.         Ruby runtime = context.runtime;
  1185.         Integer[] ll = RubyKernel.levelAndLengthFromArgs(runtime, args, 0);
  1186.         Integer level = ll[0], length = ll[1];
  1187.        
  1188.         return myContext.createCallerBacktrace(level, length, getNativeThread().getStackTrace());
  1189.     }
  1190.    
  1191.     @JRubyMethod(optional = 2)
  1192.     public IRubyObject backtrace_locations(ThreadContext context, IRubyObject[] args) {
  1193.         ThreadContext myContext = getContext();

  1194.         if (myContext == null) return context.nil;
  1195.        
  1196.         Ruby runtime = context.runtime;
  1197.         Integer[] ll = RubyKernel.levelAndLengthFromArgs(runtime, args, 0);
  1198.         Integer level = ll[0], length = ll[1];
  1199.        
  1200.         return myContext.createCallerLocations(level, length, getNativeThread().getStackTrace());
  1201.     }

  1202.     public StackTraceElement[] javaBacktrace() {
  1203.         if (threadImpl instanceof NativeThread) {
  1204.             return ((NativeThread)threadImpl).getThread().getStackTrace();
  1205.         }

  1206.         // Future-based threads can't get a Java trace
  1207.         return new StackTraceElement[0];
  1208.     }

  1209.     private boolean isCurrent() {
  1210.         return threadImpl.isCurrent();
  1211.     }

  1212.     public void exceptionRaised(RaiseException exception) {
  1213.         assert isCurrent();

  1214.         RubyException rubyException = exception.getException();
  1215.         Ruby runtime = rubyException.getRuntime();
  1216.         if (runtime.getSystemExit().isInstance(rubyException)) {
  1217.             runtime.getThreadService().getMainThread().raise(new IRubyObject[] {rubyException}, Block.NULL_BLOCK);
  1218.         } else if (abortOnException(runtime)) {
  1219.             runtime.getThreadService().getMainThread().raise(new IRubyObject[] {rubyException}, Block.NULL_BLOCK);
  1220.             return;
  1221.         } else if (runtime.getDebug().isTrue()) {
  1222.             runtime.printError(exception.getException());
  1223.         }
  1224.         exitingException = exception;
  1225.     }

  1226.     /**
  1227.      * For handling all non-Ruby exceptions bubbling out of threads
  1228.      * @param exception
  1229.      */
  1230.     @SuppressWarnings("deprecation")
  1231.     public void exceptionRaised(Throwable exception) {
  1232.         if (exception instanceof RaiseException) {
  1233.             exceptionRaised((RaiseException)exception);
  1234.             return;
  1235.         }

  1236.         assert isCurrent();

  1237.         Ruby runtime = getRuntime();
  1238.         if (abortOnException(runtime) && exception instanceof Error) {
  1239.             // re-propagate on main thread
  1240.             runtime.getThreadService().getMainThread().getNativeThread().stop(exception);
  1241.         } else {
  1242.             // just rethrow on this thread, let system handlers report it
  1243.             Helpers.throwException(exception);
  1244.         }
  1245.     }

  1246.     private boolean abortOnException(Ruby runtime) {
  1247.         return (runtime.isGlobalAbortOnExceptionEnabled() || abortOnException);
  1248.     }

  1249.     public static RubyThread mainThread(IRubyObject receiver) {
  1250.         return receiver.getRuntime().getThreadService().getMainThread();
  1251.     }

  1252.     /**
  1253.      * Perform an interruptible select operation on the given channel and fptr,
  1254.      * waiting for the requested operations or the given timeout.
  1255.      *
  1256.      * @param io the RubyIO that contains the channel, for managing blocked threads list.
  1257.      * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
  1258.      * @return true if the IO's channel became ready for the requested operations, false if
  1259.      *         it was not selectable.
  1260.      */
  1261.     public boolean select(RubyIO io, int ops) {
  1262.         return select(io.getChannel(), io.getOpenFile(), ops);
  1263.     }

  1264.     /**
  1265.      * Perform an interruptible select operation on the given channel and fptr,
  1266.      * waiting for the requested operations or the given timeout.
  1267.      *
  1268.      * @param io the RubyIO that contains the channel, for managing blocked threads list.
  1269.      * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
  1270.      * @param timeout a timeout in ms to limit the select. Less than zero selects forever,
  1271.      *                zero selects and returns ready channels or nothing immediately, and
  1272.      *                greater than zero selects for at most that many ms.
  1273.      * @return true if the IO's channel became ready for the requested operations, false if
  1274.      *         it timed out or was not selectable.
  1275.      */
  1276.     public boolean select(RubyIO io, int ops, long timeout) {
  1277.         return select(io.getChannel(), io.getOpenFile(), ops, timeout);
  1278.     }

  1279.     /**
  1280.      * Perform an interruptible select operation on the given channel and fptr,
  1281.      * waiting for the requested operations.
  1282.      *
  1283.      * @param channel the channel to perform a select against. If this is not
  1284.      *                a selectable channel, then this method will just return true.
  1285.      * @param fptr the fptr that contains the channel, for managing blocked threads list.
  1286.      * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
  1287.      * @return true if the channel became ready for the requested operations, false if
  1288.      *         it was not selectable.
  1289.      */
  1290.     public boolean select(Channel channel, OpenFile fptr, int ops) {
  1291.         return select(channel, fptr, ops, -1);
  1292.     }

  1293.     /**
  1294.      * Perform an interruptible select operation on the given channel and fptr,
  1295.      * waiting for the requested operations.
  1296.      *
  1297.      * @param channel the channel to perform a select against. If this is not
  1298.      *                a selectable channel, then this method will just return true.
  1299.      * @param io the RubyIO that contains the channel, for managing blocked threads list.
  1300.      * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
  1301.      * @return true if the channel became ready for the requested operations, false if
  1302.      *         it was not selectable.
  1303.      */
  1304.     public boolean select(Channel channel, RubyIO io, int ops) {
  1305.         return select(channel, io == null ? null : io.getOpenFile(), ops, -1);
  1306.     }

  1307.     /**
  1308.      * Perform an interruptible select operation on the given channel and fptr,
  1309.      * waiting for the requested operations or the given timeout.
  1310.      *
  1311.      * @param channel the channel to perform a select against. If this is not
  1312.      *                a selectable channel, then this method will just return true.
  1313.      * @param io the RubyIO that contains the channel, for managing blocked threads list.
  1314.      * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
  1315.      * @param timeout a timeout in ms to limit the select. Less than zero selects forever,
  1316.      *                zero selects and returns ready channels or nothing immediately, and
  1317.      *                greater than zero selects for at most that many ms.
  1318.      * @return true if the channel became ready for the requested operations, false if
  1319.      *         it timed out or was not selectable.
  1320.      */
  1321.     public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
  1322.         return select(channel, io == null ? null : io.getOpenFile(), ops, timeout);
  1323.     }

  1324.     /**
  1325.      * Perform an interruptible select operation on the given channel and fptr,
  1326.      * waiting for the requested operations or the given timeout.
  1327.      *
  1328.      * @param channel the channel to perform a select against. If this is not
  1329.      *                a selectable channel, then this method will just return true.
  1330.      * @param fptr the fptr that contains the channel, for managing blocked threads list.
  1331.      * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
  1332.      * @param timeout a timeout in ms to limit the select. Less than zero selects forever,
  1333.      *                zero selects and returns ready channels or nothing immediately, and
  1334.      *                greater than zero selects for at most that many ms.
  1335.      * @return true if the channel became ready for the requested operations, false if
  1336.      *         it timed out or was not selectable.
  1337.      */
  1338.     public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
  1339.         // Use selectables but only if they're not associated with a file (which has odd select semantics)
  1340.         if (channel instanceof SelectableChannel && (fptr == null || !fptr.fd().isNativeFile)) {
  1341.             SelectableChannel selectable = (SelectableChannel)channel;
  1342.            
  1343.             synchronized (selectable.blockingLock()) {
  1344.                 boolean oldBlocking = selectable.isBlocking();

  1345.                 SelectionKey key = null;
  1346.                 try {
  1347.                     selectable.configureBlocking(false);

  1348.                     if (fptr != null) fptr.addBlockingThread(this);
  1349.                     currentSelector = getRuntime().getSelectorPool().get(selectable.provider());

  1350.                     key = selectable.register(currentSelector, ops);

  1351.                     beforeBlockingCall();
  1352.                     int result;
  1353.                     if (timeout < 0) {
  1354.                         result = currentSelector.select();
  1355.                     } else if (timeout == 0) {
  1356.                         result = currentSelector.selectNow();
  1357.                     } else {
  1358.                         result = currentSelector.select(timeout);
  1359.                     }

  1360.                     // check for thread events, in case we've been woken up to die
  1361.                     pollThreadEvents();

  1362.                     if (result == 1) {
  1363.                         Set<SelectionKey> keySet = currentSelector.selectedKeys();

  1364.                         if (keySet.iterator().next() == key) {
  1365.                             return true;
  1366.                         }
  1367.                     }

  1368.                     return false;
  1369.                 } catch (IOException ioe) {
  1370.                     throw getRuntime().newIOErrorFromException(ioe);
  1371.                 } finally {
  1372.                     // Note: I don't like ignoring these exceptions, but it's
  1373.                     // unclear how likely they are to happen or what damage we
  1374.                     // might do by ignoring them. Note that the pieces are separate
  1375.                     // so that we can ensure one failing does not affect the others
  1376.                     // running.

  1377.                     // clean up the key in the selector
  1378.                     try {
  1379.                         if (key != null) key.cancel();
  1380.                         if (currentSelector != null) currentSelector.selectNow();
  1381.                     } catch (Exception e) {
  1382.                         // ignore
  1383.                     }

  1384.                     // shut down and null out the selector
  1385.                     try {
  1386.                         if (currentSelector != null) {
  1387.                             getRuntime().getSelectorPool().put(currentSelector);
  1388.                         }
  1389.                     } catch (Exception e) {
  1390.                         // ignore
  1391.                     } finally {
  1392.                         currentSelector = null;
  1393.                     }

  1394.                     // remove this thread as a blocker against the given IO
  1395.                     if (fptr != null) fptr.removeBlockingThread(this);

  1396.                     // go back to previous blocking state on the selectable
  1397.                     try {
  1398.                         selectable.configureBlocking(oldBlocking);
  1399.                     } catch (Exception e) {
  1400.                         // ignore
  1401.                     }

  1402.                     // clear thread state from blocking call
  1403.                     afterBlockingCall();
  1404.                 }
  1405.             }
  1406.         } else {
  1407.             // can't select, just have to do a blocking call
  1408.             return true;
  1409.         }
  1410.     }

  1411.     @SuppressWarnings("deprecated")
  1412.     public synchronized void interrupt() {
  1413.         setInterrupt();

  1414.         Selector activeSelector = currentSelector;
  1415.         if (activeSelector != null) {
  1416.             activeSelector.wakeup();
  1417.         }
  1418.         BlockingIO.Condition iowait = blockingIO;
  1419.         if (iowait != null) {
  1420.             iowait.cancel();
  1421.         }

  1422.         Unblocker task = this.unblockFunc;
  1423.         if (task != null) {
  1424.             task.wakeup(this, unblockArg);
  1425.         }

  1426.         // deprecated
  1427.         {
  1428.             BlockingTask t = currentBlockingTask;
  1429.             if (t != null) {
  1430.                 t.wakeup();
  1431.             }
  1432.         }

  1433.         // If this thread is sleeping or stopped, wake it
  1434.         notify();
  1435.     }

  1436.     public void setInterrupt() {
  1437.         while (true) {
  1438.             int oldFlag = interruptFlag;
  1439.             if (INTERRUPT_FLAG_UPDATER.compareAndSet(this, oldFlag, oldFlag | PENDING_INTERRUPT_MASK)) {
  1440.                 return;
  1441.             }
  1442.         }
  1443.     }

  1444.     private volatile BlockingIO.Condition blockingIO = null;
  1445.     public boolean waitForIO(ThreadContext context, RubyIO io, int ops) {
  1446.         Channel channel = io.getChannel();

  1447.         if (!(channel instanceof SelectableChannel)) {
  1448.             return true;
  1449.         }
  1450.         try {
  1451.             io.addBlockingThread(this);
  1452.             blockingIO = BlockingIO.newCondition(channel, ops);
  1453.             boolean ready = blockingIO.await();
  1454.            
  1455.             // check for thread events, in case we've been woken up to die
  1456.             pollThreadEvents();
  1457.             return ready;
  1458.         } catch (IOException ioe) {
  1459.             throw context.runtime.newRuntimeError("Error with selector: " + ioe);
  1460.         } catch (InterruptedException ex) {
  1461.             // FIXME: not correct exception
  1462.             throw context.runtime.newRuntimeError("Interrupted");
  1463.         } finally {
  1464.             blockingIO = null;
  1465.             io.removeBlockingThread(this);
  1466.         }
  1467.     }
  1468.     public void beforeBlockingCall() {
  1469.         pollThreadEvents();
  1470.         enterSleep();
  1471.     }
  1472.    
  1473.     public void afterBlockingCall() {
  1474.         exitSleep();
  1475.         pollThreadEvents();
  1476.     }

  1477.     private void receivedAnException(ThreadContext context, IRubyObject exception) {
  1478.         RubyModule kernelModule = getRuntime().getKernel();
  1479.         debug(this, "before propagating exception");
  1480.         kernelModule.callMethod(context, "raise", exception);
  1481.     }

  1482.     public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedException {
  1483.         if ( timeout != null ) {
  1484.             long delay_ns = (long)(timeout.doubleValue() * 1000000000.0);
  1485.             long start_ns = System.nanoTime();
  1486.             if (delay_ns > 0) {
  1487.                 long delay_ms = delay_ns / 1000000;
  1488.                 int delay_ns_remainder = (int)( delay_ns % 1000000 );
  1489.                 executeBlockingTask(new SleepTask(o, delay_ms, delay_ns_remainder));
  1490.             }
  1491.             long end_ns = System.nanoTime();
  1492.             return ( end_ns - start_ns ) <= delay_ns;
  1493.         } else {
  1494.             executeBlockingTask(new SleepTask(o, 0, 0));
  1495.             return true;
  1496.         }
  1497.     }
  1498.    
  1499.     public RubyThreadGroup getThreadGroup() {
  1500.         return threadGroup;
  1501.     }

  1502.     @Override
  1503.     public boolean equals(Object obj) {
  1504.         if (obj == null) {
  1505.             return false;
  1506.         }
  1507.         if (getClass() != obj.getClass()) {
  1508.             return false;
  1509.         }
  1510.         final RubyThread other = (RubyThread)obj;
  1511.         if (this.threadImpl != other.threadImpl && (this.threadImpl == null || !this.threadImpl.equals(other.threadImpl))) {
  1512.             return false;
  1513.         }
  1514.         return true;
  1515.     }

  1516.     @Override
  1517.     public int hashCode() {
  1518.         int hash = 3;
  1519.         hash = 97 * hash + (this.threadImpl != null ? this.threadImpl.hashCode() : 0);
  1520.         return hash;
  1521.     }

  1522.     @Override
  1523.     public String toString() {
  1524.         return threadImpl.toString();
  1525.     }
  1526.    
  1527.     /**
  1528.      * Acquire the given lock, holding a reference to it for cleanup on thread
  1529.      * termination.
  1530.      *
  1531.      * @param lock the lock to acquire, released on thread termination
  1532.      */
  1533.     public void lock(Lock lock) {
  1534.         assert Thread.currentThread() == getNativeThread();
  1535.         lock.lock();
  1536.         heldLocks.add(lock);
  1537.     }
  1538.    
  1539.     /**
  1540.      * Acquire the given lock interruptibly, holding a reference to it for cleanup
  1541.      * on thread termination.
  1542.      *
  1543.      * @param lock the lock to acquire, released on thread termination
  1544.      * @throws InterruptedException if the lock acquisition is interrupted
  1545.      */
  1546.     public void lockInterruptibly(Lock lock) throws InterruptedException {
  1547.         assert Thread.currentThread() == getNativeThread();
  1548.         lock.lockInterruptibly();
  1549.         heldLocks.add(lock);
  1550.     }
  1551.    
  1552.     /**
  1553.      * Try to acquire the given lock, adding it to a list of held locks for cleanup
  1554.      * on thread termination if it is acquired. Return immediately if the lock
  1555.      * cannot be acquired.
  1556.      *
  1557.      * @param lock the lock to acquire, released on thread termination
  1558.      */
  1559.     public boolean tryLock(Lock lock) {
  1560.         assert Thread.currentThread() == getNativeThread();
  1561.         boolean locked = lock.tryLock();
  1562.         if (locked) {
  1563.             heldLocks.add(lock);
  1564.         }
  1565.         return locked;
  1566.     }
  1567.    
  1568.     /**
  1569.      * Release the given lock and remove it from the list of locks to be released
  1570.      * on thread termination.
  1571.      *
  1572.      * @param lock the lock to release and dereferences
  1573.      */
  1574.     public void unlock(Lock lock) {
  1575.         assert Thread.currentThread() == getNativeThread();
  1576.         lock.unlock();
  1577.         heldLocks.remove(lock);
  1578.     }
  1579.    
  1580.     /**
  1581.      * Release all locks held.
  1582.      */
  1583.     public void unlockAll() {
  1584.         assert Thread.currentThread() == getNativeThread();
  1585.         for (Lock lock : heldLocks) {
  1586.             lock.unlock();
  1587.         }
  1588.     }

  1589.     private String identityString() {
  1590.         return "0x" + Integer.toHexString(System.identityHashCode(this));
  1591.     }

  1592.     /**
  1593.      * This is intended to be used to raise exceptions in Ruby threads from non-
  1594.      * Ruby threads like Timeout's thread.
  1595.      *
  1596.      * @param args Same args as for Thread#raise
  1597.      */
  1598.     @Deprecated
  1599.     public void internalRaise(IRubyObject[] args) {
  1600.         Ruby runtime = getRuntime();

  1601.         genericRaise(runtime, args, runtime.getCurrentContext().getThread());
  1602.     }

  1603.     @Deprecated
  1604.     public void receiveMail(ThreadService.Event event) {
  1605.     }

  1606.     @Deprecated
  1607.     public void checkMail(ThreadContext context) {
  1608.     }

  1609.     @Deprecated
  1610.     private volatile BlockingTask currentBlockingTask;

  1611.     @Deprecated
  1612.     public boolean selectForAccept(RubyIO io) {
  1613.         return select(io, SelectionKey.OP_ACCEPT);
  1614.     }
  1615. }