From 73845ade35e36301e2dc7e8d7173fa0cbdfa41b5 Mon Sep 17 00:00:00 2001 From: Louis Mandel Date: Thu, 26 Jun 2025 09:43:12 -0400 Subject: [PATCH 1/3] refactor: make the event loop thread part of the interpreter state (#990) Signed-off-by: Louis Mandel --- src/pdl/pdl_granite_io.py | 8 +++---- src/pdl/pdl_interpreter.py | 14 +++++++++++-- src/pdl/pdl_llms.py | 43 ++++++++++++-------------------------- src/pdl/pdl_scheduler.py | 14 +++++++++++++ 4 files changed, 42 insertions(+), 37 deletions(-) diff --git a/src/pdl/pdl_granite_io.py b/src/pdl/pdl_granite_io.py index 6b31dbcbe..4f14dca6e 100644 --- a/src/pdl/pdl_granite_io.py +++ b/src/pdl/pdl_granite_io.py @@ -1,5 +1,5 @@ # pylint: disable=import-outside-toplevel -from asyncio import run_coroutine_threadsafe +from asyncio import AbstractEventLoop, run_coroutine_threadsafe from typing import Any, Optional from granite_io.types import ChatCompletionInputs @@ -13,7 +13,6 @@ PDLRuntimeError, ) from .pdl_lazy import PdlConst, PdlLazy, lazy_apply -from .pdl_llms import _LOOP from .pdl_utils import value_of_expr @@ -113,15 +112,14 @@ async def async_generate_text( @staticmethod def generate_text( - block: GraniteioModelBlock, - messages: ModelInput, + block: GraniteioModelBlock, messages: ModelInput, event_loop: AbstractEventLoop ) -> tuple[LazyMessage, PdlLazy[Any]]: future = run_coroutine_threadsafe( GraniteioModel.async_generate_text( block, messages, ), - _LOOP, + event_loop, ) pdl_future: PdlLazy[tuple[dict[str, Any], Any]] = PdlConst(future) message = lazy_apply((lambda x: x[0]), pdl_future) diff --git a/src/pdl/pdl_interpreter.py b/src/pdl/pdl_interpreter.py index 9b1fee1b3..85fe87041 100644 --- a/src/pdl/pdl_interpreter.py +++ b/src/pdl/pdl_interpreter.py @@ -10,6 +10,7 @@ # TODO: temporarily disabling warnings to mute a pydantic warning from liteLLM import warnings +from asyncio import AbstractEventLoop from functools import partial from os import getenv @@ -31,7 +32,7 @@ ) from jinja2.nodes import TemplateData # noqa: E402 from jinja2.runtime import Undefined # noqa: E402 -from pydantic import BaseModel # noqa: E402 +from pydantic import BaseModel, ConfigDict, Field # noqa: E402 from .pdl_ast import ( # noqa: E402 AdvancedBlockType, @@ -110,7 +111,11 @@ from .pdl_location_utils import append, get_loc_string # noqa: E402 from .pdl_parser import PDLParseError, parse_file, parse_str # noqa: E402 from .pdl_python_repl import PythonREPL # noqa: E402 -from .pdl_scheduler import yield_background, yield_result # noqa: E402 +from .pdl_scheduler import ( # noqa: E402 + create_event_loop_thread, + yield_background, + yield_result, +) from .pdl_schema_utils import get_json_schema # noqa: E402 from .pdl_schema_validator import type_check_args, type_check_spec # noqa: E402 from .pdl_utils import ( # noqa: E402 @@ -127,6 +132,8 @@ class InterpreterState(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + yield_result: bool = False yield_background: bool = False batch: int = 1 @@ -136,6 +143,7 @@ class InterpreterState(BaseModel): cwd: Path = Path.cwd() # background_tasks = {} id_stack: list[str] = [] + event_loop: AbstractEventLoop = Field(default_factory=create_event_loop_thread) def with_yield_result(self: "InterpreterState", b: bool) -> "InterpreterState": return self.model_copy(update={"yield_result": b}) @@ -1638,6 +1646,7 @@ def generate_client_response_single( model_id=value_of_expr(block.model), messages=model_input, parameters=litellm_parameters_to_dict(parameters), + event_loop=state.event_loop, ) case GraniteioModelBlock(): from .pdl_granite_io import GraniteioModel @@ -1645,6 +1654,7 @@ def generate_client_response_single( message, response = GraniteioModel.generate_text( block=block, messages=model_input, + event_loop=state.event_loop, ) case _: assert False diff --git a/src/pdl/pdl_llms.py b/src/pdl/pdl_llms.py index f9d569654..ec78c1272 100644 --- a/src/pdl/pdl_llms.py +++ b/src/pdl/pdl_llms.py @@ -1,10 +1,8 @@ # pylint: disable=import-outside-toplevel -import asyncio -import threading -from concurrent.futures import Future +from asyncio import AbstractEventLoop, run_coroutine_threadsafe from os import environ from sys import stderr -from typing import Any, Callable, Generator, Optional, TypeVar +from typing import Any, Generator, Optional, TypeVar import httpx from dotenv import load_dotenv @@ -25,19 +23,6 @@ load_dotenv() -def _start_background_loop(loop): - asyncio.set_event_loop(loop) - loop.run_forever() - - -_LOOP = asyncio.new_event_loop() -_LOOP_THREAD = threading.Thread( - target=_start_background_loop, args=(_LOOP,), daemon=True -) -_LOOP_THREAD.start() -# _BACKGROUND_TASKS = set() - - class LitellmModel: @staticmethod async def async_generate_text( @@ -88,21 +73,19 @@ def generate_text( model_id: str, messages: ModelInput, parameters: dict[str, Any], + event_loop: AbstractEventLoop, ) -> tuple[LazyMessage, PdlLazy[Any]]: if "PDL_VERBOSE_ASYNC" in environ: print(f"Asynchronous model call started to {model_id}", file=stderr) - # global _BACKGROUND_TASKS - future = asyncio.run_coroutine_threadsafe( + future = run_coroutine_threadsafe( LitellmModel.async_generate_text( block, model_id, messages, parameters, ), - _LOOP, + event_loop, ) - # _BACKGROUND_TASKS.add(future) - # future.add_done_callback(_BACKGROUND_TASKS.discard) pdl_future: PdlLazy[tuple[dict[str, Any], Any]] = PdlConst(future) message = lazy_apply((lambda x: x[0]), pdl_future) response = lazy_apply((lambda x: x[1]), pdl_future) @@ -213,13 +196,13 @@ def set_structured_decoding_parameters( MapOutputT = TypeVar("MapOutputT") -def map_future( - f: Callable[[MapInputT], MapOutputT], x: Future[MapInputT] -) -> Future[MapOutputT]: - future = asyncio.run_coroutine_threadsafe(_async_call(f, x), _LOOP) - return future +# def map_future( +# f: Callable[[MapInputT], MapOutputT], x: Future[MapInputT] +# ) -> Future[MapOutputT]: +# future = asyncio.run_coroutine_threadsafe(_async_call(f, x), _LOOP) +# return future -async def _async_call(f, x): - v = x.result() - return f(v) +# async def _async_call(f, x): +# v = x.result() +# return f(v) diff --git a/src/pdl/pdl_scheduler.py b/src/pdl/pdl_scheduler.py index 2ce0d0d56..5e126ae7f 100644 --- a/src/pdl/pdl_scheduler.py +++ b/src/pdl/pdl_scheduler.py @@ -1,3 +1,5 @@ +from asyncio import AbstractEventLoop, new_event_loop, set_event_loop +from threading import Thread from typing import Any, Optional from termcolor import colored @@ -6,6 +8,18 @@ from .pdl_utils import stringify +def _start_background_loop(loop): + set_event_loop(loop) + loop.run_forever() + + +def create_event_loop_thread() -> AbstractEventLoop: + loop = new_event_loop() + loop_thread = Thread(target=_start_background_loop, args=(loop,), daemon=True) + loop_thread.start() + return loop + + def color_of(kind: BlockKind): color: Optional[str] match kind: From 4ca39e0d5c14392872053e80339cfc55f70bf48c Mon Sep 17 00:00:00 2001 From: Louis Mandel Date: Fri, 27 Jun 2025 22:32:17 -0400 Subject: [PATCH 2/3] fix: contribute directly a value to the context (#991) Signed-off-by: Louis Mandel --- src/pdl/pdl_context.py | 61 ++++++++++++++++++++++---------------- src/pdl/pdl_interpreter.py | 2 +- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/src/pdl/pdl_context.py b/src/pdl/pdl_context.py index 941df2abd..be9648f02 100644 --- a/src/pdl/pdl_context.py +++ b/src/pdl/pdl_context.py @@ -40,8 +40,11 @@ def __mul__(self, value: "PDLContext"): class SingletonContext(PDLContext): message: PdlLazy[dict[str, Any]] - def __init__(self, message: PdlLazy[dict[str, Any]]): - self.message = message + def __init__(self, message: PdlLazy[dict[str, Any]] | dict[str, Any]): + if isinstance(message, PdlLazy): + self.message = message + else: + self.message = PdlConst(message) def serialize(self, mode: SerializeMode) -> list[dict[str, Any]]: result = self.message.result() @@ -60,20 +63,23 @@ def __repr__(self): # pyright: ignore class IndependentContext(PDLContext): context: PdlLazy[list[PDLContext]] - def __init__(self, context: list[PDLContext]): + def __init__(self, context: list[PDLContext | dict[str, Any]]): ret: list[PDLContext] = [] for item in context: - if isinstance(item, IndependentContext): - ret += item.context.data - elif isinstance(item, SingletonContext): - ret += [item] - elif isinstance(item, DependentContext) and len(item) == 0: - pass - else: - # Not all elements of the list are Independent, so return - self.context = PdlList(context) - return - # All elements of the list are Independent + match item: + case IndependentContext(): + ret += item.context.data + case SingletonContext(): + ret += [item] + case DependentContext(): + if len(item) == 0: + pass + else: + ret += [item] + case dict(): + ret += [SingletonContext(item)] + case _: + assert False self.context = PdlList(ret) def serialize(self, mode: SerializeMode) -> list[dict[str, Any]]: @@ -99,20 +105,23 @@ def __repr__(self): # pyright: ignore class DependentContext(PDLContext): context: PdlLazy[list[PDLContext]] - def __init__(self, context: list[PDLContext]): + def __init__(self, context: list[PDLContext | dict[str, Any]]): ret: list[PDLContext] = [] for item in context: - if isinstance(item, DependentContext): - ret += item.context.data - elif isinstance(item, SingletonContext): - ret += [item] - elif isinstance(item, IndependentContext) and len(item) == 0: - pass - else: - # Not all elements of the list are Dependent, so return - self.context = PdlList(context) - return - # All elements of the list are Dependent + match item: + case DependentContext(): + ret += item.context.data + case SingletonContext(): + ret += [item] + case IndependentContext(): + if len(item) == 0: + pass + else: + ret += [item] + case dict(): + ret += [SingletonContext(item)] + case _: + assert False self.context = PdlList(ret) def serialize(self, mode: SerializeMode) -> list[dict[str, Any]]: diff --git a/src/pdl/pdl_interpreter.py b/src/pdl/pdl_interpreter.py index 85fe87041..d5b0f76e5 100644 --- a/src/pdl/pdl_interpreter.py +++ b/src/pdl/pdl_interpreter.py @@ -485,7 +485,7 @@ def process_advanced_block( background = DependentContext([]) contribute_value, trace = process_contribute(trace, new_scope, loc) if contribute_value is not None: - background = DependentContext([contribute_value]) + background = DependentContext(contribute_value) return result, background, new_scope, trace From 0e12c98eb15201bb741f783d416b2fb8549e9290 Mon Sep 17 00:00:00 2001 From: Louis Mandel Date: Mon, 30 Jun 2025 11:00:15 -0400 Subject: [PATCH 3/3] chore: bump ui to 0.7.1 (#992) Signed-off-by: Louis Mandel --- pdl-live-react/package-lock.json | 4 ++-- pdl-live-react/package.json | 2 +- pdl-live-react/src-tauri/Cargo.lock | 2 +- pdl-live-react/src-tauri/Cargo.toml | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pdl-live-react/package-lock.json b/pdl-live-react/package-lock.json index 32c174516..8779f9ac8 100644 --- a/pdl-live-react/package-lock.json +++ b/pdl-live-react/package-lock.json @@ -1,12 +1,12 @@ { "name": "PDL", - "version": "0.7.0", + "version": "0.7.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "PDL", - "version": "0.7.0", + "version": "0.7.1", "dependencies": { "@patternfly/react-code-editor": "^6.1.0", "@patternfly/react-core": "^6.1.0", diff --git a/pdl-live-react/package.json b/pdl-live-react/package.json index 13bc21606..cf5adcb8a 100644 --- a/pdl-live-react/package.json +++ b/pdl-live-react/package.json @@ -1,7 +1,7 @@ { "name": "PDL", "private": true, - "version": "0.7.0", + "version": "0.7.1", "type": "module", "scripts": { "prod:mac:1": "npm run tauri build -- --no-bundle --target=universal-apple-darwin", diff --git a/pdl-live-react/src-tauri/Cargo.lock b/pdl-live-react/src-tauri/Cargo.lock index bb0461cd5..c6c6192f2 100644 --- a/pdl-live-react/src-tauri/Cargo.lock +++ b/pdl-live-react/src-tauri/Cargo.lock @@ -3545,7 +3545,7 @@ checksum = "df94ce210e5bc13cb6651479fa48d14f601d9858cfe0467f43ae157023b938d3" [[package]] name = "pdl" -version = "0.7.0" +version = "0.7.1" dependencies = [ "async-openai", "async-recursion", diff --git a/pdl-live-react/src-tauri/Cargo.toml b/pdl-live-react/src-tauri/Cargo.toml index 2ac9ef147..c477183ea 100644 --- a/pdl-live-react/src-tauri/Cargo.toml +++ b/pdl-live-react/src-tauri/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pdl" -version = "0.7.0" +version = "0.7.1" description = "Prompt Declaration Language" authors = ["nickm@us.ibm.com"] edition = "2024"