Skip to content

Commit 585962d

Browse files
moodyjoneukreign
authored andcommitted
Make stop(), stop_tasks() consistently async routines, and have stop_tasks()
wait for file_output_task completion. This fixes a problem with test_download_stop_resume_delete.
1 parent ea4fba3 commit 585962d

File tree

6 files changed

+18
-17
lines changed

6 files changed

+18
-17
lines changed

lbry/file/source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ async def stop(self, finished: bool = False):
6767
async def save_file(self, file_name: Optional[str] = None, download_directory: Optional[str] = None):
6868
raise NotImplementedError()
6969

70-
def stop_tasks(self):
70+
async def stop_tasks(self):
7171
raise NotImplementedError()
7272

7373
def set_claim(self, claim_info: typing.Dict, claim: 'Claim'):

lbry/file/source_manager.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
5959
def add(self, source: ManagedDownloadSource):
6060
self._sources[source.identifier] = source
6161

62-
def remove(self, source: ManagedDownloadSource):
62+
async def remove(self, source: ManagedDownloadSource):
6363
if source.identifier not in self._sources:
6464
return
6565
self._sources.pop(source.identifier)
66-
source.stop_tasks()
66+
await source.stop_tasks()
6767

6868
async def initialize_from_database(self):
6969
raise NotImplementedError()
@@ -72,18 +72,18 @@ async def start(self):
7272
await self.initialize_from_database()
7373
self.started.set()
7474

75-
def stop(self):
75+
async def stop(self):
7676
while self._sources:
7777
_, source = self._sources.popitem()
78-
source.stop_tasks()
78+
await source.stop_tasks()
7979
self.started.clear()
8080

8181
async def create(self, file_path: str, key: Optional[bytes] = None,
8282
iv_generator: Optional[typing.Generator[bytes, None, None]] = None) -> ManagedDownloadSource:
8383
raise NotImplementedError()
8484

8585
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):
86-
self.remove(source)
86+
await self.remove(source)
8787
if delete_file and source.output_file_exists:
8888
os.remove(source.full_path)
8989

lbry/stream/managed_stream.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ async def stop(self, finished: bool = False):
191191
Stop any running save/stream tasks as well as the downloader and update the status in the database
192192
"""
193193

194-
self.stop_tasks()
194+
await self.stop_tasks()
195195
if (finished and self.status != self.STATUS_FINISHED) or self.status == self.STATUS_RUNNING:
196196
await self.update_status(self.STATUS_FINISHED if finished else self.STATUS_STOPPED)
197197

@@ -324,12 +324,13 @@ async def save_file(self, file_name: Optional[str] = None, download_directory: O
324324
await asyncio.wait_for(self.started_writing.wait(), self.config.download_timeout)
325325
except asyncio.TimeoutError:
326326
log.warning("timeout starting to write data for lbry://%s#%s", self.claim_name, self.claim_id)
327-
self.stop_tasks()
327+
await self.stop_tasks()
328328
await self.update_status(ManagedStream.STATUS_STOPPED)
329329

330-
def stop_tasks(self):
330+
async def stop_tasks(self):
331331
if self.file_output_task and not self.file_output_task.done():
332332
self.file_output_task.cancel()
333+
await asyncio.gather(self.file_output_task, return_exceptions=True)
333334
self.file_output_task = None
334335
while self.streaming_responses:
335336
req, response = self.streaming_responses.pop()

lbry/stream/stream_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,8 @@ async def start(self):
196196
await super().start()
197197
self.re_reflect_task = self.loop.create_task(self.reflect_streams())
198198

199-
def stop(self):
200-
super().stop()
199+
async def stop(self):
200+
await super().stop()
201201
if self.resume_saving_task and not self.resume_saving_task.done():
202202
self.resume_saving_task.cancel()
203203
if self.re_reflect_task and not self.re_reflect_task.done():
@@ -260,7 +260,7 @@ async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool
260260
return
261261
if source.identifier in self.running_reflector_uploads:
262262
self.running_reflector_uploads[source.identifier].cancel()
263-
source.stop_tasks()
263+
await source.stop_tasks()
264264
if source.identifier in self.streams:
265265
del self.streams[source.identifier]
266266
blob_hashes = [source.identifier] + [b.blob_hash for b in source.descriptor.blobs[:-1]]

lbry/torrent/torrent_manager.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def torrent_name(self):
7474
def bt_infohash(self):
7575
return self.identifier
7676

77-
def stop_tasks(self):
77+
async def stop_tasks(self):
7878
pass
7979

8080
@property
@@ -118,8 +118,8 @@ async def initialize_from_database(self):
118118
async def start(self):
119119
await super().start()
120120

121-
def stop(self):
122-
super().stop()
121+
async def stop(self):
122+
await super().stop()
123123
log.info("finished stopping the torrent manager")
124124

125125
async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False):

tests/unit/stream/test_stream_manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,7 @@ async def test_non_head_data_timeout(self):
424424
self.assertIsNone(stream.full_path)
425425
self.assertEqual(0, stream.written_bytes)
426426

427-
self.stream_manager.stop()
427+
await self.stream_manager.stop()
428428
await self.stream_manager.start()
429429
self.assertEqual(1, len(self.stream_manager.streams))
430430
stream = list(self.stream_manager.streams.values())[0]
@@ -449,7 +449,7 @@ async def check_post(event):
449449
stream = await self.file_manager.download_from_uri(self.uri, self.exchange_rate_manager)
450450
await stream.finished_writing.wait()
451451
await asyncio.sleep(0)
452-
self.stream_manager.stop()
452+
await self.stream_manager.stop()
453453
self.client_blob_manager.stop()
454454
# partial removal, only sd blob is missing.
455455
# in this case, we recover the sd blob while the other blobs are kept untouched as 'finished'

0 commit comments

Comments
 (0)