From 1c3b0efa3c693097fa6a1a0b586604c159845f8b Mon Sep 17 00:00:00 2001 From: Cristopher Pinzon Date: Thu, 22 May 2025 11:16:10 -0500 Subject: [PATCH 1/2] add small fixes/improvements to Firehose.CreateDeliveryStream --- .../localstack/services/firehose/provider.py | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/localstack-core/localstack/services/firehose/provider.py b/localstack-core/localstack/services/firehose/provider.py index c678d0647c076..54487efa20776 100644 --- a/localstack-core/localstack/services/firehose/provider.py +++ b/localstack-core/localstack/services/firehose/provider.py @@ -63,6 +63,7 @@ RedshiftDestinationConfiguration, RedshiftDestinationDescription, RedshiftDestinationUpdate, + ResourceInUseException, ResourceNotFoundException, S3DestinationConfiguration, S3DestinationDescription, @@ -261,7 +262,7 @@ def create_delivery_stream( self, context: RequestContext, delivery_stream_name: DeliveryStreamName, - delivery_stream_type: DeliveryStreamType = None, + delivery_stream_type: DeliveryStreamType = DeliveryStreamType.DirectPut, direct_put_source_configuration: DirectPutSourceConfiguration = None, kinesis_stream_source_configuration: KinesisStreamSourceConfiguration = None, delivery_stream_encryption_configuration_input: DeliveryStreamEncryptionConfigurationInput = None, @@ -283,6 +284,17 @@ def create_delivery_stream( # TODO add support for database_source_configuration and direct_put_source_configuration store = self.get_store(context.account_id, context.region) + delivery_stream_arn = firehose_stream_arn( + stream_name=delivery_stream_name, + account_id=context.account_id, + region_name=context.region, + ) + + if delivery_stream_name in store.delivery_streams.keys(): + raise ResourceInUseException( + f"Firehose {delivery_stream_name} under accountId {context.account_id} already exists" + ) + destinations: DestinationDescriptionList = [] if elasticsearch_destination_configuration: destinations.append( @@ -344,11 +356,7 @@ def create_delivery_stream( stream = DeliveryStreamDescription( DeliveryStreamName=delivery_stream_name, - DeliveryStreamARN=firehose_stream_arn( - stream_name=delivery_stream_name, - account_id=context.account_id, - region_name=context.region, - ), + DeliveryStreamARN=delivery_stream_arn, DeliveryStreamStatus=DeliveryStreamStatus.ACTIVE, DeliveryStreamType=delivery_stream_type, HasMoreDestinations=False, @@ -358,8 +366,6 @@ def create_delivery_stream( Source=convert_source_config_to_desc(kinesis_stream_source_configuration), ) delivery_stream_arn = stream["DeliveryStreamARN"] - store.TAGS.tag_resource(delivery_stream_arn, tags) - store.delivery_streams[delivery_stream_name] = stream if delivery_stream_type == DeliveryStreamType.KinesisStreamAsSource: if not kinesis_stream_source_configuration: @@ -396,6 +402,10 @@ def _startup(): stream["DeliveryStreamStatus"] = DeliveryStreamStatus.CREATING_FAILED run_for_max_seconds(25, _startup) + + store.TAGS.tag_resource(delivery_stream_arn, tags) + store.delivery_streams[delivery_stream_name] = stream + return CreateDeliveryStreamOutput(DeliveryStreamARN=stream["DeliveryStreamARN"]) def delete_delivery_stream( From ca8214fe431742de13abe8d93c1238d348383130 Mon Sep 17 00:00:00 2001 From: Cristopher Pinzon Date: Thu, 22 May 2025 14:24:17 -0500 Subject: [PATCH 2/2] fix linting --- localstack-core/localstack/services/firehose/provider.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/localstack-core/localstack/services/firehose/provider.py b/localstack-core/localstack/services/firehose/provider.py index 54487efa20776..18142ae80d88b 100644 --- a/localstack-core/localstack/services/firehose/provider.py +++ b/localstack-core/localstack/services/firehose/provider.py @@ -262,7 +262,7 @@ def create_delivery_stream( self, context: RequestContext, delivery_stream_name: DeliveryStreamName, - delivery_stream_type: DeliveryStreamType = DeliveryStreamType.DirectPut, + delivery_stream_type: DeliveryStreamType = None, direct_put_source_configuration: DirectPutSourceConfiguration = None, kinesis_stream_source_configuration: KinesisStreamSourceConfiguration = None, delivery_stream_encryption_configuration_input: DeliveryStreamEncryptionConfigurationInput = None, @@ -283,6 +283,7 @@ def create_delivery_stream( ) -> CreateDeliveryStreamOutput: # TODO add support for database_source_configuration and direct_put_source_configuration store = self.get_store(context.account_id, context.region) + delivery_stream_type = delivery_stream_type or DeliveryStreamType.DirectPut delivery_stream_arn = firehose_stream_arn( stream_name=delivery_stream_name,