diff --git a/examples/pipeline-chain/requirements.txt b/examples/pipeline-chain/requirements.txt index 7162272..d330bdb 100644 --- a/examples/pipeline-chain/requirements.txt +++ b/examples/pipeline-chain/requirements.txt @@ -1 +1 @@ -nextpipe==0.1.5 +nextpipe==0.2.0 diff --git a/examples/pipeline-complex/requirements.txt b/examples/pipeline-complex/requirements.txt index 7162272..d330bdb 100644 --- a/examples/pipeline-complex/requirements.txt +++ b/examples/pipeline-complex/requirements.txt @@ -1 +1 @@ -nextpipe==0.1.5 +nextpipe==0.2.0 diff --git a/examples/pipeline-ensemble/requirements.txt b/examples/pipeline-ensemble/requirements.txt index 7162272..d330bdb 100644 --- a/examples/pipeline-ensemble/requirements.txt +++ b/examples/pipeline-ensemble/requirements.txt @@ -1 +1 @@ -nextpipe==0.1.5 +nextpipe==0.2.0 diff --git a/examples/pipeline-foreach/requirements.txt b/examples/pipeline-foreach/requirements.txt index 7162272..d330bdb 100644 --- a/examples/pipeline-foreach/requirements.txt +++ b/examples/pipeline-foreach/requirements.txt @@ -1 +1 @@ -nextpipe==0.1.5 +nextpipe==0.2.0 diff --git a/examples/pipeline-preprocess/requirements.txt b/examples/pipeline-preprocess/requirements.txt index 7162272..d330bdb 100644 --- a/examples/pipeline-preprocess/requirements.txt +++ b/examples/pipeline-preprocess/requirements.txt @@ -1 +1 @@ -nextpipe==0.1.5 +nextpipe==0.2.0 diff --git a/nextpipe/__about__.py b/nextpipe/__about__.py index c606576..fc0dbfe 100644 --- a/nextpipe/__about__.py +++ b/nextpipe/__about__.py @@ -1 +1 @@ -__version__ = "v0.1.5" +__version__ = "v0.2.0" diff --git a/nextpipe/config.py b/nextpipe/config.py index 7abc2fd..e21bad2 100644 --- a/nextpipe/config.py +++ b/nextpipe/config.py @@ -1,24 +1,8 @@ -from dataclasses import dataclass, field +from dataclasses import dataclass from dataclasses_json import dataclass_json -@dataclass_json -@dataclass -class AppPollingOptions: - """Options for polling the platform for the status of an app.""" - - timeout: float = 1800 - """ - Timeout in seconds for polling the platform. - This is used for example when waiting for results of an app run. - """ - max_backoff: float = 30 - """ - Maximum backoff time in seconds. - """ - - @dataclass_json @dataclass class Configuration: @@ -36,7 +20,3 @@ class Configuration: of inputs used when a step has multiple predecessors which are themselves repeated or foreach steps. """ - app_polling: AppPollingOptions = field(default_factory=AppPollingOptions) - """ - Options for polling the platform for the status of an app. - """ diff --git a/nextpipe/decorators.py b/nextpipe/decorators.py index a12cf18..2a75167 100644 --- a/nextpipe/decorators.py +++ b/nextpipe/decorators.py @@ -1,6 +1,9 @@ +import typing +from collections.abc import Callable from enum import Enum from functools import wraps -from typing import Callable + +from nextmv import cloud from . import utils @@ -68,6 +71,29 @@ def is_join(self): def step(function): + """ + Decorator to mark a function as a step in the pipeline. This is the most + basic decorator. This decorator doesn’t require any parameters or the use + of parentheses. + + Example + ------- + A simple example shows that a step is executed. + ``` + from nextpipe import FlowSpec, log, step + + + class Flow(FlowSpec): + @step + def my_step() -> None: + log("Some code is executed here") + + + flow = Flow("DecisionFlow", None) + flow.run() + ``` + """ + @wraps(function) def wrapper(*args, **kwargs): utils.log_internal(f"Entering {function.__name__}") @@ -81,14 +107,53 @@ def wrapper(*args, **kwargs): class Needs: - def __init__(self, predecessors: list[callable]): + def __init__(self, predecessors: list[Callable]): self.predecessors = predecessors def __repr__(self): return f"StepNeeds({','.join([p.step.get_id() for p in self.predecessors])})" -def needs(predecessors: list[callable]): +def needs(predecessors: list[Callable]): + """ + Decorator to mark the predecessors of a step. This is used to + determine the order in which the steps are executed. The predecessors + are the steps that need to be executed before this actual step can be + run. + + Parameters + ---------- + predecessors : list[Callable] + The list of predecessors + + Example + ------- + In this example steps `step1` and `step2` are executed before `step3`. + + ``` + from nextpipe import FlowSpec, log, needs, step + + + class Flow(FlowSpec): + @step + def step1() -> None: + log("Execute step 1") + + @step + def step2() -> None: + log("Execute step 2") + + @needs(predecessors=[step1, step2]) + @step + def step3() -> None: + log("Execute step 3 after steps 1 and 2") + + + flow = Flow("DecisionFlow", None) + flow.run() + ``` + """ + def decorator(function): function.step.needs = Needs(predecessors) return function @@ -105,6 +170,42 @@ def __repr__(self): def optional(condition: Callable[[Step], bool]): + """ + Decorator to mark a step as optional. This is used to determine + whether the step should be executed or not. The condition is a + callable that takes the step as an argument and returns a boolean + indicating whether the step should be executed or not. + The condition is evaluated at runtime, so it can depend on the + runtime state of the pipeline. + + Parameters + ---------- + condition : Callable[[Step], bool] + The condition to evaluate. This is a callable that takes the step + as an argument and returns a boolean indicating whether the step + should be executed or not. + + Example + ------- + In this example the step `step1` is executed given that the condition is + true. + + ``` + from nextpipe import FlowSpec, log, optional, step + + + class Flow(FlowSpec): + @optional(condition=lambda step: step.get_id() == "step1") + @step + def step1() -> None: + log("Execute optional step 1") + + + flow = Flow("DecisionFlow", None) + flow.run() + ``` + """ + def decorator(function): function.step.optional = Optional(condition) return function @@ -121,6 +222,35 @@ def __repr__(self): def repeat(repetitions: int): + """ + Decorator to make a step be repeated a number of times. The number of + repetitions determines how many times the step will be run. + + Parameters + ---------- + repetitions : int + The number of times to repeat the step. + + Example + ------- + In this example the step `step1` is repeated 3 times. + + ``` + from nextpipe import FlowSpec, log, repeat, step + + + class Flow(FlowSpec): + @repeat(repetitions=3) + @step + def step1() -> None: + log("Hello, world.") + + + flow = Flow("DecisionFlow", None) + flow.run() + ``` + """ + def decorator(function): function.step.repeat = Repeat(repetitions) return function @@ -137,6 +267,43 @@ def __repr__(self): def foreach(f: Callable = None): + """ + Decorator to perform a "fanout", which means creating multiple parallel + steps out of a single step. The function that is decorated should return a + list of some sort. Each element of the list is consumed as an input by the + successor step. When using this decorator, use parentheses without any + parameters. + + Parameters + ---------- + None. Use this decorator with no parameters. + + Example + ------- + In this example the step `step2` is executed for each element in the list + returned by `step1`. The input to `step2` is the element of the list. + + ``` + from nextpipe import FlowSpec, foreach, log, needs, step + + + class Flow(FlowSpec): + @foreach() + @step + def step1() -> list[dict[str, any]]: + return [{"input": 1}, {"input": 2}, {"input": 3}] + + @needs(predecessors=[step1]) + @step + def step2(data: dict) -> None: + log(data) + + + flow = Flow("DecisionFlow", None) + flow.run() + ``` + """ + def decorator(function): function.step.foreach = Foreach() return function @@ -153,6 +320,49 @@ def __repr__(self): def join(f: Callable = None): + """ + Decorator to perform a "join", which means collecting the results of + multiple parallel predecessor steps into a single step. The outputs of the + predecessor steps should be received as a list. The order of the elements + in the list is the same as the order of the predecessor steps. Unpack the + list to obtain the results and perform processing on them as needed. When + using this decorator, use parentheses without any parameters. + + Parameters + ---------- + None. Use this decorator with no parameters. + + Example + ------- + In this example the step `step3` is executed after `step1` and `step2`. + The input to `step3` is a list containing the outputs of `step1` and + `step2`. + + ``` + from nextpipe import FlowSpec, join, log, needs, step + + + class Flow(FlowSpec): + @step + def step1() -> dict[str, any]: + return {"input": 1} + + @step + def step2() -> dict[str, any]: + return {"input": 2} + + @join() + @needs(predecessors=[step1, step2]) + @step + def step3(data: list[dict[str, any]]) -> None: + log(data) + + + flow = Flow("DecisionFlow", None) + flow.run() + ``` + """ + def decorator(function): function.step.join = Join() return function @@ -160,6 +370,10 @@ def decorator(function): return decorator +_DEFAULT_POLLING_OPTIONS: cloud.PollingOptions = cloud.PollingOptions() +"""Default polling options to use when polling for a run result.""" + + class App: def __init__( self, @@ -168,12 +382,14 @@ def __init__( input_type: InputType = InputType.JSON, parameters: dict[str, any] = None, full_result: bool = False, + polling_options: typing.Optional[cloud.PollingOptions] = _DEFAULT_POLLING_OPTIONS, ): self.app_id = app_id self.instance_id = instance_id self.parameters = parameters if parameters else {} self.input_type = input_type self.full_result = full_result + self.polling_options = polling_options def __repr__(self): return f"StepRun({self.app_id}, {self.instance_id}, {self.parameters}, {self.input_type}, {self.full_result})" @@ -185,7 +401,77 @@ def app( parameters: dict[str, any] = None, input_type: InputType = InputType.JSON, full_result: bool = False, + polling_options: typing.Optional[cloud.PollingOptions] = _DEFAULT_POLLING_OPTIONS, ): + """ + Decorator to mark a step as a Nextmv Application (external application) + step. If this decorator is used, an external application will be run, using + the specified parameters. You need to have a valid Nextmv account and + Application before you can use this decorator. Make sure the + `NEXTMV_API_KEY` environment variable is set as well. + + Parameters + ---------- + app_id : str + The ID of the application to run. + instance_id : str + The ID of the instance to run. Default is "devint". + parameters : dict[str, any] + The parameters to pass to the application. This is a dictionary of + parameter names and values. The values must be JSON serializable. + input_type : InputType + The type of input to pass to the application. This can be either + JSON or FILES. Default is JSON. + full_result : bool + Whether to return the full result of the application run. If this is + set to `True`, the full result (with metadata) will be returned. If + this is set to `False`, only the output of the application will be + returned. + polling_options : Optional[cloud.PollingOptions] + Options for polling for the results of the app run. This is used to + configure the polling behavior, such as the timeout and backoff + options. Default (or when undefined) is the predefined options in the + class itself. Please note that the `.initial_delay` attribute will be + overridden internally, as a strategy to stagger multiple parallel runs + and avoid overloading the Platform. + + Example + ------- + In this example the step `pre_process` is executed first. After + pre-processing is completed, the result is passed to the `solve` step. This + step runs a Nextmv Application with the ID `echo`. The result of the + application run is passed to the final step `post_process`, which + post-processes the result. + ``` + from nextpipe import FlowSpec, app, log, needs, step + + + class Flow(FlowSpec): + @step + def pre_process(input: dict[str, any]) -> dict[str, any]: + log("You can pre-process your data here.") + return input + + @app(app_id="echo") + @needs(predecessors=[pre_process]) + @step + def solve(): + pass + + @needs(predecessors=[solve]) + @step + def post_process(result: dict[str, any]) -> dict[str, any]: + log("You can post-process your data here.") + return result + + + data = {"foo": "bar"} + flow = Flow("DecisionFlow", data) + flow.run() + log(flow.get_result(flow.post_process)) + ``` + """ + def decorator(function): @wraps(function) def wrapper(*args, **kwargs): @@ -202,6 +488,7 @@ def wrapper(*args, **kwargs): parameters=converted_parameters, input_type=input_type, full_result=full_result, + polling_options=polling_options, ) wrapper.step.type = StepType.APP diff --git a/nextpipe/flow.py b/nextpipe/flow.py index 4438655..b87a220 100644 --- a/nextpipe/flow.py +++ b/nextpipe/flow.py @@ -1,16 +1,19 @@ import ast import base64 +import copy import inspect import io +import random import threading import time from importlib.metadata import version from itertools import product from typing import Any, Optional, Union -from nextmv.cloud import Application, Client, StatusV2 +from nextmv.cloud import Application, Client from . import config, decorators, graph, schema, threads, uplink, utils +from .__about__ import __version__ STATUS_PENDING = "pending" STATUS_RUNNING = "running" @@ -218,7 +221,7 @@ def _to_uplink_dto(self) -> uplink.FlowUpdateDTO: def __debug_print(self): utils.log_internal(f"Flow: {self.flow_spec.__class__.__name__}") - utils.log_internal(f"nextpipe: {version('nextpipe')}") + utils.log_internal(f"nextpipe: {__version__}") utils.log_internal(f"nextmv: {version('nextmv')}") utils.log_internal("Flow graph steps:") for step in self.steps: @@ -355,34 +358,46 @@ def __run_step(node: FlowNode, inputs: list[object], client: Client) -> Union[li # If the input is not AppRunConfig, we use it directly. input = inputs[0] options = app_step.parameters + + # Modify the polling options set for the step (by default or by the + # user) so that the initial delay is randomized and the stopping + # callback is configured as the node being cancelled if the user + # doesn’t want to override it. + polling_options = copy.deepcopy(app_step.polling_options) + delay = random.uniform(0, 5) # For lack of a better idea... + polling_options.initial_delay = delay + if polling_options.stop is None: + polling_options.stop = lambda: node.cancel + run_args = ( [], # No nameless arguments { # We use the named arguments to pass the user arguments to the run function "input": input, - "options": options, + "run_options": options, + "polling_options": polling_options, }, ) + # Prepare the application itself. app = Application( client=client, id=app_step.app_id, default_instance_id=app_step.instance_id, ) + # Run the application - run_id = app.new_run(*run_args[0], **run_args[1]) + result = app.new_run_with_result( + *run_args[0], + **run_args[1], + ) + run_id = result.id node.run_id = run_id - output = utils.wait_for_runs( - app=app, - run_ids=[run_id], - stop_waiting=lambda: node.cancel, - )[0] - # Check if all runs were successful - if output.metadata.status_v2 != StatusV2.succeeded: - raise Exception(f"Node {node.id} failed with status {output.metadata.status_v2}: {output.error_log}") + # Return result (do not unwrap if full result is requested) if app_step.full_result: - return output - return output.output + return result + return result.output + else: spec = inspect.getfullargspec(node.parent.definition.function) if len(spec.args) == 0: diff --git a/nextpipe/threads.py b/nextpipe/threads.py index 90eb2ad..4c2b178 100644 --- a/nextpipe/threads.py +++ b/nextpipe/threads.py @@ -1,6 +1,7 @@ import multiprocessing import threading -from typing import Callable, Optional +from collections.abc import Callable +from typing import Optional thread_local = threading.local() diff --git a/nextpipe/utils.py b/nextpipe/utils.py index 4a81751..d39021a 100644 --- a/nextpipe/utils.py +++ b/nextpipe/utils.py @@ -1,14 +1,9 @@ import ast import importlib import inspect -import random import sys import threading -import time from functools import wraps -from typing import Callable - -from nextmv.cloud import Application, RunResult, StatusV2 THREAD_NAME_PREFIX = "nextpipe-" @@ -59,58 +54,6 @@ def convert_to_string_values(input_dict: dict[str, any]) -> dict[str, str]: return {key: str(value) for key, value in input_dict.items()} -_INFINITE_TIMEOUT = sys.maxsize - - -def wait_for_runs( - app: Application, - run_ids: list[str], - timeout: float = _INFINITE_TIMEOUT, - max_backoff: float = 30, - stop_waiting: Callable[[], bool] = lambda: False, -) -> list[RunResult]: - """ - Wait until all runs with the given IDs are finished. - """ - # Wait until all runs are finished or the timeout is reached - jitter = random.random() * 2.0 - missing = set(run_ids) - backoff = 2.0 + jitter # With base and jitter we aim for a backoff start between 2 and 4 seconds - next_check = time.time() + backoff # First check with some delay as external runs are not that fast - internal_poll_interval = 0.5 - start_time = time.time() - while missing and time.time() - start_time < timeout: - # Check if the user wants to stop waiting - if stop_waiting(): - raise RuntimeError("The job was canceled.") - - # Check whether it is time to check the status of the runs. This allows quicker - # early termination if cancelled. - time.sleep(internal_poll_interval) - now = time.time() - if now < next_check: - continue - backoff = min(backoff * 2, max_backoff) - next_check = now + backoff - - # Check if all runs are finished - for run_id in missing.copy(): - run_info = app.run_metadata(run_id=run_id) - if run_info.metadata.status_v2 == StatusV2.succeeded: - missing.remove(run_id) - continue - if run_info.metadata.status_v2 in [ - StatusV2.failed, - StatusV2.canceled, - ]: - raise RuntimeError(f"Run {run_id} {run_info.metadata.status_v2}: {run_info.metadata.error}") - - if missing: - raise TimeoutError(f"Timeout of {timeout} seconds reached while waiting.") - - return [app.run_result(run_id=run_id) for run_id in run_ids] - - def __is_running_in_notebook(): """ Check if the code is running in a Jupyter notebook. diff --git a/pyproject.toml b/pyproject.toml index 01f3025..46a20ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,7 +17,7 @@ classifiers = [ ] dependencies = [ "requests>=2.31.0", - "nextmv>=0.20.1", + "nextmv>=0.22.0", "dataclasses-json>=0.6.7", ] description = "Framework for Decision Pipeline modeling and execution" @@ -70,6 +70,6 @@ path = "nextpipe/__about__.py" [project.optional-dependencies] dev = [ - "ruff>=0.6.4", + "ruff>=0.11.6", "goldie>=0.1.7", ]