1
1
import logging
2
+ import threading
2
3
3
4
from localstack import config
4
5
from localstack .constants import MODULE_MAIN_PATH
5
6
from localstack .services import install
6
7
from localstack .services .infra import do_run , log_startup_message , start_proxy_for_service
7
8
from localstack .utils .aws import aws_stack
8
- from localstack .utils .common import chmod_r , get_free_tcp_port , mkdir , replace_in_file
9
+ from localstack .utils .common import chmod_r , get_free_tcp_port , mkdir , replace_in_file , start_thread
9
10
10
11
LOGGER = logging .getLogger (__name__ )
11
12
13
+ # event to indicate that the kinesis backend service has stopped (the terminal command has returned)
14
+ kinesis_stopped = threading .Event ()
15
+
12
16
13
17
def apply_patches_kinesalite ():
14
18
files = [
@@ -33,6 +37,31 @@ def start_kinesis(port=None, asynchronous=False, update_listener=None):
33
37
raise Exception ('Unsupported Kinesis provider "%s"' % config .KINESIS_PROVIDER )
34
38
35
39
40
+ def _run_proxy_and_command (cmd , port , backend_port , update_listener , asynchronous ):
41
+ log_startup_message ("Kinesis" )
42
+ start_proxy_for_service ("kinesis" , port , backend_port , update_listener )
43
+
44
+ # TODO: generalize into service manager once it is introduced
45
+ try :
46
+ kinesis_cmd = do_run (cmd , asynchronous )
47
+ finally :
48
+ if asynchronous :
49
+
50
+ def _return_listener (* _ ):
51
+ try :
52
+ ret_code = kinesis_cmd .result_future .result ()
53
+ if ret_code != 0 :
54
+ LOGGER .error ("kinesis terminated with return code %s" , ret_code )
55
+ finally :
56
+ kinesis_stopped .set ()
57
+
58
+ start_thread (_return_listener )
59
+ else :
60
+ kinesis_stopped .set ()
61
+
62
+ return kinesis_cmd
63
+
64
+
36
65
def start_kinesis_mock (port = None , asynchronous = False , update_listener = None ):
37
66
kinesis_mock_bin = install .install_kinesis_mock ()
38
67
@@ -95,9 +124,14 @@ def start_kinesis_mock(port=None, asynchronous=False, update_listener=None):
95
124
initialize_streams_param ,
96
125
kinesis_mock_bin ,
97
126
)
98
- LOGGER .info ("starting kinesis-mock proxy %d:%d with cmd: %s" , port , backend_port , cmd )
99
- start_proxy_for_service ("kinesis" , port , backend_port , update_listener )
100
- return do_run (cmd , asynchronous )
127
+
128
+ return _run_proxy_and_command (
129
+ cmd = cmd ,
130
+ port = port ,
131
+ backend_port = backend_port ,
132
+ update_listener = update_listener ,
133
+ asynchronous = asynchronous ,
134
+ )
101
135
102
136
103
137
def start_kinesalite (port = None , asynchronous = False , update_listener = None ):
@@ -125,20 +159,30 @@ def start_kinesalite(port=None, asynchronous=False, update_listener=None):
125
159
latency ,
126
160
kinesis_data_dir_param ,
127
161
)
128
- log_startup_message ("Kinesis" )
129
- start_proxy_for_service ("kinesis" , port , backend_port , update_listener )
130
- return do_run (cmd , asynchronous )
162
+
163
+ return _run_proxy_and_command (
164
+ cmd = cmd ,
165
+ port = port ,
166
+ backend_port = backend_port ,
167
+ update_listener = update_listener ,
168
+ asynchronous = asynchronous ,
169
+ )
131
170
132
171
133
172
def check_kinesis (expect_shutdown = False , print_error = False ):
173
+ if expect_shutdown is False and kinesis_stopped .is_set ():
174
+ raise AssertionError ("kinesis backend has stopped" )
175
+
134
176
out = None
135
177
try :
136
178
# check Kinesis
137
179
out = aws_stack .connect_to_service (service_name = "kinesis" ).list_streams ()
138
180
except Exception as e :
139
181
if print_error :
140
182
LOGGER .exception ("Kinesis health check failed: %s" , e )
183
+
141
184
if expect_shutdown :
142
- assert out is None
185
+ assert out is None or kinesis_stopped . is_set ()
143
186
else :
144
- assert isinstance (out ["StreamNames" ], list )
187
+ assert not kinesis_stopped .is_set ()
188
+ assert out and isinstance (out .get ("StreamNames" ), list )
0 commit comments