diff --git a/localstack-core/localstack/services/firehose/provider.py b/localstack-core/localstack/services/firehose/provider.py index c678d0647c076..18142ae80d88b 100644 --- a/localstack-core/localstack/services/firehose/provider.py +++ b/localstack-core/localstack/services/firehose/provider.py @@ -63,6 +63,7 @@ RedshiftDestinationConfiguration, RedshiftDestinationDescription, RedshiftDestinationUpdate, + ResourceInUseException, ResourceNotFoundException, S3DestinationConfiguration, S3DestinationDescription, @@ -282,6 +283,18 @@ def create_delivery_stream( ) -> CreateDeliveryStreamOutput: # TODO add support for database_source_configuration and direct_put_source_configuration store = self.get_store(context.account_id, context.region) + delivery_stream_type = delivery_stream_type or DeliveryStreamType.DirectPut + + delivery_stream_arn = firehose_stream_arn( + stream_name=delivery_stream_name, + account_id=context.account_id, + region_name=context.region, + ) + + if delivery_stream_name in store.delivery_streams.keys(): + raise ResourceInUseException( + f"Firehose {delivery_stream_name} under accountId {context.account_id} already exists" + ) destinations: DestinationDescriptionList = [] if elasticsearch_destination_configuration: @@ -344,11 +357,7 @@ def create_delivery_stream( stream = DeliveryStreamDescription( DeliveryStreamName=delivery_stream_name, - DeliveryStreamARN=firehose_stream_arn( - stream_name=delivery_stream_name, - account_id=context.account_id, - region_name=context.region, - ), + DeliveryStreamARN=delivery_stream_arn, DeliveryStreamStatus=DeliveryStreamStatus.ACTIVE, DeliveryStreamType=delivery_stream_type, HasMoreDestinations=False, @@ -358,8 +367,6 @@ def create_delivery_stream( Source=convert_source_config_to_desc(kinesis_stream_source_configuration), ) delivery_stream_arn = stream["DeliveryStreamARN"] - store.TAGS.tag_resource(delivery_stream_arn, tags) - store.delivery_streams[delivery_stream_name] = stream if delivery_stream_type == DeliveryStreamType.KinesisStreamAsSource: if not kinesis_stream_source_configuration: @@ -396,6 +403,10 @@ def _startup(): stream["DeliveryStreamStatus"] = DeliveryStreamStatus.CREATING_FAILED run_for_max_seconds(25, _startup) + + store.TAGS.tag_resource(delivery_stream_arn, tags) + store.delivery_streams[delivery_stream_name] = stream + return CreateDeliveryStreamOutput(DeliveryStreamARN=stream["DeliveryStreamARN"]) def delete_delivery_stream(