File tree Expand file tree Collapse file tree 2 files changed +13
-8
lines changed
src/main/java/com/zendesk/maxwell Expand file tree Collapse file tree 2 files changed +13
-8
lines changed Original file line number Diff line number Diff line change @@ -160,11 +160,11 @@ private RowMapBuffer getTransactionRows() throws Exception {
160
160
161
161
RowMapBuffer buffer = new RowMapBuffer (MAX_TX_ELEMENTS );
162
162
163
- // currently to satisfy the test interface, the contract is to return null
164
- // if the queue is empty. should probably just replace this with an optional timeout...
165
-
166
163
while ( true ) {
167
164
v4Event = pollV4EventFromQueue ();
165
+
166
+ // currently to satisfy the test interface, the contract is to return null
167
+ // if the queue is empty. should probably just replace this with an optional timeout...
168
168
if (v4Event == null ) {
169
169
ensureReplicatorThread ();
170
170
continue ;
Original file line number Diff line number Diff line change @@ -31,14 +31,14 @@ public void add(T element) throws IOException {
31
31
if ( file == null ) {
32
32
file = File .createTempFile ("maxwell" , "events" );
33
33
file .deleteOnExit ();
34
- os = new ObjectOutputStream (new FileOutputStream (file ));
35
- is = new ObjectInputStream (new FileInputStream (file ));
34
+ os = new ObjectOutputStream (new BufferedOutputStream (new FileOutputStream (file )));
36
35
}
37
36
38
37
if ( elementsInFile == 0 )
39
38
LOGGER .debug ("Overflowed in-memory buffer, spilling over into " + file );
40
39
41
- os .writeObject (this .list .removeFirst ());
40
+ os .writeUnshared (this .list .removeFirst ());
41
+
42
42
elementsInFile ++;
43
43
}
44
44
}
@@ -52,8 +52,13 @@ public T getLast() {
52
52
}
53
53
54
54
public T removeFirst () throws IOException , ClassNotFoundException {
55
- if ( elementsInFile > 0 && is != null ) {
56
- T element = (T ) is .readObject ();
55
+ if ( elementsInFile > 0 ) {
56
+ if ( is == null ) {
57
+ os .flush ();
58
+ is = new ObjectInputStream (new BufferedInputStream (new FileInputStream (file )));
59
+ }
60
+
61
+ T element = (T ) is .readUnshared ();
57
62
elementsInFile --;
58
63
return element ;
59
64
} else {
You can’t perform that action at this time.
0 commit comments