Skip to content

Commit 334a30b

Browse files
committed
KAFKA-5730; Consumer should invoke async commit callback before sync commit returns
Author: Jason Gustafson <jason@confluent.io> Reviewers: Randall Hauch <rhauch@gmail.com>, Ewen Cheslack-Postava <me@ewencp.org> Closes apache#3666 from hachikuji/KAFKA-5730
1 parent efe4f65 commit 334a30b

File tree

9 files changed

+147
-13
lines changed

9 files changed

+147
-13
lines changed

clients/src/main/java/org/apache/kafka/clients/KafkaClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public interface KafkaClient extends Closeable {
8686
List<ClientResponse> poll(long timeout, long now);
8787

8888
/**
89-
* Diconnects the connection to a particular node, if there is one.
89+
* Disconnects the connection to a particular node, if there is one.
9090
* Any pending ClientRequests for this connection will receive disconnections.
9191
*
9292
* @param nodeId The id of the node

clients/src/main/java/org/apache/kafka/clients/NetworkClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,11 @@ public boolean ready(Node node, long now) {
226226
return false;
227227
}
228228

229+
// Visible for testing
230+
boolean canConnect(Node node, long now) {
231+
return connectionStates.canConnect(node.idString(), now);
232+
}
233+
229234
/**
230235
* Disconnects the connection to a particular node, if there is one.
231236
* Any pending ClientRequests for this connection will receive disconnections.
@@ -234,6 +239,9 @@ public boolean ready(Node node, long now) {
234239
*/
235240
@Override
236241
public void disconnect(String nodeId) {
242+
if (connectionStates.isDisconnected(nodeId))
243+
return;
244+
237245
selector.close(nodeId);
238246
List<ApiKeys> requestTypes = new ArrayList<>();
239247
long now = time.milliseconds();

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,6 +1119,9 @@ public boolean shouldBlock() {
11191119
* <p>
11201120
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
11211121
* encountered (in which case it is thrown to the caller).
1122+
* <p>
1123+
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
1124+
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
11221125
*
11231126
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
11241127
* This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
@@ -1152,6 +1155,9 @@ public void commitSync() {
11521155
* <p>
11531156
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
11541157
* encountered (in which case it is thrown to the caller).
1158+
* <p>
1159+
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
1160+
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
11551161
*
11561162
* @param offsets A map of offsets by partition with associated metadata
11571163
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
@@ -1194,6 +1200,11 @@ public void commitAsync() {
11941200
* <p>
11951201
* This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
11961202
* (if provided) or discarded.
1203+
* <p>
1204+
* Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as
1205+
* the invocations. Corresponding commit callbacks are also invoked in the same order. Additionally note that
1206+
* offsets committed through this API are guaranteed to complete before a subsequent call to {@link #commitSync()}
1207+
* (and variants) returns.
11971208
*
11981209
* @param callback Callback to invoke when the commit completes
11991210
*/
@@ -1217,6 +1228,11 @@ public void commitAsync(OffsetCommitCallback callback) {
12171228
* <p>
12181229
* This is an asynchronous call and will not block. Any errors encountered are either passed to the callback
12191230
* (if provided) or discarded.
1231+
* <p>
1232+
* Offsets committed through multiple calls to this API are guaranteed to be sent in the same order as
1233+
* the invocations. Corresponding commit callbacks are also invoked in the same order. Additionally note that
1234+
* offsets committed through this API are guaranteed to complete before a subsequent call to {@link #commitSync()}
1235+
* (and variants) returns.
12201236
*
12211237
* @param offsets A map of offsets by partition with associate metadata. This map will be copied internally, so it
12221238
* is safe to mutate the map after returning.

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.apache.kafka.common.Node;
2222
import org.apache.kafka.common.errors.DisconnectException;
2323
import org.apache.kafka.common.errors.GroupAuthorizationException;
24-
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
2524
import org.apache.kafka.common.errors.IllegalGenerationException;
25+
import org.apache.kafka.common.errors.InterruptException;
2626
import org.apache.kafka.common.errors.RebalanceInProgressException;
2727
import org.apache.kafka.common.errors.RetriableException;
2828
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -59,7 +59,6 @@
5959
import java.util.Map;
6060
import java.util.concurrent.TimeUnit;
6161
import java.util.concurrent.atomic.AtomicReference;
62-
import org.apache.kafka.common.errors.InterruptException;
6362

6463
/**
6564
* AbstractCoordinator implements group management for a single group member by interacting with
@@ -247,8 +246,6 @@ protected synchronized RequestFuture<Void> lookupCoordinator() {
247246
// find a node to ask about the coordinator
248247
Node node = this.client.leastLoadedNode();
249248
if (node == null) {
250-
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
251-
// from configuration?
252249
log.debug("No broker available to send GroupCoordinator request for group {}", groupId);
253250
return RequestFuture.noBrokersAvailable();
254251
} else
@@ -651,7 +648,10 @@ protected synchronized Node coordinator() {
651648
protected synchronized void coordinatorDead() {
652649
if (this.coordinator != null) {
653650
log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
654-
client.failUnsentRequests(this.coordinator, CoordinatorNotAvailableException.INSTANCE);
651+
652+
// Disconnect from the coordinator to ensure that there are no in-flight requests remaining.
653+
// Pending callbacks will be invoked with a DisconnectException.
654+
client.disconnect(this.coordinator);
655655
this.coordinator = null;
656656
}
657657
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,11 @@ public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets,
604604
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
605605
client.poll(future, remainingMs);
606606

607+
// We may have had in-flight offset commits when the synchronous commit began. If so, ensure that
608+
// the corresponding callbacks are invoked prior to returning in order to preserve the order that
609+
// the offset commits were applied.
610+
invokeCompletedOffsetCommitCallbacks();
611+
607612
if (future.succeeded()) {
608613
if (interceptors != null)
609614
interceptors.onCommit(offsets);

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,7 @@ public void poll(long timeout, long now, PollCondition pollCondition, boolean di
263263
}
264264

265265
/**
266-
* Poll for network IO and return immediately. This will not trigger wakeups,
267-
* nor will it execute any delayed tasks.
266+
* Poll for network IO and return immediately. This will not trigger wakeups.
268267
*/
269268
public void pollNoWakeup() {
270269
poll(0, time.milliseconds(), null, true);
@@ -374,6 +373,16 @@ private void checkDisconnects(long now) {
374373
}
375374
}
376375

376+
public void disconnect(Node node) {
377+
synchronized (this) {
378+
failUnsentRequests(node, DisconnectException.INSTANCE);
379+
client.disconnect(node.idString());
380+
}
381+
382+
// We need to poll to ensure callbacks from in-flight requests on the disconnected socket are fired
383+
pollNoWakeup();
384+
}
385+
377386
private void failExpiredRequests(long now) {
378387
// clear all expired unsent requests and fail their corresponding futures
379388
Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs);
@@ -383,7 +392,7 @@ private void failExpiredRequests(long now) {
383392
}
384393
}
385394

386-
public void failUnsentRequests(Node node, RuntimeException e) {
395+
private void failUnsentRequests(Node node, RuntimeException e) {
387396
// clear unsent requests to node and fail their corresponding futures
388397
synchronized (this) {
389398
Collection<ClientRequest> unsentRequests = unsent.remove(node);

clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
import org.apache.kafka.common.network.NetworkReceive;
2323
import org.apache.kafka.common.protocol.ApiKeys;
2424
import org.apache.kafka.common.protocol.types.Struct;
25-
import org.apache.kafka.common.record.RecordBatch;
2625
import org.apache.kafka.common.record.MemoryRecords;
26+
import org.apache.kafka.common.record.RecordBatch;
2727
import org.apache.kafka.common.requests.ApiVersionsResponse;
2828
import org.apache.kafka.common.requests.MetadataRequest;
2929
import org.apache.kafka.common.requests.ProduceRequest;
3030
import org.apache.kafka.common.requests.ResponseHeader;
3131
import org.apache.kafka.common.utils.MockTime;
32-
import org.apache.kafka.common.utils.Time;
3332
import org.apache.kafka.test.DelayedReceive;
3433
import org.apache.kafka.test.MockSelector;
3534
import org.apache.kafka.test.TestUtils;
@@ -270,14 +269,21 @@ public void testDisconnectDuringUserMetadataRequest() {
270269
public void testCallDisconnect() throws Exception {
271270
awaitReady(client, node);
272271
assertTrue("Expected NetworkClient to be ready to send to node " + node.idString(),
273-
client.isReady(node, Time.SYSTEM.milliseconds()));
272+
client.isReady(node, time.milliseconds()));
274273
assertFalse("Did not expect connection to node " + node.idString() + " to be failed",
275274
client.connectionFailed(node));
276275
client.disconnect(node.idString());
277276
assertFalse("Expected node " + node.idString() + " to be disconnected.",
278-
client.isReady(node, Time.SYSTEM.milliseconds()));
277+
client.isReady(node, time.milliseconds()));
279278
assertTrue("Expected connection to node " + node.idString() + " to be failed after disconnect",
280279
client.connectionFailed(node));
280+
assertFalse(client.canConnect(node, time.milliseconds()));
281+
282+
// ensure disconnect does not reset blackout period if already disconnected
283+
time.sleep(reconnectBackoffMsTest);
284+
assertTrue(client.canConnect(node, time.milliseconds()));
285+
client.disconnect(node.idString());
286+
assertTrue(client.canConnect(node, time.milliseconds()));
281287
}
282288

283289
private static class TestCallbackHandler implements RequestCompletionHandler {

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.junit.Test;
5959

6060
import java.nio.ByteBuffer;
61+
import java.util.ArrayList;
6162
import java.util.Arrays;
6263
import java.util.Collection;
6364
import java.util.Collections;
@@ -950,6 +951,37 @@ public void testCommitOffsetOnly() {
950951
assertEquals(100L, subscriptions.committed(t1p).offset());
951952
}
952953

954+
@Test
955+
public void testCoordinatorDisconnectAfterNotCoordinatorError() {
956+
testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.NOT_COORDINATOR);
957+
}
958+
959+
@Test
960+
public void testCoordinatorDisconnectAfterCoordinatorNotAvailableError() {
961+
testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors.COORDINATOR_NOT_AVAILABLE);
962+
}
963+
964+
private void testInFlightRequestsFailedAfterCoordinatorMarkedDead(Errors error) {
965+
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
966+
coordinator.ensureCoordinatorReady();
967+
968+
// Send two async commits and fail the first one with an error.
969+
// This should cause a coordinator disconnect which will cancel the second request.
970+
971+
MockCommitCallback firstCommitCallback = new MockCommitCallback();
972+
MockCommitCallback secondCommitCallback = new MockCommitCallback();
973+
coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
974+
coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
975+
976+
client.respond(offsetCommitResponse(Collections.singletonMap(t1p, error)));
977+
consumerClient.pollNoWakeup();
978+
coordinator.invokeCompletedOffsetCommitCallbacks();
979+
980+
assertTrue(coordinator.coordinatorUnknown());
981+
assertTrue(firstCommitCallback.exception instanceof RetriableCommitFailedException);
982+
assertTrue(secondCommitCallback.exception instanceof RetriableCommitFailedException);
983+
}
984+
953985
@Test
954986
public void testAutoCommitDynamicAssignment() {
955987
final String consumerId = "consumer";
@@ -1212,6 +1244,42 @@ public void testCommitOffsetSyncCoordinatorDisconnected() {
12121244
coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
12131245
}
12141246

1247+
@Test
1248+
public void testAsyncCommitCallbacksInvokedPriorToSyncCommitCompletion() throws Exception {
1249+
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
1250+
coordinator.ensureCoordinatorReady();
1251+
1252+
final List<OffsetAndMetadata> committedOffsets = Collections.synchronizedList(new ArrayList<OffsetAndMetadata>());
1253+
final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
1254+
final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L);
1255+
1256+
coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, firstOffset), new OffsetCommitCallback() {
1257+
@Override
1258+
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
1259+
committedOffsets.add(firstOffset);
1260+
}
1261+
});
1262+
1263+
// Do a synchronous commit in the background so that we can send both responses at the same time
1264+
Thread thread = new Thread() {
1265+
@Override
1266+
public void run() {
1267+
coordinator.commitOffsetsSync(Collections.singletonMap(t1p, secondOffset), 10000);
1268+
committedOffsets.add(secondOffset);
1269+
}
1270+
};
1271+
1272+
thread.start();
1273+
1274+
client.waitForRequests(2, 5000);
1275+
client.respond(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
1276+
client.respond(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
1277+
1278+
thread.join();
1279+
1280+
assertEquals(Arrays.asList(firstOffset, secondOffset), committedOffsets);
1281+
}
1282+
12151283
@Test(expected = KafkaException.class)
12161284
public void testCommitUnknownTopicOrPartition() {
12171285
client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.NetworkClient;
2323
import org.apache.kafka.common.Cluster;
2424
import org.apache.kafka.common.Node;
25+
import org.apache.kafka.common.errors.DisconnectException;
2526
import org.apache.kafka.common.errors.WakeupException;
2627
import org.apache.kafka.common.protocol.Errors;
2728
import org.apache.kafka.common.requests.HeartbeatRequest;
@@ -80,6 +81,27 @@ public void multiSend() {
8081
assertTrue(future2.succeeded());
8182
}
8283

84+
@Test
85+
public void testDisconnectWithUnsentRequests() {
86+
RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
87+
assertTrue(consumerClient.hasPendingRequests(node));
88+
assertFalse(client.hasInFlightRequests(node.idString()));
89+
consumerClient.disconnect(node);
90+
assertTrue(future.failed());
91+
assertTrue(future.exception() instanceof DisconnectException);
92+
}
93+
94+
@Test
95+
public void testDisconnectWithInFlightRequests() {
96+
RequestFuture<ClientResponse> future = consumerClient.send(node, heartbeat());
97+
consumerClient.pollNoWakeup();
98+
assertTrue(consumerClient.hasPendingRequests(node));
99+
assertTrue(client.hasInFlightRequests(node.idString()));
100+
consumerClient.disconnect(node);
101+
assertTrue(future.failed());
102+
assertTrue(future.exception() instanceof DisconnectException);
103+
}
104+
83105
@Test
84106
public void doNotBlockIfPollConditionIsSatisfied() {
85107
NetworkClient mockNetworkClient = EasyMock.mock(NetworkClient.class);

0 commit comments

Comments
 (0)