Skip to content

Docker client: reimplement the events API #12486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import tarfile
import tempfile
from abc import ABCMeta, abstractmethod
from datetime import datetime
from enum import Enum, unique
from pathlib import Path
from typing import (
Dict,
Generator,
List,
Literal,
NamedTuple,
Expand Down Expand Up @@ -743,6 +745,17 @@ def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> s
def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
"""Returns a blocking generator you can iterate over to retrieve log output as it happens."""

@abstractmethod
def events(
self,
since: Optional[Union[datetime, int]] = None,
until: Optional[Union[datetime, int]] = None,
filters: Optional[Dict] = None,
) -> Generator[Dict, None, None]:
"""Returns a generator over events generated by the container runtime.
More information: https://docs.docker.com/engine/reference/commandline/events
"""

@abstractmethod
def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
"""Get detailed attributes of a container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import re
import shlex
import subprocess
from typing import Dict, List, Optional, Tuple, Union
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from localstack import config
from localstack.utils.collections import ensure_list
Expand Down Expand Up @@ -39,10 +40,14 @@

class CancellableProcessStream(CancellableStream):
process: subprocess.Popen
map_fn: Optional[Callable[[str], Any]]

def __init__(self, process: subprocess.Popen) -> None:
def __init__(
self, process: subprocess.Popen, map_fn: Optional[Callable[[str], Any]] = None
) -> None:
super().__init__()
self.process = process
self.map_fn = map_fn

def __iter__(self):
return self
Expand All @@ -51,7 +56,10 @@ def __next__(self):
line = self.process.stdout.readline()
if not line:
raise StopIteration
return line
if self.map_fn:
return self.map_fn(line)
else:
return line

def close(self):
return self.process.terminate()
Expand Down Expand Up @@ -478,6 +486,41 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:

return CancellableProcessStream(process)

def events(
self,
since: Optional[Union[datetime, int]] = None,
until: Optional[Union[datetime, int]] = None,
filters: Optional[Dict] = None,
) -> CancellableStream:
cmd = self._docker_cmd()
cmd += ["events", "--format", "{{json .}}"]

def _to_timestamp(value: Union[datetime, int]):
if isinstance(value, datetime):
return value.timestamp()
else:
return value

if since:
cmd += [_to_timestamp(since)]
if until:
cmd += ["--until", _to_timestamp(until)]
if filters:
filter_pairs = [f"{key}={value}" for (key, value) in filters.items()]
cmd += ["--filter", ",".join(filter_pairs)]

process: subprocess.Popen = run(
cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT
)

def decode_fn(line: str):
try:
return json.loads(line)
except json.JSONDecodeError as e:
LOG.warning("Error decoding docker event %s: %s", line, e)

return CancellableProcessStream(process, map_fn=decode_fn)

def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[dict, list, str]]:
cmd = self._docker_cmd()
cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
import re
import socket
import threading
from datetime import datetime
from functools import lru_cache
from time import sleep
from typing import Dict, List, Optional, Tuple, Union, cast
from typing import Dict, Generator, List, Optional, Tuple, Union, cast
from urllib.parse import quote

import docker
Expand Down Expand Up @@ -451,6 +452,14 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
except APIError as e:
raise ContainerException() from e

def events(
self,
since: Optional[Union[datetime, int]] = None,
until: Optional[Union[datetime, int]] = None,
filters: Optional[Dict] = None,
) -> Generator[Dict, None, None]:
yield from self.client().events(since=since, until=until, filters=filters, decode=True)

def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
try:
return self.client().containers.get(container_name_or_id).attrs
Expand Down
69 changes: 68 additions & 1 deletion tests/integration/docker_utils/test_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import queue
import re
import textwrap
import time
Expand Down Expand Up @@ -45,7 +46,7 @@
from localstack.utils.net import Port, PortNotAvailableException, get_free_tcp_port
from localstack.utils.strings import to_bytes
from localstack.utils.sync import retry
from localstack.utils.threads import FuncThread
from localstack.utils.threads import FuncThread, start_thread
from tests.integration.docker_utils.conftest import is_podman_test, skip_for_podman

ContainerInfo = NamedTuple(
Expand Down Expand Up @@ -1051,6 +1052,72 @@ def test_get_logs_non_existent_container(self, docker_client: ContainerClient):
"container_hopefully_does_not_exist", safe=True
)

def test_events(self, docker_client: ContainerClient):
# create background thread watching for events
q = queue.Queue()

should_stop = False
container_name = _random_container_name()

def stream_messages(*_):
stream = docker_client.events(filters={"container": container_name})
for event in stream:
if should_stop:
break
q.put(event)

start_thread(stream_messages, name="docker-events-poller")

# run a container to generate some events
id = docker_client.create_container(
"alpine",
name=container_name,
detach=False,
command=["sh", "-c", "sleep 1"],
)
try:
docker_client.start_container(
id,
attach=True,
)
except Exception as e:
raise AssertionError("Error when starting container") from e
finally:
docker_client.remove_container(container_name)

should_stop = True

# flags to indicate that expected messages have been observed
# running a container and then removing the container should at least
# contain the following:
#
# - {"status": "create", ...}
# - {"status": "destroy", ...}
received_create = False
received_destroy = False

max_messages = 50
for _ in range(max_messages):
if received_create and received_destroy:
break

msg = q.get()

# filter out only events for this container
if msg.get("id") != id:
continue

# update test state based on message content
match msg.get("status"):
case "create":
received_create = True
case "destroy":
received_destroy = True

q.task_done()

assert received_create and received_destroy

def test_get_logs(self, docker_client: ContainerClient):
container_name = _random_container_name()
try:
Expand Down
Loading