-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtest_uplink.py
103 lines (92 loc) · 2.48 KB
/
test_uplink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import random
import time
import unittest
import nextmv.cloud
from nextpipe import FlowSpec, app, needs, step
from nextpipe.uplink import FlowDTO, FlowUpdateDTO, NodeDTO, StepDTO, UplinkClient
class Flow(FlowSpec):
@step
def prepare(input: dict):
"""Prepares the data."""
return input
@app(app_id="echo")
@needs(predecessors=[prepare])
@step
def solve():
"""Runs the model."""
pass
@needs(predecessors=[solve])
@step
def enhance(result: dict):
"""Enhances the result."""
return result
def _create_example_flow() -> FlowUpdateDTO:
steps = [
StepDTO(
id="prepare",
app_id=None,
docs="Prepares the data.",
predecessors=[],
),
StepDTO(
id="solve",
app_id="echo",
docs="Runs the model.",
predecessors=["prepare"],
),
StepDTO(
id="enhance",
app_id=None,
predecessors=["solve"],
),
]
nodes = [
NodeDTO(
id="prepare_0",
parent_id="prepare",
predecessor_ids=[],
status="succeeded",
run_id=None,
),
NodeDTO(
id="solve_0",
parent_id="solve",
predecessor_ids=["prepare_0"],
status="succeeded",
run_id="run-123",
),
NodeDTO(
id="solve_1",
parent_id="solve",
predecessor_ids=["prepare_0"],
status="succeeded",
run_id="run-124",
),
NodeDTO(
id="enhance_0",
parent_id="enhance",
predecessor_ids=["solve_0", "solve_1"],
status="succeeded",
run_id=None,
),
]
flow = FlowUpdateDTO(
pipeline_graph=FlowDTO(steps=steps, nodes=nodes),
updated_at="2023-10-01T12:00:00Z",
)
return flow
class TestLogger(unittest.TestCase):
def test_no_uplink(self):
flow = _create_example_flow()
client = nextmv.cloud.Client(
api_key="unavailable",
max_retries=0,
url=f"https://unavailable.url/{random.randint(0, 1000)}",
)
# Make sure that unavailable uplink connection does not break a run.
uplink = UplinkClient(client=client, config=None)
uplink.run_async()
uplink.submit_update(flow)
time.sleep(0.5)
uplink.terminate()
time.sleep(0.5)