Skip to content

Commit 0b00ce1

Browse files
authored
Implement memory performance fixes for downloads to non-seekable streams
2 parents 67f9820 + 27173b3 commit 0b00ce1

File tree

3 files changed

+90
-17
lines changed

3 files changed

+90
-17
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"type": "enhancement",
3+
"category": "``s3``",
4+
"description": "Implement memory performance fixes for downloads to non-seekable streams"
5+
}

s3transfer/download.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -750,7 +750,7 @@ class DeferQueue:
750750

751751
def __init__(self):
752752
self._writes = []
753-
self._pending_offsets = set()
753+
self._pending_offsets = {}
754754
self._next_offset = 0
755755

756756
def request_writes(self, offset, data):
@@ -766,23 +766,49 @@ def request_writes(self, offset, data):
766766
each method call.
767767
768768
"""
769-
if offset < self._next_offset:
769+
if offset + len(data) <= self._next_offset:
770770
# This is a request for a write that we've already
771771
# seen. This can happen in the event of a retry
772772
# where if we retry at at offset N/2, we'll requeue
773773
# offsets 0-N/2 again.
774774
return []
775775
writes = []
776+
if offset < self._next_offset:
777+
# This is a special case where the write request contains
778+
# both seen AND unseen data. This can happen in the case
779+
# that we queue part of a chunk due to an incomplete read,
780+
# then pop the incomplete data for writing, then we receive the retry
781+
# for the incomplete read which contains both the previously-seen
782+
# partial chunk followed by the rest of the chunk (unseen).
783+
#
784+
# In this case, we discard the bytes of the data we've already
785+
# queued before, and only queue the unseen bytes.
786+
seen_bytes = self._next_offset - offset
787+
data = data[seen_bytes:]
788+
offset = self._next_offset
776789
if offset in self._pending_offsets:
777-
# We've already queued this offset so this request is
778-
# a duplicate. In this case we should ignore
779-
# this request and prefer what's already queued.
780-
return []
781-
heapq.heappush(self._writes, (offset, data))
782-
self._pending_offsets.add(offset)
783-
while self._writes and self._writes[0][0] == self._next_offset:
784-
next_write = heapq.heappop(self._writes)
785-
writes.append({'offset': next_write[0], 'data': next_write[1]})
786-
self._pending_offsets.remove(next_write[0])
787-
self._next_offset += len(next_write[1])
790+
queued_data = self._pending_offsets[offset]
791+
if len(data) <= len(queued_data):
792+
# We already have a write request queued with the same offset
793+
# with at least as much data that is present in this
794+
# request. In this case we should ignore this request
795+
# and prefer what's already queued.
796+
return []
797+
else:
798+
# We have a write request queued with the same offset,
799+
# but this request contains more data. This can happen
800+
# in the case of a retried request due to an incomplete
801+
# read, followed by a retry containing the full response
802+
# body. In this case, we should overwrite the queued
803+
# request with this one since it contains more data.
804+
self._pending_offsets[offset] = data
805+
else:
806+
heapq.heappush(self._writes, offset)
807+
self._pending_offsets[offset] = data
808+
while self._writes and self._writes[0] == self._next_offset:
809+
next_write_offset = heapq.heappop(self._writes)
810+
next_write = self._pending_offsets[next_write_offset]
811+
writes.append({'offset': next_write_offset, 'data': next_write})
812+
del self._pending_offsets[next_write_offset]
813+
self._next_offset += len(next_write)
788814
return writes

tests/unit/test_download.py

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ def test_data_queued_in_order(self):
963963
writes = self.q.request_writes(offset=11, data='hello again')
964964
self.assertEqual(writes, [{'offset': 11, 'data': 'hello again'}])
965965

966-
def test_writes_below_min_offset_are_ignored(self):
966+
def test_writes_with_last_byte_below_min_offset_are_ignored(self):
967967
self.q.request_writes(offset=0, data='a')
968968
self.q.request_writes(offset=1, data='b')
969969
self.q.request_writes(offset=2, data='c')
@@ -978,13 +978,36 @@ def test_writes_below_min_offset_are_ignored(self):
978978
[{'offset': 3, 'data': 'd'}],
979979
)
980980

981-
def test_duplicate_writes_are_ignored(self):
981+
def test_writes_below_min_offset_with_last_byte_above_min_offset_are_queued(
982+
self,
983+
):
984+
self.assertEqual(
985+
self.q.request_writes(offset=0, data='foo'),
986+
[{'offset': 0, 'data': 'foo'}],
987+
)
988+
989+
# Even though a partial write of 'foo' was completed at offset 0,
990+
# a subsequent request to the same offset with a longer
991+
# length will write a substring of the data starting at
992+
# index next_offset.
993+
self.assertEqual(
994+
self.q.request_writes(offset=0, data='foo bar'),
995+
[
996+
# Note we are writing a substring of the data starting at
997+
# index 3 since the previous write to index 0 had length 3.
998+
{'offset': 3, 'data': ' bar'},
999+
],
1000+
)
1001+
1002+
def test_duplicate_writes_same_length_are_ignored(self):
9821003
self.q.request_writes(offset=2, data='c')
9831004
self.q.request_writes(offset=1, data='b')
9841005

9851006
# We're still waiting for offset=0, but if
986-
# a duplicate write comes in for offset=2/offset=1
987-
# it's ignored. This gives "first one wins" behavior.
1007+
# a duplicate write with the same length comes in
1008+
# for offset=2/offset=1 it's ignored.
1009+
# This gives "largest one wins" behavior with ties
1010+
# broken via "first one wins".
9881011
self.assertEqual(self.q.request_writes(offset=2, data='X'), [])
9891012
self.assertEqual(self.q.request_writes(offset=1, data='Y'), [])
9901013

@@ -997,3 +1020,22 @@ def test_duplicate_writes_are_ignored(self):
9971020
{'offset': 2, 'data': 'c'},
9981021
],
9991022
)
1023+
1024+
def test_duplicate_writes_longer_length_update_queue(self):
1025+
self.q.request_writes(offset=1, data='b')
1026+
1027+
# We're still waiting for offset=0, but if
1028+
# a write comes in for the same offset=2/offset=1
1029+
# it updates the queue if the request contains more data.
1030+
# This gives "largest one wins" behavior with ties
1031+
# broken via "first one wins".
1032+
self.assertEqual(self.q.request_writes(offset=1, data='bar'), [])
1033+
1034+
self.assertEqual(
1035+
self.q.request_writes(offset=0, data='a'),
1036+
[
1037+
{'offset': 0, 'data': 'a'},
1038+
# Note we're seeing 'bar', and not 'b', since len(bar) > len(b).
1039+
{'offset': 1, 'data': 'bar'},
1040+
],
1041+
)

0 commit comments

Comments
 (0)