Open
Description
Is there an existing issue for this?
- I have searched the existing issues
Current Behavior
Listing Stream Shards with ShardFilter set AT_TRIM_HORIZON
incorrectly returns all Shards that exist in the Stream.
Expected Behavior
Only Shards that were open AT_TRIM_HORIZON
should be returned.
How are you starting LocalStack?
With a docker run
command
Steps To Reproduce
Using localstack:4.6.0 in docker with java's testcontainers.
Java test repro:
class ListShardsRepro {
private static final DockerImageName LOCALSTACK_IMAGE = DockerImageName.parse("localstack/localstack:4.6.0");
private static final LocalStackContainer localstack = new LocalStackContainer(LOCALSTACK_IMAGE)
.withServices(LocalStackContainer.Service.KINESIS);
private static KinesisClient kinesisClient;
@BeforeAll
static void oneTimeSetup() {
localstack.start();
final AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(
AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())
);
kinesisClient = KinesisClient.builder()
.endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.KINESIS))
.credentialsProvider(credentialsProvider)
.region(Region.of(localstack.getRegion()))
.build();
}
@Test
void test() {
String streamName = "test-stream";
// Create a stream with 1 shard and wait to become active.
kinesisClient.createStream(req -> req.streamName(streamName).shardCount(1));
while(true) {
final DescribeStreamResponse response = kinesisClient.describeStream(req -> req.streamName(streamName));
final StreamStatus status = response.streamDescription().streamStatus();
if (status == StreamStatus.ACTIVE) {
break;
}
}
// Update shard count to 2 and wait for stream to become active.
kinesisClient.updateShardCount(req -> req.streamName(streamName).targetShardCount(2).scalingType(ScalingType.UNIFORM_SCALING));
while(true) {
final DescribeStreamResponse response = kinesisClient.describeStream(req -> req.streamName(streamName));
final StreamStatus status = response.streamDescription().streamStatus();
if (status == StreamStatus.ACTIVE) {
break;
}
}
// Get all shards that were open at TRIM_HORIZON.
ShardFilter filter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
ListShardsResponse shards = kinesisClient.listShards(req -> req.streamName(streamName).shardFilter(filter));
// Only shard-0 was open AT_TRIM_HORIZON.
List<String> expectedShards = List.of("shardId-000000000000");
List<String> actualShards = shards.shards().stream().map(Shard::shardId).toList();
// Returns shard-0, shard-1, shard-2 in localstack, but only shard-0 in AWS.
assertEquals(expectedShards, actualShards);
}
}
Environment
- OS: MacOS 15.5
- LocalStack:
LocalStack version: 4.6.0
LocalStack Docker image sha: sha256:5a97e0f9917a3f0d9630bb13b9d8ccf10cbe52f33252807d3b4e21418cc21348
LocalStack build date:
LocalStack build git hash:
Anything else?
No response