-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathsessionmanager.py
170 lines (147 loc) · 6.88 KB
/
sessionmanager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import logging
import json
import errno
import os
import re
from subprocess import check_call, check_output
from awscli.compat import ignore_user_entered_signals
from awscli.clidriver import ServiceOperation, CLIOperationCaller
logger = logging.getLogger(__name__)
ERROR_MESSAGE = (
'SessionManagerPlugin is not found. ',
'Please refer to SessionManager Documentation here: ',
'http://docs.aws.amazon.com/console/systems-manager/',
'session-manager-plugin-not-found'
)
def register_ssm_session(event_handlers):
event_handlers.register('building-command-table.ssm',
add_custom_start_session)
def add_custom_start_session(session, command_table, **kwargs):
command_table['start-session'] = StartSessionCommand(
name='start-session',
parent_name='ssm',
session=session,
operation_model=session.get_service_model(
'ssm').operation_model('StartSession'),
operation_caller=StartSessionCaller(session),
)
class VersionRequirement:
WHITESPACE_REGEX = re.compile(r"\s+")
SSM_SESSION_PLUGIN_VERSION_REGEX = re.compile(r"^\d+(\.\d+){0,3}$")
def __init__(self, min_version):
self.min_version = min_version
def meets_requirement(self, version):
ssm_plugin_version = self._sanitize_plugin_version(version)
if self._is_valid_version(ssm_plugin_version):
norm_version, norm_min_version = self._normalize(
ssm_plugin_version, self.min_version
)
return norm_version > norm_min_version
else:
return False
def _sanitize_plugin_version(self, plugin_version):
return re.sub(self.WHITESPACE_REGEX, "", plugin_version)
def _is_valid_version(self, plugin_version):
return bool(
self.SSM_SESSION_PLUGIN_VERSION_REGEX.match(plugin_version)
)
def _normalize(self, v1, v2):
v1_parts = [int(v) for v in v1.split(".")]
v2_parts = [int(v) for v in v2.split(".")]
while len(v1_parts) != len(v2_parts):
if len(v1_parts) - len(v2_parts) > 0:
v2_parts.append(0)
else:
v1_parts.append(0)
return v1_parts, v2_parts
class StartSessionCommand(ServiceOperation):
def create_help_command(self):
help_command = super(
StartSessionCommand, self).create_help_command()
# Change the output shape because the command provides no output.
self._operation_model.output_shape = None
return help_command
class StartSessionCaller(CLIOperationCaller):
LAST_PLUGIN_VERSION_WITHOUT_ENV_VAR = "1.2.497.0"
DEFAULT_SSM_ENV_NAME = "AWS_SSM_START_SESSION_RESPONSE"
def invoke(self, service_name, operation_name, parameters,
parsed_globals):
client = self._session.create_client(
service_name, region_name=parsed_globals.region,
endpoint_url=parsed_globals.endpoint_url,
verify=parsed_globals.verify_ssl)
response = client.start_session(**parameters)
session_id = response['SessionId']
region_name = client.meta.region_name
# Profile_name is used to passed on to session manager plugin
# to fetch same profile credentials to make an api call in the plugin.
# If --profile flag is configured, pass it to Session Manager plugin.
# If not, set empty string.
profile_name = parsed_globals.profile \
if parsed_globals.profile is not None else ''
endpoint_url = client.meta.endpoint_url
ssm_env_name = self.DEFAULT_SSM_ENV_NAME
try:
session_parameters = {
"SessionId": response["SessionId"],
"TokenValue": response["TokenValue"],
"StreamUrl": response["StreamUrl"],
}
start_session_response = json.dumps(session_parameters)
plugin_version = check_output(
["session-manager-plugin", "--version"], text=True
)
env = os.environ.copy()
# Check if this plugin supports passing the start session response
# as an environment variable name. If it does, it will set the
# value to the response from the start_session operation to the env
# variable defined in DEFAULT_SSM_ENV_NAME. If the session plugin
# version is invalid or older than the version defined in
# LAST_PLUGIN_VERSION_WITHOUT_ENV_VAR, it will fall back to
# passing the start_session response directly.
version_requirement = VersionRequirement(
min_version=self.LAST_PLUGIN_VERSION_WITHOUT_ENV_VAR
)
if version_requirement.meets_requirement(plugin_version):
env[ssm_env_name] = start_session_response
start_session_response = ssm_env_name
# ignore_user_entered_signals ignores these signals
# because if signals which kills the process are not
# captured would kill the foreground process but not the
# background one. Capturing these would prevents process
# from getting killed and these signals are input to plugin
# and handling in there
with ignore_user_entered_signals():
# call executable with necessary input
check_call(["session-manager-plugin",
start_session_response,
region_name,
"StartSession",
profile_name,
json.dumps(parameters),
endpoint_url], env=env)
return 0
except OSError as ex:
if ex.errno == errno.ENOENT:
logger.debug('SessionManagerPlugin is not present',
exc_info=True)
# start-session api call returns response and starts the
# session on ssm-agent and response is forwarded to
# session-manager-plugin. If plugin is not present, terminate
# is called so that service and ssm-agent terminates the
# session to avoid zombie session active on ssm-agent for
# default self terminate time
client.terminate_session(SessionId=session_id)
raise ValueError(''.join(ERROR_MESSAGE))