Skip to content

Commit f12a9d8

Browse files
KAFKA-19464: Remove unnecessary update for find next fetch offset (#20315)
The PR removes unnecessary updates for find next fetch offset. When the state is in transition and not yet completed then anyways respective offsets should not be considered for acquisition. The find next fetch offset is updated finally when transition is completed. Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>, Abhinav Dixit <adixit@confluent.io>
1 parent 2329def commit f12a9d8

File tree

1 file changed

+6
-27
lines changed

1 file changed

+6
-27
lines changed

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,7 @@ public ShareAcquiredRecords acquire(
803803
}
804804

805805
InFlightState updateResult = inFlightBatch.tryUpdateBatchState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE, maxDeliveryCount, memberId);
806-
if (updateResult == null) {
806+
if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
807807
log.info("Unable to acquire records for the batch: {} in share partition: {}-{}",
808808
inFlightBatch, groupId, topicIdPartition);
809809
continue;
@@ -1009,12 +1009,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
10091009
updatedStates.add(updateResult);
10101010
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
10111011
updateResult.state().id(), (short) updateResult.deliveryCount()));
1012-
1013-
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1014-
// This should not change the next fetch offset because the record is not available for acquisition
1015-
if (updateResult.state() != RecordState.ARCHIVED) {
1016-
updateFindNextFetchOffset(true);
1017-
}
1012+
// Do not update the next fetch offset as the offset has not completed the transition yet.
10181013
}
10191014
}
10201015
return Optional.empty();
@@ -1054,12 +1049,7 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
10541049
updatedStates.add(updateResult);
10551050
stateBatches.add(new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
10561051
updateResult.state().id(), (short) updateResult.deliveryCount()));
1057-
1058-
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1059-
// This should not change the next fetch offset because the record is not available for acquisition
1060-
if (updateResult.state() != RecordState.ARCHIVED) {
1061-
updateFindNextFetchOffset(true);
1062-
}
1052+
// Do not update the next fetch offset as the batch has not completed the transition yet.
10631053
}
10641054
return Optional.empty();
10651055
}
@@ -1641,7 +1631,7 @@ private int acquireSubsetBatchRecords(
16411631

16421632
InFlightState updateResult = offsetState.getValue().tryUpdateState(RecordState.ACQUIRED, DeliveryCountOps.INCREASE,
16431633
maxDeliveryCount, memberId);
1644-
if (updateResult == null) {
1634+
if (updateResult == null || updateResult.state() != RecordState.ACQUIRED) {
16451635
log.trace("Unable to acquire records for the offset: {} in batch: {}"
16461636
+ " for the share partition: {}-{}", offsetState.getKey(), inFlightBatch,
16471637
groupId, topicIdPartition);
@@ -1941,12 +1931,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
19411931
updatedStates.add(updateResult);
19421932
stateBatches.add(new PersisterStateBatch(offsetState.getKey(), offsetState.getKey(),
19431933
updateResult.state().id(), (short) updateResult.deliveryCount()));
1944-
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1945-
// This should not change the next fetch offset because the record is not available for acquisition
1946-
if (recordState == RecordState.AVAILABLE
1947-
&& updateResult.state() != RecordState.ARCHIVED) {
1948-
updateFindNextFetchOffset(true);
1949-
}
1934+
// Do not update the nextFetchOffset as the offset has not completed the transition yet.
19501935
}
19511936
} finally {
19521937
lock.writeLock().unlock();
@@ -1996,13 +1981,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
19961981
stateBatches.add(
19971982
new PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
19981983
updateResult.state().id(), (short) updateResult.deliveryCount()));
1999-
2000-
// If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
2001-
// This should not change the nextFetchOffset because the record is not available for acquisition
2002-
if (recordState == RecordState.AVAILABLE
2003-
&& updateResult.state() != RecordState.ARCHIVED) {
2004-
updateFindNextFetchOffset(true);
2005-
}
1984+
// Do not update the next fetch offset as the batch has not completed the transition yet.
20061985
} finally {
20071986
lock.writeLock().unlock();
20081987
}

0 commit comments

Comments
 (0)