Skip to content

Commit b78d7ba

Browse files
dguyguozhangwang
authored andcommitted
KAFKA-5152: perform state restoration in poll loop
In onPartitionsAssigned: release all locks for non-assigned suspended tasks. resume any suspended tasks. Create new tasks, but don't attempt to take the state lock. Pause partitions for any new tasks. set the state to PARTITIONS_ASSIGNED In StreamThread#runLoop poll if state is PARTITIONS_ASSIGNED 2.1 attempt to initialize any new tasks, i.e, take out the state locks and init state stores 2.2 restore some data for changelogs, i.e., poll once on the restore consumer and return the partitions that have been fully restored 2.3 update tasks with restored partitions and move any that have completed restoration to running 2.4 resume consumption for any tasks where all partitions have been restored. 2.5 if all active tasks are running, transition to RUNNING and assign standby partitions to the restoreConsumer. Author: Damian Guy <damian.guy@gmail.com> Reviewers: Guozhang Wang <wangguoz@gmail.com> Closes apache#3675 from dguy/kafka-5152
1 parent 343a8ef commit b78d7ba

27 files changed

+2043
-1089
lines changed

streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ public synchronized void onChange(final Thread thread,
414414
threadState.put(thread.getId(), newState);
415415

416416
if (newState == StreamThread.State.PARTITIONS_REVOKED ||
417-
newState == StreamThread.State.ASSIGNING_PARTITIONS) {
417+
newState == StreamThread.State.PARTITIONS_ASSIGNED) {
418418
setState(State.REBALANCING);
419419
} else if (newState == StreamThread.State.RUNNING && state() != State.RUNNING) {
420420
maybeSetRunning();

streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.kafka.common.errors.AuthorizationException;
2424
import org.apache.kafka.common.errors.WakeupException;
2525
import org.apache.kafka.streams.StreamsConfig;
26+
import org.apache.kafka.streams.errors.LockException;
2627
import org.apache.kafka.streams.errors.ProcessorStateException;
28+
import org.apache.kafka.streams.errors.StreamsException;
2729
import org.apache.kafka.streams.processor.ProcessorContext;
2830
import org.apache.kafka.streams.processor.StateStore;
2931
import org.apache.kafka.streams.processor.TaskId;
@@ -48,6 +50,8 @@ public abstract class AbstractTask implements Task {
4850
final Consumer consumer;
4951
final String logPrefix;
5052
final boolean eosEnabled;
53+
boolean taskInitialized;
54+
private final StateDirectory stateDirectory;
5155

5256
InternalProcessorContext processorContext;
5357

@@ -69,6 +73,7 @@ public abstract class AbstractTask implements Task {
6973
this.topology = topology;
7074
this.consumer = consumer;
7175
this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
76+
this.stateDirectory = stateDirectory;
7277

7378
logPrefix = String.format("%s [%s]", isStandby ? "standby-task" : "task", id());
7479

@@ -188,6 +193,20 @@ void flushState() {
188193
}
189194

190195
void initializeStateStores() {
196+
if (topology.stateStores().isEmpty()) {
197+
return;
198+
}
199+
200+
try {
201+
if (!stateDirectory.lock(id, 5)) {
202+
throw new LockException(String.format("%s Failed to lock the state directory for task %s",
203+
logPrefix, id));
204+
}
205+
} catch (IOException e) {
206+
throw new StreamsException(String.format("%s fatal error while trying to lock the state directory for task %s",
207+
logPrefix,
208+
id));
209+
}
191210
log.trace("{} Initializing state stores", logPrefix);
192211

193212
// set initial offset limits
@@ -199,13 +218,38 @@ void initializeStateStores() {
199218
}
200219
}
201220

221+
202222
/**
203223
* @throws ProcessorStateException if there is an error while closing the state manager
204224
* @param writeCheckpoint boolean indicating if a checkpoint file should be written
205225
*/
206226
void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateException {
227+
ProcessorStateException exception = null;
207228
log.trace("{} Closing state manager", logPrefix);
208-
stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
229+
try {
230+
stateMgr.close(writeCheckpoint ? recordCollectorOffsets() : null);
231+
} catch (final ProcessorStateException e) {
232+
exception = e;
233+
} finally {
234+
try {
235+
stateDirectory.unlock(id);
236+
} catch (IOException e) {
237+
if (exception == null) {
238+
exception = new ProcessorStateException(String.format("%s Failed to release state dir lock", logPrefix), e);
239+
}
240+
}
241+
}
242+
if (exception != null) {
243+
throw exception;
244+
}
209245
}
210246

247+
248+
public boolean hasStateStores() {
249+
return !topology.stateStores().isEmpty();
250+
}
251+
252+
public Collection<TopicPartition> changelogPartitions() {
253+
return stateMgr.changelogPartitions();
254+
}
211255
}

0 commit comments

Comments
 (0)