Skip to content

Step Functions: Improve Logging and Error Detection for Unsupported Service Integrations #12223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import abc
import copy
import json
import logging
from typing import Any, Final, Optional, Union

from botocore.model import ListShape, OperationModel, Shape, StringShape, StructureShape
Expand Down Expand Up @@ -39,12 +40,15 @@
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.state_task import (
StateTask,
)
from localstack.services.stepfunctions.asl.component.state.state_props import StateProps
from localstack.services.stepfunctions.asl.eval.environment import Environment
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
from localstack.services.stepfunctions.asl.utils.encoding import to_json_str
from localstack.services.stepfunctions.quotas import is_within_size_quota
from localstack.utils.strings import camel_to_snake_case, snake_to_camel_case, to_bytes, to_str

LOG = logging.getLogger(__name__)


class StateTaskService(StateTask, abc.ABC):
resource: ServiceResource
Expand All @@ -54,6 +58,20 @@ class StateTaskService(StateTask, abc.ABC):
"states": "stepfunctions",
}

def from_state_props(self, state_props: StateProps) -> None:
super().from_state_props(state_props=state_props)
# Validate the service integration is supported on program creation.
self._validate_service_integration_is_supported()

def _validate_service_integration_is_supported(self):
# Validate the service integration is supported.
supported_parameters = self._get_supported_parameters()
if supported_parameters is None:
raise ValueError(
f"The resource provided {self.resource.resource_arn} not recognized. "
"The value is not a valid resource ARN, or the resource is not available in this region."
)

def _get_sfn_resource(self) -> str:
return self.resource.api_action

Expand Down Expand Up @@ -110,6 +128,13 @@ def _to_boto_request_value(self, request_value: Any, value_shape: Shape) -> Any:
return boto_request_value

