diff --git a/crates/rune/src/cli/run.rs b/crates/rune/src/cli/run.rs index ee1f1573f..b3574eb78 100644 --- a/crates/rune/src/cli/run.rs +++ b/crates/rune/src/cli/run.rs @@ -1,12 +1,13 @@ use std::io::Write; use std::path::PathBuf; +use std::pin::pin; use std::sync::Arc; use std::time::Instant; use anyhow::{anyhow, Result}; use crate::cli::{AssetKind, CommandBase, Config, ExitCode, Io, SharedFlags}; -use crate::runtime::{UnitStorage, VmError, VmExecution, VmResult}; +use crate::runtime::{budget, GeneratorState, UnitStorage, VmError, VmExecution, VmResult}; use crate::{Context, Hash, Sources, Unit, Value, Vm}; mod cli { @@ -365,7 +366,7 @@ where T: AsRef + AsMut, { let mut current_frame_len = execution.vm().call_frames().len(); - let mut result = VmResult::Ok(None); + let mut result = VmResult::Ok(GeneratorState::Yielded(Value::unit())); while limit > 0 { let vm = execution.vm(); @@ -443,7 +444,7 @@ where match result { VmResult::Ok(result) => { - if let Some(result) = result { + if let GeneratorState::Complete(result) = result { return Ok(result); } } @@ -452,7 +453,7 @@ where } } - result = execution.async_step().await; + result = pin!(budget::with(1, execution.async_resume())).await; result = match result { VmResult::Err(error) => { diff --git a/crates/rune/src/runtime/awaited.rs b/crates/rune/src/runtime/awaited.rs index 137730077..d4e408c4c 100644 --- a/crates/rune/src/runtime/awaited.rs +++ b/crates/rune/src/runtime/awaited.rs @@ -1,8 +1,15 @@ +use core::future::Future as _; +use core::pin::Pin; +use core::task::Poll; +use core::task::{ready, Context}; + use crate::runtime::{Future, Output, Select, Vm, VmResult}; +use super::future_vm_try; + /// A stored await task. #[derive(Debug)] -pub(crate) enum Awaited { +pub(super) enum Awaited { /// A future to be awaited. Future(Future, Output), /// A select to be awaited. @@ -10,20 +17,28 @@ pub(crate) enum Awaited { } impl Awaited { - /// Wait for the given awaited into the specified virtual machine. - pub(crate) async fn into_vm(self, vm: &mut Vm) -> VmResult<()> { - match self { - Self::Future(future, out) => { - let value = vm_try!(future.await.with_vm(vm)); - vm_try!(out.store(vm.stack_mut(), value)); + /// Poll the inner thing being awaited. + pub(super) fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + vm: &mut Vm, + ) -> Poll> { + let this = self.get_mut(); + + match this { + Awaited::Future(future, out) => { + let future = Pin::new(future); + let value = future_vm_try!(ready!(future.poll(cx)).with_vm(vm)); + future_vm_try!(out.store(vm.stack_mut(), value)); } - Self::Select(select, value_addr) => { - let (ip, value) = vm_try!(select.await.with_vm(vm)); + Awaited::Select(select, value_addr) => { + let select = Pin::new(select); + let (ip, value) = future_vm_try!(ready!(select.poll(cx)).with_vm(vm)); vm.set_ip(ip); - vm_try!(value_addr.store(vm.stack_mut(), || value)); + future_vm_try!(value_addr.store(vm.stack_mut(), || value)); } } - VmResult::Ok(()) + Poll::Ready(VmResult::Ok(())) } } diff --git a/crates/rune/src/runtime/function.rs b/crates/rune/src/runtime/function.rs index 6c35f371c..1bac3a0f0 100644 --- a/crates/rune/src/runtime/function.rs +++ b/crates/rune/src/runtime/function.rs @@ -195,7 +195,7 @@ impl Function { /// /// A stop reason will be returned in case the function call results in /// a need to suspend the execution. - pub(crate) fn call_with_vm( + pub(super) fn call_with_vm( &self, vm: &mut Vm, addr: InstAddress, @@ -607,7 +607,7 @@ where /// /// A stop reason will be returned in case the function call results in /// a need to suspend the execution. - pub(crate) fn call_with_vm( + pub(super) fn call_with_vm( &self, vm: &mut Vm, addr: InstAddress, diff --git a/crates/rune/src/runtime/generator.rs b/crates/rune/src/runtime/generator.rs index dc9ee71fd..53dbb5145 100644 --- a/crates/rune/src/runtime/generator.rs +++ b/crates/rune/src/runtime/generator.rs @@ -58,11 +58,7 @@ where return VmResult::Ok(None); }; - let state = if execution.is_resumed() { - vm_try!(execution.resume_with(Value::empty())) - } else { - vm_try!(execution.resume()) - }; + let state = vm_try!(execution.resume_with(Value::empty())); VmResult::Ok(match state { GeneratorState::Yielded(value) => Some(value), @@ -80,11 +76,7 @@ where .as_mut() .ok_or(VmErrorKind::GeneratorComplete)); - let state = if execution.is_resumed() { - vm_try!(execution.resume_with(value)) - } else { - vm_try!(execution.resume()) - }; + let state = vm_try!(execution.resume_with(value)); if state.is_complete() { self.execution = None; diff --git a/crates/rune/src/runtime/macros.rs b/crates/rune/src/runtime/macros.rs index 80dc693c8..6743eb773 100644 --- a/crates/rune/src/runtime/macros.rs +++ b/crates/rune/src/runtime/macros.rs @@ -139,3 +139,16 @@ macro_rules! double_ended_range_iter { } }; } + +macro_rules! __future_vm_try { + ($expr:expr) => { + match $crate::runtime::try_result($expr) { + $crate::runtime::VmResult::Ok(value) => value, + $crate::runtime::VmResult::Err(err) => { + return core::task::Poll::Ready($crate::runtime::VmResult::Err(err)); + } + } + }; +} + +pub(super) use __future_vm_try as future_vm_try; diff --git a/crates/rune/src/runtime/mod.rs b/crates/rune/src/runtime/mod.rs index d9e7b864b..ff3f91afb 100644 --- a/crates/rune/src/runtime/mod.rs +++ b/crates/rune/src/runtime/mod.rs @@ -5,6 +5,7 @@ mod tests; #[macro_use] mod macros; +use self::macros::future_vm_try; mod steps_between; use self::steps_between::StepsBetween; @@ -32,7 +33,7 @@ pub use self::args::{Args, FixedArgs}; pub(crate) use self::args::{DynArgs, DynArgsUsed, DynGuardedArgs}; mod awaited; -pub(crate) use self::awaited::Awaited; +use self::awaited::Awaited; pub mod budget; @@ -192,11 +193,11 @@ pub use self::vm_error::{try_result, RuntimeError, TryFromResult, VmError, VmRes pub(crate) use self::vm_error::{VmErrorKind, VmIntegerRepr}; mod vm_execution; -pub(crate) use self::vm_execution::ExecutionState; pub use self::vm_execution::{VmExecution, VmSendExecution}; mod vm_halt; -pub(crate) use self::vm_halt::{VmHalt, VmHaltInfo}; +use self::vm_halt::VmHalt; +pub use self::vm_halt::VmHaltInfo; mod fmt; pub use self::fmt::Formatter; diff --git a/crates/rune/src/runtime/stream.rs b/crates/rune/src/runtime/stream.rs index 53edc7fc1..b6ca7e8ff 100644 --- a/crates/rune/src/runtime/stream.rs +++ b/crates/rune/src/runtime/stream.rs @@ -60,11 +60,7 @@ where return VmResult::Ok(None); }; - let state = if execution.is_resumed() { - vm_try!(execution.async_resume_with(Value::empty()).await) - } else { - vm_try!(execution.async_resume().await) - }; + let state = vm_try!(execution.async_resume_with(Value::empty()).await); VmResult::Ok(match state { GeneratorState::Yielded(value) => Some(value), @@ -82,11 +78,7 @@ where .as_mut() .ok_or(VmErrorKind::GeneratorComplete)); - let state = if execution.is_resumed() { - vm_try!(execution.async_resume_with(value).await) - } else { - vm_try!(execution.async_resume().await) - }; + let state = vm_try!(execution.async_resume_with(value).await); if state.is_complete() { self.execution = None; diff --git a/crates/rune/src/runtime/vm.rs b/crates/rune/src/runtime/vm.rs index 07c3ba61f..3bf493043 100644 --- a/crates/rune/src/runtime/vm.rs +++ b/crates/rune/src/runtime/vm.rs @@ -3018,7 +3018,7 @@ impl Vm { } /// Evaluate a single instruction. - pub(crate) fn run(&mut self, diagnostics: Option<&mut dyn VmDiagnostics>) -> VmResult { + pub(super) fn run(&mut self, diagnostics: Option<&mut dyn VmDiagnostics>) -> VmResult { let mut vm_diagnostics_obj; let diagnostics = match diagnostics { diff --git a/crates/rune/src/runtime/vm_error.rs b/crates/rune/src/runtime/vm_error.rs index 94588b68a..a12b02627 100644 --- a/crates/rune/src/runtime/vm_error.rs +++ b/crates/rune/src/runtime/vm_error.rs @@ -13,8 +13,8 @@ use crate::{Any, Hash, ItemBuf}; use super::{ AccessError, AccessErrorKind, AnyObjError, AnyObjErrorKind, AnyTypeInfo, BoxedPanic, CallFrame, - DynArgsUsed, DynamicTakeError, ExecutionState, MaybeTypeOf, Panic, Protocol, SliceError, - StackError, StaticString, TypeInfo, TypeOf, Unit, Vm, VmHaltInfo, + DynArgsUsed, DynamicTakeError, MaybeTypeOf, Panic, Protocol, SliceError, StackError, + StaticString, TypeInfo, TypeOf, Unit, Vm, VmHaltInfo, }; /// A virtual machine error which includes tracing information. @@ -659,7 +659,6 @@ pub(crate) enum VmErrorKind { Panic { reason: Panic, }, - NoRunningVm, Halted { halt: VmHaltInfo, }, @@ -834,12 +833,6 @@ pub(crate) enum VmErrorKind { actual: TypeInfo, }, MissingInterfaceEnvironment, - ExpectedExecutionState { - actual: ExecutionState, - }, - ExpectedExitedExecutionState { - actual: ExecutionState, - }, GeneratorComplete, FutureCompleted, // Used in rune-macros. @@ -888,7 +881,6 @@ impl fmt::Display for VmErrorKind { VmErrorKind::BadJump { error } => error.fmt(f), VmErrorKind::DynArgsUsed { error } => error.fmt(f), VmErrorKind::Panic { reason } => write!(f, "Panicked: {reason}"), - VmErrorKind::NoRunningVm {} => write!(f, "No running virtual machines"), VmErrorKind::Halted { halt } => write!(f, "Halted for unexpected reason `{halt}`"), VmErrorKind::Overflow {} => write!(f, "Numerical overflow"), VmErrorKind::Underflow {} => write!(f, "Numerical underflow"), @@ -1052,12 +1044,6 @@ impl fmt::Display for VmErrorKind { VmErrorKind::MissingInterfaceEnvironment {} => { write!(f, "Missing interface environment") } - VmErrorKind::ExpectedExecutionState { actual } => { - write!(f, "Expected resume execution state, but was {actual}") - } - VmErrorKind::ExpectedExitedExecutionState { actual } => { - write!(f, "Expected exited execution state, but was {actual}") - } VmErrorKind::GeneratorComplete {} => { write!(f, "Cannot resume a generator that has completed") } diff --git a/crates/rune/src/runtime/vm_execution.rs b/crates/rune/src/runtime/vm_execution.rs index 67c68b731..f18c8c973 100644 --- a/crates/rune/src/runtime/vm_execution.rs +++ b/crates/rune/src/runtime/vm_execution.rs @@ -1,18 +1,29 @@ use core::fmt; use core::future::Future; use core::mem::{replace, take}; +use core::pin::{pin, Pin}; +use core::task::{ready, Context, Poll}; use ::rust_alloc::sync::Arc; use crate::alloc::prelude::*; -use crate::runtime::budget; use crate::runtime::{ - Generator, GeneratorState, InstAddress, Output, RuntimeContext, Stream, Unit, Value, Vm, - VmErrorKind, VmHalt, VmHaltInfo, VmResult, + Generator, GeneratorState, Output, RuntimeContext, Stream, Unit, Value, Vm, VmErrorKind, + VmHalt, VmHaltInfo, VmResult, }; use crate::shared::AssertSend; -use super::VmDiagnostics; +use super::{future_vm_try, Awaited, VmDiagnostics}; + +use core::ptr; +use core::task::{RawWaker, RawWakerVTable, Waker}; + +const NOOP_RAW_WAKER: RawWaker = { + const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| NOOP_RAW_WAKER, |_| {}, |_| {}, |_| {}); + RawWaker::new(ptr::null(), &VTABLE) +}; + +static NOOP_WAKER: Waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; /// The state of an execution. We keep track of this because it's important to /// correctly interact with functions that yield (like generators and streams) @@ -20,24 +31,20 @@ use super::VmDiagnostics; /// onto the stack. #[derive(Debug, Clone, Copy, PartialEq)] #[non_exhaustive] -pub(crate) enum ExecutionState { - /// The initial state of an execution. - Initial, - /// execution is waiting. - Resumed(Output), - /// Suspended execution. - Suspended, - /// Execution exited. - Exited(Option), +enum InnerExecutionState { + /// The execution is running. + Running, + /// Execution has stopped running for yielding and expect output to be + /// written to the given output. + Yielded(Output), } -impl fmt::Display for ExecutionState { +impl fmt::Display for InnerExecutionState { + #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - ExecutionState::Initial => write!(f, "initial"), - ExecutionState::Resumed(out) => write!(f, "resumed({out})"), - ExecutionState::Suspended => write!(f, "suspended"), - ExecutionState::Exited(..) => write!(f, "exited"), + InnerExecutionState::Running => write!(f, "running"), + InnerExecutionState::Yielded(out) => write!(f, "yielded({out})"), } } } @@ -60,7 +67,7 @@ where /// The current head vm which holds the execution. head: T, /// The state of an execution. - state: ExecutionState, + state: InnerExecutionState, /// Indicates the current stack of suspended contexts. states: Vec, } @@ -70,19 +77,15 @@ where T: AsRef + AsMut, { /// Construct an execution from a virtual machine. + #[inline] pub(crate) fn new(head: T) -> Self { Self { head, - state: ExecutionState::Initial, + state: InnerExecutionState::Running, states: Vec::new(), } } - /// Test if the current execution state is resumed. - pub(crate) fn is_resumed(&self) -> bool { - matches!(self.state, ExecutionState::Resumed(..)) - } - /// Coerce the current execution into a generator if appropriate. /// /// ``` @@ -112,6 +115,7 @@ where /// } /// # Ok::<_, rune::support::Error>(()) /// ``` + #[inline] pub fn into_generator(self) -> Generator { Generator::from_execution(self) } @@ -148,16 +152,19 @@ where /// # })?; /// # Ok::<_, rune::support::Error>(()) /// ``` + #[inline] pub fn into_stream(self) -> Stream { Stream::from_execution(self) } /// Get a reference to the current virtual machine. + #[inline] pub fn vm(&self) -> &Vm { self.head.as_ref() } /// Get a mutable reference the current virtual machine. + #[inline] pub fn vm_mut(&mut self) -> &mut Vm { self.head.as_mut() } @@ -165,19 +172,16 @@ where /// Complete the current execution without support for async instructions. /// /// This will error if the execution is suspended through yielding. - pub async fn async_complete(&mut self) -> VmResult { - match vm_try!(self.async_resume().await) { - GeneratorState::Complete(value) => VmResult::Ok(value), - GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted { - halt: VmHaltInfo::Yielded, - }), - } + #[inline] + pub fn async_complete(&mut self) -> Complete<'_, '_, T> { + self.inner_poll(None, ResumeState::Default).into_complete() } /// Complete the current execution without support for async instructions. /// /// If any async instructions are encountered, this will error. This will /// also error if the execution is suspended through yielding. + #[inline] pub fn complete(&mut self) -> VmResult { self.complete_with_diagnostics(None) } @@ -186,116 +190,46 @@ where /// /// If any async instructions are encountered, this will error. This will /// also error if the execution is suspended through yielding. + #[inline] pub fn complete_with_diagnostics( &mut self, diagnostics: Option<&mut dyn VmDiagnostics>, ) -> VmResult { - match vm_try!(self.resume_with_diagnostics(diagnostics)) { - GeneratorState::Complete(value) => VmResult::Ok(value), - GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted { - halt: VmHaltInfo::Yielded, - }), - } + vm_try!(self.inner_resume(diagnostics, ResumeState::Resume(Value::empty()))).complete() } /// Resume the current execution with the given value and resume /// asynchronous execution. - pub async fn async_resume_with(&mut self, value: Value) -> VmResult { - let state = replace(&mut self.state, ExecutionState::Suspended); - - let ExecutionState::Resumed(out) = state else { - return VmResult::err(VmErrorKind::ExpectedExecutionState { actual: state }); - }; - - vm_try!(out.store(self.head.as_mut().stack_mut(), value)); - self.inner_async_resume(None).await + pub fn async_resume_with(&mut self, value: Value) -> Resume<'_, '_, T> { + self.inner_poll(None, ResumeState::Resume(value)) + .into_resume() } /// Resume the current execution with support for async instructions. /// /// If the function being executed is a generator or stream this will resume /// it while returning a unit from the current `yield`. - pub async fn async_resume(&mut self) -> VmResult { - self.async_resume_with_diagnostics(None).await + pub fn async_resume(&mut self) -> Resume<'_, '_, T> { + self.async_resume_with_diagnostics(None) } /// Resume the current execution with support for async instructions. /// /// If the function being executed is a generator or stream this will resume /// it while returning a unit from the current `yield`. - pub async fn async_resume_with_diagnostics( - &mut self, - diagnostics: Option<&mut dyn VmDiagnostics>, - ) -> VmResult { - if let ExecutionState::Resumed(out) = self.state { - vm_try!(out.store(self.head.as_mut().stack_mut(), Value::unit)); - } - - self.inner_async_resume(diagnostics).await - } - - async fn inner_async_resume( - &mut self, - mut diagnostics: Option<&mut dyn VmDiagnostics>, - ) -> VmResult { - loop { - let vm = self.head.as_mut(); - - match vm_try!(vm - .run(match diagnostics { - Some(ref mut value) => Some(&mut **value), - None => None, - }) - .with_vm(vm)) - { - VmHalt::Exited(addr) => { - self.state = ExecutionState::Exited(addr); - } - VmHalt::Awaited(awaited) => { - vm_try!(awaited.into_vm(vm).await); - continue; - } - VmHalt::VmCall(vm_call) => { - vm_try!(vm_call.into_execution(self)); - continue; - } - VmHalt::Yielded(addr, out) => { - let value = match addr { - Some(addr) => vm.stack().at(addr).clone(), - None => Value::unit(), - }; - - self.state = ExecutionState::Resumed(out); - return VmResult::Ok(GeneratorState::Yielded(value)); - } - halt => { - return VmResult::err(VmErrorKind::Halted { - halt: halt.into_info(), - }) - } - } - - if self.states.is_empty() { - let value = vm_try!(self.end()); - return VmResult::Ok(GeneratorState::Complete(value)); - } - - vm_try!(self.pop_state()); - } + pub fn async_resume_with_diagnostics<'this, 'diag>( + &'this mut self, + diagnostics: Option<&'diag mut dyn VmDiagnostics>, + ) -> Resume<'this, 'diag, T> { + self.inner_poll(diagnostics, ResumeState::Resume(Value::empty())) + .into_resume() } /// Resume the current execution with the given value and resume synchronous /// execution. #[tracing::instrument(skip_all, fields(?value))] pub fn resume_with(&mut self, value: Value) -> VmResult { - let state = replace(&mut self.state, ExecutionState::Suspended); - - let ExecutionState::Resumed(out) = state else { - return VmResult::err(VmErrorKind::ExpectedExecutionState { actual: state }); - }; - - vm_try!(out.store(self.head.as_mut().stack_mut(), value)); - self.inner_resume(None) + vm_try!(self.inner_resume(None, ResumeState::Resume(value))).generator_state() } /// Resume the current execution without support for async instructions. @@ -305,7 +239,7 @@ where /// /// If any async instructions are encountered, this will error. pub fn resume(&mut self) -> VmResult { - self.resume_with_diagnostics(None) + vm_try!(self.inner_resume(None, ResumeState::Resume(Value::empty()))).generator_state() } /// Resume the current execution without support for async instructions. @@ -315,148 +249,40 @@ where /// /// If any async instructions are encountered, this will error. #[tracing::instrument(skip_all, fields(diagnostics=diagnostics.is_some()))] + #[inline] pub fn resume_with_diagnostics( &mut self, diagnostics: Option<&mut dyn VmDiagnostics>, ) -> VmResult { - if let ExecutionState::Resumed(out) = replace(&mut self.state, ExecutionState::Suspended) { - vm_try!(out.store(self.head.as_mut().stack_mut(), Value::unit())); - } - - self.inner_resume(diagnostics) + vm_try!(self.inner_resume(diagnostics, ResumeState::Resume(Value::empty()))) + .generator_state() } + #[inline] fn inner_resume( &mut self, mut diagnostics: Option<&mut dyn VmDiagnostics>, - ) -> VmResult { - loop { - let len = self.states.len(); - let vm = self.head.as_mut(); - - match vm_try!(vm - .run(match diagnostics { - Some(ref mut value) => Some(&mut **value), - None => None, - }) - .with_vm(vm)) - { - VmHalt::Exited(addr) => { - self.state = ExecutionState::Exited(addr); - } - VmHalt::VmCall(vm_call) => { - vm_try!(vm_call.into_execution(self)); - continue; - } - VmHalt::Yielded(addr, out) => { - let value = match addr { - Some(addr) => vm.stack().at(addr).clone(), - None => Value::unit(), - }; - - self.state = ExecutionState::Resumed(out); - return VmResult::Ok(GeneratorState::Yielded(value)); - } - halt => { - return VmResult::err(VmErrorKind::Halted { - halt: halt.into_info(), - }); - } - } - - if len == 0 { - let value = vm_try!(self.end()); - return VmResult::Ok(GeneratorState::Complete(value)); - } - - vm_try!(self.pop_state()); - } + state: ResumeState, + ) -> VmResult { + pin!(self.inner_poll(diagnostics.take(), state)).once() } - /// Step the single execution for one step without support for async - /// instructions. - /// - /// If any async instructions are encountered, this will error. - pub fn step(&mut self) -> VmResult> { - let len = self.states.len(); - let vm = self.head.as_mut(); - - match vm_try!(budget::with(1, || vm.run(None).with_vm(vm)).call()) { - VmHalt::Exited(addr) => { - self.state = ExecutionState::Exited(addr); - } - VmHalt::VmCall(vm_call) => { - vm_try!(vm_call.into_execution(self)); - return VmResult::Ok(None); - } - VmHalt::Limited => return VmResult::Ok(None), - halt => { - return VmResult::err(VmErrorKind::Halted { - halt: halt.into_info(), - }) - } - } - - if len == 0 { - let value = vm_try!(self.end()); - return VmResult::Ok(Some(value)); - } - - vm_try!(self.pop_state()); - VmResult::Ok(None) - } - - /// Step the single execution for one step with support for async - /// instructions. - pub async fn async_step(&mut self) -> VmResult> { - let vm = self.head.as_mut(); - - match vm_try!(budget::with(1, || vm.run(None).with_vm(vm)).call()) { - VmHalt::Exited(addr) => { - self.state = ExecutionState::Exited(addr); - } - VmHalt::Awaited(awaited) => { - vm_try!(awaited.into_vm(vm).await); - return VmResult::Ok(None); - } - VmHalt::VmCall(vm_call) => { - vm_try!(vm_call.into_execution(self)); - return VmResult::Ok(None); - } - VmHalt::Limited => return VmResult::Ok(None), - halt => { - return VmResult::err(VmErrorKind::Halted { - halt: halt.into_info(), - }); - } - } - - if self.states.is_empty() { - let value = vm_try!(self.end()); - return VmResult::Ok(Some(value)); + #[inline] + fn inner_poll<'this, 'diag>( + &'this mut self, + diagnostics: Option<&'diag mut dyn VmDiagnostics>, + state: ResumeState, + ) -> Execution<'this, 'diag, T> { + Execution { + this: self, + diagnostics, + state, } - - vm_try!(self.pop_state()); - VmResult::Ok(None) - } - - /// End execution and perform debug checks. - pub(crate) fn end(&mut self) -> VmResult { - let ExecutionState::Exited(addr) = self.state else { - return VmResult::err(VmErrorKind::ExpectedExitedExecutionState { actual: self.state }); - }; - - let value = match addr { - Some(addr) => self.head.as_ref().stack().at(addr).clone(), - None => Value::unit(), - }; - - debug_assert!(self.states.is_empty(), "Execution states should be empty"); - VmResult::Ok(value) } /// Push a virtual machine state onto the execution. #[tracing::instrument(skip_all)] + #[inline] pub(crate) fn push_state(&mut self, state: VmExecutionState) -> VmResult<()> { tracing::trace!("pushing suspended state"); let vm = self.head.as_mut(); @@ -465,26 +291,6 @@ where vm_try!(self.states.try_push(VmExecutionState { context, unit })); VmResult::Ok(()) } - - /// Pop a virtual machine state from the execution and transfer the top of - /// the stack from the popped machine. - #[tracing::instrument(skip_all)] - fn pop_state(&mut self) -> VmResult<()> { - tracing::trace!("popping suspended state"); - - let state = vm_try!(self.states.pop().ok_or(VmErrorKind::NoRunningVm)); - let vm = self.head.as_mut(); - - if let Some(context) = state.context { - *vm.context_mut() = context; - } - - if let Some(unit) = state.unit { - *vm.unit_mut() = unit; - } - - VmResult::Ok(()) - } } impl VmExecution<&mut Vm> { @@ -523,14 +329,8 @@ impl VmSendExecution { /// values from escaping from the virtual machine. pub fn async_complete(mut self) -> impl Future> + Send + 'static { let future = async move { - let result = vm_try!(self.0.async_resume().await); - - match result { - GeneratorState::Complete(value) => VmResult::Ok(value), - GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted { - halt: VmHaltInfo::Yielded, - }), - } + let future = self.0.inner_poll(None, ResumeState::Default); + vm_try!(future.await).complete() }; // Safety: we wrap all APIs around the [VmExecution], preventing values @@ -543,19 +343,13 @@ impl VmSendExecution { /// This requires that the result of the Vm is converted into a /// [crate::FromValue] that also implements [Send], which prevents non-Send /// values from escaping from the virtual machine. - pub fn async_complete_with_diagnostics( + pub async fn async_complete_with_diagnostics( mut self, diagnostics: Option<&mut dyn VmDiagnostics>, ) -> impl Future> + Send + '_ { let future = async move { - let result = vm_try!(self.0.async_resume_with_diagnostics(diagnostics).await); - - match result { - GeneratorState::Complete(value) => VmResult::Ok(value), - GeneratorState::Yielded(..) => VmResult::err(VmErrorKind::Halted { - halt: VmHaltInfo::Yielded, - }), - } + let future = self.0.inner_poll(diagnostics, ResumeState::Default); + vm_try!(future.await).complete() }; // Safety: we wrap all APIs around the [VmExecution], preventing values @@ -577,3 +371,219 @@ where }) } } + +enum ResumeState { + Default, + Resume(Value), + Await(Awaited), +} + +/// The future when a virtual machine is resumed. +pub struct Resume<'a, 'b, T> +where + T: AsRef + AsMut, +{ + inner: Execution<'a, 'b, T>, +} + +impl Future for Resume<'_, '_, T> +where + T: AsRef + AsMut, +{ + type Output = VmResult; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = unsafe { Pin::map_unchecked_mut(self, |this| &mut this.inner) }; + let result = future_vm_try!(ready!(inner.poll(cx))).generator_state(); + Poll::Ready(result) + } +} + +/// The future when a value is produced. +pub struct Complete<'a, 'b, T> +where + T: AsRef + AsMut, +{ + inner: Execution<'a, 'b, T>, +} + +impl Future for Complete<'_, '_, T> +where + T: AsRef + AsMut, +{ + type Output = VmResult; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = unsafe { Pin::map_unchecked_mut(self, |this| &mut this.inner) }; + let result = future_vm_try!(ready!(inner.poll(cx))).complete(); + Poll::Ready(result) + } +} + +/// The full outcome of an execution. +/// +/// This includes whether or not the execution was limited. +pub enum ExecutionState { + Complete(Value), + Yielded(Value), + Limited, +} + +impl ExecutionState { + #[inline] + fn complete(self) -> VmResult { + match self { + ExecutionState::Complete(value) => VmResult::Ok(value), + ExecutionState::Yielded(..) => VmResult::err(VmErrorKind::Halted { + halt: VmHaltInfo::Yielded, + }), + ExecutionState::Limited => VmResult::err(VmErrorKind::Halted { + halt: VmHaltInfo::Limited, + }), + } + } + + #[inline] + fn generator_state(self) -> VmResult { + match self { + ExecutionState::Complete(value) => VmResult::Ok(GeneratorState::Complete(value)), + ExecutionState::Yielded(value) => VmResult::Ok(GeneratorState::Yielded(value)), + ExecutionState::Limited => VmResult::err(VmErrorKind::Halted { + halt: VmHaltInfo::Limited, + }), + } + } +} + +/// The future when a virtual machine has been polled. +pub struct Execution<'this, 'diag, T> +where + T: AsRef + AsMut, +{ + this: &'this mut VmExecution, + diagnostics: Option<&'diag mut dyn VmDiagnostics>, + state: ResumeState, +} + +impl<'this, 'diag, T> Execution<'this, 'diag, T> +where + T: AsRef + AsMut, +{ + /// Consume and poll the future once. + /// + /// This will produce an error if the future is not ready, which implies + /// that some async operation was involved that needed to be awaited. + #[inline] + pub fn once(self: Pin<&mut Self>) -> VmResult { + let mut cx = Context::from_waker(&NOOP_WAKER); + + match self.poll(&mut cx) { + Poll::Ready(result) => result, + Poll::Pending => VmResult::err(VmErrorKind::Halted { + halt: VmHaltInfo::Awaited, + }), + } + } + + #[inline] + fn into_resume(self) -> Resume<'this, 'diag, T> { + Resume { inner: self } + } + + #[inline] + fn into_complete(self) -> Complete<'this, 'diag, T> { + Complete { inner: self } + } +} + +impl Future for Execution<'_, '_, T> +where + T: AsRef + AsMut, +{ + type Output = VmResult; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + this, + diagnostics, + state, + } = self.get_mut(); + + loop { + let vm = this.head.as_mut(); + + match state { + ResumeState::Resume(..) => { + let ResumeState::Resume(value) = replace(state, ResumeState::Default) else { + unreachable!(); + }; + + if let InnerExecutionState::Yielded(out) = + replace(&mut this.state, InnerExecutionState::Running) + { + future_vm_try!(out.store(vm.stack_mut(), value)); + } + } + ResumeState::Await(awaited) => { + // SAFETY: We are polling from within a pinned type. + let awaited = unsafe { Pin::new_unchecked(awaited) }; + future_vm_try!(ready!(awaited.poll(cx, vm))); + *state = ResumeState::Default; + } + ResumeState::Default => { + let result = vm + .run(match diagnostics { + Some(ref mut value) => Some(&mut **value), + None => None, + }) + .with_vm(vm); + + match future_vm_try!(result) { + VmHalt::Exited(addr) => { + let Some(state) = this.states.pop() else { + let value = match addr { + Some(addr) => replace( + future_vm_try!(vm.stack_mut().at_mut(addr)), + Value::empty(), + ), + None => Value::unit(), + }; + + return Poll::Ready(VmResult::Ok(ExecutionState::Complete(value))); + }; + + if let Some(context) = state.context { + *vm.context_mut() = context; + } + + if let Some(unit) = state.unit { + *vm.unit_mut() = unit; + } + } + VmHalt::Awaited(new) => { + *state = ResumeState::Await(new); + } + VmHalt::VmCall(vm_call) => { + future_vm_try!(vm_call.into_execution(this)); + } + VmHalt::Yielded(addr, out) => { + let value = match addr { + Some(addr) => vm.stack().at(addr).clone(), + None => Value::unit(), + }; + + this.state = InnerExecutionState::Yielded(out); + return Poll::Ready(VmResult::Ok(ExecutionState::Yielded(value))); + } + VmHalt::Limited => { + return Poll::Ready(VmResult::Ok(ExecutionState::Limited)); + } + } + } + } + } + } +} diff --git a/crates/rune/src/runtime/vm_halt.rs b/crates/rune/src/runtime/vm_halt.rs index 3b544b48a..cb8c08918 100644 --- a/crates/rune/src/runtime/vm_halt.rs +++ b/crates/rune/src/runtime/vm_halt.rs @@ -4,7 +4,7 @@ use crate::runtime::{Awaited, InstAddress, Output, VmCall}; /// The reason why the virtual machine execution stopped. #[derive(Debug)] -pub(crate) enum VmHalt { +pub(super) enum VmHalt { /// The virtual machine exited by running out of call frames, returning the given value. Exited(Option), /// The virtual machine exited because it ran out of execution quota. @@ -17,22 +17,10 @@ pub(crate) enum VmHalt { VmCall(VmCall), } -impl VmHalt { - /// Convert into cheap info enum which only described the reason. - pub(crate) fn into_info(self) -> VmHaltInfo { - match self { - Self::Exited(..) => VmHaltInfo::Exited, - Self::Limited => VmHaltInfo::Limited, - Self::Yielded(..) => VmHaltInfo::Yielded, - Self::Awaited(..) => VmHaltInfo::Awaited, - Self::VmCall(..) => VmHaltInfo::VmCall, - } - } -} - /// The reason why the virtual machine execution stopped. #[derive(Debug, Clone, Copy, PartialEq)] -pub(crate) enum VmHaltInfo { +#[non_exhaustive] +pub enum VmHaltInfo { /// The virtual machine exited by running out of call frames. Exited, /// The virtual machine exited because it ran out of execution quota.