From e8a91cf21a3aee05ff9cdeda9503c303de9867b1 Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Fri, 22 Dec 2023 14:20:52 +0000 Subject: [PATCH 1/4] Add `events` method to ContainerClient --- .../utils/container_utils/container_client.py | 20 +++++- .../container_utils/docker_cmd_client.py | 13 +++- .../container_utils/docker_sdk_client.py | 5 +- tests/integration/docker_utils/test_docker.py | 66 ++++++++++++++++++- 4 files changed, 100 insertions(+), 4 deletions(-) diff --git a/localstack/utils/container_utils/container_client.py b/localstack/utils/container_utils/container_client.py index 9cdb3d6c700a8..bcff49035a0b3 100644 --- a/localstack/utils/container_utils/container_client.py +++ b/localstack/utils/container_utils/container_client.py @@ -11,7 +11,18 @@ from abc import ABCMeta, abstractmethod from enum import Enum, unique from pathlib import Path -from typing import Dict, List, Literal, NamedTuple, Optional, Protocol, Tuple, Union, get_args +from typing import ( + Dict, + Generator, + List, + Literal, + NamedTuple, + Optional, + Protocol, + Tuple, + Union, + get_args, +) from localstack import config from localstack.utils.collections import HashableList, ensure_list @@ -670,6 +681,13 @@ def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> s def stream_container_logs(self, container_name_or_id: str) -> CancellableStream: """Returns a blocking generator you can iterate over to retrieve log output as it happens.""" + @abstractmethod + def events(self) -> Generator[Dict, None, None]: + """Returns a generator over events generated by the container runtime. + + More information: https://docs.docker.com/engine/reference/commandline/events + """ + @abstractmethod def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]: """Get detailed attributes of a container. diff --git a/localstack/utils/container_utils/docker_cmd_client.py b/localstack/utils/container_utils/docker_cmd_client.py index 6072dfd7f1169..417eea629933d 100644 --- a/localstack/utils/container_utils/docker_cmd_client.py +++ b/localstack/utils/container_utils/docker_cmd_client.py @@ -6,7 +6,7 @@ import re import shlex import subprocess -from typing import Dict, List, Optional, Tuple, Union +from typing import Dict, Generator, List, Optional, Tuple, Union from localstack import config from localstack.utils.collections import ensure_list @@ -408,6 +408,17 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream: return CancellableProcessStream(process) + def events(self) -> Generator[Dict, None, None]: + cmd = self._docker_cmd() + cmd += ["events", "--format", "{{json .}}"] + + process: subprocess.Popen = run( + cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT + ) + + for msg in CancellableProcessStream(process): + yield json.loads(msg) + def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[dict, list, str]]: cmd = self._docker_cmd() cmd += ["inspect", "--format", "{{json .}}", object_name_or_id] diff --git a/localstack/utils/container_utils/docker_sdk_client.py b/localstack/utils/container_utils/docker_sdk_client.py index 2e31db850afb4..c3fcb4aa6fd32 100644 --- a/localstack/utils/container_utils/docker_sdk_client.py +++ b/localstack/utils/container_utils/docker_sdk_client.py @@ -7,7 +7,7 @@ import socket import threading from time import sleep -from typing import Dict, List, Optional, Tuple, Union, cast +from typing import Dict, Generator, List, Optional, Tuple, Union, cast from urllib.parse import quote import docker @@ -361,6 +361,9 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream: except APIError as e: raise ContainerException() from e + def events(self) -> Generator[Dict, None, None]: + yield from self.client().events(decode=True) + def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]: try: return self.client().containers.get(container_name_or_id).attrs diff --git a/tests/integration/docker_utils/test_docker.py b/tests/integration/docker_utils/test_docker.py index 064904a6ddfdf..add4fe59bc8de 100644 --- a/tests/integration/docker_utils/test_docker.py +++ b/tests/integration/docker_utils/test_docker.py @@ -2,6 +2,7 @@ import ipaddress import logging import os +import queue import re import time from typing import NamedTuple, Type @@ -39,7 +40,7 @@ ) from localstack.utils.net import Port, PortNotAvailableException, get_free_tcp_port from localstack.utils.strings import to_bytes -from localstack.utils.threads import FuncThread +from localstack.utils.threads import FuncThread, start_thread from tests.integration.docker_utils.conftest import is_podman_test, skip_for_podman ContainerInfo = NamedTuple( @@ -1048,6 +1049,69 @@ def test_stream_logs(self, docker_client: ContainerClient): finally: docker_client.remove_container(container_name) + def test_events(self, docker_client: ContainerClient): + # create background thread watching for events + q = queue.Queue() + + should_stop = False + + def stream_messages(*_): + for event in docker_client.events(): + if should_stop: + break + q.put(event) + + start_thread(stream_messages, name="docker-events-poller") + + # run a container to generate some events + container_name = _random_container_name() + id = docker_client.create_container( + "alpine", + name=container_name, + detach=False, + command=["sh", "-c", "sleep 1"], + ) + try: + docker_client.start_container( + id, + attach=True, + ) + finally: + docker_client.remove_container(container_name) + + should_stop = True + + # flags to indicate that expected messages have been observed + # running a container and then removing the container should at least + # contain the following: + # + # - {"status": "create", ...} + # - {"status": "destroy", ...} + received_create = False + received_destroy = False + + max_messages = 50 + for _ in range(max_messages): + if received_create and received_destroy: + break + + msg = q.get() + + # filter out only events for this container + if msg.get("id") != id: + continue + + # update test state based on message content + match msg.get("status"): + case "create": + received_create = True + case "destroy": + received_destroy = True + + q.task_done() + + assert received_create and received_destroy + @markers.skip_offline def test_pull_docker_image(self, docker_client: ContainerClient): try: From 0d71e4e9cc96790581ca47bc234e9d30410188a7 Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Fri, 22 Dec 2023 14:56:17 +0000 Subject: [PATCH 2/4] Include event filter flags --- .../utils/container_utils/container_client.py | 8 ++++++- .../container_utils/docker_cmd_client.py | 21 ++++++++++++++++++- .../container_utils/docker_sdk_client.py | 10 +++++++-- 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/localstack/utils/container_utils/container_client.py b/localstack/utils/container_utils/container_client.py index bcff49035a0b3..68de78748e0af 100644 --- a/localstack/utils/container_utils/container_client.py +++ b/localstack/utils/container_utils/container_client.py @@ -9,6 +9,7 @@ import tarfile import tempfile from abc import ABCMeta, abstractmethod +from datetime import datetime from enum import Enum, unique from pathlib import Path from typing import ( @@ -682,7 +683,12 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream: """Returns a blocking generator you can iterate over to retrieve log output as it happens.""" @abstractmethod - def events(self) -> Generator[Dict, None, None]: + def events( + self, + since: Optional[Union[datetime, int]] = None, + until: Optional[Union[datetime, int]] = None, + filters: Optional[Dict] = None, + ) -> Generator[Dict, None, None]: """Returns a generator over events generated by the container runtime. More information: https://docs.docker.com/engine/reference/commandline/events diff --git a/localstack/utils/container_utils/docker_cmd_client.py b/localstack/utils/container_utils/docker_cmd_client.py index 417eea629933d..7b1e94285b429 100644 --- a/localstack/utils/container_utils/docker_cmd_client.py +++ b/localstack/utils/container_utils/docker_cmd_client.py @@ -6,6 +6,7 @@ import re import shlex import subprocess +from datetime import datetime from typing import Dict, Generator, List, Optional, Tuple, Union from localstack import config @@ -408,10 +409,28 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream: return CancellableProcessStream(process) - def events(self) -> Generator[Dict, None, None]: + def events( + self, + since: Optional[Union[datetime, int]] = None, + until: Optional[Union[datetime, int]] = None, + filters: Optional[Dict] = None, + ) -> Generator[Dict, None, None]: cmd = self._docker_cmd() cmd += ["events", "--format", "{{json .}}"] + def _to_timestamp(value: Union[datetime, int]): + if isinstance(value, datetime): + return value.timestamp() + else: + return value + + if since: + cmd += [_to_timestamp(since)] + if until: + cmd += [_to_timestamp(until)] + if filters: + cmd += [json.dumps(filters)] + process: subprocess.Popen = run( cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT ) diff --git a/localstack/utils/container_utils/docker_sdk_client.py b/localstack/utils/container_utils/docker_sdk_client.py index c3fcb4aa6fd32..0f3049e7ce281 100644 --- a/localstack/utils/container_utils/docker_sdk_client.py +++ b/localstack/utils/container_utils/docker_sdk_client.py @@ -6,6 +6,7 @@ import re import socket import threading +from datetime import datetime from time import sleep from typing import Dict, Generator, List, Optional, Tuple, Union, cast from urllib.parse import quote @@ -361,8 +362,13 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream: except APIError as e: raise ContainerException() from e - def events(self) -> Generator[Dict, None, None]: - yield from self.client().events(decode=True) + def events( + self, + since: Optional[Union[datetime, int]] = None, + until: Optional[Union[datetime, int]] = None, + filters: Optional[Dict] = None, + ) -> Generator[Dict, None, None]: + yield from self.client().events(since=since, until=until, filters=filters, decode=True) def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]: try: From d7e0d1faef286d47aaf56bdf1aec6a3673fcc00e Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Fri, 22 Dec 2023 15:17:40 +0000 Subject: [PATCH 3/4] Fix filter usage and add to tests --- localstack/utils/container_utils/docker_cmd_client.py | 10 +++++++--- tests/integration/docker_utils/test_docker.py | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/localstack/utils/container_utils/docker_cmd_client.py b/localstack/utils/container_utils/docker_cmd_client.py index 7b1e94285b429..dbf7f4cbd7b1f 100644 --- a/localstack/utils/container_utils/docker_cmd_client.py +++ b/localstack/utils/container_utils/docker_cmd_client.py @@ -427,16 +427,20 @@ def _to_timestamp(value: Union[datetime, int]): if since: cmd += [_to_timestamp(since)] if until: - cmd += [_to_timestamp(until)] + cmd += ["--until", _to_timestamp(until)] if filters: - cmd += [json.dumps(filters)] + filter_pairs = [f"{key}={value}" for (key, value) in filters.items()] + cmd += ["--filter", ",".join(filter_pairs)] process: subprocess.Popen = run( cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT ) for msg in CancellableProcessStream(process): - yield json.loads(msg) + try: + yield json.loads(msg) + except json.JSONDecodeError as e: + LOG.warning("Error decoding docker event %s: %s", msg, e) def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[dict, list, str]]: cmd = self._docker_cmd() diff --git a/tests/integration/docker_utils/test_docker.py b/tests/integration/docker_utils/test_docker.py index add4fe59bc8de..8fd8cf4daf82c 100644 --- a/tests/integration/docker_utils/test_docker.py +++ b/tests/integration/docker_utils/test_docker.py @@ -1054,9 +1054,10 @@ def test_events(self, docker_client: ContainerClient): q = queue.Queue() should_stop = False + container_name = _random_container_name() def stream_messages(*_): - for event in docker_client.events(): + for event in docker_client.events(filters={"id": "container_name"}): if should_stop: break q.put(event) @@ -1064,7 +1065,6 @@ def stream_messages(*_): start_thread(stream_messages, name="docker-events-poller") # run a container to generate some events - container_name = _random_container_name() id = docker_client.create_container( "alpine", name=container_name, From 2af4b6e8b31806f979e9d07a415871d4e8f6ed6e Mon Sep 17 00:00:00 2001 From: Simon Walker Date: Fri, 5 Jan 2024 16:53:47 +0000 Subject: [PATCH 4/4] Ensure events stream is cancellable --- .../container_utils/docker_cmd_client.py | 24 +++++++++++++------ 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/localstack/utils/container_utils/docker_cmd_client.py b/localstack/utils/container_utils/docker_cmd_client.py index dbf7f4cbd7b1f..440f472eb4b26 100644 --- a/localstack/utils/container_utils/docker_cmd_client.py +++ b/localstack/utils/container_utils/docker_cmd_client.py @@ -7,7 +7,7 @@ import shlex import subprocess from datetime import datetime -from typing import Dict, Generator, List, Optional, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from localstack import config from localstack.utils.collections import ensure_list @@ -38,10 +38,14 @@ class CancellableProcessStream(CancellableStream): process: subprocess.Popen + map_fn: Callable[[str], Any] - def __init__(self, process: subprocess.Popen) -> None: + def __init__( + self, process: subprocess.Popen, map_fn: Union[Callable[[str], Any], None] = None + ) -> None: super().__init__() self.process = process + self.map_fn = map_fn def __iter__(self): return self @@ -50,7 +54,11 @@ def __next__(self): line = self.process.stdout.readline() if not line: raise StopIteration - return line + + if self.map_fn is not None: + return self.map_fn(line) + else: + return line def close(self): return self.process.terminate() @@ -414,7 +422,7 @@ def events( since: Optional[Union[datetime, int]] = None, until: Optional[Union[datetime, int]] = None, filters: Optional[Dict] = None, - ) -> Generator[Dict, None, None]: + ) -> CancellableStream: cmd = self._docker_cmd() cmd += ["events", "--format", "{{json .}}"] @@ -436,11 +444,13 @@ def _to_timestamp(value: Union[datetime, int]): cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT ) - for msg in CancellableProcessStream(process): + def decode_fn(line: str): try: - yield json.loads(msg) + return json.loads(line) except json.JSONDecodeError as e: - LOG.warning("Error decoding docker event %s: %s", msg, e) + LOG.warning("Error decoding docker event %s: %s", line, e) + + return CancellableProcessStream(process, map_fn=decode_fn) def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[dict, list, str]]: cmd = self._docker_cmd()