22
22
23
23
import static io .netty .buffer .PooledByteBufAllocator .DEFAULT ;
24
24
import static java .lang .Class .forName ;
25
- import static java .lang .ThreadLocal .withInitial ;
26
25
import static org .lmdbjava .UnsafeAccess .UNSAFE ;
27
26
28
27
import java .lang .reflect .Field ;
29
- import java .util .ArrayDeque ;
30
28
31
29
import io .netty .buffer .ByteBuf ;
30
+ import io .netty .buffer .PooledByteBufAllocator ;
32
31
import jnr .ffi .Pointer ;
33
32
34
33
/**
@@ -47,37 +46,34 @@ public final class ByteBufProxy extends BufferProxy<ByteBuf> {
47
46
*/
48
47
public static final BufferProxy <ByteBuf > PROXY_NETTY = new ByteBufProxy ();
49
48
50
- private static final long ADDRESS_OFFSET ;
51
-
52
- /**
53
- * A thread-safe pool for a given length. If the buffer found is bigger then
54
- * the buffer in the pool creates a new buffer. If no buffer is found creates
55
- * a new buffer.
56
- */
57
- private static final ThreadLocal <ArrayDeque <ByteBuf >> BUFFERS = withInitial (()
58
- -> new ArrayDeque <>(16 ));
59
-
60
49
private static final int BUFFER_RETRIES = 10 ;
61
50
private static final String FIELD_NAME_ADDRESS = "memoryAddress" ;
62
51
private static final String FIELD_NAME_LENGTH = "length" ;
63
- private static final long LENGTH_OFFSET ;
64
52
private static final String NAME = "io.netty.buffer.PooledUnsafeDirectByteBuf" ;
53
+ private final long lengthOffset ;
54
+ private final long addressOffset ;
55
+
56
+ private final PooledByteBufAllocator nettyAllocator ;
57
+
58
+ private ByteBufProxy () {
59
+ this (DEFAULT );
60
+ }
61
+
62
+ public ByteBufProxy (final PooledByteBufAllocator allocator ) {
63
+ this .nettyAllocator = allocator ;
65
64
66
- static {
67
65
try {
68
- createBuffer ();
66
+ final ByteBuf initBuf = this .allocate ();
67
+ initBuf .release ();
69
68
final Field address = findField (NAME , FIELD_NAME_ADDRESS );
70
69
final Field length = findField (NAME , FIELD_NAME_LENGTH );
71
- ADDRESS_OFFSET = UNSAFE .objectFieldOffset (address );
72
- LENGTH_OFFSET = UNSAFE .objectFieldOffset (length );
70
+ addressOffset = UNSAFE .objectFieldOffset (address );
71
+ lengthOffset = UNSAFE .objectFieldOffset (length );
73
72
} catch (final SecurityException e ) {
74
73
throw new LmdbException ("Field access error" , e );
75
74
}
76
75
}
77
76
78
- private ByteBufProxy () {
79
- }
80
-
81
77
static Field findField (final String c , final String name ) {
82
78
Class <?> clazz ;
83
79
try {
@@ -97,39 +93,27 @@ static Field findField(final String c, final String name) {
97
93
throw new LmdbException (name + " not found" );
98
94
}
99
95
100
- private static ByteBuf createBuffer () {
96
+ @ Override
97
+ protected ByteBuf allocate () {
101
98
for (int i = 0 ; i < BUFFER_RETRIES ; i ++) {
102
- final ByteBuf bb = DEFAULT .directBuffer (0 );
99
+ final ByteBuf bb = nettyAllocator .directBuffer ();
103
100
if (NAME .equals (bb .getClass ().getName ())) {
104
101
return bb ;
102
+ } else {
103
+ bb .release ();
105
104
}
106
105
}
107
106
throw new IllegalStateException ("Netty buffer must be " + NAME );
108
107
}
109
108
110
- @ Override
111
- protected ByteBuf allocate () {
112
- final ArrayDeque <ByteBuf > queue = BUFFERS .get ();
113
- final ByteBuf buffer = queue .poll ();
114
-
115
- if (buffer != null && buffer .capacity () >= 0 ) {
116
- return buffer ;
117
- } else {
118
- return createBuffer ();
119
- }
120
- }
121
-
122
109
@ Override
123
110
protected int compare (final ByteBuf o1 , final ByteBuf o2 ) {
124
111
return o1 .compareTo (o2 );
125
112
}
126
113
127
114
@ Override
128
115
protected void deallocate (final ByteBuf buff ) {
129
- final ArrayDeque <ByteBuf > queue = BUFFERS .get ();
130
- if (!queue .offer (buff )) {
131
- buff .release ();
132
- }
116
+ buff .release ();
133
117
}
134
118
135
119
@ Override
@@ -161,8 +145,8 @@ protected ByteBuf out(final ByteBuf buffer, final Pointer ptr,
161
145
final long ptrAddr ) {
162
146
final long addr = UNSAFE .getLong (ptrAddr + STRUCT_FIELD_OFFSET_DATA );
163
147
final long size = UNSAFE .getLong (ptrAddr + STRUCT_FIELD_OFFSET_SIZE );
164
- UNSAFE .putLong (buffer , ADDRESS_OFFSET , addr );
165
- UNSAFE .putInt (buffer , LENGTH_OFFSET , (int ) size );
148
+ UNSAFE .putLong (buffer , addressOffset , addr );
149
+ UNSAFE .putInt (buffer , lengthOffset , (int ) size );
166
150
buffer .writerIndex ((int ) size ).readerIndex (0 );
167
151
return buffer ;
168
152
}
0 commit comments