Skip to content

Commit 23877c7

Browse files
authored
fix DynamoDB stream events for UPDATE operations (localstack#484)
1 parent e40826d commit 23877c7

File tree

8 files changed

+41
-21
lines changed

8 files changed

+41
-21
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ install: ## Install dependencies in virtualenv
1212
(test `which virtualenv` || $(PIP_CMD) install --user virtualenv) && \
1313
(test -e $(VENV_DIR) || virtualenv $(VENV_OPTS) $(VENV_DIR)) && \
1414
($(VENV_RUN) && $(PIP_CMD) install --upgrade pip) && \
15-
(test ! -e requirements.txt || ($(VENV_RUN); $(PIP_CMD) install six==1.10.0 ; $(PIP_CMD) install -r requirements.txt) && \
15+
(test ! -e requirements.txt || ($(VENV_RUN); $(PIP_CMD) install six==1.10.0 ; $(PIP_CMD) -q install -r requirements.txt) && \
1616
$(VENV_RUN); PYTHONPATH=. exec python localstack/services/install.py testlibs)
1717

1818
install-web: ## Install npm dependencies for dashboard Web UI

localstack/services/dynamodb/dynamodb_listener.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def return_response(self, method, path, data, headers, response):
7777
record['eventName'] = 'MODIFY'
7878
record['dynamodb']['Keys'] = data['Key']
7979
record['dynamodb']['NewImage'] = new_item['Item']
80+
record['dynamodb']['SizeBytes'] = len(json.dumps(new_item['Item']))
8081
elif action == '%s.BatchWriteItem' % ACTION_PREFIX:
8182
records = []
8283
for table_name, requests in data['RequestItems'].items():
@@ -99,6 +100,7 @@ def return_response(self, method, path, data, headers, response):
99100
return keys
100101
record['dynamodb']['Keys'] = keys
101102
record['dynamodb']['NewImage'] = data['Item']
103+
record['dynamodb']['SizeBytes'] = len(json.dumps(data['Item']))
102104
elif action == '%s.GetItem' % ACTION_PREFIX:
103105
if response.status_code == 200:
104106
content = json.loads(to_str(response.content))
@@ -132,10 +134,11 @@ def return_response(self, method, path, data, headers, response):
132134
# nothing to do
133135
return
134136

135-
if 'TableName' in data:
136-
record['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])
137-
forward_to_lambda(records)
138-
forward_to_ddb_stream(records)
137+
if len(records) > 0 and 'eventName' in records[0]:
138+
if 'TableName' in data:
139+
records[0]['eventSourceARN'] = aws_stack.dynamodb_table_arn(data['TableName'])
140+
forward_to_lambda(records)
141+
forward_to_ddb_stream(records)
139142

140143

141144
# instantiate listener

localstack/services/dynamodb/dynamodb_starter.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import time
21
import logging
32
import traceback
43
from localstack.config import PORT_DYNAMODB, DATA_DIR
54
from localstack.constants import DEFAULT_PORT_DYNAMODB_BACKEND
65
from localstack.utils.aws import aws_stack
7-
from localstack.utils.common import mkdir, is_port_open
6+
from localstack.utils.common import mkdir, wait_for_port_open
87
from localstack.services import install
98
from localstack.services.infra import get_service_protocol, start_proxy_for_service, do_run
109
from localstack.services.install import ROOT_PATH
@@ -16,7 +15,7 @@ def check_dynamodb(expect_shutdown=False, print_error=False):
1615
out = None
1716
try:
1817
# wait for port to be opened
19-
wait_for_port_open()
18+
wait_for_port_open(DEFAULT_PORT_DYNAMODB_BACKEND)
2019
# check DynamoDB
2120
out = aws_stack.connect_to_service(service_name='dynamodb').list_tables()
2221
except Exception as e:
@@ -28,13 +27,6 @@ def check_dynamodb(expect_shutdown=False, print_error=False):
2827
assert isinstance(out['TableNames'], list)
2928

3029

31-
def wait_for_port_open():
32-
for i in range(0, 8):
33-
if is_port_open(DEFAULT_PORT_DYNAMODB_BACKEND):
34-
break
35-
time.sleep(0.5)
36-
37-
3830
def start_dynamodb(port=PORT_DYNAMODB, async=False, update_listener=None):
3931
install.install_dynamodb_local()
4032
backend_port = DEFAULT_PORT_DYNAMODB_BACKEND

localstack/services/s3/s3_starter.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import logging
22
import traceback
3+
from localstack.constants import DEFAULT_PORT_S3_BACKEND
34
from localstack.utils.aws import aws_stack
5+
from localstack.utils.common import wait_for_port_open
46

57
LOGGER = logging.getLogger(__name__)
68

79

810
def check_s3(expect_shutdown=False, print_error=False):
911
out = None
1012
try:
13+
# wait for port to be opened
14+
wait_for_port_open(DEFAULT_PORT_S3_BACKEND)
1115
# check S3
1216
out = aws_stack.connect_to_service(service_name='s3').list_buckets()
1317
except Exception as e:

localstack/utils/aws/aws_responses.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from flask import jsonify, make_response
1+
import json
2+
from flask import Response
23

34

45
def flask_error_response(msg, code=500, error_type='InternalFailure'):
@@ -8,4 +9,6 @@ def flask_error_response(msg, code=500, error_type='InternalFailure'):
89
'__type': error_type
910
}
1011
headers = {'x-amzn-errortype': error_type}
11-
return make_response((jsonify(result), code, headers))
12+
# Note: don't use flask's make_response(..) or jsonify(..) here as they
13+
# can lead to "RuntimeError: working outside of application context".
14+
return Response(json.dumps(result), status=code, headers=headers)

localstack/utils/common.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,13 @@ def is_port_open(port_or_url):
251251
return result == 0
252252

253253

254+
def wait_for_port_open(port, retries=10, sleep_time=0.5):
255+
for i in range(0, retries):
256+
if is_port_open(port):
257+
break
258+
time.sleep(sleep_time)
259+
260+
254261
def timestamp(time=None, format=TIMESTAMP_FORMAT):
255262
if not time:
256263
time = datetime.utcnow()

tests/integration/lambdas/lambda_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ def deserialize_event(event):
7575
ddb = event.get('dynamodb')
7676
if ddb:
7777
ddb_deserializer = TypeDeserializer()
78-
return ddb_deserializer.deserialize({'M': ddb['NewImage']})
78+
return ddb_deserializer.deserialize({'M': ddb.get('NewImage')})
7979
kinesis = event.get('kinesis')
8080
if kinesis:
8181
assert kinesis['sequenceNumber']

tests/integration/test_integration.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,13 @@ def process_records(records, shard_id):
122122
zip_file=zip_file, event_source_arn=kinesis_event_source_arn, runtime=LAMBDA_RUNTIME_PYTHON27)
123123

124124
# put items to table
125-
num_events_ddb = 10
125+
num_events_ddb = 15
126+
num_put_items = 7
127+
num_batch_items = 3
128+
num_updates_ddb = num_events_ddb - num_put_items - num_batch_items
126129
LOGGER.info('Putting %s items to table...' % num_events_ddb)
127130
table = dynamodb.Table(TEST_TABLE_NAME)
128-
for i in range(0, num_events_ddb - 3):
131+
for i in range(0, num_put_items):
129132
table.put_item(Item={
130133
PARTITION_KEY: 'testId%s' % i,
131134
'data': 'foobar123'
@@ -136,6 +139,14 @@ def process_records(records, shard_id):
136139
{'PutRequest': {'Item': {PARTITION_KEY: short_uid(), 'data': 'foobar123 £'}}},
137140
{'PutRequest': {'Item': {PARTITION_KEY: short_uid(), 'data': 'foobar123 ¢'}}}
138141
]})
142+
# update some items, which also triggers notification events
143+
for i in range(0, num_updates_ddb):
144+
dynamodb_service.update_item(TableName=TEST_TABLE_NAME,
145+
Key={PARTITION_KEY: {'S': 'testId%s' % i}},
146+
AttributeUpdates={'data': {
147+
'Action': 'PUT',
148+
'Value': {'S': 'foobar123_updated'}
149+
}})
139150

140151
# put items to stream
141152
num_events_kinesis = 10
@@ -186,7 +197,7 @@ def check_events():
186197
stats2 = get_lambda_metrics(TEST_LAMBDA_NAME_STREAM, 'Errors')
187198
assert len(stats2['Datapoints']) == 1
188199
stats3 = get_lambda_metrics(TEST_LAMBDA_NAME_DDB)
189-
assert len(stats3['Datapoints']) == 10
200+
assert len(stats3['Datapoints']) == num_events_ddb
190201

191202

192203
def test_kinesis_lambda_forward_chain():

0 commit comments

Comments
 (0)