-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathfilegenerator.py
369 lines (341 loc) · 15.7 KB
/
filegenerator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
# Copyright 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
import sys
import stat
from dateutil.parser import parse
from dateutil.tz import tzlocal
from botocore.exceptions import ClientError
from awscli.customizations.s3.utils import find_bucket_key, get_file_stat
from awscli.customizations.s3.utils import BucketLister, create_warning, \
find_dest_path_comp_key, EPOCH_TIME
from awscli.compat import queue
_open = open
def is_special_file(path):
"""
This function checks to see if a special file. It checks if the
file is a character special device, block special device, FIFO, or
socket.
"""
mode = os.stat(path).st_mode
# Character special device.
if stat.S_ISCHR(mode):
return True
# Block special device
if stat.S_ISBLK(mode):
return True
# FIFO.
if stat.S_ISFIFO(mode):
return True
# Socket.
if stat.S_ISSOCK(mode):
return True
return False
def is_readable(path):
"""
This function checks to see if a file or a directory can be read.
This is tested by performing an operation that requires read access
on the file or the directory.
"""
if os.path.isdir(path):
try:
os.listdir(path)
except (OSError, IOError):
return False
else:
try:
with _open(path, 'r') as fd:
pass
except (OSError, IOError):
return False
return True
# This class is provided primarily to provide a detailed error message.
class FileDecodingError(Exception):
"""Raised when there was an issue decoding the file."""
ADVICE = (
"Please check your locale settings. The filename was decoded as: %s\n"
"On posix platforms, check the LC_CTYPE environment variable."
% (sys.getfilesystemencoding())
)
def __init__(self, directory, filename):
self.directory = directory
self.file_name = filename
self.error_message = (
'There was an error trying to decode the the file %s in '
'directory "%s". \n%s' % (repr(self.file_name),
self.directory,
self.ADVICE)
)
super(FileDecodingError, self).__init__(self.error_message)
class FileStat(object):
def __init__(self, src, dest=None, compare_key=None, size=None,
last_update=None, src_type=None, dest_type=None,
operation_name=None, response_data=None):
self.src = src
self.dest = dest
self.compare_key = compare_key
self.size = size
self.last_update = last_update
self.src_type = src_type
self.dest_type = dest_type
self.operation_name = operation_name
self.response_data = response_data
class FileGenerator(object):
"""
This is a class the creates a generator to yield files based on information
returned from the ``FileFormat`` class. It is universal in the sense that
it will handle s3 files, local files, local directories, and s3 objects
under the same common prefix. The generator yields corresponding
``FileInfo`` objects to send to a ``Comparator`` or ``S3Handler``.
"""
def __init__(self, client, operation_name, follow_symlinks=True,
page_size=None, result_queue=None, request_parameters=None):
self._client = client
self.operation_name = operation_name
self.follow_symlinks = follow_symlinks
self.page_size = page_size
self.result_queue = result_queue
if not result_queue:
self.result_queue = queue.Queue()
self.request_parameters = {}
if request_parameters is not None:
self.request_parameters = request_parameters
def call(self, files):
"""
This is the generalized function to yield the ``FileInfo`` objects.
``dir_op`` and ``use_src_name`` flags affect which files are used and
ensure the proper destination paths and compare keys are formed.
"""
function_table = {'s3': self.list_objects, 'local': self.list_files}
source = files['src']['path']
src_type = files['src']['type']
dest_type = files['dest']['type']
file_iterator = function_table[src_type](source, files['dir_op'])
for src_path, extra_information in file_iterator:
dest_path, compare_key = find_dest_path_comp_key(files, src_path)
file_stat_kwargs = {
'src': src_path, 'dest': dest_path, 'compare_key': compare_key,
'src_type': src_type, 'dest_type': dest_type,
'operation_name': self.operation_name
}
self._inject_extra_information(file_stat_kwargs, extra_information)
yield FileStat(**file_stat_kwargs)
def _inject_extra_information(self, file_stat_kwargs, extra_information):
src_type = file_stat_kwargs['src_type']
file_stat_kwargs['size'] = extra_information['Size']
file_stat_kwargs['last_update'] = extra_information['LastModified']
# S3 objects require the response data retrieved from HeadObject
# and ListObject
if src_type == 's3':
file_stat_kwargs['response_data'] = extra_information
def list_files(self, path, dir_op):
"""
This function yields the appropriate local file or local files
under a directory depending on if the operation is on a directory.
For directories a depth first search is implemented in order to
follow the same sorted pattern as a s3 list objects operation
outputs. It yields the file's source path, size, and last
update
"""
join, isdir, isfile = os.path.join, os.path.isdir, os.path.isfile
error, listdir = os.error, os.listdir
if not self.should_ignore_file(path):
if not dir_op:
stats = self._safely_get_file_stats(path)
if stats:
yield stats
else:
# We need to list files in byte order based on the full
# expanded path of the key: 'test/1/2/3.txt' However,
# listdir() will only give us contents a single directory
# at a time, so we'll get 'test'. At the same time we don't
# want to load the entire list of files into memory. This
# is handled by first going through the current directory
# contents and adding the directory separator to any
# directories. We can then sort the contents,
# and ensure byte order.
listdir_names = listdir(path)
names = []
for name in listdir_names:
if not self.should_ignore_file_with_decoding_warnings(
path, name):
file_path = join(path, name)
if isdir(file_path):
name = name + os.path.sep
names.append(name)
self.normalize_sort(names, os.sep, '/')
for name in names:
file_path = join(path, name)
if isdir(file_path):
# Anything in a directory will have a prefix of
# this current directory and will come before the
# remaining contents in this directory. This
# means we need to recurse into this sub directory
# before yielding the rest of this directory's
# contents.
for x in self.list_files(file_path, dir_op):
yield x
else:
stats = self._safely_get_file_stats(file_path)
if stats:
yield stats
def _safely_get_file_stats(self, file_path):
try:
size, last_update = get_file_stat(file_path)
except (OSError, ValueError):
self.triggers_warning(file_path)
else:
last_update = self._validate_update_time(last_update, file_path)
return file_path, {'Size': size, 'LastModified': last_update}
def _validate_update_time(self, update_time, path):
# If the update time is None we know we ran into an invalid timestamp.
if update_time is None:
warning = create_warning(
path=path,
error_message="File has an invalid timestamp. Passing epoch "
"time as timestamp.",
skip_file=False)
self.result_queue.put(warning)
return EPOCH_TIME
return update_time
def normalize_sort(self, names, os_sep, character):
"""
The purpose of this function is to ensure that the same path separator
is used when sorting. In windows, the path operator is a backslash as
opposed to a forward slash which can lead to differences in sorting
between s3 and a windows machine.
"""
names.sort(key=lambda item: item.replace(os_sep, character))
def should_ignore_file_with_decoding_warnings(self, dirname, filename):
"""
We can get a UnicodeDecodeError if we try to listdir(<unicode>) and
can't decode the contents with sys.getfilesystemencoding(). In this
case listdir() returns the bytestring, which means that
join(<unicode>, <str>) could raise a UnicodeDecodeError. When this
happens we warn using a FileDecodingError that provides more
information into what's going on.
"""
if not isinstance(filename, str):
decoding_error = FileDecodingError(dirname, filename)
warning = create_warning(repr(filename),
decoding_error.error_message)
self.result_queue.put(warning)
return True
path = os.path.join(dirname, filename)
return self.should_ignore_file(path)
def should_ignore_file(self, path):
"""
This function checks whether a file should be ignored in the
file generation process. This includes symlinks that are not to be
followed and files that generate warnings.
"""
if not self.follow_symlinks:
if os.path.isdir(path) and path.endswith(os.sep):
# Trailing slash must be removed to check if it is a symlink.
path = path[:-1]
if os.path.islink(path):
return True
warning_triggered = self.triggers_warning(path)
if warning_triggered:
return True
return False
def triggers_warning(self, path):
"""
This function checks the specific types and properties of a file.
If the file would cause trouble, the function adds a
warning to the result queue to be printed out and returns a boolean
value notify whether the file caused a warning to be generated.
Files that generate warnings are skipped. Currently, this function
checks for files that do not exist and files that the user does
not have read access.
"""
if not os.path.exists(path):
warning = create_warning(path, "File does not exist.")
self.result_queue.put(warning)
return True
if is_special_file(path):
warning = create_warning(path,
("File is character special device, "
"block special device, FIFO, or "
"socket."))
self.result_queue.put(warning)
return True
if not is_readable(path):
warning = create_warning(path, "File/Directory is not readable.")
self.result_queue.put(warning)
return True
return False
def list_objects(self, s3_path, dir_op):
"""
This function yields the appropriate object or objects under a
common prefix depending if the operation is on objects under a
common prefix. It yields the file's source path, size, and last
update.
"""
# Short circuit path: if we are not recursing into the s3
# bucket and a specific path was given, we can just yield
# that path and not have to call any operation in s3.
bucket, prefix = find_bucket_key(s3_path)
if not dir_op and prefix:
yield self._list_single_object(s3_path)
else:
lister = BucketLister(self._client)
extra_args = self.request_parameters.get('ListObjectsV2', {})
for key in lister.list_objects(bucket=bucket, prefix=prefix,
page_size=self.page_size,
extra_args=extra_args):
source_path, response_data = key
if response_data['Size'] == 0 and source_path.endswith('/'):
if self.operation_name == 'delete':
# This is to filter out manually created folders
# in S3. They have a size zero and would be
# undesirably downloaded. Local directories
# are automatically created when they do not
# exist locally. But user should be able to
# delete them.
yield source_path, response_data
elif not dir_op and s3_path != source_path:
pass
else:
yield source_path, response_data
def _list_single_object(self, s3_path):
# When we know we're dealing with a single object, we can avoid
# a ListObjects operation (which causes concern for anyone setting
# IAM policies with the smallest set of permissions needed) and
# instead use a HeadObject request.
if self.operation_name == 'delete':
# If the operation is just a single remote delete, there is
# no need to run HeadObject on the S3 object as none of the
# information gained from HeadObject is required to delete the
# object.
return s3_path, {'Size': None, 'LastModified': None}
bucket, key = find_bucket_key(s3_path)
try:
params = {'Bucket': bucket, 'Key': key}
params.update(self.request_parameters.get('HeadObject', {}))
response = self._client.head_object(**params)
except ClientError as e:
# We want to try to give a more helpful error message.
# This is what the customer is going to see so we want to
# give as much detail as we have.
if not e.response['Error']['Code'] == '404':
raise
# The key does not exist so we'll raise a more specific
# error message here.
response = e.response.copy()
response['Error']['Message'] = 'Key "%s" does not exist' % key
raise ClientError(response, 'HeadObject')
response['Size'] = int(response.pop('ContentLength'))
last_update = parse(response['LastModified'])
response['LastModified'] = last_update.astimezone(tzlocal())
return s3_path, response