ObjectProxyCache.java
package org.jruby.javasupport.util;
import org.jruby.util.log.Logger;
import org.jruby.util.log.LoggerFactory;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.concurrent.locks.ReentrantLock;
/**
* Maps Java objects to their proxies. Combines elements of WeakHashMap and
* ConcurrentHashMap to permit unsynchronized reads. May be configured to
* use either Weak (the default) or Soft references.<p>
*
* Note that both Java objects and their proxies are held by weak/soft
* references; because proxies (currently) keep strong references to their
* Java objects, if we kept strong references to them the Java objects would
* never be gc'ed. This presents a problem in the case where a user passes
* a Rubified Java object out to Java but keeps no reference in Ruby to the
* proxy; if the object is returned to Ruby after its proxy has been gc'ed,
* a new (and possibly very wrong, in the case of JRuby-defined subclasses)
* proxy will be created. Use of soft references may help reduce the
* likelihood of this occurring; users may be advised to keep Ruby-side
* references to prevent it occurring altogether.
*
* @author <a href="mailto:bill.dortch@gmail.com">Bill Dortch</a>
*
*/
public abstract class ObjectProxyCache<T,A> {
private static final Logger LOG = LoggerFactory.getLogger("ObjectProxyCache");
public static enum ReferenceType { WEAK, SOFT }
private static final int DEFAULT_SEGMENTS = 16; // must be power of 2
private static final int DEFAULT_SEGMENT_SIZE = 8; // must be power of 2
private static final float DEFAULT_LOAD_FACTOR = 0.75f;
private static final int MAX_CAPACITY = 1 << 30;
private static final int MAX_SEGMENTS = 1 << 16;
private static final int VULTURE_RUN_FREQ_SECONDS = 5;
private static int _nextId = 0;
private static synchronized int nextId() {
return ++_nextId;
}
private final ReferenceType referenceType;
private final Segment<T,A>[] segments;
private final int segmentShift;
private final int segmentMask;
private Thread vulture;
private final int id;
public ObjectProxyCache() {
this(DEFAULT_SEGMENTS, DEFAULT_SEGMENT_SIZE, ReferenceType.WEAK);
}
public ObjectProxyCache(ReferenceType refType) {
this(DEFAULT_SEGMENTS, DEFAULT_SEGMENT_SIZE, refType);
}
public ObjectProxyCache(int numSegments, int initialSegCapacity, ReferenceType refType) {
if (numSegments <= 0 || initialSegCapacity <= 0 || refType == null) {
throw new IllegalArgumentException();
}
this.id = nextId();
this.referenceType = refType;
if (numSegments > MAX_SEGMENTS) numSegments = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < numSegments) {
++sshift;
ssize <<= 1;
}
// note segmentShift differs from ConcurrentHashMap's calculation due to
// issues with System.identityHashCode (upper n bits always 0, at least
// under Java 1.6 / WinXP)
this.segmentShift = 24 - sshift;
this.segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialSegCapacity > MAX_CAPACITY) {
initialSegCapacity = MAX_CAPACITY;
}
int cap = 1;
while (cap < initialSegCapacity) cap <<= 1;
for (int i = ssize; --i >= 0; ) {
segments[i] = new Segment<T,A>(cap, this);
}
// vulture thread will periodically expunge dead
// entries. entries are also expunged during 'put'
// operations; this is designed to cover the case where
// many objects are created initially, followed by limited
// put activity.
//
// FIXME: DISABLED (below) pending resolution of finalization issue
//
try {
this.vulture = new Thread("ObjectProxyCache "+id+" vulture") {
public void run() {
for ( ;; ) {
try {
sleep(VULTURE_RUN_FREQ_SECONDS * 1000);
} catch (InterruptedException e) {}
boolean dump = size() > 200;
if (dump) {
LOG.debug("***Vulture {} waking, stats:", id);
LOG.debug(stats());
}
for (int i = segments.length; --i >= 0; ) {
Segment<T,A> seg = segments[i];
seg.lock();
try {
seg.expunge();
} finally {
seg.unlock();
}
yield();
}
if (dump) {
LOG.debug("***Vulture {} sleeping, stats:", id);
LOG.debug(stats());
}
}
}
};
vulture.setDaemon(true);
} catch (SecurityException e) {
this.vulture = null;
}
// FIXME: vulture daemon thread prevents finalization,
// find alternative approach.
// vulture.start();
// System.err.println("***ObjectProxyCache " + id + " started at "+ new java.util.Date());
}
// protected void finalize() throws Throwable {
// System.err.println("***ObjectProxyCache " + id + " finalized at "+ new java.util.Date());
// }
public abstract T allocateProxy(Object javaObject, A allocator);
public T get(Object javaObject) {
if (javaObject == null) return null;
int hash = hash(javaObject);
return segmentFor(hash).get(javaObject, hash);
}
public T getOrCreate(Object javaObject, A allocator) {
if (javaObject == null || allocator == null) return null;
int hash = hash(javaObject);
return segmentFor(hash).getOrCreate(javaObject, hash, allocator);
}
public void put(Object javaObject, T proxy) {
if (javaObject == null || proxy == null) return;
int hash = hash(javaObject);
segmentFor(hash).put(javaObject, hash, proxy);
}
private static int hash(Object javaObject) {
int h = System.identityHashCode(javaObject);
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}
private Segment<T,A> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
/**
* Returns the approximate size (elements in use) of the cache. The
* sizes of the segments are summed. No effort is made to synchronize
* across segments, so the value returned may differ from the actual
* size at any point in time.
*
* @return
*/
public int size() {
int size = 0;
for (Segment<T,A> seg : segments) {
size += seg.tableSize;
}
return size;
}
public String stats() {
StringBuilder b = new StringBuilder();
int n = 0;
int size = 0;
int alloc = 0;
b.append("Segments: ").append(segments.length).append("\n");
for (Segment<T,A> seg : segments) {
int ssize = 0;
int salloc = 0;
seg.lock();
try {
ssize = seg.count();
salloc = seg.entryTable.length;
} finally {
seg.unlock();
}
size += ssize;
alloc += salloc;
b.append("seg[").append(n++).append("]: size: ").append(ssize)
.append(" alloc: ").append(salloc).append("\n");
}
b.append("Total: size: ").append(size)
.append(" alloc: ").append(alloc).append("\n");
return b.toString();
}
// EntryRefs include hash with key to facilitate lookup by Segment#expunge
// after ref is removed from ReferenceQueue
private static interface EntryRef<T> {
T get();
int hash();
}
private static final class WeakEntryRef<T> extends WeakReference<T> implements EntryRef<T> {
final int hash;
WeakEntryRef(int hash, T rawObject, ReferenceQueue<Object> queue) {
super(rawObject, queue);
this.hash = hash;
}
public int hash() {
return hash;
}
}
private static final class SoftEntryRef<T> extends SoftReference<T> implements EntryRef<T> {
final int hash;
SoftEntryRef(int hash, T rawObject, ReferenceQueue<Object> queue) {
super(rawObject, queue);
this.hash = hash;
}
public int hash() {
return hash;
}
}
// Unlike WeakHashMap, our Entry does not subclass WeakReference, but rather
// makes it a final field. The theory is that doing so should force a happens-before
// relationship WRT the WeakReference constructor, guaranteeing that the key will be
// visibile to other threads (unless it's been GC'ed). See JLS 17.5 (final fields) and
// 17.4.5 (Happens-before order) to confirm or refute my reasoning here.
static class Entry<T> {
final EntryRef<Object> objectRef;
final int hash;
final EntryRef<T> proxyRef;
final Entry<T> next;
Entry(Object object, int hash, T proxy, ReferenceType type, Entry<T> next, ReferenceQueue<Object> queue) {
this.hash = hash;
this.next = next;
// references to the Java object and its proxy will either both be
// weak or both be soft, since the proxy contains a strong reference
// to the object, so it wouldn't make sense for the reference types
// to differ.
if (type == ReferenceType.WEAK) {
this.objectRef = new WeakEntryRef<Object>(hash, object, queue);
this.proxyRef = new WeakEntryRef<T>(hash, proxy, queue);
} else {
this.objectRef = new SoftEntryRef<Object>(hash, object, queue);
this.proxyRef = new SoftEntryRef<T>(hash, proxy, queue);
}
}
// ctor used by remove/rehash
Entry(EntryRef<Object> objectRef, int hash, EntryRef<T> proxyRef, Entry<T> next) {
this.objectRef = objectRef;
this.hash = hash;
this.proxyRef = proxyRef;
this.next = next;
}
@SuppressWarnings("unchecked")
static final <T> Entry<T>[] newArray(int size) {
return new Entry[size];
}
}
// lame generics issues: making Segment class static and manually
// inserting cache reference to work around various problems generically
// referencing methods/vars across classes.
static class Segment<T,A> extends ReentrantLock {
final ObjectProxyCache<T,A> cache;
final ReferenceQueue<Object> referenceQueue = new ReferenceQueue<Object>();
volatile Entry<T>[] entryTable;
int tableSize;
int threshold;
Segment(int capacity, ObjectProxyCache<T,A> cache) {
threshold = (int)(capacity * DEFAULT_LOAD_FACTOR);
entryTable = Entry.newArray(capacity);
this.cache = cache;
}
// must be called under lock
private void expunge() {
Entry<T>[] table = entryTable;
ReferenceQueue<Object> queue = referenceQueue;
EntryRef ref;
// note that we'll potentially see the refs for both the java object and
// proxy -- whichever we see first will cause the entry to be removed;
// the other will not match an entry and will be ignored.
while ((ref = (EntryRef)queue.poll()) != null) {
int hash;
for (Entry<T> e = table[(hash = ref.hash()) & (table.length - 1)]; e != null; e = e.next) {
if (hash == e.hash && (ref == e.objectRef || ref == e.proxyRef)) {
remove(table, hash, e);
break;
}
}
}
}
// must be called under lock
private void remove(Entry<T>[] table, int hash, Entry<T> e) {
int index = hash & (table.length - 1);
Entry<T> first = table[index];
for (Entry<T> n = first; n != null; n = n.next) {
if (n == e) {
Entry<T> newFirst = n.next;
for (Entry<T> p = first; p != n; p = p.next) {
newFirst = new Entry<T>(p.objectRef, p.hash, p.proxyRef, newFirst);
}
table[index] = newFirst;
tableSize--;
entryTable = table; // write-volatile
return;
}
}
}
// temp method to verify tableSize value
// must be called under lock
private int count() {
int count = 0;
for (Entry<T> e : entryTable) {
while (e != null) {
count++;
e = e.next;
}
}
return count;
}
// must be called under lock
private Entry<T>[] rehash() {
assert tableSize == count() : "tableSize "+tableSize+" != count() "+count();
Entry<T>[] oldTable = entryTable; // read-volatile
int oldCapacity;
if ((oldCapacity = oldTable.length) >= MAX_CAPACITY) {
return oldTable;
}
int newCapacity = oldCapacity << 1;
int sizeMask = newCapacity - 1;
threshold = (int)(newCapacity * DEFAULT_LOAD_FACTOR);
Entry<T>[] newTable = Entry.newArray(newCapacity);
Entry<T> e;
for (int i = oldCapacity; --i >= 0; ) {
if ((e = oldTable[i]) != null) {
int idx = e.hash & sizeMask;
Entry<T> next;
if ((next = e.next) == null) {
// Single node in list
newTable[idx] = e;
} else {
// Reuse trailing consecutive sequence at same slot
int lastIdx = idx;
Entry<T> lastRun = e;
for (Entry<T> last = next; last != null; last = last.next) {
int k;
if ((k = last.hash & sizeMask) != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone all remaining nodes
for (Entry<T> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
Entry<T> m = new Entry<T>(p.objectRef, p.hash, p.proxyRef, newTable[k]);
newTable[k] = m;
}
}
}
}
entryTable = newTable; // write-volatile
return newTable;
}
void put(Object object, int hash, T proxy) {
lock();
try {
expunge();
Entry<T>[] table;
int potentialNewSize;
if ((potentialNewSize = tableSize + 1) > threshold) {
table = rehash(); // indirect read-/write- volatile
} else {
table = entryTable; // read-volatile
}
int index;
Entry<T> e;
for (e = table[index = hash & (table.length - 1)]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
if (proxy == e.proxyRef.get()) return;
// entry exists, proxy doesn't match. replace.
// this could happen if old proxy was gc'ed
// TODO: raise exception if stored proxy is non-null? (not gc'ed)
remove(table, hash, e);
potentialNewSize--;
break;
}
}
e = new Entry<T>(object, hash, proxy, cache.referenceType, table[index], referenceQueue);
table[index] = e;
tableSize = potentialNewSize;
entryTable = table; // write-volatile
} finally {
unlock();
}
}
T getOrCreate(Object object, int hash, A allocator) {
Entry<T>[] table;
T proxy;
for (Entry<T> e = (table = entryTable)[hash & table.length - 1]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
if ((proxy = e.proxyRef.get()) != null) return proxy;
break;
}
}
lock();
try {
expunge();
int potentialNewSize;
if ((potentialNewSize = tableSize + 1) > threshold) {
table = rehash(); // indirect read-/write- volatile
} else {
table = entryTable; // read-volatile
}
int index;
Entry<T> e;
for (e = table[index = hash & (table.length - 1)]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
if ((proxy = e.proxyRef.get()) != null) return proxy;
// entry exists, proxy has been gc'ed. replace entry.
remove(table, hash, e);
potentialNewSize--;
break;
}
}
proxy = cache.allocateProxy(object, allocator);
e = new Entry<T>(object, hash, proxy, cache.referenceType, table[index], referenceQueue);
table[index] = e;
tableSize = potentialNewSize;
entryTable = table; // write-volatile
return proxy;
} finally {
unlock();
}
}
T get(Object object, int hash) {
Entry<T>[] table;
for (Entry<T> e = (table = entryTable)[hash & table.length - 1]; e != null; e = e.next) {
if (hash == e.hash && object == e.objectRef.get()) {
return e.proxyRef.get();
}
}
return null;
}
@SuppressWarnings("unchecked")
static final <T,A> Segment<T,A>[] newArray(int size) {
return new Segment[size];
}
}
}