Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion localstack/utils/container_utils/container_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,21 @@
import tarfile
import tempfile
from abc import ABCMeta, abstractmethod
from datetime import datetime
from enum import Enum, unique
from pathlib import Path
from typing import Dict, List, Literal, NamedTuple, Optional, Protocol, Tuple, Union, get_args
from typing import (
Dict,
Generator,
List,
Literal,
NamedTuple,
Optional,
Protocol,
Tuple,
Union,
get_args,
)

from localstack import config
from localstack.utils.collections import HashableList, ensure_list
Expand Down Expand Up @@ -670,6 +682,18 @@ def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> s
def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
"""Returns a blocking generator you can iterate over to retrieve log output as it happens."""

@abstractmethod
def events(
self,
since: Optional[Union[datetime, int]] = None,
until: Optional[Union[datetime, int]] = None,
filters: Optional[Dict] = None,
) -> Generator[Dict, None, None]:
"""Returns a generator over events generated by the container runtime.

More information: https://docs.docker.com/engine/reference/commandline/events
"""

@abstractmethod
def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
"""Get detailed attributes of a container.
Expand Down
50 changes: 47 additions & 3 deletions localstack/utils/container_utils/docker_cmd_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import re
import shlex
import subprocess
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from localstack import config
from localstack.utils.collections import ensure_list
Expand Down Expand Up @@ -37,10 +38,14 @@

class CancellableProcessStream(CancellableStream):
process: subprocess.Popen
map_fn: Callable[[str], Any]

def __init__(self, process: subprocess.Popen) -> None:
def __init__(
self, process: subprocess.Popen, map_fn: Union[Callable[[str], Any], None] = None
) -> None:
super().__init__()
self.process = process
self.map_fn = map_fn

def __iter__(self):
return self
Expand All @@ -49,7 +54,11 @@ def __next__(self):
line = self.process.stdout.readline()
if not line:
raise StopIteration
return line

if self.map_fn is not None:
return self.map_fn(line)
else:
return line

def close(self):
return self.process.terminate()
Expand Down Expand Up @@ -408,6 +417,41 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:

return CancellableProcessStream(process)

def events(
self,
since: Optional[Union[datetime, int]] = None,
until: Optional[Union[datetime, int]] = None,
filters: Optional[Dict] = None,
) -> CancellableStream:
cmd = self._docker_cmd()
cmd += ["events", "--format", "{{json .}}"]

def _to_timestamp(value: Union[datetime, int]):
if isinstance(value, datetime):
return value.timestamp()
else:
return value

if since:
cmd += [_to_timestamp(since)]
if until:
cmd += ["--until", _to_timestamp(until)]
if filters:
filter_pairs = [f"{key}={value}" for (key, value) in filters.items()]
cmd += ["--filter", ",".join(filter_pairs)]

process: subprocess.Popen = run(
cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT
)

def decode_fn(line: str):
try:
return json.loads(line)
except json.JSONDecodeError as e:
LOG.warning("Error decoding docker event %s: %s", line, e)

return CancellableProcessStream(process, map_fn=decode_fn)

def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[dict, list, str]]:
cmd = self._docker_cmd()
cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]
Expand Down
11 changes: 10 additions & 1 deletion localstack/utils/container_utils/docker_sdk_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import re
import socket
import threading
from datetime import datetime
from time import sleep
from typing import Dict, List, Optional, Tuple, Union, cast
from typing import Dict, Generator, List, Optional, Tuple, Union, cast
from urllib.parse import quote

import docker
Expand Down Expand Up @@ -361,6 +362,14 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
except APIError as e:
raise ContainerException() from e

def events(
self,
since: Optional[Union[datetime, int]] = None,
until: Optional[Union[datetime, int]] = None,
filters: Optional[Dict] = None,
) -> Generator[Dict, None, None]:
yield from self.client().events(since=since, until=until, filters=filters, decode=True)

def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
try:
return self.client().containers.get(container_name_or_id).attrs
Expand Down
66 changes: 65 additions & 1 deletion tests/integration/docker_utils/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import ipaddress
import logging
import os
import queue
import re
import time
from typing import NamedTuple, Type
Expand Down Expand Up @@ -39,7 +40,7 @@
)
from localstack.utils.net import Port, PortNotAvailableException, get_free_tcp_port
from localstack.utils.strings import to_bytes
from localstack.utils.threads import FuncThread
from localstack.utils.threads import FuncThread, start_thread
from tests.integration.docker_utils.conftest import is_podman_test, skip_for_podman

ContainerInfo = NamedTuple(
Expand Down Expand Up @@ -1048,6 +1049,69 @@ def test_stream_logs(self, docker_client: ContainerClient):
finally:
docker_client.remove_container(container_name)

def test_events(self, docker_client: ContainerClient):
# create background thread watching for events
q = queue.Queue()

should_stop = False
container_name = _random_container_name()

def stream_messages(*_):
for event in docker_client.events(filters={"id": "container_name"}):
if should_stop:
break
q.put(event)

start_thread(stream_messages, name="docker-events-poller")

# run a container to generate some events
id = docker_client.create_container(
"alpine",
name=container_name,
detach=False,
command=["sh", "-c", "sleep 1"],
)
try:
docker_client.start_container(
id,
attach=True,
)
finally:
docker_client.remove_container(container_name)

should_stop = True

# flags to indicate that expected messages have been observed
# running a container and then removing the container should at least
# contain the following:
#
# - {"status": "create", ...}
# - {"status": "destroy", ...}
received_create = False
received_destroy = False

max_messages = 50
for _ in range(max_messages):
if received_create and received_destroy:
break

msg = q.get()

# filter out only events for this container
if msg.get("id") != id:
continue

# update test state based on message content
match msg.get("status"):
case "create":
received_create = True
case "destroy":
received_destroy = True

q.task_done()

assert received_create and received_destroy

@markers.skip_offline
def test_pull_docker_image(self, docker_client: ContainerClient):
try:
Expand Down