RubyThread.java
- /***** BEGIN LICENSE BLOCK *****
- * Version: EPL 1.0/GPL 2.0/LGPL 2.1
- *
- * The contents of this file are subject to the Eclipse Public
- * License Version 1.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of
- * the License at http://www.eclipse.org/legal/epl-v10.html
- *
- * Software distributed under the License is distributed on an "AS
- * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * rights and limitations under the License.
- *
- * Copyright (C) 2002 Jason Voegele <jason@jvoegele.com>
- * Copyright (C) 2002-2004 Anders Bengtsson <ndrsbngtssn@yahoo.se>
- * Copyright (C) 2002-2004 Jan Arne Petersen <jpetersen@uni-bonn.de>
- * Copyright (C) 2004 Thomas E Enebo <enebo@acm.org>
- * Copyright (C) 2004-2005 Charles O Nutter <headius@headius.com>
- * Copyright (C) 2004 Stefan Matthias Aust <sma@3plus4.de>
- *
- * Alternatively, the contents of this file may be used under the terms of
- * either of the GNU General Public License Version 2 or later (the "GPL"),
- * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
- * in which case the provisions of the GPL or the LGPL are applicable instead
- * of those above. If you wish to allow use of your version of this file only
- * under the terms of either the GPL or the LGPL, and not to allow others to
- * use your version of this file under the terms of the EPL, indicate your
- * decision by deleting the provisions above and replace them with the notice
- * and other provisions required by the GPL or the LGPL. If you do not delete
- * the provisions above, a recipient may use your version of this file under
- * the terms of any one of the EPL, the GPL or the LGPL.
- ***** END LICENSE BLOCK *****/
- package org.jruby;
- import java.io.IOException;
- import java.lang.ref.WeakReference;
- import java.nio.channels.Channel;
- import java.nio.channels.SelectableChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.util.Iterator;
- import java.util.Queue;
- import java.util.Vector;
- import java.util.WeakHashMap;
- import java.util.HashMap;
- import java.util.List;
- import java.util.ArrayList;
- import java.util.Map;
- import java.util.Set;
- import org.jruby.common.IRubyWarnings.ID;
- import org.jruby.exceptions.RaiseException;
- import org.jruby.exceptions.ThreadKill;
- import org.jruby.internal.runtime.NativeThread;
- import org.jruby.internal.runtime.RubyRunnable;
- import org.jruby.internal.runtime.ThreadLike;
- import org.jruby.internal.runtime.ThreadService;
- import org.jruby.runtime.Block;
- import org.jruby.runtime.ObjectAllocator;
- import org.jruby.runtime.ThreadContext;
- import org.jruby.runtime.ExecutionContext;
- import org.jruby.runtime.builtin.IRubyObject;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Semaphore;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
- import java.util.concurrent.atomic.AtomicReference;
- import java.util.concurrent.locks.Lock;
- import org.jruby.anno.JRubyMethod;
- import org.jruby.anno.JRubyClass;
- import org.jruby.runtime.ClassIndex;
- import org.jruby.runtime.ObjectMarshal;
- import static org.jruby.runtime.Visibility.*;
- import org.jruby.util.TypeConverter;
- import org.jruby.util.io.BlockingIO;
- import org.jruby.util.io.OpenFile;
- import org.jruby.util.io.SelectorFactory;
- import org.jruby.util.log.Logger;
- import org.jruby.util.log.LoggerFactory;
- import org.jruby.java.proxies.ConcreteJavaProxy;
- import org.jruby.runtime.Helpers;
- import org.jruby.runtime.backtrace.RubyStackTraceElement;
- import org.jruby.util.ByteList;
- /**
- * Implementation of Ruby's <code>Thread</code> class. Each Ruby thread is
- * mapped to an underlying Java Virtual Machine thread.
- * <p>
- * Thread encapsulates the behavior of a thread of execution, including the main
- * thread of the Ruby script. In the descriptions that follow, the parameter
- * <code>aSymbol</code> refers to a symbol, which is either a quoted string or a
- * <code>Symbol</code> (such as <code>:name</code>).
- *
- * Note: For CVS history, see ThreadClass.java.
- */
- @JRubyClass(name="Thread")
- public class RubyThread extends RubyObject implements ExecutionContext {
- private static final Logger LOG = LoggerFactory.getLogger("RubyThread");
- /** The thread-like think that is actually executing */
- private ThreadLike threadImpl;
- /** Normal thread-local variables */
- private transient Map<IRubyObject, IRubyObject> threadLocalVariables;
- /** Context-local variables, internal-ish thread locals */
- private final Map<Object, IRubyObject> contextVariables = new WeakHashMap<Object, IRubyObject>();
- /** Whether this thread should try to abort the program on exception */
- private boolean abortOnException;
- /** The final value resulting from the thread's execution */
- private IRubyObject finalResult;
- /**
- * The exception currently being raised out of the thread. We reference
- * it here to continue propagating it while handling thread shutdown
- * logic and abort_on_exception.
- */
- private RaiseException exitingException;
- /** The ThreadGroup to which this thread belongs */
- private RubyThreadGroup threadGroup;
- /** Per-thread "current exception" */
- private IRubyObject errorInfo;
- /** Weak reference to the ThreadContext for this thread. */
- private volatile WeakReference<ThreadContext> contextRef;
- /** Whether to scan for cross-thread events */
- private volatile boolean handleInterrupt = true;
- /** Stack of interrupt masks active for this thread */
- private final List<RubyHash> interruptMaskStack = new ArrayList<RubyHash>();
- private static final boolean DEBUG = false;
- private int RUBY_MIN_THREAD_PRIORITY = -3;
- private int RUBY_MAX_THREAD_PRIORITY = 3;
- /** Thread statuses */
- public static enum Status {
- RUN, SLEEP, ABORTING, DEAD;
-
- public final ByteList bytes;
-
- Status() {
- bytes = new ByteList(toString().toLowerCase().getBytes(RubyEncoding.UTF8));
- }
- }
- /** Current status in an atomic reference */
- private final AtomicReference<Status> status = new AtomicReference<Status>(Status.RUN);
- /** Mail slot for cross-thread events */
- private final Queue<IRubyObject> pendingInterruptQueue = new ConcurrentLinkedQueue();
- /** A function to use to unblock this thread, if possible */
- private Unblocker unblockFunc;
- /** Argument to pass to the unblocker */
- private Object unblockArg;
- /** The list of locks this thread currently holds, so they can be released on exit */
- private final List<Lock> heldLocks = new Vector<Lock>();
- /** Whether or not this thread has been disposed of */
- private volatile boolean disposed = false;
- /** Interrupt flags */
- private volatile int interruptFlag = 0;
- /** Interrupt mask to use for disabling certain types */
- private volatile int interruptMask;
- /** Short circuit to avoid-re-scanning for interrupts */
- private volatile boolean pendingInterruptQueueChecked = false;
- private volatile Selector currentSelector;
- private static final AtomicIntegerFieldUpdater INTERRUPT_FLAG_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(RubyThread.class, "interruptFlag");
- private static final int TIMER_INTERRUPT_MASK = 0x01;
- private static final int PENDING_INTERRUPT_MASK = 0x02;
- private static final int POSTPONED_JOB_INTERRUPT_MASK = 0x04;
- private static final int TRAP_INTERRUPT_MASK = 0x08;
- private static final int INTERRUPT_NONE = 0;
- private static final int INTERRUPT_IMMEDIATE = 1;
- private static final int INTERRUPT_ON_BLOCKING = 2;
- private static final int INTERRUPT_NEVER = 3;
- protected RubyThread(Ruby runtime, RubyClass type) {
- super(runtime, type);
- finalResult = runtime.getNil();
- errorInfo = runtime.getNil();
- }
-
- public RubyThread(Ruby runtime, RubyClass klass, Runnable runnable) {
- this(runtime, klass);
-
- startWith(runnable);
- }
- private void executeInterrupts(ThreadContext context, boolean blockingTiming) {
- Ruby runtime = context.runtime;
- int interrupt;
- boolean postponedJobInterrupt = false;
- while ((interrupt = getInterrupts()) != 0) {
- boolean timerInterrupt = (interrupt & TIMER_INTERRUPT_MASK) == TIMER_INTERRUPT_MASK;
- boolean pendingInterrupt = (interrupt & PENDING_INTERRUPT_MASK) == PENDING_INTERRUPT_MASK;
- // if (postponedJobInterrupt) {
- // postponedJobFlush(context);
- // }
- // Missing: signal handling...but perhaps we don't need it on JVM
- if (pendingInterrupt && pendingInterruptActive()) {
- IRubyObject err = pendingInterruptDeque(context, blockingTiming ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
- if (err == UNDEF) {
- // no error
- } else if (err == RubyFixnum.zero(runtime) ||
- err == RubyFixnum.one(runtime) ||
- err == RubyFixnum.two(runtime)) {
- toKill();
- } else {
- afterBlockingCall();
- if (status.get() == Status.SLEEP) {
- exitSleep();
- }
- RubyKernel.raise(context, runtime.getKernel(), new IRubyObject[]{err}, Block.NULL_BLOCK);
- }
- }
- // Missing: timer interrupt...needed?
- }
- }
- private void postponedJobFlush(ThreadContext context) {
- // unsure if this function has any relevance in JRuby
- // int savedPostponedJobInterruptMask = interruptMask & POSTPONED_JOB_INTERRUPT_MASK;
- //
- // errorInfo = context.nil;
- // interruptMask |= POSTPONED_JOB_INTERRUPT_MASK;
- }
- private boolean pendingInterruptActive() {
- if (pendingInterruptQueueChecked) return false;
- if (pendingInterruptQueue.isEmpty()) return false;
- return true;
- }
- private void toKill() {
- pendingInterruptClear();
- throwThreadKill();
- }
- private void pendingInterruptClear() {
- pendingInterruptQueue.clear();
- }
- private int getInterrupts() {
- int interrupt;
- while (true) {
- interrupt = interruptFlag;
- if (INTERRUPT_FLAG_UPDATER.compareAndSet(this, interrupt, interrupt & interruptMask)) {
- break;
- }
- }
- return interrupt & ~interruptMask;
- }
- private IRubyObject pendingInterruptDeque(ThreadContext context, int timing) {
- for (Iterator<IRubyObject> iterator = pendingInterruptQueue.iterator(); iterator.hasNext();) {
- IRubyObject err = iterator.next();
- int maskTiming = pendingInterruptCheckMask(context, err);
- switch (maskTiming) {
- case INTERRUPT_ON_BLOCKING:
- if (timing != INTERRUPT_ON_BLOCKING) break;
- case INTERRUPT_NONE:
- case INTERRUPT_IMMEDIATE:
- iterator.remove();
- return err;
- case INTERRUPT_NEVER:
- break;
- }
- }
- pendingInterruptQueueChecked = true;
- return UNDEF;
- }
- private int pendingInterruptCheckMask(ThreadContext context, IRubyObject err) {
- List<IRubyObject> ancestors = err.getMetaClass().getAncestorList();
- int ancestorsLen = ancestors.size();
- List<RubyHash> maskStack = interruptMaskStack;
- int maskStackLen = maskStack.size();
- for (int i = 0; i < maskStackLen; i++) {
- RubyHash mask = maskStack.get(maskStackLen - (i + 1));
- for (int j = 0; j < ancestorsLen; j++) {
- IRubyObject klass = ancestors.get(j);
- IRubyObject sym;
- if (!(sym = mask.op_aref(context, klass)).isNil()) {
- String symStr = sym.toString();
- switch (symStr) {
- case "immediate": return INTERRUPT_IMMEDIATE;
- case "on_blocking": return INTERRUPT_ON_BLOCKING;
- case "never": return INTERRUPT_NEVER;
- default:
- throw context.runtime.newThreadError("unknown mask signature");
- }
- }
- }
- }
- return INTERRUPT_NONE;
- }
- public IRubyObject getErrorInfo() {
- return errorInfo;
- }
- public IRubyObject setErrorInfo(IRubyObject errorInfo) {
- this.errorInfo = errorInfo;
- return errorInfo;
- }
- public void setContext(ThreadContext context) {
- this.contextRef = new WeakReference<ThreadContext>(context);
- }
- public ThreadContext getContext() {
- return contextRef == null ? null : contextRef.get();
- }
- public Thread getNativeThread() {
- return threadImpl.nativeThread();
- }
- /**
- * Perform pre-execution tasks once the native thread is running, but we
- * have not yet called the Ruby code for the thread.
- */
- public void beforeStart() {
- }
- /**
- * Dispose of the current thread by tidying up connections to other stuff
- */
- public void dispose() {
- if (disposed) return;
-
- synchronized (this) {
- if (disposed) return;
-
- disposed = true;
- // remove from parent thread group
- threadGroup.remove(this);
- // unlock all locked locks
- unlockAll();
- // mark thread as DEAD
- beDead();
- }
- // unregister from runtime's ThreadService
- getRuntime().getThreadService().unregisterThread(this);
- }
-
- public static RubyClass createThreadClass(Ruby runtime) {
- // FIXME: In order for Thread to play well with the standard 'new' behavior,
- // it must provide an allocator that can create empty object instances which
- // initialize then fills with appropriate data.
- RubyClass threadClass = runtime.defineClass("Thread", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
- runtime.setThread(threadClass);
- threadClass.setClassIndex(ClassIndex.THREAD);
- threadClass.setReifiedClass(RubyThread.class);
- threadClass.defineAnnotatedMethods(RubyThread.class);
- RubyThread rubyThread = new RubyThread(runtime, threadClass);
- // TODO: need to isolate the "current" thread from class creation
- rubyThread.threadImpl = new NativeThread(rubyThread, Thread.currentThread());
- runtime.getThreadService().setMainThread(Thread.currentThread(), rubyThread);
-
- // set to default thread group
- runtime.getDefaultThreadGroup().addDirectly(rubyThread);
-
- threadClass.setMarshal(ObjectMarshal.NOT_MARSHALABLE_MARSHAL);
-
- // set up Thread::Backtrace::Location class
- RubyClass backtrace = threadClass.defineClassUnder("Backtrace", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
- RubyClass location = backtrace.defineClassUnder("Location", runtime.getObject(), ObjectAllocator.NOT_ALLOCATABLE_ALLOCATOR);
- location.defineAnnotatedMethods(Location.class);
- runtime.setLocation(location);
-
- return threadClass;
- }
-
- public static class Location extends RubyObject {
- public Location(Ruby runtime, RubyClass klass, RubyStackTraceElement element) {
- super(runtime, klass);
- this.element = element;
- }
-
- @JRubyMethod
- public IRubyObject absolute_path(ThreadContext context) {
- return context.runtime.newString(element.getFileName());
- }
-
- @JRubyMethod
- public IRubyObject base_label(ThreadContext context) {
- return context.runtime.newString(element.getMethodName());
- }
-
- @JRubyMethod
- public IRubyObject inspect(ThreadContext context) {
- return to_s(context).inspect();
- }
-
- @JRubyMethod
- public IRubyObject label(ThreadContext context) {
- return context.runtime.newString(element.getMethodName());
- }
-
- @JRubyMethod
- public IRubyObject lineno(ThreadContext context) {
- return context.runtime.newFixnum(element.getLineNumber());
- }
-
- @JRubyMethod
- public IRubyObject path(ThreadContext context) {
- return context.runtime.newString(element.getFileName());
- }
-
- @JRubyMethod
- public IRubyObject to_s(ThreadContext context) {
- return context.runtime.newString(element.mriStyleString());
- }
- public static IRubyObject newLocationArray(Ruby runtime, RubyStackTraceElement[] elements) {
- RubyArray ary = runtime.newArray(elements.length);
- for (RubyStackTraceElement element : elements) {
- ary.append(new RubyThread.Location(runtime, runtime.getLocation(), element));
- }
- return ary;
- }
-
- private final RubyStackTraceElement element;
- }
- /**
- * <code>Thread.new</code>
- * <p>
- * Thread.new( <i>[ arg ]*</i> ) {| args | block } -> aThread
- * <p>
- * Creates a new thread to execute the instructions given in block, and
- * begins running it. Any arguments passed to Thread.new are passed into the
- * block.
- * <pre>
- * x = Thread.new { sleep .1; print "x"; print "y"; print "z" }
- * a = Thread.new { print "a"; print "b"; sleep .2; print "c" }
- * x.join # Let the threads finish before
- * a.join # main thread exits...
- * </pre>
- * <i>produces:</i> abxyzc
- */
- @JRubyMethod(name = {"new", "fork"}, rest = true, meta = true)
- public static IRubyObject newInstance(IRubyObject recv, IRubyObject[] args, Block block) {
- return startThread(recv, args, true, block);
- }
- /**
- * Basically the same as Thread.new . However, if class Thread is
- * subclassed, then calling start in that subclass will not invoke the
- * subclass's initialize method.
- */
- public static RubyThread start(IRubyObject recv, IRubyObject[] args, Block block) {
- return start19(recv, args, block);
- }
-
- @JRubyMethod(rest = true, name = "start", meta = true)
- public static RubyThread start19(IRubyObject recv, IRubyObject[] args, Block block) {
- Ruby runtime = recv.getRuntime();
- // The error message may appear incongruous here, due to the difference
- // between JRuby's Thread model and MRI's.
- // We mimic MRI's message in the name of compatibility.
- if (! block.isGiven()) throw runtime.newArgumentError("tried to create Proc object without a block");
- return startThread(recv, args, false, block);
- }
-
- public static RubyThread adopt(IRubyObject recv, Thread t) {
- return adoptThread(recv, t, Block.NULL_BLOCK);
- }
- private static RubyThread adoptThread(final IRubyObject recv, Thread t, Block block) {
- final Ruby runtime = recv.getRuntime();
- final RubyThread rubyThread = new RubyThread(runtime, (RubyClass) recv);
-
- rubyThread.threadImpl = new NativeThread(rubyThread, t);
- ThreadContext context = runtime.getThreadService().registerNewThread(rubyThread);
- runtime.getThreadService().associateThread(t, rubyThread);
-
- context.preAdoptThread();
-
- // set to default thread group
- runtime.getDefaultThreadGroup().addDirectly(rubyThread);
-
- return rubyThread;
- }
-
- @JRubyMethod(rest = true, visibility = PRIVATE)
- public IRubyObject initialize(ThreadContext context, IRubyObject[] args, Block block) {
- Ruby runtime = getRuntime();
- if (!block.isGiven()) throw runtime.newThreadError("must be called with a block");
- if (threadImpl != null) throw runtime.newThreadError("already initialized thread");
- RubyRunnable runnable = new RubyRunnable(this, args, block);
-
- return startWith(runnable);
- }
- private IRubyObject startWith(Runnable runnable) throws RaiseException, OutOfMemoryError {
- Ruby runtime = getRuntime();
- ThreadContext context = runtime.getCurrentContext();
-
- try {
- Thread thread = new Thread(runnable);
- thread.setDaemon(true);
- thread.setName("Ruby-" + runtime.getRuntimeNumber() + "-" + thread.getName() + ": " + context.getFile() + ":" + (context.getLine() + 1));
- threadImpl = new NativeThread(this, thread);
- addToCorrectThreadGroup(context);
- // JRUBY-2380, associate thread early so it shows up in Thread.list right away, in case it doesn't run immediately
- runtime.getThreadService().associateThread(thread, this);
- threadImpl.start();
- // We yield here to hopefully permit the target thread to schedule
- // MRI immediately schedules it, so this is close but not exact
- Thread.yield();
-
- return this;
- } catch (OutOfMemoryError oome) {
- if (oome.getMessage().equals("unable to create new native thread")) {
- throw runtime.newThreadError(oome.getMessage());
- }
- throw oome;
- } catch (SecurityException ex) {
- throw runtime.newThreadError(ex.getMessage());
- }
- }
-
- private static RubyThread startThread(final IRubyObject recv, final IRubyObject[] args, boolean callInit, Block block) {
- RubyThread rubyThread = new RubyThread(recv.getRuntime(), (RubyClass) recv);
-
- if (callInit) {
- rubyThread.callInit(args, block);
- } else {
- // for Thread::start, which does not call the subclass's initialize
- rubyThread.initialize(recv.getRuntime().getCurrentContext(), args, block);
- }
-
- return rubyThread;
- }
-
- public synchronized void cleanTerminate(IRubyObject result) {
- finalResult = result;
- }
- public synchronized void beDead() {
- status.set(Status.DEAD);
- }
- public void pollThreadEvents() {
- pollThreadEvents(getRuntime().getCurrentContext());
- }
- // CHECK_INTS
- public void pollThreadEvents(ThreadContext context) {
- if (anyInterrupted()) {
- executeInterrupts(context, true);
- }
- }
- // RUBY_VM_INTERRUPTED_ANY
- private boolean anyInterrupted() {
- return Thread.interrupted() || (interruptFlag & ~interruptMask) != 0;
- }
-
- private static void throwThreadKill() {
- throw new ThreadKill();
- }
- @JRubyMethod(meta = true)
- public static IRubyObject handle_interrupt(ThreadContext context, IRubyObject self, IRubyObject _mask, Block block) {
- Ruby runtime = context.runtime;
- if (!block.isGiven()) {
- throw runtime.newArgumentError("block is needed");
- }
- RubyHash mask = (RubyHash)TypeConverter.convertToTypeWithCheck(_mask, runtime.getHash(), "to_hash");
- mask.visitAll(new RubyHash.Visitor() {
- @Override
- public void visit(IRubyObject key, IRubyObject value) {
- if (value instanceof RubySymbol) {
- RubySymbol sym = (RubySymbol)value;
- String symString = sym.toString();
- if (!symString.equals("immediate") && !symString.equals("on_blocking") && !symString.equals("never")) {
- throw key.getRuntime().newArgumentError("unknown mask signature");
- }
- }
- }
- });
- RubyThread th = context.getThread();
- th.interruptMaskStack.add(mask);
- if (th.pendingInterruptQueue.isEmpty()) {
- th.pendingInterruptQueueChecked = false;
- th.setInterrupt();
- }
- try {
- return block.call(context);
- } finally {
- th.interruptMaskStack.remove(th.interruptMaskStack.size() - 1);
- th.setInterrupt();
- th.pollThreadEvents(context);
- }
- }
- @JRubyMethod(name = "pending_interrupt?", meta = true, optional = 1)
- public static IRubyObject pending_interrupt_p(ThreadContext context, IRubyObject self, IRubyObject[] args) {
- return context.getThread().pending_interrupt_p(context, args);
- }
- @JRubyMethod(name = "pending_interrupt?", optional = 1)
- public IRubyObject pending_interrupt_p(ThreadContext context, IRubyObject[] args) {
- if (pendingInterruptQueue.isEmpty()) {
- return context.runtime.getFalse();
- } else {
- if (args.length == 1) {
- IRubyObject err = args[0];
- if (!(err instanceof RubyModule)) {
- throw context.runtime.newTypeError("class or module required for rescue clause");
- }
- if (pendingInterruptInclude(err)) {
- return context.runtime.getTrue();
- } else {
- return context.runtime.getFalse();
- }
- } else {
- return context.runtime.getTrue();
- }
- }
- }
- private boolean pendingInterruptInclude(IRubyObject err) {
- Iterator<IRubyObject> iterator = pendingInterruptQueue.iterator();
- while (iterator.hasNext()) {
- IRubyObject e = iterator.next();
- if (((RubyModule)e).op_le(err).isTrue()) {
- return true;
- }
- }
- return false;
- }
- /**
- * Returns the status of the global ``abort on exception'' condition. The
- * default is false. When set to true, will cause all threads to abort (the
- * process will exit(0)) if an exception is raised in any thread. See also
- * Thread.abort_on_exception= .
- */
- @JRubyMethod(name = "abort_on_exception", meta = true)
- public static RubyBoolean abort_on_exception_x(IRubyObject recv) {
- Ruby runtime = recv.getRuntime();
- return runtime.isGlobalAbortOnExceptionEnabled() ? runtime.getTrue() : runtime.getFalse();
- }
- @JRubyMethod(name = "abort_on_exception=", required = 1, meta = true)
- public static IRubyObject abort_on_exception_set_x(IRubyObject recv, IRubyObject value) {
- recv.getRuntime().setGlobalAbortOnExceptionEnabled(value.isTrue());
- return value;
- }
- @JRubyMethod(meta = true)
- public static RubyThread current(IRubyObject recv) {
- return recv.getRuntime().getCurrentContext().getThread();
- }
- @JRubyMethod(meta = true)
- public static RubyThread main(IRubyObject recv) {
- return recv.getRuntime().getThreadService().getMainThread();
- }
- @JRubyMethod(meta = true)
- public static IRubyObject pass(IRubyObject recv) {
- Ruby runtime = recv.getRuntime();
- ThreadService ts = runtime.getThreadService();
- boolean critical = ts.getCritical();
-
- ts.setCritical(false);
- try {
- Thread.yield();
- } finally {
- ts.setCritical(critical);
- }
-
- return recv.getRuntime().getNil();
- }
- @JRubyMethod(meta = true)
- public static IRubyObject exclusive(ThreadContext context, IRubyObject recv, Block block) {
- Ruby runtime = context.runtime;
- ThreadService ts = runtime.getThreadService();
- boolean critical = ts.getCritical();
- ts.setCritical(true);
- try {
- return block.yieldSpecific(context);
- } finally {
- ts.setCritical(critical);
- }
- }
- @JRubyMethod(meta = true)
- public static RubyArray list(IRubyObject recv) {
- RubyThread[] activeThreads = recv.getRuntime().getThreadService().getActiveRubyThreads();
-
- return recv.getRuntime().newArrayNoCopy(activeThreads);
- }
- private void addToCorrectThreadGroup(ThreadContext context) {
- // JRUBY-3568, inherit threadgroup or use default
- IRubyObject group = context.getThread().group();
- if (!group.isNil()) {
- ((RubyThreadGroup) group).addDirectly(this);
- } else {
- context.runtime.getDefaultThreadGroup().addDirectly(this);
- }
- }
-
- private IRubyObject getSymbolKey(IRubyObject originalKey) {
- if (originalKey instanceof RubySymbol) {
- return originalKey;
- } else if (originalKey instanceof RubyString) {
- return getRuntime().newSymbol(originalKey.asJavaString());
- } else if (originalKey instanceof RubyFixnum) {
- getRuntime().getWarnings().warn(ID.FIXNUMS_NOT_SYMBOLS, "Do not use Fixnums as Symbols");
- throw getRuntime().newArgumentError(originalKey + " is not a symbol");
- } else {
- throw getRuntime().newTypeError(originalKey + " is not a symbol");
- }
- }
-
- private synchronized Map<IRubyObject, IRubyObject> getThreadLocals() {
- if (threadLocalVariables == null) {
- threadLocalVariables = new HashMap<IRubyObject, IRubyObject>();
- }
- return threadLocalVariables;
- }
- private void clearThreadLocals() {
- threadLocalVariables = null;
- }
- @Override
- public final Map<Object, IRubyObject> getContextVariables() {
- return contextVariables;
- }
- public boolean isAlive(){
- return threadImpl.isAlive() && status.get() != Status.ABORTING;
- }
- @JRubyMethod(name = "[]", required = 1)
- public IRubyObject op_aref(IRubyObject key) {
- IRubyObject value;
- if ((value = getThreadLocals().get(getSymbolKey(key))) != null) {
- return value;
- }
- return getRuntime().getNil();
- }
- @JRubyMethod(name = "[]=", required = 2)
- public IRubyObject op_aset(IRubyObject key, IRubyObject value) {
- key = getSymbolKey(key);
-
- getThreadLocals().put(key, value);
- return value;
- }
- @JRubyMethod
- public RubyBoolean abort_on_exception() {
- return abortOnException ? getRuntime().getTrue() : getRuntime().getFalse();
- }
- @JRubyMethod(name = "abort_on_exception=", required = 1)
- public IRubyObject abort_on_exception_set(IRubyObject val) {
- abortOnException = val.isTrue();
- return val;
- }
- @JRubyMethod(name = "alive?")
- public RubyBoolean alive_p() {
- return isAlive() ? getRuntime().getTrue() : getRuntime().getFalse();
- }
- @Deprecated
- public IRubyObject join(IRubyObject[] args) {
- return join(getRuntime().getCurrentContext(), args);
- }
- @JRubyMethod(optional = 1)
- public IRubyObject join(ThreadContext context, IRubyObject[] args) {
- Ruby runtime = context.runtime;
- long timeoutMillis = Long.MAX_VALUE;
- if (args.length > 0 && !args[0].isNil()) {
- if (args.length > 1) {
- throw runtime.newArgumentError(args.length, 1);
- }
- // MRI behavior: value given in seconds; converted to Float; less
- // than or equal to zero returns immediately; returns nil
- timeoutMillis = (long)(1000.0D * args[0].convertToFloat().getValue());
- if (timeoutMillis <= 0) {
- // TODO: not sure that we should skip calling join() altogether.
- // Thread.join() has some implications for Java Memory Model, etc.
- if (threadImpl.isAlive()) {
- return context.nil;
- } else {
- return this;
- }
- }
- }
- if (isCurrent()) {
- throw runtime.newThreadError("thread " + identityString() + " tried to join itself");
- }
- RubyThread currentThread = context.getThread();
- try {
- if (runtime.getThreadService().getCritical()) {
- // If the target thread is sleeping or stopped, wake it
- synchronized (this) {
- notify();
- }
-
- // interrupt the target thread in case it's blocking or waiting
- // WARNING: We no longer interrupt the target thread, since this usually means
- // interrupting IO and with NIO that means the channel is no longer usable.
- // We either need a new way to handle waking a target thread that's waiting
- // on IO, or we need to accept that we can't wake such threads and must wait
- // for them to complete their operation.
- //threadImpl.interrupt();
- }
- final long timeToWait = Math.min(timeoutMillis, 200);
- // We need this loop in order to be able to "unblock" the
- // join call without actually calling interrupt.
- long start = System.currentTimeMillis();
- while(true) {
- currentThread.pollThreadEvents();
- threadImpl.join(timeToWait);
- if (!threadImpl.isAlive()) {
- break;
- }
- if (System.currentTimeMillis() - start > timeoutMillis) {
- break;
- }
- }
- } catch (InterruptedException ie) {
- ie.printStackTrace();
- assert false : ie;
- } catch (ExecutionException ie) {
- ie.printStackTrace();
- assert false : ie;
- }
- if (exitingException != null) {
- // Set $! in the current thread before exiting
- runtime.getGlobalVariables().set("$!", (IRubyObject)exitingException.getException());
- throw exitingException;
- }
- // check events before leaving
- currentThread.pollThreadEvents(context);
- if (threadImpl.isAlive()) {
- return context.nil;
- } else {
- return this;
- }
- }
- @JRubyMethod
- public IRubyObject value() {
- join(new IRubyObject[0]);
- synchronized (this) {
- return finalResult;
- }
- }
- @JRubyMethod
- public IRubyObject group() {
- if (threadGroup == null) {
- return getRuntime().getNil();
- }
-
- return threadGroup;
- }
-
- void setThreadGroup(RubyThreadGroup rubyThreadGroup) {
- threadGroup = rubyThreadGroup;
- }
-
- @JRubyMethod
- @Override
- public synchronized IRubyObject inspect() {
- // FIXME: There's some code duplication here with RubyObject#inspect
- StringBuilder part = new StringBuilder();
- String cname = getMetaClass().getRealClass().getName();
- part.append("#<").append(cname).append(":");
- part.append(identityString());
- part.append(' ');
- part.append(status.toString().toLowerCase());
- part.append('>');
- return getRuntime().newString(part.toString());
- }
- @JRubyMethod(name = "key?", required = 1)
- public RubyBoolean key_p(IRubyObject key) {
- key = getSymbolKey(key);
-
- return getRuntime().newBoolean(getThreadLocals().containsKey(key));
- }
- @JRubyMethod
- public RubyArray keys() {
- IRubyObject[] keys = new IRubyObject[getThreadLocals().size()];
-
- return RubyArray.newArrayNoCopy(getRuntime(), getThreadLocals().keySet().toArray(keys));
- }
-
- @JRubyMethod(meta = true)
- public static IRubyObject stop(ThreadContext context, IRubyObject receiver) {
- RubyThread rubyThread = context.getThread();
-
- synchronized (rubyThread) {
- rubyThread.pollThreadEvents(context);
- try {
- // attempt to decriticalize all if we're the critical thread
- receiver.getRuntime().getThreadService().setCritical(false);
- rubyThread.status.set(Status.SLEEP);
- rubyThread.wait();
- } catch (InterruptedException ie) {
- rubyThread.pollThreadEvents(context);
- rubyThread.status.set(Status.RUN);
- }
- }
-
- return receiver.getRuntime().getNil();
- }
-
- @JRubyMethod(required = 1, meta = true)
- public static IRubyObject kill(IRubyObject receiver, IRubyObject rubyThread, Block block) {
- if (!(rubyThread instanceof RubyThread)) throw receiver.getRuntime().newTypeError(rubyThread, receiver.getRuntime().getThread());
- return ((RubyThread)rubyThread).kill();
- }
-
- @JRubyMethod(meta = true)
- public static IRubyObject exit(IRubyObject receiver, Block block) {
- RubyThread rubyThread = receiver.getRuntime().getThreadService().getCurrentContext().getThread();
- return rubyThread.kill();
- }
- @JRubyMethod(name = "stop?")
- public RubyBoolean stop_p() {
- // not valid for "dead" state
- return getRuntime().newBoolean(status.get() == Status.SLEEP || status.get() == Status.DEAD);
- }
-
- @JRubyMethod
- public synchronized RubyThread wakeup() {
- if(!threadImpl.isAlive() && status.get() == Status.DEAD) {
- throw getRuntime().newThreadError("killed thread");
- }
- status.set(Status.RUN);
- interrupt();
- return this;
- }
-
- @JRubyMethod
- public RubyFixnum priority() {
- return RubyFixnum.newFixnum(getRuntime(), javaPriorityToRubyPriority(threadImpl.getPriority()));
- }
- @JRubyMethod(name = "priority=", required = 1)
- public IRubyObject priority_set(IRubyObject priority) {
- int iPriority = RubyNumeric.fix2int(priority);
-
- if (iPriority < RUBY_MIN_THREAD_PRIORITY) {
- iPriority = RUBY_MIN_THREAD_PRIORITY;
- } else if (iPriority > RUBY_MAX_THREAD_PRIORITY) {
- iPriority = RUBY_MAX_THREAD_PRIORITY;
- }
- if (threadImpl.isAlive()) {
- int jPriority = rubyPriorityToJavaPriority(iPriority);
- if (jPriority < Thread.MIN_PRIORITY) {
- jPriority = Thread.MIN_PRIORITY;
- } else if (jPriority > Thread.MAX_PRIORITY) {
- jPriority = Thread.MAX_PRIORITY;
- }
- threadImpl.setPriority(jPriority);
- }
- return RubyFixnum.newFixnum(getRuntime(), iPriority);
- }
-
- /* helper methods to translate java thread priority (1-10) to
- * Ruby thread priority (-3 to 3) using a quadratic polynoimal ant its
- * inverse
- * i.e., j = r^2/18 + 3*r/2 + 5
- * r = 3/2*sqrt(8*j + 41) - 27/2
- */
- private int javaPriorityToRubyPriority(int javaPriority) {
- double d; // intermediate value
- d = 1.5 * Math.sqrt(8.0*javaPriority + 41) - 13.5;
- return Math.round((float) d);
- }
-
- private int rubyPriorityToJavaPriority(int rubyPriority) {
- double d;
- if (rubyPriority < RUBY_MIN_THREAD_PRIORITY) {
- rubyPriority = RUBY_MIN_THREAD_PRIORITY;
- } else if (rubyPriority > RUBY_MAX_THREAD_PRIORITY) {
- rubyPriority = RUBY_MAX_THREAD_PRIORITY;
- }
- d = Math.pow(rubyPriority, 2.0)/18.0 + 1.5 * rubyPriority + 5;
- return Math.round((float) d);
- }
-
- /**
- * Simplified utility method for just raising an existing exception in this
- * thread.
- *
- * @param exception the exception to raise
- * @return this thread
- */
- public IRubyObject raise(IRubyObject exception) {
- return raise(new IRubyObject[]{exception}, Block.NULL_BLOCK);
- }
- @JRubyMethod(optional = 3)
- public IRubyObject raise(IRubyObject[] args, Block block) {
- Ruby runtime = getRuntime();
- RubyThread currentThread = runtime.getCurrentContext().getThread();
- return genericRaise(runtime, args, currentThread);
- }
- public IRubyObject genericRaise(Ruby runtime, IRubyObject[] args, RubyThread currentThread) {
- if (!isAlive()) return runtime.getNil();
- if (currentThread == this) {
- RubyKernel.raise(runtime.getCurrentContext(), runtime.getKernel(), args, Block.NULL_BLOCK);
- // should not reach here
- }
- IRubyObject exception = prepareRaiseException(runtime, args, Block.NULL_BLOCK);
- pendingInterruptEnqueue(exception);
- interrupt();
- return runtime.getNil();
- }
- private IRubyObject prepareRaiseException(Ruby runtime, IRubyObject[] args, Block block) {
- if(args.length == 0) {
- IRubyObject lastException = errorInfo;
- if(lastException.isNil()) {
- return new RaiseException(runtime, runtime.getRuntimeError(), "", false).getException();
- }
- return lastException;
- }
- IRubyObject exception;
- ThreadContext context = getRuntime().getCurrentContext();
-
- if(args.length == 1) {
- if(args[0] instanceof RubyString) {
- return runtime.getRuntimeError().newInstance(context, args, block);
- } else if (args[0] instanceof ConcreteJavaProxy) {
- return args[0];
- } else if(!args[0].respondsTo("exception")) {
- return runtime.newTypeError("exception class/object expected").getException();
- }
- exception = args[0].callMethod(context, "exception");
- } else {
- if (!args[0].respondsTo("exception")) {
- return runtime.newTypeError("exception class/object expected").getException();
- }
-
- exception = args[0].callMethod(context, "exception", args[1]);
- }
-
- if (!runtime.getException().isInstance(exception)) {
- return runtime.newTypeError("exception object expected").getException();
- }
-
- if (args.length == 3) {
- ((RubyException) exception).set_backtrace(args[2]);
- }
-
- return exception;
- }
-
- @JRubyMethod
- public synchronized IRubyObject run() {
- return wakeup();
- }
- /**
- * Sleep the current thread for millis, waking up on any thread interrupts.
- *
- * We can never be sure if a wait will finish because of a Java "spurious wakeup". So if we
- * explicitly wakeup and we wait less than requested amount we will return false. We will
- * return true if we sleep right amount or less than right amount via spurious wakeup.
- *
- * @param millis Number of milliseconds to sleep. Zero sleeps forever.
- */
- public boolean sleep(long millis) throws InterruptedException {
- assert this == getRuntime().getCurrentContext().getThread();
- Semaphore sem = new Semaphore(1);
- sem.acquire();
- if (executeTask(getContext(), new Object[]{sem, millis, 0}, SLEEP_TASK2) >= millis) {
- return true;
- } else {
- return false;
- }
- }
- public IRubyObject status() {
- return status(getRuntime());
- }
- @JRubyMethod
- public IRubyObject status(ThreadContext context) {
- return status(context.runtime);
- }
-
- private synchronized IRubyObject status(Ruby runtime) {
- if (threadImpl.isAlive()) {
- return runtime.getThreadStatus(status.get());
- } else if (exitingException != null) {
- return runtime.getNil();
- } else {
- return runtime.getFalse();
- }
- }
- @Deprecated
- public static interface BlockingTask {
- public void run() throws InterruptedException;
- public void wakeup();
- }
- public interface Unblocker<Data> {
- public void wakeup(RubyThread thread, Data self);
- }
- public interface Task<Data, Return> extends Unblocker<Data> {
- public Return run(ThreadContext context, Data data) throws InterruptedException;
- public void wakeup(RubyThread thread, Data data);
- }
- public static final class SleepTask implements BlockingTask {
- private final Object object;
- private final long millis;
- private final int nanos;
- public SleepTask(Object object, long millis, int nanos) {
- this.object = object;
- this.millis = millis;
- this.nanos = nanos;
- }
- @Override
- public void run() throws InterruptedException {
- synchronized (object) {
- object.wait(millis, nanos);
- }
- }
- @Override
- public void wakeup() {
- synchronized (object) {
- object.notify();
- }
- }
- }
- private static final class SleepTask2 implements Task<Object[], Long> {
- @Override
- public Long run(ThreadContext context, Object[] data) throws InterruptedException {
- long millis = (Long)data[1];
- int nanos = (Integer)data[2];
- long start = System.currentTimeMillis();
- // TODO: nano handling?
- if (millis == 0) {
- ((Semaphore) data[0]).acquire();
- } else {
- ((Semaphore) data[0]).tryAcquire(millis, TimeUnit.MILLISECONDS);
- }
- return System.currentTimeMillis() - start;
- }
- @Override
- public void wakeup(RubyThread thread, Object[] data) {
- ((Semaphore)data[0]).release();
- }
- }
- private static final Task<Object[], Long> SLEEP_TASK2 = new SleepTask2();
- @Deprecated
- public void executeBlockingTask(BlockingTask task) throws InterruptedException {
- try {
- this.currentBlockingTask = task;
- enterSleep();
- pollThreadEvents();
- task.run();
- } finally {
- exitSleep();
- currentBlockingTask = null;
- pollThreadEvents();
- }
- }
- public <Data, Return> Return executeTask(ThreadContext context, Data data, Task<Data, Return> task) throws InterruptedException {
- try {
- this.unblockFunc = task;
- this.unblockArg = data;
- // check for interrupt before going into blocking call
- pollThreadEvents(context);
- enterSleep();
- return task.run(context, data);
- } finally {
- exitSleep();
- this.unblockFunc = null;
- this.unblockArg = null;
- pollThreadEvents(context);
- }
- }
- public void enterSleep() {
- status.set(Status.SLEEP);
- }
- public void exitSleep() {
- status.set(Status.RUN);
- }
- @JRubyMethod(name = {"kill", "exit", "terminate"})
- public IRubyObject kill() {
- Ruby runtime = getRuntime();
- // need to reexamine this
- RubyThread currentThread = runtime.getCurrentContext().getThread();
- if (currentThread == runtime.getThreadService().getMainThread()) {
- // rb_exit to hard exit process...not quite right for us
- }
- return genericKill(runtime, currentThread);
- }
- private IRubyObject genericKill(Ruby runtime, RubyThread currentThread) {
- // If the killee thread is the same as the killer thread, just die
- if (currentThread == this) throwThreadKill();
- pendingInterruptEnqueue(RubyFixnum.zero(runtime));
- interrupt();
- return this;
- }
- private void pendingInterruptEnqueue(IRubyObject v) {
- pendingInterruptQueue.add(v);
- pendingInterruptQueueChecked = false;
- }
-
- /**
- * Used for finalizers that need to kill a Ruby thread. Finalizers run in
- * a VM thread to which we do not want to attach a ThreadContext and within
- * which we do not want to check for Ruby thread events. This mechanism goes
- * directly to mail delivery, bypassing all Ruby Thread-related steps.
- */
- public void dieFromFinalizer() {
- genericKill(getRuntime(), null);
- }
- private static void debug(RubyThread thread, String message) {
- if (DEBUG) LOG.debug(Thread.currentThread() + "(" + thread.status + "): " + message);
- }
-
- @JRubyMethod
- public IRubyObject safe_level() {
- throw getRuntime().newNotImplementedError("Thread-specific SAFE levels are not supported");
- }
- public IRubyObject backtrace(ThreadContext context) {
- return backtrace20(context, NULL_ARRAY);
- }
- @JRubyMethod(name = "backtrace", optional = 2)
- public IRubyObject backtrace20(ThreadContext context, IRubyObject[] args) {
- ThreadContext myContext = getContext();
- // context can be nil if we have not started or GC has claimed our context
- if (myContext == null) return context.nil;
- Thread nativeThread = getNativeThread();
- // nativeThread can be null if the thread has terminated and GC has claimed it
- if (nativeThread == null) return context.nil;
-
- Ruby runtime = context.runtime;
- Integer[] ll = RubyKernel.levelAndLengthFromArgs(runtime, args, 0);
- Integer level = ll[0], length = ll[1];
-
- return myContext.createCallerBacktrace(level, length, getNativeThread().getStackTrace());
- }
-
- @JRubyMethod(optional = 2)
- public IRubyObject backtrace_locations(ThreadContext context, IRubyObject[] args) {
- ThreadContext myContext = getContext();
- if (myContext == null) return context.nil;
-
- Ruby runtime = context.runtime;
- Integer[] ll = RubyKernel.levelAndLengthFromArgs(runtime, args, 0);
- Integer level = ll[0], length = ll[1];
-
- return myContext.createCallerLocations(level, length, getNativeThread().getStackTrace());
- }
- public StackTraceElement[] javaBacktrace() {
- if (threadImpl instanceof NativeThread) {
- return ((NativeThread)threadImpl).getThread().getStackTrace();
- }
- // Future-based threads can't get a Java trace
- return new StackTraceElement[0];
- }
- private boolean isCurrent() {
- return threadImpl.isCurrent();
- }
- public void exceptionRaised(RaiseException exception) {
- assert isCurrent();
- RubyException rubyException = exception.getException();
- Ruby runtime = rubyException.getRuntime();
- if (runtime.getSystemExit().isInstance(rubyException)) {
- runtime.getThreadService().getMainThread().raise(new IRubyObject[] {rubyException}, Block.NULL_BLOCK);
- } else if (abortOnException(runtime)) {
- runtime.getThreadService().getMainThread().raise(new IRubyObject[] {rubyException}, Block.NULL_BLOCK);
- return;
- } else if (runtime.getDebug().isTrue()) {
- runtime.printError(exception.getException());
- }
- exitingException = exception;
- }
- /**
- * For handling all non-Ruby exceptions bubbling out of threads
- * @param exception
- */
- @SuppressWarnings("deprecation")
- public void exceptionRaised(Throwable exception) {
- if (exception instanceof RaiseException) {
- exceptionRaised((RaiseException)exception);
- return;
- }
- assert isCurrent();
- Ruby runtime = getRuntime();
- if (abortOnException(runtime) && exception instanceof Error) {
- // re-propagate on main thread
- runtime.getThreadService().getMainThread().getNativeThread().stop(exception);
- } else {
- // just rethrow on this thread, let system handlers report it
- Helpers.throwException(exception);
- }
- }
- private boolean abortOnException(Ruby runtime) {
- return (runtime.isGlobalAbortOnExceptionEnabled() || abortOnException);
- }
- public static RubyThread mainThread(IRubyObject receiver) {
- return receiver.getRuntime().getThreadService().getMainThread();
- }
- /**
- * Perform an interruptible select operation on the given channel and fptr,
- * waiting for the requested operations or the given timeout.
- *
- * @param io the RubyIO that contains the channel, for managing blocked threads list.
- * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
- * @return true if the IO's channel became ready for the requested operations, false if
- * it was not selectable.
- */
- public boolean select(RubyIO io, int ops) {
- return select(io.getChannel(), io.getOpenFile(), ops);
- }
- /**
- * Perform an interruptible select operation on the given channel and fptr,
- * waiting for the requested operations or the given timeout.
- *
- * @param io the RubyIO that contains the channel, for managing blocked threads list.
- * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
- * @param timeout a timeout in ms to limit the select. Less than zero selects forever,
- * zero selects and returns ready channels or nothing immediately, and
- * greater than zero selects for at most that many ms.
- * @return true if the IO's channel became ready for the requested operations, false if
- * it timed out or was not selectable.
- */
- public boolean select(RubyIO io, int ops, long timeout) {
- return select(io.getChannel(), io.getOpenFile(), ops, timeout);
- }
- /**
- * Perform an interruptible select operation on the given channel and fptr,
- * waiting for the requested operations.
- *
- * @param channel the channel to perform a select against. If this is not
- * a selectable channel, then this method will just return true.
- * @param fptr the fptr that contains the channel, for managing blocked threads list.
- * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
- * @return true if the channel became ready for the requested operations, false if
- * it was not selectable.
- */
- public boolean select(Channel channel, OpenFile fptr, int ops) {
- return select(channel, fptr, ops, -1);
- }
- /**
- * Perform an interruptible select operation on the given channel and fptr,
- * waiting for the requested operations.
- *
- * @param channel the channel to perform a select against. If this is not
- * a selectable channel, then this method will just return true.
- * @param io the RubyIO that contains the channel, for managing blocked threads list.
- * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
- * @return true if the channel became ready for the requested operations, false if
- * it was not selectable.
- */
- public boolean select(Channel channel, RubyIO io, int ops) {
- return select(channel, io == null ? null : io.getOpenFile(), ops, -1);
- }
- /**
- * Perform an interruptible select operation on the given channel and fptr,
- * waiting for the requested operations or the given timeout.
- *
- * @param channel the channel to perform a select against. If this is not
- * a selectable channel, then this method will just return true.
- * @param io the RubyIO that contains the channel, for managing blocked threads list.
- * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
- * @param timeout a timeout in ms to limit the select. Less than zero selects forever,
- * zero selects and returns ready channels or nothing immediately, and
- * greater than zero selects for at most that many ms.
- * @return true if the channel became ready for the requested operations, false if
- * it timed out or was not selectable.
- */
- public boolean select(Channel channel, RubyIO io, int ops, long timeout) {
- return select(channel, io == null ? null : io.getOpenFile(), ops, timeout);
- }
- /**
- * Perform an interruptible select operation on the given channel and fptr,
- * waiting for the requested operations or the given timeout.
- *
- * @param channel the channel to perform a select against. If this is not
- * a selectable channel, then this method will just return true.
- * @param fptr the fptr that contains the channel, for managing blocked threads list.
- * @param ops the operations to wait for, from {@see java.nio.channels.SelectionKey}.
- * @param timeout a timeout in ms to limit the select. Less than zero selects forever,
- * zero selects and returns ready channels or nothing immediately, and
- * greater than zero selects for at most that many ms.
- * @return true if the channel became ready for the requested operations, false if
- * it timed out or was not selectable.
- */
- public boolean select(Channel channel, OpenFile fptr, int ops, long timeout) {
- // Use selectables but only if they're not associated with a file (which has odd select semantics)
- if (channel instanceof SelectableChannel && (fptr == null || !fptr.fd().isNativeFile)) {
- SelectableChannel selectable = (SelectableChannel)channel;
-
- synchronized (selectable.blockingLock()) {
- boolean oldBlocking = selectable.isBlocking();
- SelectionKey key = null;
- try {
- selectable.configureBlocking(false);
- if (fptr != null) fptr.addBlockingThread(this);
- currentSelector = getRuntime().getSelectorPool().get(selectable.provider());
- key = selectable.register(currentSelector, ops);
- beforeBlockingCall();
- int result;
- if (timeout < 0) {
- result = currentSelector.select();
- } else if (timeout == 0) {
- result = currentSelector.selectNow();
- } else {
- result = currentSelector.select(timeout);
- }
- // check for thread events, in case we've been woken up to die
- pollThreadEvents();
- if (result == 1) {
- Set<SelectionKey> keySet = currentSelector.selectedKeys();
- if (keySet.iterator().next() == key) {
- return true;
- }
- }
- return false;
- } catch (IOException ioe) {
- throw getRuntime().newIOErrorFromException(ioe);
- } finally {
- // Note: I don't like ignoring these exceptions, but it's
- // unclear how likely they are to happen or what damage we
- // might do by ignoring them. Note that the pieces are separate
- // so that we can ensure one failing does not affect the others
- // running.
- // clean up the key in the selector
- try {
- if (key != null) key.cancel();
- if (currentSelector != null) currentSelector.selectNow();
- } catch (Exception e) {
- // ignore
- }
- // shut down and null out the selector
- try {
- if (currentSelector != null) {
- getRuntime().getSelectorPool().put(currentSelector);
- }
- } catch (Exception e) {
- // ignore
- } finally {
- currentSelector = null;
- }
- // remove this thread as a blocker against the given IO
- if (fptr != null) fptr.removeBlockingThread(this);
- // go back to previous blocking state on the selectable
- try {
- selectable.configureBlocking(oldBlocking);
- } catch (Exception e) {
- // ignore
- }
- // clear thread state from blocking call
- afterBlockingCall();
- }
- }
- } else {
- // can't select, just have to do a blocking call
- return true;
- }
- }
- @SuppressWarnings("deprecated")
- public synchronized void interrupt() {
- setInterrupt();
- Selector activeSelector = currentSelector;
- if (activeSelector != null) {
- activeSelector.wakeup();
- }
- BlockingIO.Condition iowait = blockingIO;
- if (iowait != null) {
- iowait.cancel();
- }
- Unblocker task = this.unblockFunc;
- if (task != null) {
- task.wakeup(this, unblockArg);
- }
- // deprecated
- {
- BlockingTask t = currentBlockingTask;
- if (t != null) {
- t.wakeup();
- }
- }
- // If this thread is sleeping or stopped, wake it
- notify();
- }
- public void setInterrupt() {
- while (true) {
- int oldFlag = interruptFlag;
- if (INTERRUPT_FLAG_UPDATER.compareAndSet(this, oldFlag, oldFlag | PENDING_INTERRUPT_MASK)) {
- return;
- }
- }
- }
- private volatile BlockingIO.Condition blockingIO = null;
- public boolean waitForIO(ThreadContext context, RubyIO io, int ops) {
- Channel channel = io.getChannel();
- if (!(channel instanceof SelectableChannel)) {
- return true;
- }
- try {
- io.addBlockingThread(this);
- blockingIO = BlockingIO.newCondition(channel, ops);
- boolean ready = blockingIO.await();
-
- // check for thread events, in case we've been woken up to die
- pollThreadEvents();
- return ready;
- } catch (IOException ioe) {
- throw context.runtime.newRuntimeError("Error with selector: " + ioe);
- } catch (InterruptedException ex) {
- // FIXME: not correct exception
- throw context.runtime.newRuntimeError("Interrupted");
- } finally {
- blockingIO = null;
- io.removeBlockingThread(this);
- }
- }
- public void beforeBlockingCall() {
- pollThreadEvents();
- enterSleep();
- }
-
- public void afterBlockingCall() {
- exitSleep();
- pollThreadEvents();
- }
- private void receivedAnException(ThreadContext context, IRubyObject exception) {
- RubyModule kernelModule = getRuntime().getKernel();
- debug(this, "before propagating exception");
- kernelModule.callMethod(context, "raise", exception);
- }
- public boolean wait_timeout(IRubyObject o, Double timeout) throws InterruptedException {
- if ( timeout != null ) {
- long delay_ns = (long)(timeout.doubleValue() * 1000000000.0);
- long start_ns = System.nanoTime();
- if (delay_ns > 0) {
- long delay_ms = delay_ns / 1000000;
- int delay_ns_remainder = (int)( delay_ns % 1000000 );
- executeBlockingTask(new SleepTask(o, delay_ms, delay_ns_remainder));
- }
- long end_ns = System.nanoTime();
- return ( end_ns - start_ns ) <= delay_ns;
- } else {
- executeBlockingTask(new SleepTask(o, 0, 0));
- return true;
- }
- }
-
- public RubyThreadGroup getThreadGroup() {
- return threadGroup;
- }
- @Override
- public boolean equals(Object obj) {
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final RubyThread other = (RubyThread)obj;
- if (this.threadImpl != other.threadImpl && (this.threadImpl == null || !this.threadImpl.equals(other.threadImpl))) {
- return false;
- }
- return true;
- }
- @Override
- public int hashCode() {
- int hash = 3;
- hash = 97 * hash + (this.threadImpl != null ? this.threadImpl.hashCode() : 0);
- return hash;
- }
- @Override
- public String toString() {
- return threadImpl.toString();
- }
-
- /**
- * Acquire the given lock, holding a reference to it for cleanup on thread
- * termination.
- *
- * @param lock the lock to acquire, released on thread termination
- */
- public void lock(Lock lock) {
- assert Thread.currentThread() == getNativeThread();
- lock.lock();
- heldLocks.add(lock);
- }
-
- /**
- * Acquire the given lock interruptibly, holding a reference to it for cleanup
- * on thread termination.
- *
- * @param lock the lock to acquire, released on thread termination
- * @throws InterruptedException if the lock acquisition is interrupted
- */
- public void lockInterruptibly(Lock lock) throws InterruptedException {
- assert Thread.currentThread() == getNativeThread();
- lock.lockInterruptibly();
- heldLocks.add(lock);
- }
-
- /**
- * Try to acquire the given lock, adding it to a list of held locks for cleanup
- * on thread termination if it is acquired. Return immediately if the lock
- * cannot be acquired.
- *
- * @param lock the lock to acquire, released on thread termination
- */
- public boolean tryLock(Lock lock) {
- assert Thread.currentThread() == getNativeThread();
- boolean locked = lock.tryLock();
- if (locked) {
- heldLocks.add(lock);
- }
- return locked;
- }
-
- /**
- * Release the given lock and remove it from the list of locks to be released
- * on thread termination.
- *
- * @param lock the lock to release and dereferences
- */
- public void unlock(Lock lock) {
- assert Thread.currentThread() == getNativeThread();
- lock.unlock();
- heldLocks.remove(lock);
- }
-
- /**
- * Release all locks held.
- */
- public void unlockAll() {
- assert Thread.currentThread() == getNativeThread();
- for (Lock lock : heldLocks) {
- lock.unlock();
- }
- }
- private String identityString() {
- return "0x" + Integer.toHexString(System.identityHashCode(this));
- }
- /**
- * This is intended to be used to raise exceptions in Ruby threads from non-
- * Ruby threads like Timeout's thread.
- *
- * @param args Same args as for Thread#raise
- */
- @Deprecated
- public void internalRaise(IRubyObject[] args) {
- Ruby runtime = getRuntime();
- genericRaise(runtime, args, runtime.getCurrentContext().getThread());
- }
- @Deprecated
- public void receiveMail(ThreadService.Event event) {
- }
- @Deprecated
- public void checkMail(ThreadContext context) {
- }
- @Deprecated
- private volatile BlockingTask currentBlockingTask;
- @Deprecated
- public boolean selectForAccept(RubyIO io) {
- return select(io, SelectionKey.OP_ACCEPT);
- }
- }