-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathchain.py
45 lines (33 loc) · 892 Bytes
/
chain.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
import nextmv
from nextpipe import FlowSpec, app, needs, step
# >>> Workflow definition
class Flow(FlowSpec):
@step
def prepare(input: dict):
"""Prepares the data."""
input["prepared"] = True
return input
@app(app_id="echo")
@needs(predecessors=[prepare])
@step
def solve():
"""Runs the model."""
pass
@needs(predecessors=[solve])
@step
def enhance(result: dict):
"""Enhances the result."""
output = result["solution"] # Unwrap the solution
output["echo"]["data"]["enhanced"] = True
return output
def main():
# Load input data
input = nextmv.load_local()
# Run workflow
flow = Flow("DecisionFlow", input.data)
flow.run()
# Write out the result
print(json.dumps(flow.get_result(flow.enhance)))
if __name__ == "__main__":
main()