@@ -803,7 +803,7 @@ public ShareAcquiredRecords acquire(
803
803
}
804
804
805
805
InFlightState updateResult = inFlightBatch .tryUpdateBatchState (RecordState .ACQUIRED , DeliveryCountOps .INCREASE , maxDeliveryCount , memberId );
806
- if (updateResult == null ) {
806
+ if (updateResult == null || updateResult . state () != RecordState . ACQUIRED ) {
807
807
log .info ("Unable to acquire records for the batch: {} in share partition: {}-{}" ,
808
808
inFlightBatch , groupId , topicIdPartition );
809
809
continue ;
@@ -1009,12 +1009,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
1009
1009
updatedStates .add (updateResult );
1010
1010
stateBatches .add (new PersisterStateBatch (offsetState .getKey (), offsetState .getKey (),
1011
1011
updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1012
-
1013
- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1014
- // This should not change the next fetch offset because the record is not available for acquisition
1015
- if (updateResult .state () != RecordState .ARCHIVED ) {
1016
- updateFindNextFetchOffset (true );
1017
- }
1012
+ // Do not update the next fetch offset as the offset has not completed the transition yet.
1018
1013
}
1019
1014
}
1020
1015
return Optional .empty ();
@@ -1054,12 +1049,7 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
1054
1049
updatedStates .add (updateResult );
1055
1050
stateBatches .add (new PersisterStateBatch (inFlightBatch .firstOffset (), inFlightBatch .lastOffset (),
1056
1051
updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1057
-
1058
- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1059
- // This should not change the next fetch offset because the record is not available for acquisition
1060
- if (updateResult .state () != RecordState .ARCHIVED ) {
1061
- updateFindNextFetchOffset (true );
1062
- }
1052
+ // Do not update the next fetch offset as the batch has not completed the transition yet.
1063
1053
}
1064
1054
return Optional .empty ();
1065
1055
}
@@ -1641,7 +1631,7 @@ private int acquireSubsetBatchRecords(
1641
1631
1642
1632
InFlightState updateResult = offsetState .getValue ().tryUpdateState (RecordState .ACQUIRED , DeliveryCountOps .INCREASE ,
1643
1633
maxDeliveryCount , memberId );
1644
- if (updateResult == null ) {
1634
+ if (updateResult == null || updateResult . state () != RecordState . ACQUIRED ) {
1645
1635
log .trace ("Unable to acquire records for the offset: {} in batch: {}"
1646
1636
+ " for the share partition: {}-{}" , offsetState .getKey (), inFlightBatch ,
1647
1637
groupId , topicIdPartition );
@@ -1941,12 +1931,7 @@ private Optional<Throwable> acknowledgePerOffsetBatchRecords(
1941
1931
updatedStates .add (updateResult );
1942
1932
stateBatches .add (new PersisterStateBatch (offsetState .getKey (), offsetState .getKey (),
1943
1933
updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1944
- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
1945
- // This should not change the next fetch offset because the record is not available for acquisition
1946
- if (recordState == RecordState .AVAILABLE
1947
- && updateResult .state () != RecordState .ARCHIVED ) {
1948
- updateFindNextFetchOffset (true );
1949
- }
1934
+ // Do not update the nextFetchOffset as the offset has not completed the transition yet.
1950
1935
}
1951
1936
} finally {
1952
1937
lock .writeLock ().unlock ();
@@ -1996,13 +1981,7 @@ private Optional<Throwable> acknowledgeCompleteBatch(
1996
1981
stateBatches .add (
1997
1982
new PersisterStateBatch (inFlightBatch .firstOffset (), inFlightBatch .lastOffset (),
1998
1983
updateResult .state ().id (), (short ) updateResult .deliveryCount ()));
1999
-
2000
- // If the maxDeliveryCount limit has been exceeded, the record will be transitioned to ARCHIVED state.
2001
- // This should not change the nextFetchOffset because the record is not available for acquisition
2002
- if (recordState == RecordState .AVAILABLE
2003
- && updateResult .state () != RecordState .ARCHIVED ) {
2004
- updateFindNextFetchOffset (true );
2005
- }
1984
+ // Do not update the next fetch offset as the batch has not completed the transition yet.
2006
1985
} finally {
2007
1986
lock .writeLock ().unlock ();
2008
1987
}
0 commit comments