ARTEMIS-2945 Artemis native JNI code can be replaced by Java#9
ARTEMIS-2945 Artemis native JNI code can be replaced by Java#9franz1981 wants to merge 2 commits intoapache:masterfrom
Conversation
2a8f0a0 to
7a48876
Compare
|
I've noticed a thing I would like to raise here to be sure I'll remember in the future :) I see in the original code that on // The GlobalRef will be deleted when poll is called. this is done so
// the vm wouldn't crash if the Callback passed by the user is GCed between submission
// and callback.
// also as the real intention is to hold the reference until the life cycle is complete
iocb->data = (void *) (*env)->NewGlobalRef(env, callback);But I see that isn't happening for
// This will allocate a buffer, aligned by alignment.
// Buffers created here need to be manually destroyed by destroyBuffer, or this would leak on the process heap away of Java's GC managed memory
// NOTE: this buffer will contain non initialized data, you must fill it up properlyHence I can simplify a bit the java code here, because the |
b8c8e50 to
e92f3c4
Compare
|
@clebertsuconic @michaelandrepearce Let's explain what This nice mechanism raise a legit question:
Assuming that reads are rare on Artemis, they can still happen, during compaction, and a slow read means that compaction would suffer as well: on There are several ways to fix this issue and TBH the first commit of this same PR already take care of this, but assume a smart submitter of writes/reads, that correctly batch writes and reads on user space (ie Java land - the artemis TimedBuffer). Not sure is the right path but is a nice way to show how easy is to bring in new behaviours :) |
e92f3c4 to
4de26fa
Compare
|
Just FYI I've run some tests on a not recent hardware and I still get the best performance with lower number of producers/consumers (without compaction in the mix) from int lastFile = -1;
for (int i = 0; i < events; i++) {
final IoEventArray.IoEvent ioEvent = ioEventArray.get(i);
assert ioEvent.obj() != 0;
final int id = (int) ioEvent.data();
final PooledIOCB pooledIOCB = iocbArray[id];
assert ioEvent.obj() == pooledIOCB.address;
assert IoCb.aioData(pooledIOCB.bytes) == id;
final SubmitInfo submitInfo = pooledIOCB.submitInfo;
if (submitInfo != null) {
pooledIOCB.submitInfo = null;
}
final long res = ioEvent.res();
if (res >= 0) {
final int fd = IoCb.aioFildes(pooledIOCB.bytes);
if (fd != dumbFD) {
if (useFdatasync) {
if (lastFile != fd) {
lastFile = fd;
fdatasync(fd);
}
}
} else {
stop = true;
}
}
iocbPool.add(pooledIOCB);
if (ioSpace != null) {
ioSpace.release();
}
if (submitInfo != null) {
if (res >= 0) {
submitInfo.done();
} else {
// TODO the error string can be cached?
submitInfo.onError((int) -res, strError(res));
}
}
}On higher number of producers/consumers both the commits on this PR (ie not batching async fdsync and batching async fdsync) perform better then the original logic (in C or Java). |
|
@clebertsuconic @michaelandrepearce I've just noticed that thanks to the AioRing abstraction we could create an AioRing.CompletionCallback interface and use it to be truly zero-copy, decoding on the fly IoEvents into their primitive fields (data, obj, res, res2) while looping trought completions: with the simplified code on #9 (comment) this should be very simple to be implemented and effective, given that a long fdatasync would likely let some more completions to accumulate while looping. This is what I mean: public interface AioRingCompletionCallback {
void handle(long data, long obj, long res, long res2);
}
public int poll(AioRingCompletionCallback callback, int min, int max) {
final long nrAddress = this.nrAddress;
final long headAddress = this.headAddress;
final long tailAddress = this.tailAddress;
final long ioEventsAddress = this.ioEventsAddress;
final int nr = UNSAFE.getInt(nrAddress);
int head = UNSAFE.getInt(headAddress);
// no need of membar here because Unsafe::getInt already provide it
final int tail = UNSAFE.getIntVolatile(null, tailAddress);
int available = tail - head;
if (available < 0) {
// a wrap has occurred
available += nr;
}
if (available < min) {
return 0;
}
if (available == 0) {
return 0;
}
// this is to mitigate a RHEL BUG: see native code for more info
if (available > nr) {
return -1;
}
available = Math.min(available, max);
for (int i = 0; i < available; i++) {
final long ioEvent = ioEventsAddress + (head * SIZE_OF_IO_EVENT_STRUCT);
final long data = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.DATA_OFFSET);
final long obj = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.OBJ_OFFSET);
final long res = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.RES_OFFSET);
final long res2 = UNSAFE.getLong(ioEvent + IoEventArray.IoEvent.RES2_OFFSET);
head++;
head = head >= nr ? 0 : head;
UNSAFE.putOrderedInt(null, headAddress, head);
callback.handle(data, obj, res, res2);
}
return available;
} |
|
I'm going to close this and will later create a bug fix for the read fairness instead: #10 is a more straight one that just try to replace what we already have instead |
https://issues.apache.org/jira/browse/ARTEMIS-2945