Skip to content

bug: Kinesis ListShards ignores ShardFilter #12833

Open
@awelless

Description

@awelless

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

Listing Stream Shards with ShardFilter set AT_TRIM_HORIZON incorrectly returns all Shards that exist in the Stream.

Expected Behavior

Only Shards that were open AT_TRIM_HORIZON should be returned.

How are you starting LocalStack?

With a docker run command

Steps To Reproduce

Using localstack:4.6.0 in docker with java's testcontainers.

Java test repro:

class ListShardsRepro {

    private static final DockerImageName LOCALSTACK_IMAGE = DockerImageName.parse("localstack/localstack:4.6.0");

    private static final LocalStackContainer localstack = new LocalStackContainer(LOCALSTACK_IMAGE)
            .withServices(LocalStackContainer.Service.KINESIS);

    private static KinesisClient kinesisClient;

    @BeforeAll
    static void oneTimeSetup() {
        localstack.start();

        final AwsCredentialsProvider credentialsProvider = StaticCredentialsProvider.create(
                AwsBasicCredentials.create(localstack.getAccessKey(), localstack.getSecretKey())
        );

        kinesisClient = KinesisClient.builder()
                .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.KINESIS))
                .credentialsProvider(credentialsProvider)
                .region(Region.of(localstack.getRegion()))
                .build();
    }

    @Test
    void test() {
        String streamName = "test-stream";
        
        // Create a stream with 1 shard and wait to become active.
        kinesisClient.createStream(req -> req.streamName(streamName).shardCount(1));
        while(true) {
            final DescribeStreamResponse response = kinesisClient.describeStream(req -> req.streamName(streamName));
            final StreamStatus status = response.streamDescription().streamStatus();
            if (status == StreamStatus.ACTIVE) {
                break;
            }
        }

        // Update shard count to 2 and wait for stream to become active.
        kinesisClient.updateShardCount(req -> req.streamName(streamName).targetShardCount(2).scalingType(ScalingType.UNIFORM_SCALING));
        while(true) {
            final DescribeStreamResponse response = kinesisClient.describeStream(req -> req.streamName(streamName));
            final StreamStatus status = response.streamDescription().streamStatus();
            if (status == StreamStatus.ACTIVE) {
                break;
            }
        }

        // Get all shards that were open at TRIM_HORIZON.
        ShardFilter filter = ShardFilter.builder().type(ShardFilterType.AT_TRIM_HORIZON).build();
        ListShardsResponse shards = kinesisClient.listShards(req -> req.streamName(streamName).shardFilter(filter));

        // Only shard-0 was open AT_TRIM_HORIZON.
        List<String> expectedShards = List.of("shardId-000000000000");
        List<String> actualShards = shards.shards().stream().map(Shard::shardId).toList();

        // Returns shard-0, shard-1, shard-2 in localstack, but only shard-0 in AWS.
        assertEquals(expectedShards, actualShards);
    }
}

Environment

- OS: MacOS 15.5
- LocalStack:
  LocalStack version: 4.6.0
  LocalStack Docker image sha: sha256:5a97e0f9917a3f0d9630bb13b9d8ccf10cbe52f33252807d3b4e21418cc21348
  LocalStack build date:
  LocalStack build git hash:

Anything else?

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions