-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathvalidation.py
905 lines (800 loc) · 39.2 KB
/
validation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
# Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import base64
import binascii
import json
import hashlib
import logging
import re
import sys
import zlib
from zlib import error as ZLibError
from datetime import datetime, timedelta
from dateutil import tz, parser
from pyasn1.error import PyAsn1Error
import rsa
from awscli.customizations.cloudtrail.utils import get_trail_by_arn, \
get_account_id_from_arn
from awscli.customizations.commands import BasicCommand
from botocore.exceptions import ClientError
from awscli.schema import ParameterRequiredError
LOG = logging.getLogger(__name__)
DATE_FORMAT = '%Y%m%dT%H%M%SZ'
DISPLAY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
def format_date(date):
"""Returns a formatted date string in a CloudTrail date format"""
return date.strftime(DATE_FORMAT)
def format_display_date(date):
"""Returns a formatted date string meant for CLI output"""
return date.strftime(DISPLAY_DATE_FORMAT)
def normalize_date(date):
"""Returns a normalized date using a UTC timezone"""
return date.replace(tzinfo=tz.tzutc())
def extract_digest_key_date(digest_s3_key):
"""Extract the timestamp portion of a manifest file.
Manifest file names take the following form:
AWSLogs/{account}/CloudTrail-Digest/{region}/{ymd}/{account}_CloudTrail \
-Digest_{region}_{name}_region_{date}.json.gz
"""
return digest_s3_key[-24:-8]
def parse_date(date_string):
try:
return parser.parse(date_string)
except ValueError:
raise ValueError('Unable to parse date value: %s' % date_string)
def assert_cloudtrail_arn_is_valid(trail_arn):
"""Ensures that the arn looks correct.
ARNs look like: arn:aws:cloudtrail:us-east-1:123456789012:trail/foo"""
pattern = re.compile(r'arn:.+:cloudtrail:.+:\d{12}:trail/.+')
if not pattern.match(trail_arn):
raise ValueError('Invalid trail ARN provided: %s' % trail_arn)
def create_digest_traverser(
cloudtrail_client,
organization_client,
s3_client_provider,
trail_arn,
trail_source_region=None,
on_invalid=None,
on_gap=None,
on_missing=None,
bucket=None,
prefix=None,
account_id=None,
):
"""Creates a CloudTrail DigestTraverser and its object graph.
:type cloudtrail_client: botocore.client.CloudTrail
:param cloudtrail_client: Client used to connect to CloudTrail
:type organization_client: botocore.client.organizations
:param organization_client: Client used to connect to Organizations
:type s3_client_provider: S3ClientProvider
:param s3_client_provider: Used to create Amazon S3 client per/region.
:param trail_arn: CloudTrail trail ARN
:param trail_source_region: The scanned region of a trail.
:param on_invalid: Callback that is invoked when validating a digest fails.
:param on_gap: Callback that is invoked when a digest has no link to the
previous digest, but there are more digests to validate. This can
happen when a trail is disabled for a period of time.
:param on_missing: Callback that is invoked when a digest file has been
deleted from Amazon S3 but is supposed to be present.
:param bucket: Amazon S3 bucket of the trail if it is different than the
bucket that is currently associated with the trail.
:param prefix: bucket: Key prefix prepended to each digest and log placed
in the Amazon S3 bucket if it is different than the prefix that is
currently associated with the trail.
:param account_id: The account id for which the digest files are
validated. For normal trails this is the caller account, for
organization trails it is the member accout.
``on_gap``, ``on_invalid``, and ``on_missing`` callbacks are invoked with
the following named arguments:
- ``bucket`: The next S3 bucket.
- ``next_key``: (optional) Next digest key that was found in the bucket.
- ``next_end_date``: (optional) End date of the next found digest.
- ``last_key``: The last digest key that was found.
- ``last_start_date``: (optional) Start date of last found digest.
- ``message``: (optional) Message string about the notification.
"""
assert_cloudtrail_arn_is_valid(trail_arn)
organization_id = None
if bucket is None:
# Determine the bucket and prefix based on the trail arn.
trail_info = get_trail_by_arn(cloudtrail_client, trail_arn)
LOG.debug('Loaded trail info: %s', trail_info)
bucket = trail_info['S3BucketName']
prefix = trail_info.get('S3KeyPrefix', None)
is_org_trail = trail_info.get('IsOrganizationTrail')
if is_org_trail:
if not account_id:
raise ParameterRequiredError(
"Missing required parameter for organization "
"trail: '--account-id'")
organization_id = organization_client.describe_organization()[
'Organization']['Id']
# Determine the region from the ARN (e.g., arn:aws:cloudtrail:REGION:...)
trail_region = trail_arn.split(':')[3]
# Determine the name from the ARN (the last part after "/")
trail_name = trail_arn.split('/')[-1]
# If account id is not specified parse it from trail ARN
if not account_id:
account_id = get_account_id_from_arn(trail_arn)
digest_provider = DigestProvider(
account_id=account_id, trail_name=trail_name,
s3_client_provider=s3_client_provider,
trail_source_region=trail_source_region,
trail_home_region=trail_region,
organization_id=organization_id)
return DigestTraverser(
digest_provider=digest_provider, starting_bucket=bucket,
starting_prefix=prefix, on_invalid=on_invalid, on_gap=on_gap,
on_missing=on_missing,
public_key_provider=PublicKeyProvider(cloudtrail_client))
class S3ClientProvider(object):
"""Creates Amazon S3 clients and determines the region name of a client.
This class will cache the location constraints of previously requested
buckets and cache previously created clients for the same region.
"""
def __init__(self, session, get_bucket_location_region='us-east-1'):
self._session = session
self._get_bucket_location_region = get_bucket_location_region
self._client_cache = {}
self._region_cache = {}
def get_client(self, bucket_name):
"""Creates an S3 client that can work with the given bucket name"""
region_name = self._get_bucket_region(bucket_name)
return self._create_client(region_name)
def _get_bucket_region(self, bucket_name):
"""Returns the region of a bucket"""
if bucket_name not in self._region_cache:
client = self._create_client(self._get_bucket_location_region)
result = client.get_bucket_location(Bucket=bucket_name)
region = result['LocationConstraint'] or 'us-east-1'
self._region_cache[bucket_name] = region
return self._region_cache[bucket_name]
def _create_client(self, region_name):
"""Creates an Amazon S3 client for the given region name"""
if region_name not in self._client_cache:
client = self._session.create_client('s3', region_name)
# Remove the CLI error event that prevents exceptions.
self._client_cache[region_name] = client
return self._client_cache[region_name]
class DigestError(ValueError):
"""Exception raised when a digest fails to validate"""
pass
class DigestSignatureError(DigestError):
"""Exception raised when a digest signature is invalid"""
def __init__(self, bucket, key):
message = ('Digest file\ts3://%s/%s\tINVALID: signature verification '
'failed') % (bucket, key)
super(DigestSignatureError, self).__init__(message)
class InvalidDigestFormat(DigestError):
"""Exception raised when a digest has an invalid format"""
def __init__(self, bucket, key):
message = 'Digest file\ts3://%s/%s\tINVALID: invalid format' % (bucket,
key)
super(InvalidDigestFormat, self).__init__(message)
class PublicKeyProvider(object):
"""Retrieves public keys from CloudTrail within a date range."""
def __init__(self, cloudtrail_client):
self._cloudtrail_client = cloudtrail_client
def get_public_keys(self, start_date, end_date):
"""Loads public keys in a date range into a returned dict.
:type start_date: datetime
:param start_date: Start date of a date range.
:type end_date: datetime
:param end_date: End date of a date range.
:rtype: dict
:return: Returns a dict where each key is the fingerprint of the
public key, and each value is a dict of public key data.
"""
public_keys = self._cloudtrail_client.list_public_keys(
StartTime=start_date, EndTime=end_date)
public_keys_in_range = public_keys['PublicKeyList']
LOG.debug('Loaded public keys in range: %s', public_keys_in_range)
return dict((key['Fingerprint'], key) for key in public_keys_in_range)
class DigestProvider(object):
"""
Retrieves digest keys and digests from Amazon S3.
This class is responsible for determining the full list of digest files
in a bucket and loading digests from the bucket into a JSON decoded
dict. This class is not responsible for validation or iterating from
one digest to the next.
"""
def __init__(
self,
s3_client_provider,
account_id,
trail_name,
trail_home_region,
trail_source_region=None,
organization_id=None,
):
self._client_provider = s3_client_provider
self.trail_name = trail_name
self.account_id = account_id
self.trail_home_region = trail_home_region
self.trail_source_region = trail_source_region or trail_home_region
self.organization_id = organization_id
def load_digest_keys_in_range(self, bucket, prefix, start_date, end_date):
"""Returns a list of digest keys in the date range.
This method uses a list_objects API call and provides a Marker
parameter that is calculated based on the start_date provided.
Amazon S3 then returns all keys in the bucket that start after
the given key (non-inclusive). We then iterate over the keys
until the date extracted from the yielded keys is greater than
the given end_date.
"""
digests = []
marker = self._create_digest_key(start_date, prefix)
client = self._client_provider.get_client(bucket)
paginator = client.get_paginator('list_objects')
page_iterator = paginator.paginate(Bucket=bucket, Marker=marker)
key_filter = page_iterator.search('Contents[*].Key')
# Create a target start end end date
target_start_date = format_date(normalize_date(start_date))
# Add one hour to the end_date to get logs that spilled over to next.
target_end_date = format_date(
normalize_date(end_date + timedelta(hours=1)))
# Ensure digests are from the same trail.
digest_key_regex = re.compile(self._create_digest_key_regex(prefix))
for key in key_filter:
if digest_key_regex.match(key):
# Use a lexicographic comparison to know when to stop.
extracted_date = extract_digest_key_date(key)
if extracted_date > target_end_date:
break
# Only append digests after the start date.
if extracted_date >= target_start_date:
digests.append(key)
return digests
def fetch_digest(self, bucket, key):
"""Loads a digest by key from S3.
Returns the JSON decode data and GZIP inflated raw content.
"""
client = self._client_provider.get_client(bucket)
result = client.get_object(Bucket=bucket, Key=key)
try:
digest = zlib.decompress(result['Body'].read(),
zlib.MAX_WBITS | 16)
digest_data = json.loads(digest.decode())
except (ValueError, ZLibError):
# Cannot gzip decode or JSON parse.
raise InvalidDigestFormat(bucket, key)
# Add the expected digest signature and algorithm to the dict.
if 'signature' not in result['Metadata'] \
or 'signature-algorithm' not in result['Metadata']:
raise DigestSignatureError(bucket, key)
digest_data['_signature'] = result['Metadata']['signature']
digest_data['_signature_algorithm'] = \
result['Metadata']['signature-algorithm']
return digest_data, digest
def _create_digest_key(self, start_date, key_prefix):
"""Computes an Amazon S3 key based on the provided data.
The computed is what would have been placed in the S3 bucket if
a log digest were created at a specific time. This computed key
does not have to actually exist as it will only be used to as
a Marker parameter in a list_objects call.
:return: Returns a computed key as a string.
"""
# Subtract one minute to ensure the dates are inclusive.
date = start_date - timedelta(minutes=1)
template = 'AWSLogs/'
template_params = {
'account_id': self.account_id,
'date': format_date(date),
'ymd': date.strftime('%Y/%m/%d'),
'source_region': self.trail_source_region,
'home_region': self.trail_home_region,
'name': self.trail_name
}
if self.organization_id:
template += '{organization_id}/'
template_params['organization_id'] = self.organization_id
template += (
'{account_id}/CloudTrail-Digest/{source_region}/'
'{ymd}/{account_id}_CloudTrail-Digest_{source_region}_{name}_'
'{home_region}_{date}.json.gz'
)
key = template.format(**template_params)
if key_prefix:
key = key_prefix + '/' + key
return key
def _create_digest_key_regex(self, key_prefix):
"""Creates a regular expression used to match against S3 keys"""
template = 'AWSLogs/'
template_params = {
'account_id': re.escape(self.account_id),
'source_region': re.escape(self.trail_source_region),
'home_region': re.escape(self.trail_home_region),
'name': re.escape(self.trail_name)
}
if self.organization_id:
template += '{organization_id}/'
template_params['organization_id'] = self.organization_id
template += (
'{account_id}/CloudTrail\\-Digest/{source_region}/'
'\\d+/\\d+/\\d+/{account_id}_CloudTrail\\-Digest_'
'{source_region}_{name}_{home_region}_.+\\.json\\.gz'
)
key = template.format(**template_params)
if key_prefix:
key = re.escape(key_prefix) + '/' + key
return '^' + key + '$'
class DigestTraverser(object):
"""Retrieves and validates digests within a date range."""
# These keys are required to be present before validating the contents
# of a digest.
required_digest_keys = ['digestPublicKeyFingerprint', 'digestS3Bucket',
'digestS3Object', 'previousDigestSignature',
'digestEndTime', 'digestStartTime']
def __init__(self, digest_provider, starting_bucket, starting_prefix,
public_key_provider, digest_validator=None,
on_invalid=None, on_gap=None, on_missing=None):
"""
:type digest_provider: DigestProvider
:param digest_provider: DigestProvider object
:param starting_bucket: S3 bucket where the digests are stored.
:param starting_prefix: An optional prefix applied to each S3 key.
:param public_key_provider: Provides public keys for a range.
:param digest_validator: Validates digest using a validate method.
:param on_invalid: Callback invoked when a digest is invalid.
:param on_gap: Callback invoked when a digest has no parent, but
there are still more digests to validate.
:param on_missing: Callback invoked when a digest file is missing.
"""
self.starting_bucket = starting_bucket
self.starting_prefix = starting_prefix
self.digest_provider = digest_provider
self._public_key_provider = public_key_provider
self._on_gap = on_gap
self._on_invalid = on_invalid
self._on_missing = on_missing
if digest_validator is None:
digest_validator = Sha256RSADigestValidator()
self._digest_validator = digest_validator
def traverse(self, start_date, end_date=None):
"""Creates and returns a generator that yields validated digest data.
Each yielded digest dictionary contains information about the digest
and the log file associated with the digest. Digest files are validated
before they are yielded. Whether or not the digest is successfully
validated is stated in the "isValid" key value pair of the yielded
dictionary.
:type start_date: datetime
:param start_date: Date to start validating from (inclusive).
:type start_date: datetime
:param end_date: Date to stop validating at (inclusive).
"""
if end_date is None:
end_date = datetime.utcnow()
end_date = normalize_date(end_date)
start_date = normalize_date(start_date)
bucket = self.starting_bucket
prefix = self.starting_prefix
digests = self._load_digests(bucket, prefix, start_date, end_date)
public_keys = self._load_public_keys(start_date, end_date)
key, end_date = self._get_last_digest(digests)
last_start_date = end_date
while key and start_date <= last_start_date:
try:
digest, end_date = self._load_and_validate_digest(
public_keys, bucket, key)
last_start_date = normalize_date(
parse_date(digest['digestStartTime']))
previous_bucket = digest.get('previousDigestS3Bucket', None)
yield digest
if previous_bucket is None:
# The chain is broken, so find next in digest store.
key, end_date = self._find_next_digest(
digests=digests, bucket=bucket, last_key=key,
last_start_date=last_start_date, cb=self._on_gap,
is_cb_conditional=True)
else:
key = digest['previousDigestS3Object']
if previous_bucket != bucket:
bucket = previous_bucket
# The bucket changed so reload the digest list.
digests = self._load_digests(
bucket, prefix, start_date, end_date)
except ClientError as e:
if e.response['Error']['Code'] != 'NoSuchKey':
raise e
key, end_date = self._find_next_digest(
digests=digests, bucket=bucket, last_key=key,
last_start_date=last_start_date, cb=self._on_missing,
message=str(e))
except DigestError as e:
key, end_date = self._find_next_digest(
digests=digests, bucket=bucket, last_key=key,
last_start_date=last_start_date, cb=self._on_invalid,
message=str(e))
except Exception as e:
# Any other unexpected errors.
key, end_date = self._find_next_digest(
digests=digests, bucket=bucket, last_key=key,
last_start_date=last_start_date, cb=self._on_invalid,
message='Digest file\ts3://%s/%s\tINVALID: %s'
% (bucket, key, str(e)))
def _load_digests(self, bucket, prefix, start_date, end_date):
return self.digest_provider.load_digest_keys_in_range(
bucket=bucket, prefix=prefix,
start_date=start_date, end_date=end_date)
def _find_next_digest(self, digests, bucket, last_key, last_start_date,
cb=None, is_cb_conditional=False, message=None):
"""Finds the next digest in the bucket and invokes any callback."""
next_key, next_end_date = self._get_last_digest(digests, last_key)
if cb and (not is_cb_conditional or next_key):
cb(bucket=bucket, next_key=next_key, last_key=last_key,
next_end_date=next_end_date, last_start_date=last_start_date,
message=message)
return next_key, next_end_date
def _get_last_digest(self, digests, before_key=None):
"""Finds the previous digest key (either the last or before before_key)
If no key is provided, the last digest is used. If a digest is found,
the end date of the provider is adjusted to match the found key's end
date.
"""
if not digests:
return None, None
elif before_key is None:
next_key = digests.pop()
next_key_date = normalize_date(
parse_date(extract_digest_key_date(next_key)))
return next_key, next_key_date
# find a key before the given key.
before_key_date = parse_date(extract_digest_key_date(before_key))
while digests:
next_key = digests.pop()
next_key_date = normalize_date(
parse_date(extract_digest_key_date(next_key)))
if next_key_date < before_key_date:
LOG.debug("Next found key: %s", next_key)
return next_key, next_key_date
return None, None
def _load_and_validate_digest(self, public_keys, bucket, key):
"""Loads and validates a digest from S3.
:param public_keys: Public key dictionary of fingerprint to dict.
:return: Returns a tuple of the digest data as a dict and end_date
:rtype: tuple
"""
digest_data, digest = self.digest_provider.fetch_digest(bucket, key)
for required_key in self.required_digest_keys:
if required_key not in digest_data:
raise InvalidDigestFormat(bucket, key)
# Ensure the bucket and key are the same as what's expected.
if digest_data['digestS3Bucket'] != bucket \
or digest_data['digestS3Object'] != key:
raise DigestError(
('Digest file\ts3://%s/%s\tINVALID: has been moved from its '
'original location') % (bucket, key))
# Get the public keys in the given time range.
fingerprint = digest_data['digestPublicKeyFingerprint']
if fingerprint not in public_keys:
raise DigestError(
('Digest file\ts3://%s/%s\tINVALID: public key not found in '
'region %s for fingerprint %s') %
(bucket, key, self.digest_provider.trail_home_region,
fingerprint))
public_key_hex = public_keys[fingerprint]['Value']
self._digest_validator.validate(
bucket, key, public_key_hex, digest_data, digest)
end_date = normalize_date(parse_date(digest_data['digestEndTime']))
return digest_data, end_date
def _load_public_keys(self, start_date, end_date):
public_keys = self._public_key_provider.get_public_keys(
start_date, end_date)
if not public_keys:
raise RuntimeError(
'No public keys found between %s and %s' %
(format_display_date(start_date),
format_display_date(end_date)))
return public_keys
class Sha256RSADigestValidator(object):
"""
Validates SHA256withRSA signed digests.
The result of validating the digest is inserted into the digest_data
dictionary using the isValid key value pair.
"""
def validate(self, bucket, key, public_key, digest_data, inflated_digest):
"""Validates a digest file.
Throws a DigestError when the digest is invalid.
:param bucket: Bucket of the digest file
:param key: Key of the digest file
:param public_key: Public key bytes.
:param digest_data: Dict of digest data returned when JSON
decoding a manifest.
:param inflated_digest: Inflated digest file contents as bytes.
"""
try:
decoded_key = base64.b64decode(public_key)
public_key = rsa.PublicKey.load_pkcs1(decoded_key, format='DER')
to_sign = self._create_string_to_sign(digest_data, inflated_digest)
signature_bytes = binascii.unhexlify(digest_data['_signature'])
rsa.verify(to_sign, signature_bytes, public_key)
except PyAsn1Error:
raise DigestError(
('Digest file\ts3://%s/%s\tINVALID: Unable to load PKCS #1 key'
' with fingerprint %s')
% (bucket, key, digest_data['digestPublicKeyFingerprint']))
except rsa.pkcs1.VerificationError:
# Note from the Python-RSA docs: Never display the stack trace of
# a rsa.pkcs1.VerificationError exception. It shows where in the
# code the exception occurred, and thus leaks information about
# the key.
raise DigestSignatureError(bucket, key)
def _create_string_to_sign(self, digest_data, inflated_digest):
previous_signature = digest_data['previousDigestSignature']
if previous_signature is None:
# The value must be 'null' to match the Java implementation.
previous_signature = 'null'
string_to_sign = "%s\n%s/%s\n%s\n%s" % (
digest_data['digestEndTime'],
digest_data['digestS3Bucket'],
digest_data['digestS3Object'],
hashlib.sha256(inflated_digest).hexdigest(),
previous_signature)
LOG.debug('Digest string to sign: %s', string_to_sign)
return string_to_sign.encode()
class CloudTrailValidateLogs(BasicCommand):
"""
Validates log digests and log files, optionally saving them to disk.
"""
NAME = 'validate-logs'
DESCRIPTION = """
Validates CloudTrail logs for a given period of time.
This command uses the digest files delivered to your S3 bucket to perform
the validation.
The AWS CLI allows you to detect the following types of changes:
- Modification or deletion of CloudTrail log files.
- Modification or deletion of CloudTrail digest files.
To validate log files with the AWS CLI, the following preconditions must
be met:
- You must have online connectivity to AWS.
- You must have read access to the S3 bucket that contains the digest and
log files.
- The digest and log files must not have been moved from the original S3
location where CloudTrail delivered them.
- For organization trails you must have access to describe-organization to
validate digest files
When you disable Log File Validation, the chain of digest files is broken
after one hour. CloudTrail will not digest log files that were delivered
during a period in which the Log File Validation feature was disabled.
For example, if you enable Log File Validation on January 1, disable it
on January 2, and re-enable it on January 10, digest files will not be
created for the log files delivered from January 3 to January 9. The same
applies whenever you stop CloudTrail logging or delete a trail.
.. note::
Log files that have been downloaded to local disk cannot be validated
with the AWS CLI. The CLI will download all log files each time this
command is executed.
.. note::
This command requires that the role executing the command has
permission to call ListObjects, GetObject, and GetBucketLocation for
each bucket referenced by the trail.
"""
ARG_TABLE = [
{'name': 'trail-arn', 'required': True, 'cli_type_name': 'string',
'help_text': 'Specifies the ARN of the trail to be validated'},
{'name': 'start-time', 'required': True, 'cli_type_name': 'string',
'help_text': ('Specifies that log files delivered on or after the '
'specified UTC timestamp value will be validated. '
'Example: "2015-01-08T05:21:42Z".')},
{'name': 'end-time', 'cli_type_name': 'string',
'help_text': ('Optionally specifies that log files delivered on or '
'before the specified UTC timestamp value will be '
'validated. The default value is the current time. '
'Example: "2015-01-08T12:31:41Z".')},
{'name': 's3-bucket', 'cli_type_name': 'string',
'help_text': ('Optionally specifies the S3 bucket where the digest '
'files are stored. If a bucket name is not specified, '
'the CLI will retrieve it by calling describe_trails')},
{'name': 's3-prefix', 'cli_type_name': 'string',
'help_text': ('Optionally specifies the optional S3 prefix where the '
'digest files are stored. If not specified, the CLI '
'will determine the prefix automatically by calling '
'describe_trails.')},
{'name': 'account-id', 'cli_type_name': 'string',
'help_text': ('Optionally specifies the account for validating logs. '
'This parameter is needed for organization trails '
'for validating logs for specific account inside an '
'organization')},
{'name': 'verbose', 'cli_type_name': 'boolean',
'action': 'store_true',
'help_text': 'Display verbose log validation information'}
]
def __init__(self, session):
super(CloudTrailValidateLogs, self).__init__(session)
self.trail_arn = None
self.is_verbose = False
self.start_time = None
self.end_time = None
self.s3_bucket = None
self.s3_prefix = None
self.s3_client_provider = None
self.cloudtrail_client = None
self.account_id = None
self._source_region = None
self._valid_digests = 0
self._invalid_digests = 0
self._valid_logs = 0
self._invalid_logs = 0
self._is_last_status_double_space = True
self._found_start_time = None
self._found_end_time = None
def _run_main(self, args, parsed_globals):
self.handle_args(args)
self.setup_services(parsed_globals)
self._call()
if self._invalid_digests > 0 or self._invalid_logs > 0:
return 1
return 0
def handle_args(self, args):
self.trail_arn = args.trail_arn
self.is_verbose = args.verbose
self.s3_bucket = args.s3_bucket
self.s3_prefix = args.s3_prefix
self.account_id = args.account_id
self.start_time = normalize_date(parse_date(args.start_time))
if args.end_time:
self.end_time = normalize_date(parse_date(args.end_time))
else:
self.end_time = normalize_date(datetime.utcnow())
if self.start_time > self.end_time:
raise ValueError(('Invalid time range specified: start-time must '
'occur before end-time'))
# Found start time always defaults to the given start time. This value
# may change if the earliest found digest is after the given start
# time. Note that the summary output report of what date ranges were
# actually found is only shown if a valid digest is encountered,
# thereby setting self._found_end_time to a value.
self._found_start_time = self.start_time
def setup_services(self, parsed_globals):
self._source_region = parsed_globals.region
# Use the the same region as the region of the CLI to get locations.
self.s3_client_provider = S3ClientProvider(
self._session, self._source_region)
client_args = {'region_name': parsed_globals.region,
'verify': parsed_globals.verify_ssl}
self.organization_client = self._session.create_client(
'organizations', **client_args)
if parsed_globals.endpoint_url is not None:
client_args['endpoint_url'] = parsed_globals.endpoint_url
self.cloudtrail_client = self._session.create_client(
'cloudtrail', **client_args)
def _call(self):
traverser = create_digest_traverser(
trail_arn=self.trail_arn, cloudtrail_client=self.cloudtrail_client,
organization_client=self.organization_client,
trail_source_region=self._source_region,
s3_client_provider=self.s3_client_provider, bucket=self.s3_bucket,
prefix=self.s3_prefix, on_missing=self._on_missing_digest,
on_invalid=self._on_invalid_digest, on_gap=self._on_digest_gap,
account_id=self.account_id)
self._write_startup_text()
digests = traverser.traverse(self.start_time, self.end_time)
for digest in digests:
# Only valid digests are yielded and only valid digests can adjust
# the found times that are reported in the CLI output summary.
self._track_found_times(digest)
self._valid_digests += 1
self._write_status(
'Digest file\ts3://%s/%s\tvalid'
% (digest['digestS3Bucket'], digest['digestS3Object']))
if not digest['logFiles']:
continue
for log in digest['logFiles']:
self._download_log(log)
self._write_summary_text()
def _track_found_times(self, digest):
# Track the earliest found start time, but do not use a date before
# the user supplied start date.
digest_start_time = parse_date(digest['digestStartTime'])
if digest_start_time > self.start_time:
self._found_start_time = digest_start_time
# Only use the last found end time if it is less than the
# user supplied end time (or the current date).
if not self._found_end_time:
digest_end_time = parse_date(digest['digestEndTime'])
self._found_end_time = min(digest_end_time, self.end_time)
def _download_log(self, log):
""" Download a log, decompress, and compare SHA256 checksums"""
try:
# Create a client that can work with this bucket.
client = self.s3_client_provider.get_client(log['s3Bucket'])
response = client.get_object(
Bucket=log['s3Bucket'], Key=log['s3Object'])
gzip_inflater = zlib.decompressobj(zlib.MAX_WBITS | 16)
rolling_hash = hashlib.sha256()
for chunk in iter(lambda: response['Body'].read(2048), b""):
data = gzip_inflater.decompress(chunk)
rolling_hash.update(data)
remaining_data = gzip_inflater.flush()
if remaining_data:
rolling_hash.update(remaining_data)
computed_hash = rolling_hash.hexdigest()
if computed_hash != log['hashValue']:
self._on_log_invalid(log)
else:
self._valid_logs += 1
self._write_status(('Log file\ts3://%s/%s\tvalid'
% (log['s3Bucket'], log['s3Object'])))
except ClientError as e:
if e.response['Error']['Code'] != 'NoSuchKey':
raise
self._on_missing_log(log)
except Exception:
self._on_invalid_log_format(log)
def _write_status(self, message, is_error=False):
if is_error:
if self._is_last_status_double_space:
sys.stderr.write("%s\n\n" % message)
else:
sys.stderr.write("\n%s\n\n" % message)
self._is_last_status_double_space = True
elif self.is_verbose:
self._is_last_status_double_space = False
sys.stdout.write("%s\n" % message)
def _write_startup_text(self):
sys.stdout.write(
'Validating log files for trail %s between %s and %s\n\n'
% (self.trail_arn, format_display_date(self.start_time),
format_display_date(self.end_time)))
def _write_summary_text(self):
if not self._is_last_status_double_space:
sys.stdout.write('\n')
sys.stdout.write('Results requested for %s to %s\n'
% (format_display_date(self.start_time),
format_display_date(self.end_time)))
if not self._valid_digests and not self._invalid_digests:
sys.stdout.write('No digests found\n')
return
if not self._found_start_time or not self._found_end_time:
sys.stdout.write('No valid digests found in range\n')
else:
sys.stdout.write('Results found for %s to %s:\n'
% (format_display_date(self._found_start_time),
format_display_date(self._found_end_time)))
self._write_ratio(self._valid_digests, self._invalid_digests, 'digest')
self._write_ratio(self._valid_logs, self._invalid_logs, 'log')
sys.stdout.write('\n')
def _write_ratio(self, valid, invalid, name):
total = valid + invalid
if total > 0:
sys.stdout.write('\n%d/%d %s files valid' % (valid, total, name))
if invalid > 0:
sys.stdout.write(', %d/%d %s files INVALID' % (invalid, total,
name))
def _on_missing_digest(self, bucket, last_key, **kwargs):
self._invalid_digests += 1
self._write_status('Digest file\ts3://%s/%s\tINVALID: not found'
% (bucket, last_key), True)
def _on_digest_gap(self, **kwargs):
self._write_status(
'No log files were delivered by CloudTrail between %s and %s'
% (format_display_date(kwargs['next_end_date']),
format_display_date(kwargs['last_start_date'])), True)
def _on_invalid_digest(self, message, **kwargs):
self._invalid_digests += 1
self._write_status(message, True)
def _on_invalid_log_format(self, log_data):
self._invalid_logs += 1
self._write_status(
('Log file\ts3://%s/%s\tINVALID: invalid format'
% (log_data['s3Bucket'], log_data['s3Object'])), True)
def _on_log_invalid(self, log_data):
self._invalid_logs += 1
self._write_status(
"Log file\ts3://%s/%s\tINVALID: hash value doesn't match"
% (log_data['s3Bucket'], log_data['s3Object']), True)
def _on_missing_log(self, log_data):
self._invalid_logs += 1
self._write_status(
'Log file\ts3://%s/%s\tINVALID: not found'
% (log_data['s3Bucket'], log_data['s3Object']), True)