Skip to content

Commit 77d873d

Browse files
authored
improve kinesis startup routine and add KINESIS_MOCK_FORCE_JAVA config option (localstack#4344)
1 parent 815ef41 commit 77d873d

File tree

5 files changed

+63
-17
lines changed

5 files changed

+63
-17
lines changed

localstack/services/install.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import requests
1313

1414
from localstack import config
15-
from localstack.config import KINESIS_PROVIDER
15+
from localstack.config import KINESIS_PROVIDER, is_env_true
1616
from localstack.constants import (
1717
DEFAULT_SERVICE_PORTS,
1818
DYNAMODB_JAR_URL,
@@ -224,7 +224,11 @@ def install_kinesis_mock():
224224
is_probably_m1 = system == "darwin" and ("arm64" in version or "arm32" in version)
225225

226226
LOG.debug("getting kinesis-mock for %s %s", system, machine)
227-
if (machine == "x86_64" or machine == "amd64") and not is_probably_m1:
227+
228+
if is_env_true("KINESIS_MOCK_FORCE_JAVA"):
229+
# sometimes the static binaries may have problems, and we want to fal back to Java
230+
bin_file = "kinesis-mock.jar"
231+
elif (machine == "x86_64" or machine == "amd64") and not is_probably_m1:
228232
if system == "windows":
229233
bin_file = "kinesis-mock-mostly-static.exe"
230234
elif system == "linux":

localstack/services/kinesis/kinesis_starter.py

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import logging
2+
import threading
23

34
from localstack import config
45
from localstack.constants import MODULE_MAIN_PATH
56
from localstack.services import install
67
from localstack.services.infra import do_run, log_startup_message, start_proxy_for_service
78
from localstack.utils.aws import aws_stack
8-
from localstack.utils.common import chmod_r, get_free_tcp_port, mkdir, replace_in_file
9+
from localstack.utils.common import chmod_r, get_free_tcp_port, mkdir, replace_in_file, start_thread
910

1011
LOGGER = logging.getLogger(__name__)
1112

13+
# event to indicate that the kinesis backend service has stopped (the terminal command has returned)
14+
kinesis_stopped = threading.Event()
15+
1216

1317
def apply_patches_kinesalite():
1418
files = [
@@ -33,6 +37,31 @@ def start_kinesis(port=None, asynchronous=False, update_listener=None):
3337
raise Exception('Unsupported Kinesis provider "%s"' % config.KINESIS_PROVIDER)
3438

3539

40+
def _run_proxy_and_command(cmd, port, backend_port, update_listener, asynchronous):
41+
log_startup_message("Kinesis")
42+
start_proxy_for_service("kinesis", port, backend_port, update_listener)
43+
44+
# TODO: generalize into service manager once it is introduced
45+
try:
46+
kinesis_cmd = do_run(cmd, asynchronous)
47+
finally:
48+
if asynchronous:
49+
50+
def _return_listener(*_):
51+
try:
52+
ret_code = kinesis_cmd.result_future.result()
53+
if ret_code != 0:
54+
LOGGER.error("kinesis terminated with return code %s", ret_code)
55+
finally:
56+
kinesis_stopped.set()
57+
58+
start_thread(_return_listener)
59+
else:
60+
kinesis_stopped.set()
61+
62+
return kinesis_cmd
63+
64+
3665
def start_kinesis_mock(port=None, asynchronous=False, update_listener=None):
3766
kinesis_mock_bin = install.install_kinesis_mock()
3867

@@ -95,9 +124,14 @@ def start_kinesis_mock(port=None, asynchronous=False, update_listener=None):
95124
initialize_streams_param,
96125
kinesis_mock_bin,
97126
)
98-
LOGGER.info("starting kinesis-mock proxy %d:%d with cmd: %s", port, backend_port, cmd)
99-
start_proxy_for_service("kinesis", port, backend_port, update_listener)
100-
return do_run(cmd, asynchronous)
127+
128+
return _run_proxy_and_command(
129+
cmd=cmd,
130+
port=port,
131+
backend_port=backend_port,
132+
update_listener=update_listener,
133+
asynchronous=asynchronous,
134+
)
101135

102136

103137
def start_kinesalite(port=None, asynchronous=False, update_listener=None):
@@ -125,20 +159,30 @@ def start_kinesalite(port=None, asynchronous=False, update_listener=None):
125159
latency,
126160
kinesis_data_dir_param,
127161
)
128-
log_startup_message("Kinesis")
129-
start_proxy_for_service("kinesis", port, backend_port, update_listener)
130-
return do_run(cmd, asynchronous)
162+
163+
return _run_proxy_and_command(
164+
cmd=cmd,
165+
port=port,
166+
backend_port=backend_port,
167+
update_listener=update_listener,
168+
asynchronous=asynchronous,
169+
)
131170

132171

133172
def check_kinesis(expect_shutdown=False, print_error=False):
173+
if expect_shutdown is False and kinesis_stopped.is_set():
174+
raise AssertionError("kinesis backend has stopped")
175+
134176
out = None
135177
try:
136178
# check Kinesis
137179
out = aws_stack.connect_to_service(service_name="kinesis").list_streams()
138180
except Exception as e:
139181
if print_error:
140182
LOGGER.exception("Kinesis health check failed: %s", e)
183+
141184
if expect_shutdown:
142-
assert out is None
185+
assert out is None or kinesis_stopped.is_set()
143186
else:
144-
assert isinstance(out["StreamNames"], list)
187+
assert not kinesis_stopped.is_set()
188+
assert out and isinstance(out.get("StreamNames"), list)

localstack/services/plugins.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import json
22
import logging
33
import time
4-
import traceback
54
from concurrent.futures import ThreadPoolExecutor
65

76
import requests
@@ -110,10 +109,7 @@ def check_infra(retries=10, expect_shutdown=False, apis=None, additional_checks=
110109
additional(expect_shutdown=expect_shutdown)
111110
except Exception as e:
112111
if retries <= 0:
113-
LOG.error(
114-
"Error checking state of local environment (after some retries): %s"
115-
% traceback.format_exc()
116-
)
112+
LOG.exception("Error checking state of local environment (after some retries)")
117113
raise e
118114
time.sleep(3)
119115
check_infra(

localstack/utils/bootstrap.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,7 @@ def run(self):
789789
kwargs["_thread"] = self
790790
result = self.func(self.params, **kwargs)
791791
except Exception as e:
792+
self.result_future.set_exception(e)
792793
result = e
793794
if not self.quiet:
794795
LOG.info(

localstack/utils/common.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def run_cmd(self, params):
142142
or not self.process
143143
or self.process.returncode == 0
144144
):
145-
return
145+
return self.process.returncode if self.process else None
146146
LOG.info(
147147
"Restarting process (received exit code %s): %s"
148148
% (self.process.returncode, self.cmd)
@@ -199,6 +199,7 @@ def filter_line(line):
199199
else:
200200
self.process.communicate()
201201
except Exception as e:
202+
self.result_future.set_exception(e)
202203
if self.process and not self.quiet:
203204
LOG.warning('Shell command error "%s": %s' % (e, self.cmd))
204205
if self.process and not self.quiet and self.process.returncode != 0:

0 commit comments

Comments
 (0)