Skip to content

Commit ec27241

Browse files
committed
Revert "Add events method to docker clients (#9932)"
This reverts commit 94f9ddf.
1 parent a5713c8 commit ec27241

File tree

4 files changed

+6
-147
lines changed

4 files changed

+6
-147
lines changed

localstack/utils/container_utils/container_client.py

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,9 @@
99
import tarfile
1010
import tempfile
1111
from abc import ABCMeta, abstractmethod
12-
from datetime import datetime
1312
from enum import Enum, unique
1413
from pathlib import Path
15-
from typing import (
16-
Dict,
17-
Generator,
18-
List,
19-
Literal,
20-
NamedTuple,
21-
Optional,
22-
Protocol,
23-
Tuple,
24-
Union,
25-
get_args,
26-
)
14+
from typing import Dict, List, Literal, NamedTuple, Optional, Protocol, Tuple, Union, get_args
2715

2816
from localstack import config
2917
from localstack.utils.collections import HashableList, ensure_list
@@ -682,18 +670,6 @@ def get_container_logs(self, container_name_or_id: str, safe: bool = False) -> s
682670
def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
683671
"""Returns a blocking generator you can iterate over to retrieve log output as it happens."""
684672

685-
@abstractmethod
686-
def events(
687-
self,
688-
since: Optional[Union[datetime, int]] = None,
689-
until: Optional[Union[datetime, int]] = None,
690-
filters: Optional[Dict] = None,
691-
) -> Generator[Dict, None, None]:
692-
"""Returns a generator over events generated by the container runtime.
693-
694-
More information: https://docs.docker.com/engine/reference/commandline/events
695-
"""
696-
697673
@abstractmethod
698674
def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
699675
"""Get detailed attributes of a container.

localstack/utils/container_utils/docker_cmd_client.py

Lines changed: 3 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@
66
import re
77
import shlex
88
import subprocess
9-
from datetime import datetime
10-
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
9+
from typing import Dict, List, Optional, Tuple, Union
1110

1211
from localstack import config
1312
from localstack.utils.collections import ensure_list
@@ -38,14 +37,10 @@
3837

3938
class CancellableProcessStream(CancellableStream):
4039
process: subprocess.Popen
41-
map_fn: Callable[[str], Any]
4240

43-
def __init__(
44-
self, process: subprocess.Popen, map_fn: Union[Callable[[str], Any], None] = None
45-
) -> None:
41+
def __init__(self, process: subprocess.Popen) -> None:
4642
super().__init__()
4743
self.process = process
48-
self.map_fn = map_fn
4944

5045
def __iter__(self):
5146
return self
@@ -54,11 +49,7 @@ def __next__(self):
5449
line = self.process.stdout.readline()
5550
if not line:
5651
raise StopIteration
57-
58-
if self.map_fn is not None:
59-
return self.map_fn(line)
60-
else:
61-
return line
52+
return line
6253

6354
def close(self):
6455
return self.process.terminate()
@@ -417,41 +408,6 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
417408

418409
return CancellableProcessStream(process)
419410

420-
def events(
421-
self,
422-
since: Optional[Union[datetime, int]] = None,
423-
until: Optional[Union[datetime, int]] = None,
424-
filters: Optional[Dict] = None,
425-
) -> CancellableStream:
426-
cmd = self._docker_cmd()
427-
cmd += ["events", "--format", "{{json .}}"]
428-
429-
def _to_timestamp(value: Union[datetime, int]):
430-
if isinstance(value, datetime):
431-
return value.timestamp()
432-
else:
433-
return value
434-
435-
if since:
436-
cmd += [_to_timestamp(since)]
437-
if until:
438-
cmd += ["--until", _to_timestamp(until)]
439-
if filters:
440-
filter_pairs = [f"{key}={value}" for (key, value) in filters.items()]
441-
cmd += ["--filter", ",".join(filter_pairs)]
442-
443-
process: subprocess.Popen = run(
444-
cmd, asynchronous=True, outfile=subprocess.PIPE, stderr=subprocess.STDOUT
445-
)
446-
447-
def decode_fn(line: str):
448-
try:
449-
return json.loads(line)
450-
except json.JSONDecodeError as e:
451-
LOG.warning("Error decoding docker event %s: %s", line, e)
452-
453-
return CancellableProcessStream(process, map_fn=decode_fn)
454-
455411
def _inspect_object(self, object_name_or_id: str) -> Dict[str, Union[dict, list, str]]:
456412
cmd = self._docker_cmd()
457413
cmd += ["inspect", "--format", "{{json .}}", object_name_or_id]

localstack/utils/container_utils/docker_sdk_client.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@
66
import re
77
import socket
88
import threading
9-
from datetime import datetime
109
from time import sleep
11-
from typing import Dict, Generator, List, Optional, Tuple, Union, cast
10+
from typing import Dict, List, Optional, Tuple, Union, cast
1211
from urllib.parse import quote
1312

1413
import docker
@@ -362,14 +361,6 @@ def stream_container_logs(self, container_name_or_id: str) -> CancellableStream:
362361
except APIError as e:
363362
raise ContainerException() from e
364363

365-
def events(
366-
self,
367-
since: Optional[Union[datetime, int]] = None,
368-
until: Optional[Union[datetime, int]] = None,
369-
filters: Optional[Dict] = None,
370-
) -> Generator[Dict, None, None]:
371-
yield from self.client().events(since=since, until=until, filters=filters, decode=True)
372-
373364
def inspect_container(self, container_name_or_id: str) -> Dict[str, Union[Dict, str]]:
374365
try:
375366
return self.client().containers.get(container_name_or_id).attrs

tests/integration/docker_utils/test_docker.py

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import ipaddress
33
import logging
44
import os
5-
import queue
65
import re
76
import time
87
from typing import NamedTuple, Type
@@ -40,7 +39,7 @@
4039
)
4140
from localstack.utils.net import Port, PortNotAvailableException, get_free_tcp_port
4241
from localstack.utils.strings import to_bytes
43-
from localstack.utils.threads import FuncThread, start_thread
42+
from localstack.utils.threads import FuncThread
4443
from tests.integration.docker_utils.conftest import is_podman_test, skip_for_podman
4544

4645
ContainerInfo = NamedTuple(
@@ -1049,69 +1048,6 @@ def test_stream_logs(self, docker_client: ContainerClient):
10491048
finally:
10501049
docker_client.remove_container(container_name)
10511050

1052-
def test_events(self, docker_client: ContainerClient):
1053-
# create background thread watching for events
1054-
q = queue.Queue()
1055-
1056-
should_stop = False
1057-
container_name = _random_container_name()
1058-
1059-
def stream_messages(*_):
1060-
for event in docker_client.events(filters={"id": "container_name"}):
1061-
if should_stop:
1062-
break
1063-
q.put(event)
1064-
1065-
start_thread(stream_messages, name="docker-events-poller")
1066-
1067-
# run a container to generate some events
1068-
id = docker_client.create_container(
1069-
"alpine",
1070-
name=container_name,
1071-
detach=False,
1072-
command=["sh", "-c", "sleep 1"],
1073-
)
1074-
try:
1075-
docker_client.start_container(
1076-
id,
1077-
attach=True,
1078-
)
1079-
finally:
1080-
docker_client.remove_container(container_name)
1081-
1082-
should_stop = True
1083-
1084-
# flags to indicate that expected messages have been observed
1085-
# running a container and then removing the container should at least
1086-
# contain the following:
1087-
#
1088-
# - {"status": "create", ...}
1089-
# - {"status": "destroy", ...}
1090-
received_create = False
1091-
received_destroy = False
1092-
1093-
max_messages = 50
1094-
for _ in range(max_messages):
1095-
if received_create and received_destroy:
1096-
break
1097-
1098-
msg = q.get()
1099-
1100-
# filter out only events for this container
1101-
if msg.get("id") != id:
1102-
continue
1103-
1104-
# update test state based on message content
1105-
match msg.get("status"):
1106-
case "create":
1107-
received_create = True
1108-
case "destroy":
1109-
received_destroy = True
1110-
1111-
q.task_done()
1112-
1113-
assert received_create and received_destroy
1114-
11151051
@markers.skip_offline
11161052
def test_pull_docker_image(self, docker_client: ContainerClient):
11171053
try:

0 commit comments

Comments
 (0)