def _to_boto_request(self, parameters: dict, structure_shape: StructureShape) -> None:
if not isinstance(structure_shape, StructureShape):
LOG.warning(
"Step Functions could not normalise the request for integration '%s' due to the unexpected request template value of type '%s'",
self.resource.resource_arn,
type(structure_shape),
)
return
shape_members = structure_shape.members
norm_member_binds: dict[str, tuple[str, StructureShape]] = {
camel_to_snake_case(member_key): (member_key, member_value)
Expand Down Expand Up @@ -148,6 +173,14 @@ def _from_boto_response(self, response: Any, structure_shape: StructureShape) ->
if not isinstance(response, dict):
return

if not isinstance(structure_shape, StructureShape):
LOG.warning(
"Step Functions could not normalise the response of integration '%s' due to the unexpected request template value of type '%s'",
self.resource.resource_arn,
type(structure_shape),
)
return

shape_members = structure_shape.members
response_bind_keys: list[str] = list(response.keys())
for response_key in response_bind_keys:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from localstack.services.stepfunctions.asl.component.state.state_execution.state_task.service.state_task_service_callback import (
StateTaskServiceCallback,
)
from localstack.services.stepfunctions.asl.component.state.state_props import StateProps
from localstack.services.stepfunctions.asl.eval.environment import Environment
from localstack.services.stepfunctions.asl.eval.event.event_detail import EventDetails
from localstack.services.stepfunctions.asl.utils.boto_client import boto_client_for
Expand All @@ -38,8 +37,9 @@ class StateTaskServiceAwsSdk(StateTaskServiceCallback):
def __init__(self):
super().__init__(supported_integration_patterns=_SUPPORTED_INTEGRATION_PATTERNS)

def from_state_props(self, state_props: StateProps) -> None:
super().from_state_props(state_props=state_props)
def _validate_service_integration_is_supported(self):
# As no aws-sdk support catalog is available, allow invalid aws-sdk integration to fail at runtime.
pass

def _get_sfn_resource_type(self) -> str:
return f"{self.resource.service_name}:{self.resource.api_name}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class StateTaskServiceUnsupported(StateTaskServiceCallback):
def __init__(self):
super().__init__(supported_integration_patterns=_SUPPORTED_INTEGRATION_PATTERNS)

def _validate_service_integration_is_supported(self):
# Attempts to execute any derivation; logging this incident on creation.
self._log_unsupported_warning()

def _log_unsupported_warning(self):
# Logs that the optimised service integration is not supported,
# however the request is being forwarded to the service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,25 +717,35 @@ def sfn_sqs_integration():
def stepfunctions_api():
return [
JsonpathTransformer(
"$..SdkHttpMetadata.AllHttpHeaders.Date",
"$..SdkHttpMetadata..Date",
"date",
replace_reference=False,
),
JsonpathTransformer(
"$..SdkHttpMetadata.AllHttpHeaders.X-Amzn-Trace-Id",
"X-Amzn-Trace-Id",
"$..SdkResponseMetadata..RequestId",
"RequestId",
replace_reference=False,
),
JsonpathTransformer(
"$..SdkHttpMetadata.HttpHeaders.Date",
"date",
"$..X-Amzn-Trace-Id",
"X-Amzn-Trace-Id",
replace_reference=False,
),
JsonpathTransformer(
"$..SdkHttpMetadata.HttpHeaders.X-Amzn-Trace-Id",
"$..X-Amzn-Trace-Id",
"X-Amzn-Trace-Id",
replace_reference=False,
),
JsonpathTransformer(
"$..x-amz-crc32",
"x-amz-crc32",
replace_reference=False,
),
JsonpathTransformer(
"$..x-amzn-RequestId",
"x-amzn-RequestId",
replace_reference=False,
),
KeyValueBasedTransformer(_transform_stepfunctions_cause_details, "json-input"),
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class ServicesTemplates(TemplateLoader):
DYNAMODB_PUT_UPDATE_GET_ITEM: Final[str] = os.path.join(
_THIS_FOLDER, "statemachines/dynamodb_put_update_get_item.json5"
)
DYNAMODB_PUT_QUERY: Final[str] = os.path.join(
_THIS_FOLDER, "statemachines/dynamodb_put_query.json5"
)
INVALID_INTEGRATION_DYNAMODB_QUERY: Final[str] = os.path.join(
_THIS_FOLDER, "statemachines/invalid_integration_dynamodb_query.json5"
)
# Lambda Functions.
LAMBDA_ID_FUNCTION: Final[str] = os.path.join(_THIS_FOLDER, "lambdafunctions/id_function.py")
LAMBDA_RETURN_BYTES_STR: Final[str] = os.path.join(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"StartAt": "PutItem",
"States": {
"PutItem": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:putItem",
"Parameters": {
"TableName.$": "$.TableName",
"Item.$": "$.Item"
},
"ResultPath": "$.putItemOutput",
"Next": "QueryItems"
},
"QueryItems": {
"Type": "Task",
// Use aws-sdk for the query call: see AWS's limitations
// of the ddb optimised service integration.
"Resource": "arn:aws:states:::aws-sdk:dynamodb:query",
"ResultPath": "$.queryOutput",
"Parameters": {
"TableName.$": "$.TableName",
"KeyConditionExpression": "id = :id",
"ExpressionAttributeValues": {
":id.$": "$.Item.id"
}
},
"End": true
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
{
"StartAt": "Query",
"States": {
"Query": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:query",
"ResultPath": "$.queryItemOutput",
"Parameters": {
"TableName.$": "$.TableName",
"KeyConditionExpression": "id = :id",
"ExpressionAttributeValues": {
":id": {
"S.$": "$.Item.id.S"
}
}
},
"End": true
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import json

import pytest
from localstack_snapshot.snapshots.transformer import RegexTransformer

from localstack.testing.pytest import markers
from localstack.testing.pytest.stepfunctions.utils import (
create_and_record_execution,
create_state_machine_with_iam_role,
)
from localstack.utils.strings import short_uid
from tests.aws.services.stepfunctions.templates.services.services_templates import (
Expand All @@ -18,61 +22,45 @@
]
)
class TestTaskServiceDynamoDB:
@markers.aws.needs_fixing
def test_put_get_item(
@markers.aws.validated
@pytest.mark.parametrize(
"template_path",
[
ST.DYNAMODB_PUT_GET_ITEM,
ST.DYNAMODB_PUT_DELETE_ITEM,
ST.DYNAMODB_PUT_UPDATE_GET_ITEM,
ST.DYNAMODB_PUT_QUERY,
],
ids=[
"DYNAMODB_PUT_GET_ITEM",
"DYNAMODB_PUT_DELETE_ITEM",
"DYNAMODB_PUT_UPDATE_GET_ITEM",
"DYNAMODB_PUT_QUERY",
],
)
def test_base_integrations(
self,
aws_client,
create_state_machine_iam_role,
create_state_machine,
dynamodb_create_table,
sfn_snapshot,
template_path,
):
sfn_snapshot.add_transformer(sfn_snapshot.transform.dynamodb_api())

table_name = f"sfn_test_table_{short_uid()}"
dynamodb_create_table(table_name=table_name, partition_key="id", client=aws_client.dynamodb)
sfn_snapshot.add_transformer(RegexTransformer(table_name, "table-name"))

template = ST.load_sfn_template(ST.DYNAMODB_PUT_GET_ITEM)
definition = json.dumps(template)

exec_input = json.dumps(
{
"TableName": table_name,
"Item": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
"Key": {"id": {"S": "id1"}},
}
)
create_and_record_execution(
aws_client,
create_state_machine_iam_role,
create_state_machine,
sfn_snapshot,
definition,
exec_input,
)

@markers.aws.needs_fixing
def test_put_delete_item(
self,
aws_client,
create_state_machine_iam_role,
create_state_machine,
dynamodb_create_table,
sfn_snapshot,
):
sfn_snapshot.add_transformer(sfn_snapshot.transform.dynamodb_api())

table_name = f"sfn_test_table_{short_uid()}"
dynamodb_create_table(table_name=table_name, partition_key="id", client=aws_client.dynamodb)

template = ST.load_sfn_template(ST.DYNAMODB_PUT_DELETE_ITEM)
template = ST.load_sfn_template(template_path)
definition = json.dumps(template)

exec_input = json.dumps(
{
"TableName": table_name,
"Item": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
"Key": {"id": {"S": "id1"}},
"UpdateExpression": "set S=:r",
"ExpressionAttributeValues": {":r": {"S": "HelloWorldUpdated"}},
}
)
create_and_record_execution(
Expand All @@ -84,37 +72,25 @@ def test_put_delete_item(
exec_input,
)

@markers.aws.needs_fixing
def test_put_update_get_item(
@markers.aws.validated
@markers.snapshot.skip_snapshot_verify(paths=["$..exception_value"])
def test_invalid_integration(
self,
aws_client,
create_state_machine_iam_role,
create_state_machine,
dynamodb_create_table,
sfn_snapshot,
):
sfn_snapshot.add_transformer(sfn_snapshot.transform.dynamodb_api())

table_name = f"sfn_test_table_{short_uid()}"
dynamodb_create_table(table_name=table_name, partition_key="id", client=aws_client.dynamodb)

template = ST.load_sfn_template(ST.DYNAMODB_PUT_UPDATE_GET_ITEM)
template = ST.load_sfn_template(ST.INVALID_INTEGRATION_DYNAMODB_QUERY)
definition = json.dumps(template)

exec_input = json.dumps(
{
"TableName": table_name,
"Item": {"data": {"S": "HelloWorld"}, "id": {"S": "id1"}},
"Key": {"id": {"S": "id1"}},
"UpdateExpression": "set S=:r",
"ExpressionAttributeValues": {":r": {"S": "HelloWorldUpdated"}},
}
)
create_and_record_execution(
aws_client,
create_state_machine_iam_role,
create_state_machine,
sfn_snapshot,
definition,
exec_input,
with pytest.raises(Exception) as ex:
create_state_machine_with_iam_role(
aws_client,
create_state_machine_iam_role,
create_state_machine,
sfn_snapshot,
definition,
)
sfn_snapshot.match(
"exception", {"exception_typename": ex.typename, "exception_value": ex.value}
)
Loading
Loading