Skip to content

Commit 3a0a702

Browse files
authored
Merge pull request #162 from Maithem/pooledAllocator
Support User Defined PooledByteBufAllocator
2 parents 26d76be + cb19f59 commit 3a0a702

File tree

1 file changed

+24
-40
lines changed

1 file changed

+24
-40
lines changed

src/main/java/org/lmdbjava/ByteBufProxy.java

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,12 @@
2222

2323
import static io.netty.buffer.PooledByteBufAllocator.DEFAULT;
2424
import static java.lang.Class.forName;
25-
import static java.lang.ThreadLocal.withInitial;
2625
import static org.lmdbjava.UnsafeAccess.UNSAFE;
2726

2827
import java.lang.reflect.Field;
29-
import java.util.ArrayDeque;
3028

3129
import io.netty.buffer.ByteBuf;
30+
import io.netty.buffer.PooledByteBufAllocator;
3231
import jnr.ffi.Pointer;
3332

3433
/**
@@ -47,37 +46,34 @@ public final class ByteBufProxy extends BufferProxy<ByteBuf> {
4746
*/
4847
public static final BufferProxy<ByteBuf> PROXY_NETTY = new ByteBufProxy();
4948

50-
private static final long ADDRESS_OFFSET;
51-
52-
/**
53-
* A thread-safe pool for a given length. If the buffer found is bigger then
54-
* the buffer in the pool creates a new buffer. If no buffer is found creates
55-
* a new buffer.
56-
*/
57-
private static final ThreadLocal<ArrayDeque<ByteBuf>> BUFFERS = withInitial(()
58-
-> new ArrayDeque<>(16));
59-
6049
private static final int BUFFER_RETRIES = 10;
6150
private static final String FIELD_NAME_ADDRESS = "memoryAddress";
6251
private static final String FIELD_NAME_LENGTH = "length";
63-
private static final long LENGTH_OFFSET;
6452
private static final String NAME = "io.netty.buffer.PooledUnsafeDirectByteBuf";
53+
private final long lengthOffset;
54+
private final long addressOffset;
55+
56+
private final PooledByteBufAllocator nettyAllocator;
57+
58+
private ByteBufProxy() {
59+
this(DEFAULT);
60+
}
61+
62+
public ByteBufProxy(final PooledByteBufAllocator allocator) {
63+
this.nettyAllocator = allocator;
6564

66-
static {
6765
try {
68-
createBuffer();
66+
final ByteBuf initBuf = this.allocate();
67+
initBuf.release();
6968
final Field address = findField(NAME, FIELD_NAME_ADDRESS);
7069
final Field length = findField(NAME, FIELD_NAME_LENGTH);
71-
ADDRESS_OFFSET = UNSAFE.objectFieldOffset(address);
72-
LENGTH_OFFSET = UNSAFE.objectFieldOffset(length);
70+
addressOffset = UNSAFE.objectFieldOffset(address);
71+
lengthOffset = UNSAFE.objectFieldOffset(length);
7372
} catch (final SecurityException e) {
7473
throw new LmdbException("Field access error", e);
7574
}
7675
}
7776

78-
private ByteBufProxy() {
79-
}
80-
8177
static Field findField(final String c, final String name) {
8278
Class<?> clazz;
8379
try {
@@ -97,39 +93,27 @@ static Field findField(final String c, final String name) {
9793
throw new LmdbException(name + " not found");
9894
}
9995

100-
private static ByteBuf createBuffer() {
96+
@Override
97+
protected ByteBuf allocate() {
10198
for (int i = 0; i < BUFFER_RETRIES; i++) {
102-
final ByteBuf bb = DEFAULT.directBuffer(0);
99+
final ByteBuf bb = nettyAllocator.directBuffer();
103100
if (NAME.equals(bb.getClass().getName())) {
104101
return bb;
102+
} else {
103+
bb.release();
105104
}
106105
}
107106
throw new IllegalStateException("Netty buffer must be " + NAME);
108107
}
109108

110-
@Override
111-
protected ByteBuf allocate() {
112-
final ArrayDeque<ByteBuf> queue = BUFFERS.get();
113-
final ByteBuf buffer = queue.poll();
114-
115-
if (buffer != null && buffer.capacity() >= 0) {
116-
return buffer;
117-
} else {
118-
return createBuffer();
119-
}
120-
}
121-
122109
@Override
123110
protected int compare(final ByteBuf o1, final ByteBuf o2) {
124111
return o1.compareTo(o2);
125112
}
126113

127114
@Override
128115
protected void deallocate(final ByteBuf buff) {
129-
final ArrayDeque<ByteBuf> queue = BUFFERS.get();
130-
if (!queue.offer(buff)) {
131-
buff.release();
132-
}
116+
buff.release();
133117
}
134118

135119
@Override
@@ -161,8 +145,8 @@ protected ByteBuf out(final ByteBuf buffer, final Pointer ptr,
161145
final long ptrAddr) {
162146
final long addr = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_DATA);
163147
final long size = UNSAFE.getLong(ptrAddr + STRUCT_FIELD_OFFSET_SIZE);
164-
UNSAFE.putLong(buffer, ADDRESS_OFFSET, addr);
165-
UNSAFE.putInt(buffer, LENGTH_OFFSET, (int) size);
148+
UNSAFE.putLong(buffer, addressOffset, addr);
149+
UNSAFE.putInt(buffer, lengthOffset, (int) size);
166150
buffer.writerIndex((int) size).readerIndex(0);
167151
return buffer;
168152
}

0 commit comments

Comments
 (0)