Skip to content

Commit 4ebc61e

Browse files
authored
Merge pull request #2068 from FedML-AI/dev/v0.7.0
Dev/v0.7.0
2 parents 2500cbe + bb39ae3 commit 4ebc61e

File tree

4 files changed

+67
-6
lines changed

4 files changed

+67
-6
lines changed

python/fedml/computing/scheduler/comm_utils/container_utils.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22
import os
33
import traceback
4+
import datetime
5+
from dateutil.parser import isoparse
46

57
import docker
68
from docker import errors
@@ -363,3 +365,34 @@ def gpu_stats(gpu_ids):
363365
logging.error(f"Failed to get GPU stats: {e}")
364366

365367
return gpu_stats_map
368+
369+
@staticmethod
370+
def get_container_deploy_time_offset(container_name) -> int:
371+
"""
372+
Diff between the host machine's time and the container's time, in seconds
373+
"""
374+
time_diff = 0
375+
try:
376+
client = docker.from_env()
377+
container = client.containers.get(container_name)
378+
logs_content = container.logs(stdout=True, stderr=True, stream=False, follow=False, timestamps=True)
379+
logs_content = sys_utils.decode_our_err_result(logs_content)
380+
line_of_logs = logs_content.split("\n")
381+
382+
for line in line_of_logs:
383+
if line == "":
384+
continue
385+
386+
container_time = line.split(" ")[0]
387+
nano_second_str = container_time.split(".")[1][:9]
388+
t_container_datetime_obj = isoparse(container_time)
389+
curr_host_time = datetime.datetime.now()
390+
391+
# Calculate the time difference between the container time and the host time
392+
# The time difference is in seconds
393+
time_diff = (curr_host_time - t_container_datetime_obj.replace(tzinfo=None)).total_seconds()
394+
break
395+
except Exception as e:
396+
logging.error(f"Failed to get container deploy time offset: {e}")
397+
398+
return time_diff

python/fedml/computing/scheduler/comm_utils/job_monitor.py

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import os
44
import time
55
import traceback
6+
import datetime
67

78
import re
8-
from datetime import datetime
99
from dateutil.parser import isoparse
1010

1111
from urllib.parse import urlparse
@@ -1056,6 +1056,13 @@ def monitor_endpoint_logs(self):
10561056
endpoint_logs = ContainerUtils.get_instance().get_container_logs(endpoint_container_name,
10571057
timestamps=True)
10581058

1059+
# Sync Time by setting the offset
1060+
if endpoint_logs is not None:
1061+
t_sec_offset = ContainerUtils.get_instance().get_container_deploy_time_offset(
1062+
endpoint_container_name)
1063+
self.replica_log_channels[job.job_id][job.edge_id][i]["deploy_container_t_offset"] = (
1064+
t_sec_offset)
1065+
10591066
if (endpoint_logs is None or endpoint_logs == "\n" or endpoint_logs == "\r\n" or
10601067
endpoint_logs == "\r" or endpoint_logs == "" or endpoint_logs == " "):
10611068
continue
@@ -1066,13 +1073,26 @@ def monitor_endpoint_logs(self):
10661073
with open(log_file_path, "a") as f:
10671074
line_of_logs = endpoint_logs.split("\n")
10681075

1076+
# Add NTP offset
1077+
channel_info = self.replica_log_channels[job.job_id][job.edge_id][i]
1078+
t_sec_offset = channel_info.get("deploy_container_t_offset", None)
1079+
10691080
for line in line_of_logs:
10701081
if line == "":
10711082
continue
10721083

1073-
container_time = line.split(" ")[0]
1074-
nano_second_str = container_time.split(".")[1][:9]
1075-
t_datetime_obj = isoparse(container_time)
1084+
try:
1085+
container_time = line.split(" ")[0]
1086+
nano_second_str = container_time.split(".")[1][:9]
1087+
t_datetime_obj = isoparse(container_time)
1088+
1089+
if t_sec_offset is not None:
1090+
t_datetime_obj = t_datetime_obj + datetime.timedelta(seconds=t_sec_offset)
1091+
except Exception as e:
1092+
logging.error(f"Exception when parsing the container log time {e}")
1093+
t_datetime_obj = datetime.datetime.now()
1094+
nano_second_str = "000000000"
1095+
10761096
t_sec = t_datetime_obj.strftime("%a, %d %b %Y %H:%M:%S")
10771097
t_nano_sec = f"[{t_sec}.{nano_second_str}]"
10781098

python/fedml/computing/scheduler/model_scheduler/device_model_db.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,12 @@ def open_job_db(self):
272272
except Exception as e:
273273
pass
274274

275+
try:
276+
# Also for current_latency = Column(Float)
277+
self.db_connection.execute(text("ALTER TABLE end_point_metrics ADD current_latency FLOAT default 1;"))
278+
except Exception as e:
279+
pass
280+
275281
def close_job_db(self):
276282
if self.db_connection is not None:
277283
self.db_connection.close()

python/fedml/computing/scheduler/model_scheduler/device_model_inference.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,16 @@ async def send_inference_request(idle_device, endpoint_id, inference_url, input_
282282
http_infer_available = False
283283

284284
if http_infer_available:
285-
response_ok = await FedMLHttpInference.is_inference_ready(inference_url, timeout=5)
285+
response_ok = await FedMLHttpInference.is_inference_ready(
286+
inference_url, timeout=os.getenv("FEDML_GATEWAY_HTTP_READY_TIMEOUT", 20))
286287
if response_ok:
287288
response_ok, inference_response = await FedMLHttpInference.run_http_inference_with_curl_request(
288289
inference_url, input_list, output_list, inference_type=inference_type)
289290
logging.info(f"Use http inference. return {response_ok}")
290291
return inference_response
291292

292-
response_ok = await FedMLHttpProxyInference.is_inference_ready(inference_url, timeout=10)
293+
response_ok = await FedMLHttpProxyInference.is_inference_ready(
294+
inference_url, timeout=os.getenv("FEDML_GATEWAY_HTTP_PROXY_READY_TIMEOUT", 20))
293295
if response_ok:
294296
response_ok, inference_response = await FedMLHttpProxyInference.run_http_proxy_inference_with_request(
295297
endpoint_id, inference_url, input_list, output_list, inference_type=inference_type)

0 commit comments

Comments
 (0)