RubyFiber.java
/*
* Copyright (c) 2013, 2014 Oracle and/or its affiliates. All rights reserved. This
* code is released under a tri EPL/GPL/LGPL license. You can use it,
* redistribute it and/or modify it under the terms of the:
*
* Eclipse Public License version 1.0
* GNU General Public License version 2
* GNU Lesser General Public License version 2.1
*/
package org.jruby.truffle.runtime.core;
import com.oracle.truffle.api.nodes.ControlFlowException;
import org.jruby.truffle.nodes.RubyNode;
import org.jruby.truffle.nodes.objects.Allocator;
import org.jruby.truffle.runtime.RubyContext;
import org.jruby.truffle.runtime.subsystems.FiberManager;
import org.jruby.truffle.runtime.subsystems.ThreadManager;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Represents the Ruby {@code Fiber} class. The current implementation uses Java threads and message
* passing. Note that the relationship between Java threads, Ruby threads and Ruby fibers is
* complex. A Java thread might be running a fiber that on difference resumptions is representing
* different Ruby threads. Take note of the lock contracts on {@link #waitForResume} and
* {@link #resume}.
*/
public class RubyFiber extends RubyBasicObject {
private interface FiberMessage {
}
private static class FiberResumeMessage implements FiberMessage {
private final RubyThread thread;
private final RubyFiber sendingFiber;
private final Object arg;
public FiberResumeMessage(RubyThread thread, RubyFiber sendingFiber, Object arg) {
this.thread = thread;
this.sendingFiber = sendingFiber;
this.arg = arg;
}
public RubyThread getThread() {
return thread;
}
public RubyFiber getSendingFiber() {
return sendingFiber;
}
public Object getArg() {
return arg;
}
}
private static class FiberExitMessage implements FiberMessage {
}
public static class FiberExitException extends ControlFlowException {
private static final long serialVersionUID = 1522270454305076317L;
}
private final FiberManager fiberManager;
private final ThreadManager threadManager;
private BlockingQueue<FiberMessage> messageQueue = new ArrayBlockingQueue<>(1);
public RubyFiber lastResumedByFiber = null;
public RubyFiber(RubyClass rubyClass, FiberManager fiberManager, ThreadManager threadManager) {
super(rubyClass);
this.fiberManager = fiberManager;
this.threadManager = threadManager;
}
public void initialize(RubyProc block) {
RubyNode.notDesignedForCompilation();
final RubyFiber finalFiber = this;
final RubyProc finalBlock = block;
new Thread(new Runnable() {
@Override
public void run() {
fiberManager.registerFiber(finalFiber);
try {
try {
final Object arg = finalFiber.waitForResume();
final Object result = finalBlock.rootCall(arg);
finalFiber.lastResumedByFiber.resume(finalFiber, result);
} catch (FiberExitException e) {
// Naturally exit the thread on catching this
}
} finally {
fiberManager.unregisterFiber(finalFiber);
}
}
}).start();
}
/**
* Send the Java thread that represents this fiber to sleep until it recieves a resume or exit
* message. On entry, assumes that the GIL is not held. On exit, holding the GIL.
*/
public Object waitForResume() {
RubyNode.notDesignedForCompilation();
FiberMessage message = null;
do {
try {
// TODO(cs) what is a suitable timeout?
message = messageQueue.poll(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// Poll again
}
} while (message == null);
if (message instanceof FiberExitMessage) {
throw new FiberExitException();
}
final FiberResumeMessage resumeMessage = (FiberResumeMessage) message;
threadManager.enterGlobalLock(resumeMessage.getThread());
fiberManager.setCurrentFiber(this);
lastResumedByFiber = resumeMessage.getSendingFiber();
return resumeMessage.getArg();
}
/**
* Send a message to a fiber by posting into a message queue. Doesn't explicitly notify the Java
* thread (although the queue implementation may) and doesn't wait for the message to be
* received. On entry, assumes the the GIL is held. On exit, not holding the GIL.
*/
public void resume(RubyFiber sendingFiber, Object... args) {
RubyNode.notDesignedForCompilation();
Object arg;
if (args.length == 0) {
arg = getContext().getCoreLibrary().getNilObject();
} else if (args.length == 1) {
arg = args[0];
} else {
arg = RubyArray.fromObjects(getContext().getCoreLibrary().getArrayClass(), args);
}
final RubyThread runningThread = threadManager.leaveGlobalLock();
messageQueue.add(new FiberResumeMessage(runningThread, sendingFiber, arg));
}
public void shutdown() {
RubyNode.notDesignedForCompilation();
messageQueue.add(new FiberExitMessage());
}
public static class FiberAllocator implements Allocator {
@Override
public RubyBasicObject allocate(RubyContext context, RubyClass rubyClass, RubyNode currentNode) {
return new RubyFiber(rubyClass, context.getFiberManager(), context.getThreadManager());
}
}
}