-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcommunication.py
74 lines (59 loc) · 2.44 KB
/
communication.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import time
from threading import Lock
from typing import Optional
from serial import Serial
class LockingSerial:
"""
Serial connection with a lock object for use across multiple objects.
"""
def __init__(
self,
connection: Serial,
throughput_step_size: float
):
"""
Initialize the serial connection.
:param connection: Serial connection.
:param throughput_step_size: Step size in (0.0, 1.0] used to estimate throughput. Smaller step sizes create less
variance but more lag in the estimate, whereas larger step sizes create more variance but less lag.
"""
self.connection = connection
self.throughput_step_size = throughput_step_size
self.lock = Lock()
self.throughput_time_epoch_seconds: Optional[float] = None
self.bytes_read_per_second = 0.0
self.bytes_written_per_second = 0.0
def write_then_read(
self,
data: bytes,
read_length: int
) -> bytes:
"""
Write bytes and then read response.
:param data: Bytes to write.
:param read_length: Number of bytes to read.
:return: Bytes that were read.
"""
with self.lock:
self.connection.write(data)
if read_length == 0:
bytes_read = bytes()
else:
bytes_read = self.connection.read(read_length)
num_bytes_read = len(bytes_read)
if num_bytes_read != read_length:
raise ValueError(f'Expected to read {read_length} byte(s) but read {num_bytes_read}.')
# update throughput estimates
current_time_epoch_seconds = time.time()
if self.throughput_time_epoch_seconds is not None:
elapsed_seconds = current_time_epoch_seconds - self.throughput_time_epoch_seconds
self.bytes_written_per_second = (
(1.0 - self.throughput_step_size) * self.bytes_written_per_second +
self.throughput_step_size * (len(data) / elapsed_seconds)
)
self.bytes_read_per_second = (
(1.0 - self.throughput_step_size) * self.bytes_read_per_second +
self.throughput_step_size * (num_bytes_read / elapsed_seconds)
)
self.throughput_time_epoch_seconds = current_time_epoch_seconds
return bytes_read