-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy paths3events.py
121 lines (101 loc) · 4.68 KB
/
s3events.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Add S3 specific event streaming output arg."""
from awscli.arguments import CustomArgument
STREAM_HELP_TEXT = 'Filename where the records will be saved'
class DocSectionNotFoundError(Exception):
pass
def register_event_stream_arg(event_handlers):
event_handlers.register(
'building-argument-table.s3api.select-object-content',
add_event_stream_output_arg)
event_handlers.register_last(
'doc-output.s3api.select-object-content',
replace_event_stream_docs
)
def register_document_expires_string(event_handlers):
event_handlers.register_last(
'doc-output.s3api',
document_expires_string
)
def add_event_stream_output_arg(argument_table, operation_model,
session, **kwargs):
argument_table['outfile'] = S3SelectStreamOutputArgument(
name='outfile', help_text=STREAM_HELP_TEXT,
cli_type_name='string', positional_arg=True,
stream_key=operation_model.output_shape.serialization['payload'],
session=session)
def replace_event_stream_docs(help_command, **kwargs):
doc = help_command.doc
current = ''
while current != '======\nOutput\n======':
try:
current = doc.pop_write()
except IndexError:
# This should never happen, but in the rare case that it does
# we should be raising something with a helpful error message.
raise DocSectionNotFoundError(
'Could not find the "output" section for the command: %s'
% help_command)
doc.write('======\nOutput\n======\n')
doc.write("This command generates no output. The selected "
"object content is written to the specified outfile.\n")
def document_expires_string(help_command, **kwargs):
doc = help_command.doc
expires_field_idx = doc.find_last_write('Expires -> (timestamp)')
if expires_field_idx is None:
return
deprecation_note_and_expires_string = [
f'\n\n\n{" " * doc.style.indentation * doc.style.indent_width}',
'.. note::',
f'\n\n\n{" " * (doc.style.indentation + 1) * doc.style.indent_width}',
'This member has been deprecated. Please use `ExpiresString` instead.\n',
f'\n\n{" " * doc.style.indentation * doc.style.indent_width}',
f'\n\n{" " * doc.style.indentation * doc.style.indent_width}',
'ExpiresString -> (string)\n\n',
'\tThe raw, unparsed value of the ``Expires`` field.',
f'\n\n{" " * doc.style.indentation * doc.style.indent_width}'
]
for idx, write in enumerate(deprecation_note_and_expires_string):
# We add 4 to the index of the expires field name because each
# field in the output section consists of exactly 4 elements.
doc.insert_write(expires_field_idx + idx + 4, write)
class S3SelectStreamOutputArgument(CustomArgument):
_DOCUMENT_AS_REQUIRED = True
def __init__(self, stream_key, session, **kwargs):
super(S3SelectStreamOutputArgument, self).__init__(**kwargs)
# This is the key in the response body where we can find the
# streamed contents.
self._stream_key = stream_key
self._output_file = None
self._session = session
def add_to_params(self, parameters, value):
self._output_file = value
self._session.register('after-call.s3.SelectObjectContent',
self.save_file)
def save_file(self, parsed, **kwargs):
# This method is hooked into after-call which fires
# before the error checking happens in the client.
# Therefore if the stream_key is not in the parsed
# response we immediately return and let the default
# error handling happen.
if self._stream_key not in parsed:
return
event_stream = parsed[self._stream_key]
with open(self._output_file, 'wb') as fp:
for event in event_stream:
if 'Records' in event:
fp.write(event['Records']['Payload'])
# We don't want to include the streaming param in
# the returned response, it's not JSON serializable.
del parsed[self._stream_key]