diff --git a/pgml-extension/src/api.rs b/pgml-extension/src/api.rs index 729be53a5..b560ee9d4 100644 --- a/pgml-extension/src/api.rs +++ b/pgml-extension/src/api.rs @@ -13,16 +13,25 @@ use serde_json::json; use crate::bindings::sklearn::package_version; use crate::orm::*; +macro_rules! unwrap_or_error { + ($i:expr) => { + match $i { + Ok(v) => v, + Err(e) => error!("{e}"), + } + }; +} + #[cfg(feature = "python")] #[pg_extern] pub fn activate_venv(venv: &str) -> bool { - crate::bindings::venv::activate_venv(venv) + unwrap_or_error!(crate::bindings::venv::activate_venv(venv)) } #[cfg(feature = "python")] #[pg_extern(immutable, parallel_safe)] pub fn validate_python_dependencies() -> bool { - crate::bindings::venv::activate(); + unwrap_or_error!(crate::bindings::venv::activate()); Python::with_gil(|py| { let sys = PyModule::import(py, "sys").unwrap(); @@ -40,13 +49,12 @@ pub fn validate_python_dependencies() -> bool { } }); - info!( - "Scikit-learn {}, XGBoost {}, LightGBM {}, NumPy {}", - package_version("sklearn"), - package_version("xgboost"), - package_version("lightgbm"), - package_version("numpy"), - ); + let sklearn = unwrap_or_error!(package_version("sklearn")); + let xgboost = unwrap_or_error!(package_version("xgboost")); + let lightgbm = unwrap_or_error!(package_version("lightgbm")); + let numpy = unwrap_or_error!(package_version("numpy")); + + info!("Scikit-learn {sklearn}, XGBoost {xgboost}, LightGBM {lightgbm}, NumPy {numpy}",); true } @@ -58,8 +66,8 @@ pub fn validate_python_dependencies() {} #[cfg(feature = "python")] #[pg_extern] pub fn python_package_version(name: &str) -> String { - crate::bindings::venv::activate(); - package_version(name) + unwrap_or_error!(crate::bindings::venv::activate()); + unwrap_or_error!(package_version(name)) } #[cfg(not(feature = "python"))] @@ -71,9 +79,9 @@ pub fn python_package_version(name: &str) { #[cfg(feature = "python")] #[pg_extern] pub fn python_pip_freeze() -> TableIterator<'static, (name!(package, String),)> { - crate::bindings::venv::activate(); + unwrap_or_error!(crate::bindings::venv::activate()); - let packages = crate::bindings::venv::freeze() + let packages = unwrap_or_error!(crate::bindings::venv::freeze()) .into_iter() .map(|package| (package,)); @@ -99,7 +107,7 @@ pub fn validate_shared_library() { #[cfg(feature = "python")] #[pg_extern] fn python_version() -> String { - crate::bindings::venv::activate(); + unwrap_or_error!(crate::bindings::venv::activate()); let mut version = String::new(); Python::with_gil(|py| { @@ -479,27 +487,31 @@ fn predict_row(project_name: &str, row: pgrx::datum::AnyElement) -> f32 { #[pg_extern(immutable, parallel_safe, strict, name = "predict")] fn predict_model(model_id: i64, features: Vec) -> f32 { - Model::find_cached(model_id).predict(&features) + let model = unwrap_or_error!(Model::find_cached(model_id)); + unwrap_or_error!(model.predict(&features)) } #[pg_extern(immutable, parallel_safe, strict, name = "predict_proba")] fn predict_model_proba(model_id: i64, features: Vec) -> Vec { - Model::find_cached(model_id).predict_proba(&features) + let model = unwrap_or_error!(Model::find_cached(model_id)); + unwrap_or_error!(model.predict_proba(&features)) } #[pg_extern(immutable, parallel_safe, strict, name = "predict_joint")] fn predict_model_joint(model_id: i64, features: Vec) -> Vec { - Model::find_cached(model_id).predict_joint(&features) + let model = unwrap_or_error!(Model::find_cached(model_id)); + unwrap_or_error!(model.predict_joint(&features)) } #[pg_extern(immutable, parallel_safe, strict, name = "predict_batch")] fn predict_model_batch(model_id: i64, features: Vec) -> Vec { - Model::find_cached(model_id).predict_batch(&features) + let model = unwrap_or_error!(Model::find_cached(model_id)); + unwrap_or_error!(model.predict_batch(&features)) } #[pg_extern(immutable, parallel_safe, strict, name = "predict")] fn predict_model_row(model_id: i64, row: pgrx::datum::AnyElement) -> f32 { - let model = Model::find_cached(model_id); + let model = unwrap_or_error!(Model::find_cached(model_id)); let snapshot = &model.snapshot; let numeric_encoded_features = model.numeric_encode_features(&[row]); let features_width = snapshot.features_width(); @@ -514,7 +526,7 @@ fn predict_model_row(model_id: i64, row: pgrx::datum::AnyElement) -> f32 { let column = &snapshot.columns[position.column_position - 1]; column.preprocess(&data, &mut processed, features_width, position.row_position); }); - model.predict(&processed) + unwrap_or_error!(model.predict(&processed)) } #[pg_extern] @@ -617,7 +629,11 @@ pub fn chunk( text: &str, kwargs: default!(JsonB, "'{}'"), ) -> TableIterator<'static, (name!(chunk_index, i64), name!(chunk, String))> { - let chunks = crate::bindings::langchain::chunk(splitter, text, &kwargs.0); + let chunks = match crate::bindings::langchain::chunk(splitter, text, &kwargs.0) { + Ok(chunks) => chunks, + Err(e) => error!("{e}"), + }; + let chunks = chunks .into_iter() .enumerate() @@ -838,28 +854,23 @@ fn tune( #[cfg(feature = "python")] #[pg_extern(name = "sklearn_f1_score")] pub fn sklearn_f1_score(ground_truth: Vec, y_hat: Vec) -> f32 { - crate::bindings::sklearn::f1(&ground_truth, &y_hat) + unwrap_or_error!(crate::bindings::sklearn::f1(&ground_truth, &y_hat)) } #[cfg(feature = "python")] #[pg_extern(name = "sklearn_r2_score")] pub fn sklearn_r2_score(ground_truth: Vec, y_hat: Vec) -> f32 { - crate::bindings::sklearn::r2(&ground_truth, &y_hat) + unwrap_or_error!(crate::bindings::sklearn::r2(&ground_truth, &y_hat)) } #[cfg(feature = "python")] #[pg_extern(name = "sklearn_regression_metrics")] pub fn sklearn_regression_metrics(ground_truth: Vec, y_hat: Vec) -> JsonB { - JsonB( - serde_json::from_str( - &serde_json::to_string(&crate::bindings::sklearn::regression_metrics( - &ground_truth, - &y_hat, - )) - .unwrap(), - ) - .unwrap(), - ) + let metrics = unwrap_or_error!(crate::bindings::sklearn::regression_metrics( + &ground_truth, + &y_hat, + )); + JsonB(json!(metrics)) } #[cfg(feature = "python")] @@ -869,17 +880,13 @@ pub fn sklearn_classification_metrics( y_hat: Vec, num_classes: i64, ) -> JsonB { - JsonB( - serde_json::from_str( - &serde_json::to_string(&crate::bindings::sklearn::classification_metrics( - &ground_truth, - &y_hat, - num_classes as usize, - )) - .unwrap(), - ) - .unwrap(), - ) + let metrics = unwrap_or_error!(crate::bindings::sklearn::classification_metrics( + &ground_truth, + &y_hat, + num_classes as _ + )); + + JsonB(json!(metrics)) } #[pg_extern] diff --git a/pgml-extension/src/bindings/langchain.rs b/pgml-extension/src/bindings/langchain.rs index f5e6f3e1d..7ccaab954 100644 --- a/pgml-extension/src/bindings/langchain.rs +++ b/pgml-extension/src/bindings/langchain.rs @@ -1,37 +1,29 @@ +use anyhow::Result; use once_cell::sync::Lazy; use pgrx::*; use pyo3::prelude::*; use pyo3::types::PyTuple; -static PY_MODULE: Lazy> = Lazy::new(|| { - Python::with_gil(|py| -> Py { - let src = include_str!(concat!( - env!("CARGO_MANIFEST_DIR"), - "/src/bindings/langchain.py" - )); +use crate::{bindings::TracebackError, create_pymodule}; - PyModule::from_code(py, src, "", "").unwrap().into() - }) -}); +create_pymodule!("/src/bindings/langchain.py"); -pub fn chunk(splitter: &str, text: &str, kwargs: &serde_json::Value) -> Vec { - crate::bindings::venv::activate(); +pub fn chunk(splitter: &str, text: &str, kwargs: &serde_json::Value) -> Result> { + crate::bindings::venv::activate()?; let kwargs = serde_json::to_string(kwargs).unwrap(); - Python::with_gil(|py| -> Vec { - let chunk: Py = PY_MODULE.getattr(py, "chunk").unwrap(); + Python::with_gil(|py| -> Result> { + let chunk: Py = get_module!(PY_MODULE).getattr(py, "chunk")?; - chunk + Ok(chunk .call1( py, PyTuple::new( py, &[splitter.into_py(py), text.into_py(py), kwargs.into_py(py)], ), - ) - .unwrap() - .extract(py) - .unwrap() + )? + .extract(py)?) }) } diff --git a/pgml-extension/src/bindings/lightgbm.rs b/pgml-extension/src/bindings/lightgbm.rs index 3862739f7..61a74292a 100644 --- a/pgml-extension/src/bindings/lightgbm.rs +++ b/pgml-extension/src/bindings/lightgbm.rs @@ -2,6 +2,8 @@ use crate::bindings::Bindings; use crate::orm::dataset::Dataset; use crate::orm::task::Task; use crate::orm::Hyperparams; + +use anyhow::Result; use lightgbm; use pgrx::*; use serde_json::json; @@ -22,15 +24,18 @@ impl std::fmt::Debug for Estimator { } } -pub fn fit_regression(dataset: &Dataset, hyperparams: &Hyperparams) -> Box { +pub fn fit_regression(dataset: &Dataset, hyperparams: &Hyperparams) -> Result> { fit(dataset, hyperparams, Task::regression) } -pub fn fit_classification(dataset: &Dataset, hyperparams: &Hyperparams) -> Box { +pub fn fit_classification( + dataset: &Dataset, + hyperparams: &Hyperparams, +) -> Result> { fit(dataset, hyperparams, Task::classification) } -fn fit(dataset: &Dataset, hyperparams: &Hyperparams, task: Task) -> Box { +fn fit(dataset: &Dataset, hyperparams: &Hyperparams, task: Task) -> Result> { let mut hyperparams = hyperparams.clone(); match task { Task::regression => { @@ -65,14 +70,19 @@ fn fit(dataset: &Dataset, hyperparams: &Hyperparams, task: Task) -> Box Vec { - let results = self.predict_proba(features, num_features); - match num_classes { + fn predict( + &self, + features: &[f32], + num_features: usize, + num_classes: usize, + ) -> Result> { + let results = self.predict_proba(features, num_features)?; + Ok(match num_classes { // TODO make lightgbm predict both classes like scikit and xgboost 0 => results, 2 => results.iter().map(|i| i.round()).collect(), @@ -87,47 +97,46 @@ impl Bindings for Estimator { .unwrap() as f32 }) .collect(), - } + }) } // Predict the raw probability of classes for a classifier. - fn predict_proba(&self, features: &[f32], num_features: usize) -> Vec { - self.estimator - .predict(features, num_features as i32) - .unwrap() + fn predict_proba(&self, features: &[f32], num_features: usize) -> Result> { + Ok(self + .estimator + .predict(features, num_features as i32)? .into_iter() .map(|i| i as f32) - .collect() + .collect()) } /// Serialize self to bytes - fn to_bytes(&self) -> Vec { + fn to_bytes(&self) -> Result> { let r: u64 = rand::random(); let path = format!("/tmp/pgml_{}.bin", r); - self.estimator.save_file(&path).unwrap(); - let bytes = std::fs::read(&path).unwrap(); - std::fs::remove_file(&path).unwrap(); + self.estimator.save_file(&path)?; + let bytes = std::fs::read(&path)?; + std::fs::remove_file(&path)?; - bytes + Ok(bytes) } /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized, { let r: u64 = rand::random(); let path = format!("/tmp/pgml_{}.bin", r); - std::fs::write(&path, bytes).unwrap(); + std::fs::write(&path, bytes)?; let mut estimator = lightgbm::Booster::from_file(&path); if estimator.is_err() { // backward compatibility w/ 2.0.0 - std::fs::write(&path, &bytes[16..]).unwrap(); + std::fs::write(&path, &bytes[16..])?; estimator = lightgbm::Booster::from_file(&path); } - std::fs::remove_file(&path).unwrap(); - Box::new(Estimator { - estimator: estimator.unwrap(), - }) + std::fs::remove_file(&path)?; + let estimator = estimator?; + Ok(Box::new(Estimator { estimator })) } } diff --git a/pgml-extension/src/bindings/linfa.rs b/pgml-extension/src/bindings/linfa.rs index 8c358e7f6..2470f223d 100644 --- a/pgml-extension/src/bindings/linfa.rs +++ b/pgml-extension/src/bindings/linfa.rs @@ -1,5 +1,6 @@ use std::convert::From; +use anyhow::{bail, Result}; use linfa::prelude::Predict; use linfa::traits::Fit; use ndarray::{ArrayView1, ArrayView2}; @@ -16,7 +17,7 @@ pub struct LinearRegression { } impl LinearRegression { - pub fn fit(dataset: &Dataset, hyperparams: &Hyperparams) -> Box + pub fn fit(dataset: &Dataset, hyperparams: &Hyperparams) -> Result> where Self: Sized, { @@ -37,45 +38,49 @@ impl LinearRegression { estimator = estimator .with_intercept(value.as_bool().expect("fit_intercept must be boolean")) } - _ => error!("Unknown {}: {:?}", key.as_str(), value), + _ => bail!("Unknown {}: {:?}", key.as_str(), value), }; } let estimator = estimator.fit(&linfa_dataset).unwrap(); - Box::new(LinearRegression { + Ok(Box::new(LinearRegression { estimator, num_features: dataset.num_features, - }) + })) } } impl Bindings for LinearRegression { /// Predict a novel datapoint. - fn predict(&self, features: &[f32], num_features: usize, _num_classes: usize) -> Vec { + fn predict( + &self, + features: &[f32], + num_features: usize, + _num_classes: usize, + ) -> Result> { let records = - ArrayView2::from_shape((features.len() / num_features, num_features), features) - .unwrap(); - self.estimator.predict(records).targets.into_raw_vec() + ArrayView2::from_shape((features.len() / num_features, num_features), features)?; + Ok(self.estimator.predict(records).targets.into_raw_vec()) } /// Predict a novel datapoint. - fn predict_proba(&self, _features: &[f32], _num_features: usize) -> Vec { - todo!("predict_proba is currently only supported by the Python runtime.") + fn predict_proba(&self, _features: &[f32], _num_features: usize) -> Result> { + bail!("predict_proba is currently only supported by the Python runtime.") } /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized, { - let estimator: LinearRegression = rmp_serde::from_read(bytes).unwrap(); - Box::new(estimator) + let estimator: LinearRegression = rmp_serde::from_read(bytes)?; + Ok(Box::new(estimator)) } /// Serialize self to bytes - fn to_bytes(&self) -> Vec { - rmp_serde::to_vec(self).unwrap() + fn to_bytes(&self) -> Result> { + Ok(rmp_serde::to_vec(self)?) } } @@ -88,7 +93,7 @@ pub struct LogisticRegression { } impl LogisticRegression { - pub fn fit(dataset: &Dataset, hyperparams: &Hyperparams) -> Box + pub fn fit(dataset: &Dataset, hyperparams: &Hyperparams) -> Result> where Self: Sized, { @@ -127,18 +132,18 @@ impl LogisticRegression { value.as_f64().expect("gradient_tolerance must be a float") as f32, ) } - _ => error!("Unknown {}: {:?}", key.as_str(), value), + _ => bail!("Unknown {}: {:?}", key.as_str(), value), }; } let estimator = estimator.fit(&linfa_dataset).unwrap(); - Box::new(LogisticRegression { + Ok(Box::new(LogisticRegression { estimator_binary: None, estimator_multi: Some(estimator), num_features: dataset.num_features, num_distinct_labels: dataset.num_distinct_labels, - }) + })) } else { let mut estimator = linfa_logistic::LogisticRegression::default(); @@ -162,35 +167,39 @@ impl LogisticRegression { value.as_f64().expect("gradient_tolerance must be a float") as f32, ) } - _ => error!("Unknown {}: {:?}", key.as_str(), value), + _ => bail!("Unknown {}: {:?}", key.as_str(), value), }; } let estimator = estimator.fit(&linfa_dataset).unwrap(); - Box::new(LogisticRegression { + Ok(Box::new(LogisticRegression { estimator_binary: Some(estimator), estimator_multi: None, num_features: dataset.num_features, num_distinct_labels: dataset.num_distinct_labels, - }) + })) } } } impl Bindings for LogisticRegression { - fn predict_proba(&self, _features: &[f32], _num_features: usize) -> Vec { - todo!("predict_proba is currently only supported by the Python runtime.") + fn predict_proba(&self, _features: &[f32], _num_features: usize) -> Result> { + bail!("predict_proba is currently only supported by the Python runtime.") } - fn predict(&self, features: &[f32], _num_features: usize, _num_classes: usize) -> Vec { + fn predict( + &self, + features: &[f32], + _num_features: usize, + _num_classes: usize, + ) -> Result> { let records = ArrayView2::from_shape( (features.len() / self.num_features, self.num_features), features, - ) - .unwrap(); + )?; - if self.num_distinct_labels > 2 { + Ok(if self.num_distinct_labels > 2 { self.estimator_multi .as_ref() .unwrap() @@ -210,21 +219,21 @@ impl Bindings for LogisticRegression { .into_iter() .map(|x| x as f32) .collect() - } + }) } /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized, { - let estimator: LogisticRegression = rmp_serde::from_read(bytes).unwrap(); - Box::new(estimator) + let estimator: LogisticRegression = rmp_serde::from_read(bytes)?; + Ok(Box::new(estimator)) } /// Serialize self to bytes - fn to_bytes(&self) -> Vec { - rmp_serde::to_vec(self).unwrap() + fn to_bytes(&self) -> Result> { + Ok(rmp_serde::to_vec(self)?) } } @@ -235,7 +244,7 @@ pub struct Svm { } impl Svm { - pub fn fit(dataset: &Dataset, hyperparams: &Hyperparams) -> Box { + pub fn fit(dataset: &Dataset, hyperparams: &Hyperparams) -> Result> { let records = ArrayView2::from_shape( (dataset.num_train_rows, dataset.num_features), &dataset.x_train, @@ -268,47 +277,51 @@ impl Svm { "poli" => estimator = estimator.polynomial_kernel(3.0, 1.0), // degree = 3, c = 1.0 as per Scikit "linear" => estimator = estimator.linear_kernel(), "rbf" => estimator = estimator.gaussian_kernel(1e-7), // Default eps - value => error!("Unknown kernel: {}", value), + value => bail!("Unknown kernel: {}", value), } } - _ => error!("Unknown {}: {:?}", key, value), + _ => bail!("Unknown {}: {:?}", key, value), } } let estimator = estimator.fit(&linfa_dataset).unwrap(); - Box::new(Svm { + Ok(Box::new(Svm { estimator, num_features: dataset.num_features, - }) + })) } } impl Bindings for Svm { - fn predict_proba(&self, _features: &[f32], _num_features: usize) -> Vec { - todo!("predict_proba is currently only supported by the Python runtime.") + fn predict_proba(&self, _features: &[f32], _num_features: usize) -> Result> { + bail!("predict_proba is currently only supported by the Python runtime.") } /// Predict a novel datapoint. - fn predict(&self, features: &[f32], num_features: usize, _num_classes: usize) -> Vec { + fn predict( + &self, + features: &[f32], + num_features: usize, + _num_classes: usize, + ) -> Result> { let records = - ArrayView2::from_shape((features.len() / num_features, num_features), features) - .unwrap(); + ArrayView2::from_shape((features.len() / num_features, num_features), features)?; - self.estimator.predict(records).targets.into_raw_vec() + Ok(self.estimator.predict(records).targets.into_raw_vec()) } /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized, { - let estimator: Svm = rmp_serde::from_read(bytes).unwrap(); - Box::new(estimator) + let estimator: Svm = rmp_serde::from_read(bytes)?; + Ok(Box::new(estimator)) } /// Serialize self to bytes - fn to_bytes(&self) -> Vec { - rmp_serde::to_vec(self).unwrap() + fn to_bytes(&self) -> Result> { + Ok(rmp_serde::to_vec(self)?) } } diff --git a/pgml-extension/src/bindings/mod.rs b/pgml-extension/src/bindings/mod.rs index 5c32608f1..417a2e28a 100644 --- a/pgml-extension/src/bindings/mod.rs +++ b/pgml-extension/src/bindings/mod.rs @@ -7,6 +7,32 @@ use pyo3::{PyResult, Python}; use crate::orm::*; +#[cfg(feature = "python")] +#[macro_export] +macro_rules! create_pymodule { + ($pyfile:literal) => { + pub static PY_MODULE: Lazy>> = Lazy::new(|| { + Python::with_gil(|py| -> anyhow::Result> { + let src = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), $pyfile)); + Ok(PyModule::from_code(py, src, "", "") + .format_traceback(py)? + .into()) + }) + }); + }; +} + +#[cfg(feature = "python")] +#[macro_export] +macro_rules! get_module { + ($module:ident) => { + match $module.as_ref() { + Ok(module) => module, + Err(e) => anyhow::bail!(e), + } + }; +} + #[cfg(feature = "python")] pub mod langchain; pub mod lightgbm; @@ -19,7 +45,7 @@ pub mod transformers; pub mod venv; pub mod xgboost; -pub type Fit = fn(dataset: &Dataset, hyperparams: &Hyperparams) -> Box; +pub type Fit = fn(dataset: &Dataset, hyperparams: &Hyperparams) -> Result>; /// The Bindings trait that has to be implemented by all algorithm /// providers we use in PostgresML. We don't rely on Serde serialization, @@ -28,16 +54,21 @@ pub type Fit = fn(dataset: &Dataset, hyperparams: &Hyperparams) -> Box Vec; + fn predict( + &self, + features: &[f32], + num_features: usize, + num_classes: usize, + ) -> Result>; /// Predict the probability of each class. - fn predict_proba(&self, features: &[f32], num_features: usize) -> Vec; + fn predict_proba(&self, features: &[f32], num_features: usize) -> Result>; /// Serialize self to bytes - fn to_bytes(&self) -> Vec; + fn to_bytes(&self) -> Result>; /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized; } diff --git a/pgml-extension/src/bindings/sklearn.rs b/pgml-extension/src/bindings/sklearn.rs index 75383e5e7..99e9cfe78 100644 --- a/pgml-extension/src/bindings/sklearn.rs +++ b/pgml-extension/src/bindings/sklearn.rs @@ -10,28 +10,20 @@ use pgrx::*; /// defined in `src/bindings/sklearn.py`. use std::collections::HashMap; +use anyhow::Result; use once_cell::sync::Lazy; use pyo3::prelude::*; use pyo3::types::PyTuple; use crate::bindings::Bindings; -use crate::orm::*; +use crate::{bindings::TracebackError, create_pymodule, orm::*}; -static PY_MODULE: Lazy> = Lazy::new(|| { - Python::with_gil(|py| -> Py { - let src = include_str!(concat!( - env!("CARGO_MANIFEST_DIR"), - "/src/bindings/sklearn.py" - )); - - PyModule::from_code(py, src, "", "").unwrap().into() - }) -}); +create_pymodule!("/src/bindings/sklearn.py"); macro_rules! wrap_fit { ($fn_name:tt, $task:literal) => { - pub fn $fn_name(dataset: &Dataset, hyperparams: &Hyperparams) -> Box { + pub fn $fn_name(dataset: &Dataset, hyperparams: &Hyperparams) -> Result> { fit(dataset, hyperparams, $task) } }; @@ -139,56 +131,49 @@ fn fit( dataset: &Dataset, hyperparams: &Hyperparams, algorithm_task: &'static str, -) -> Box { +) -> Result> { let hyperparams = serde_json::to_string(hyperparams).unwrap(); let (estimator, predict, predict_proba) = - Python::with_gil(|py| -> (Py, Py, Py) { - let estimator: Py = PY_MODULE.getattr(py, "estimator").unwrap(); + Python::with_gil(|py| -> Result<(Py, Py, Py)> { + let module = get_module!(PY_MODULE); + + let estimator: Py = module.getattr(py, "estimator")?; - let train: Py = estimator - .call1( + let train: Py = estimator.call1( + py, + PyTuple::new( py, - PyTuple::new( - py, - &[ - String::from(algorithm_task).into_py(py), - dataset.num_features.into_py(py), - dataset.num_labels.into_py(py), - hyperparams.into_py(py), - ], - ), - ) - .unwrap(); - - let estimator: Py = train - .call1(py, PyTuple::new(py, [&dataset.x_train, &dataset.y_train])) - .unwrap(); - - let predict: Py = PY_MODULE - .getattr(py, "predictor") - .unwrap() - .call1(py, PyTuple::new(py, [&estimator])) - .unwrap() - .extract(py) - .unwrap(); - - let predict_proba: Py = PY_MODULE - .getattr(py, "predictor_proba") - .unwrap() - .call1(py, PyTuple::new(py, [&estimator])) - .unwrap() - .extract(py) - .unwrap(); - - (estimator, predict, predict_proba) - }); - - Box::new(Estimator { + &[ + String::from(algorithm_task).into_py(py), + dataset.num_features.into_py(py), + dataset.num_labels.into_py(py), + hyperparams.into_py(py), + ], + ), + )?; + + let estimator: Py = + train.call1(py, PyTuple::new(py, [&dataset.x_train, &dataset.y_train]))?; + + let predict: Py = module + .getattr(py, "predictor")? + .call1(py, PyTuple::new(py, [&estimator]))? + .extract(py)?; + + let predict_proba: Py = module + .getattr(py, "predictor_proba")? + .call1(py, PyTuple::new(py, [&estimator]))? + .extract(py)?; + + Ok((estimator, predict, predict_proba)) + })?; + + Ok(Box::new(Estimator { estimator, predict, predict_proba, - }) + })) } pub struct Estimator { @@ -211,139 +196,125 @@ impl std::fmt::Debug for Estimator { impl Bindings for Estimator { /// Predict a novel datapoint. - fn predict(&self, features: &[f32], _num_features: usize, _num_classes: usize) -> Vec { - Python::with_gil(|py| -> Vec { - self.predict - .call1(py, PyTuple::new(py, [features])) - .unwrap() - .extract(py) - .unwrap() + fn predict( + &self, + features: &[f32], + _num_features: usize, + _num_classes: usize, + ) -> Result> { + Python::with_gil(|py| { + Ok(self + .predict + .call1(py, PyTuple::new(py, [features]))? + .extract(py)?) }) } - fn predict_proba(&self, features: &[f32], _num_features: usize) -> Vec { - Python::with_gil(|py| -> Vec { - self.predict_proba - .call1(py, PyTuple::new(py, [features])) - .unwrap() - .extract(py) - .unwrap() + fn predict_proba(&self, features: &[f32], _num_features: usize) -> Result> { + Python::with_gil(|py| { + Ok(self + .predict_proba + .call1(py, PyTuple::new(py, [features]))? + .extract(py)?) }) } /// Serialize self to bytes - fn to_bytes(&self) -> Vec { - Python::with_gil(|py| -> Vec { - let save = PY_MODULE.getattr(py, "save").unwrap(); - save.call1(py, PyTuple::new(py, [&self.estimator])) - .unwrap() - .extract(py) - .unwrap() + fn to_bytes(&self) -> Result> { + Python::with_gil(|py| { + let save = get_module!(PY_MODULE).getattr(py, "save")?; + Ok(save + .call1(py, PyTuple::new(py, [&self.estimator]))? + .extract(py)?) }) } /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized, { - Python::with_gil(|py| -> Box { - let load = PY_MODULE.getattr(py, "load").unwrap(); - let estimator: Py = load - .call1(py, PyTuple::new(py, [bytes])) - .unwrap() - .extract(py) - .unwrap(); - - let predict: Py = PY_MODULE - .getattr(py, "predictor") - .unwrap() - .call1(py, PyTuple::new(py, [&estimator])) - .unwrap() - .extract(py) - .unwrap(); - - let predict_proba: Py = PY_MODULE - .getattr(py, "predictor_proba") - .unwrap() - .call1(py, PyTuple::new(py, [&estimator])) - .unwrap() - .extract(py) - .unwrap(); - - Box::new(Estimator { + Python::with_gil(|py| -> Result> { + let module = get_module!(PY_MODULE); + + let load = module.getattr(py, "load")?; + let estimator: Py = load.call1(py, PyTuple::new(py, [bytes]))?.extract(py)?; + + let predict: Py = module + .getattr(py, "predictor")? + .call1(py, PyTuple::new(py, [&estimator]))? + .extract(py)?; + + let predict_proba: Py = module + .getattr(py, "predictor_proba")? + .call1(py, PyTuple::new(py, [&estimator]))? + .extract(py)?; + + Ok(Box::new(Estimator { estimator, predict, predict_proba, - }) + })) }) } } -fn sklearn_metric(name: &str, ground_truth: &[f32], y_hat: &[f32]) -> f32 { - Python::with_gil(|py| -> f32 { - let calculate_metric = PY_MODULE.getattr(py, "calculate_metric").unwrap(); - let wrapper: Py = calculate_metric - .call1(py, PyTuple::new(py, [name])) - .unwrap() - .extract(py) +fn sklearn_metric(name: &str, ground_truth: &[f32], y_hat: &[f32]) -> Result { + Python::with_gil(|py| { + let calculate_metric = get_module!(PY_MODULE) + .getattr(py, "calculate_metric") .unwrap(); + let wrapper: Py = calculate_metric + .call1(py, PyTuple::new(py, [name]))? + .extract(py)?; let score: f32 = wrapper - .call1(py, PyTuple::new(py, [ground_truth, y_hat])) - .unwrap() - .extract(py) - .unwrap(); + .call1(py, PyTuple::new(py, [ground_truth, y_hat]))? + .extract(py)?; - score + Ok(score) }) } -pub fn f1(ground_truth: &[f32], y_hat: &[f32]) -> f32 { +pub fn f1(ground_truth: &[f32], y_hat: &[f32]) -> Result { sklearn_metric("f1", ground_truth, y_hat) } -pub fn r2(ground_truth: &[f32], y_hat: &[f32]) -> f32 { +pub fn r2(ground_truth: &[f32], y_hat: &[f32]) -> Result { sklearn_metric("r2", ground_truth, y_hat) } -pub fn precision(ground_truth: &[f32], y_hat: &[f32]) -> f32 { +pub fn precision(ground_truth: &[f32], y_hat: &[f32]) -> Result { sklearn_metric("precision", ground_truth, y_hat) } -pub fn recall(ground_truth: &[f32], y_hat: &[f32]) -> f32 { +pub fn recall(ground_truth: &[f32], y_hat: &[f32]) -> Result { sklearn_metric("recall", ground_truth, y_hat) } -pub fn confusion_matrix(ground_truth: &[f32], y_hat: &[f32]) -> Vec> { - Python::with_gil(|py| -> Vec> { - let calculate_metric = PY_MODULE.getattr(py, "calculate_metric").unwrap(); +pub fn confusion_matrix(ground_truth: &[f32], y_hat: &[f32]) -> Result>> { + Python::with_gil(|py| { + let calculate_metric = get_module!(PY_MODULE).getattr(py, "calculate_metric")?; let wrapper: Py = calculate_metric - .call1(py, PyTuple::new(py, ["confusion_matrix"])) - .unwrap() - .extract(py) - .unwrap(); + .call1(py, PyTuple::new(py, ["confusion_matrix"]))? + .extract(py)?; let matrix: Vec> = wrapper - .call1(py, PyTuple::new(py, [ground_truth, y_hat])) - .unwrap() - .extract(py) - .unwrap(); + .call1(py, PyTuple::new(py, [ground_truth, y_hat]))? + .extract(py)?; - matrix + Ok(matrix) }) } -pub fn regression_metrics(ground_truth: &[f32], y_hat: &[f32]) -> HashMap { - Python::with_gil(|py| -> HashMap { - let calculate_metric = PY_MODULE.getattr(py, "regression_metrics").unwrap(); +pub fn regression_metrics(ground_truth: &[f32], y_hat: &[f32]) -> Result> { + Python::with_gil(|py| { + let calculate_metric = get_module!(PY_MODULE).getattr(py, "regression_metrics")?; let scores: HashMap = calculate_metric - .call1(py, PyTuple::new(py, [ground_truth, y_hat])) - .unwrap() - .extract(py) - .unwrap(); + .call1(py, PyTuple::new(py, [ground_truth, y_hat]))? + .extract(py)?; - scores + Ok(scores) }) } @@ -351,47 +322,43 @@ pub fn classification_metrics( ground_truth: &[f32], y_hat: &[f32], num_classes: usize, -) -> HashMap { - let mut scores = Python::with_gil(|py| -> HashMap { - let calculate_metric = PY_MODULE.getattr(py, "classification_metrics").unwrap(); +) -> Result> { + let mut scores = Python::with_gil(|py| -> Result> { + let calculate_metric = get_module!(PY_MODULE).getattr(py, "classification_metrics")?; let scores: HashMap = calculate_metric - .call1(py, PyTuple::new(py, [ground_truth, y_hat])) - .unwrap() - .extract(py) - .unwrap(); + .call1(py, PyTuple::new(py, [ground_truth, y_hat]))? + .extract(py)?; - scores - }); + Ok(scores) + })?; if num_classes == 2 { - let roc_auc = sklearn_metric("roc_auc", ground_truth, y_hat); + let roc_auc = sklearn_metric("roc_auc", ground_truth, y_hat)?; scores.insert("roc_auc".to_string(), roc_auc); } - scores + Ok(scores) } pub fn cluster_metrics( num_features: usize, inputs: &[f32], labels: &[f32], -) -> HashMap { - Python::with_gil(|py| -> HashMap { - let calculate_metric = PY_MODULE.getattr(py, "cluster_metrics").unwrap(); +) -> Result> { + Python::with_gil(|py| { + let calculate_metric = get_module!(PY_MODULE).getattr(py, "cluster_metrics")?; let scores: HashMap = calculate_metric - .call1(py, (num_features, PyTuple::new(py, [inputs, labels]))) - .unwrap() - .extract(py) - .unwrap(); + .call1(py, (num_features, PyTuple::new(py, [inputs, labels])))? + .extract(py)?; - scores + Ok(scores) }) } -pub fn package_version(name: &str) -> String { - Python::with_gil(|py| -> String { - let package = py.import(name).unwrap(); - package.getattr("__version__").unwrap().extract().unwrap() +pub fn package_version(name: &str) -> Result { + Python::with_gil(|py| { + let package = py.import(name)?; + Ok(package.getattr("__version__")?.extract()?) }) } diff --git a/pgml-extension/src/bindings/transformers/mod.rs b/pgml-extension/src/bindings/transformers/mod.rs index c5fffa1c6..7621a2b3e 100644 --- a/pgml-extension/src/bindings/transformers/mod.rs +++ b/pgml-extension/src/bindings/transformers/mod.rs @@ -10,29 +10,21 @@ use pyo3::prelude::*; use pyo3::types::PyTuple; use serde_json::Value; +use crate::create_pymodule; use crate::orm::{Task, TextDataset}; use super::TracebackError; pub mod whitelist; -static PY_MODULE: Lazy> = Lazy::new(|| { - Python::with_gil(|py| -> Py { - let src = include_str!(concat!( - env!("CARGO_MANIFEST_DIR"), - "/src/bindings/transformers/transformers.py" - )); - - PyModule::from_code(py, src, "", "").unwrap().into() - }) -}); +create_pymodule!("/src/bindings/transformers/transformers.py"); pub fn transform( task: &serde_json::Value, args: &serde_json::Value, inputs: Vec<&str>, ) -> Result { - crate::bindings::venv::activate(); + crate::bindings::venv::activate()?; whitelist::verify_task(task)?; @@ -41,7 +33,9 @@ pub fn transform( let inputs = serde_json::to_string(&inputs)?; let results = Python::with_gil(|py| -> Result { - let transform: Py = PY_MODULE.getattr(py, "transform").format_traceback(py)?; + let transform: Py = get_module!(PY_MODULE) + .getattr(py, "transform") + .format_traceback(py)?; let output = transform .call1( @@ -61,7 +55,7 @@ pub fn transform( pub fn get_model_from(task: &Value) -> Result { Ok(Python::with_gil(|py| -> Result { - let get_model_from = PY_MODULE + let get_model_from = get_module!(PY_MODULE) .getattr(py, "get_model_from") .format_traceback(py)?; let model = get_model_from @@ -76,11 +70,13 @@ pub fn embed( inputs: Vec<&str>, kwargs: &serde_json::Value, ) -> Result>> { - crate::bindings::venv::activate(); + crate::bindings::venv::activate()?; let kwargs = serde_json::to_string(kwargs)?; Python::with_gil(|py| -> Result>> { - let embed: Py = PY_MODULE.getattr(py, "embed").format_traceback(py)?; + let embed: Py = get_module!(PY_MODULE) + .getattr(py, "embed") + .format_traceback(py)?; let output = embed .call1( py, @@ -105,13 +101,15 @@ pub fn tune( hyperparams: &JsonB, path: &Path, ) -> Result> { - crate::bindings::venv::activate(); + crate::bindings::venv::activate()?; let task = task.to_string(); let hyperparams = serde_json::to_string(&hyperparams.0)?; Python::with_gil(|py| -> Result> { - let tune = PY_MODULE.getattr(py, "tune").format_traceback(py)?; + let tune = get_module!(PY_MODULE) + .getattr(py, "tune") + .format_traceback(py)?; let path = path.to_string_lossy(); let output = tune .call1( @@ -133,10 +131,12 @@ pub fn tune( } pub fn generate(model_id: i64, inputs: Vec<&str>, config: JsonB) -> Result> { - crate::bindings::venv::activate(); + crate::bindings::venv::activate()?; Python::with_gil(|py| -> Result> { - let generate = PY_MODULE.getattr(py, "generate").format_traceback(py)?; + let generate = get_module!(PY_MODULE) + .getattr(py, "generate") + .format_traceback(py)?; let config = serde_json::to_string(&config.0)?; // cloning inputs in case we have to re-call on error is rather unfortunate here // similarly, using a json string to pass kwargs is also unfortunate extra parsing @@ -161,7 +161,7 @@ pub fn generate(model_id: i64, inputs: Vec<&str>, config: JsonB) -> Result, kwargs: &serde_json::Value, ) -> Result { - crate::bindings::venv::activate(); + crate::bindings::venv::activate()?; let kwargs = serde_json::to_string(kwargs)?; let dataset = Python::with_gil(|py| -> Result { - let load_dataset: Py = PY_MODULE.getattr(py, "load_dataset").format_traceback(py)?; + let load_dataset: Py = get_module!(PY_MODULE) + .getattr(py, "load_dataset") + .format_traceback(py)?; Ok(load_dataset .call1( py, @@ -374,10 +376,10 @@ pub fn load_dataset( } pub fn clear_gpu_cache(memory_usage: Option) -> Result { - crate::bindings::venv::activate(); + crate::bindings::venv::activate().unwrap(); Python::with_gil(|py| -> Result { - let clear_gpu_cache: Py = PY_MODULE + let clear_gpu_cache: Py = get_module!(PY_MODULE) .getattr(py, "clear_gpu_cache") .format_traceback(py)?; let success = clear_gpu_cache diff --git a/pgml-extension/src/bindings/venv.rs b/pgml-extension/src/bindings/venv.rs index 4dcb47501..458803a08 100644 --- a/pgml-extension/src/bindings/venv.rs +++ b/pgml-extension/src/bindings/venv.rs @@ -1,45 +1,40 @@ //! Use virtualenv. +use anyhow::Result; use once_cell::sync::Lazy; use pgrx::*; use pyo3::prelude::*; use pyo3::types::PyTuple; use crate::config::get_config; +use crate::{bindings::TracebackError, create_pymodule}; static CONFIG_NAME: &str = "pgml.venv"; -static PY_MODULE: Lazy> = Lazy::new(|| { - Python::with_gil(|py| -> Py { - let src = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/src/bindings/venv.py")); +create_pymodule!("/src/bindings/venv.py"); - PyModule::from_code(py, src, "", "").unwrap().into() - }) -}); - -pub fn activate_venv(venv: &str) -> bool { - Python::with_gil(|py| -> bool { - let activate_venv: Py = PY_MODULE.getattr(py, "activate_venv").unwrap(); - let result: Py = activate_venv - .call1(py, PyTuple::new(py, &[venv.to_string().into_py(py)])) - .unwrap(); +pub fn activate_venv(venv: &str) -> Result { + Python::with_gil(|py| { + let activate_venv: Py = get_module!(PY_MODULE).getattr(py, "activate_venv")?; + let result: Py = + activate_venv.call1(py, PyTuple::new(py, &[venv.to_string().into_py(py)]))?; - result.extract(py).unwrap() + Ok(result.extract(py)?) }) } -pub fn activate() -> bool { +pub fn activate() -> Result { match get_config(CONFIG_NAME) { Some(venv) => activate_venv(&venv), - None => false, + None => Ok(false), } } -pub fn freeze() -> Vec { - Python::with_gil(|py| -> Vec { - let freeze: Py = PY_MODULE.getattr(py, "freeze").unwrap(); - let result: Py = freeze.call0(py).unwrap(); +pub fn freeze() -> Result> { + Python::with_gil(|py| { + let freeze = get_module!(PY_MODULE).getattr(py, "freeze")?; + let result = freeze.call0(py)?; - result.extract(py).unwrap() + Ok(result.extract(py)?) }) } diff --git a/pgml-extension/src/bindings/xgboost.rs b/pgml-extension/src/bindings/xgboost.rs index 482f53243..3521560a2 100644 --- a/pgml-extension/src/bindings/xgboost.rs +++ b/pgml-extension/src/bindings/xgboost.rs @@ -4,6 +4,7 @@ /// that are very effective on real-world datasets. /// /// It uses its own dense matrix. +use anyhow::{anyhow, Result}; use xgboost::parameters::tree::*; use xgboost::parameters::*; use xgboost::{Booster, DMatrix}; @@ -136,11 +137,14 @@ fn get_tree_params(hyperparams: &Hyperparams) -> tree::TreeBoosterParameters { params.build().unwrap() } -pub fn fit_regression(dataset: &Dataset, hyperparams: &Hyperparams) -> Box { +pub fn fit_regression(dataset: &Dataset, hyperparams: &Hyperparams) -> Result> { fit(dataset, hyperparams, learning::Objective::RegLinear) } -pub fn fit_classification(dataset: &Dataset, hyperparams: &Hyperparams) -> Box { +pub fn fit_classification( + dataset: &Dataset, + hyperparams: &Hyperparams, +) -> Result> { fit( dataset, hyperparams, @@ -152,7 +156,7 @@ fn fit( dataset: &Dataset, hyperparams: &Hyperparams, objective: learning::Objective, -) -> Box { +) -> Result> { // split the train/test data into DMatrix let mut dtrain = DMatrix::from_dense(&dataset.x_train, dataset.num_train_rows).unwrap(); let mut dtest = DMatrix::from_dense(&dataset.x_test, dataset.num_test_rows).unwrap(); @@ -216,7 +220,7 @@ fn fit( // train model, and print evaluation data let booster = Booster::train(¶ms).unwrap(); - Box::new(Estimator { estimator: booster }) + Ok(Box::new(Estimator { estimator: booster })) } pub struct Estimator { @@ -236,10 +240,15 @@ impl std::fmt::Debug for Estimator { } impl Bindings for Estimator { - fn predict(&self, features: &[f32], num_features: usize, num_classes: usize) -> Vec { - let x = DMatrix::from_dense(features, features.len() / num_features).unwrap(); - let y = self.estimator.predict(&x).unwrap(); - match num_classes { + fn predict( + &self, + features: &[f32], + num_features: usize, + num_classes: usize, + ) -> Result> { + let x = DMatrix::from_dense(features, features.len() / num_features)?; + let y = self.estimator.predict(&x)?; + Ok(match num_classes { 0 => y, _ => y .chunks(num_classes) @@ -252,26 +261,26 @@ impl Bindings for Estimator { .unwrap() as f32 }) .collect::>(), - } + }) } - fn predict_proba(&self, features: &[f32], num_features: usize) -> Vec { - let x = DMatrix::from_dense(features, features.len() / num_features).unwrap(); - self.estimator.predict(&x).unwrap() + fn predict_proba(&self, features: &[f32], num_features: usize) -> Result> { + let x = DMatrix::from_dense(features, features.len() / num_features)?; + Ok(self.estimator.predict(&x)?) } /// Serialize self to bytes - fn to_bytes(&self) -> Vec { + fn to_bytes(&self) -> Result> { let r: u64 = rand::random(); let path = format!("/tmp/pgml_{}.bin", r); - self.estimator.save(std::path::Path::new(&path)).unwrap(); - let bytes = std::fs::read(&path).unwrap(); - std::fs::remove_file(&path).unwrap(); - bytes + self.estimator.save(std::path::Path::new(&path))?; + let bytes = std::fs::read(&path)?; + std::fs::remove_file(&path)?; + Ok(bytes) } /// Deserialize self from bytes, with additional context - fn from_bytes(bytes: &[u8]) -> Box + fn from_bytes(bytes: &[u8]) -> Result> where Self: Sized, { @@ -281,7 +290,7 @@ impl Bindings for Estimator { estimator = Booster::load_buffer(&bytes[16..]); } - let mut estimator = estimator.unwrap(); + let mut estimator = estimator?; // Get concurrency setting let concurrency: i64 = Spi::get_one( @@ -290,14 +299,13 @@ impl Bindings for Estimator { current_setting('pgml.predict_concurrency', true), '2' )::bigint", - ) - .unwrap() + )? .unwrap(); estimator .set_param("nthread", &concurrency.to_string()) - .expect("could not set nthread XGBoost parameter"); + .map_err(|e| anyhow!("could not set nthread XGBoost parameter: {e}"))?; - Box::new(Estimator { estimator }) + Ok(Box::new(Estimator { estimator })) } } diff --git a/pgml-extension/src/orm/file.rs b/pgml-extension/src/orm/file.rs index 6485b0709..142b6f77a 100644 --- a/pgml-extension/src/orm/file.rs +++ b/pgml-extension/src/orm/file.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use parking_lot::Mutex; use std::collections::HashMap; use std::str::FromStr; @@ -17,12 +18,12 @@ static DEPLOYED_ESTIMATORS_BY_MODEL_ID: Lazy Arc> { +pub fn find_deployed_estimator_by_model_id(model_id: i64) -> Result>> { // Get the estimator from process memory, if we already loaded it. { let estimators = DEPLOYED_ESTIMATORS_BY_MODEL_ID.lock(); if let Some(estimator) = estimators.get(&model_id) { - return estimator.clone(); + return Ok(estimator.clone()); } } @@ -93,31 +94,31 @@ pub fn find_deployed_estimator_by_model_id(model_id: i64) -> Arc = match runtime { Runtime::rust => { match algorithm { - Algorithm::xgboost => crate::bindings::xgboost::Estimator::from_bytes(&data), - Algorithm::lightgbm => crate::bindings::lightgbm::Estimator::from_bytes(&data), + Algorithm::xgboost => crate::bindings::xgboost::Estimator::from_bytes(&data)?, + Algorithm::lightgbm => crate::bindings::lightgbm::Estimator::from_bytes(&data)?, Algorithm::linear => match task { - Task::regression => crate::bindings::linfa::LinearRegression::from_bytes(&data), + Task::regression => crate::bindings::linfa::LinearRegression::from_bytes(&data)?, Task::classification => { - crate::bindings::linfa::LogisticRegression::from_bytes(&data) + crate::bindings::linfa::LogisticRegression::from_bytes(&data)? } _ => error!("Rust runtime only supports `classification` and `regression` task types for linear algorithms."), }, - Algorithm::svm => crate::bindings::linfa::Svm::from_bytes(&data), + Algorithm::svm => crate::bindings::linfa::Svm::from_bytes(&data)?, _ => todo!(), //smartcore_load(&data, task, algorithm, &hyperparams), } } #[cfg(feature = "python")] - Runtime::python => crate::bindings::sklearn::Estimator::from_bytes(&data), + Runtime::python => crate::bindings::sklearn::Estimator::from_bytes(&data)?, #[cfg(not(feature = "python"))] Runtime::python => { - error!("Python runtime not supported, recompile with `--features python`") + anyhow::bail!("Python runtime not supported, recompile with `--features python`") } }; // Cache the estimator in process memory. let mut estimators = DEPLOYED_ESTIMATORS_BY_MODEL_ID.lock(); estimators.insert(model_id, Arc::new(bindings)); - estimators.get(&model_id).unwrap().clone() + Ok(estimators.get(&model_id).unwrap().clone()) } diff --git a/pgml-extension/src/orm/model.rs b/pgml-extension/src/orm/model.rs index eb1653832..43ed2cd99 100644 --- a/pgml-extension/src/orm/model.rs +++ b/pgml-extension/src/orm/model.rs @@ -1,3 +1,4 @@ +use anyhow::{anyhow, bail, Result}; use parking_lot::Mutex; use std::collections::HashMap; use std::fmt::{Display, Error, Formatter}; @@ -88,7 +89,7 @@ impl Model { }; if runtime == Runtime::python { - crate::bindings::venv::activate(); + crate::bindings::venv::activate().unwrap(); } let dataset = snapshot.tabular_dataset(); @@ -271,7 +272,7 @@ impl Model { model } - fn find(id: i64) -> Model { + fn find(id: i64) -> Result { let mut model = None; // Create the model record. Spi::connect(|client| { @@ -308,31 +309,31 @@ impl Model { Runtime::rust => { match algorithm { Algorithm::xgboost => { - crate::bindings::xgboost::Estimator::from_bytes(&data) + crate::bindings::xgboost::Estimator::from_bytes(&data)? } Algorithm::lightgbm => { - crate::bindings::lightgbm::Estimator::from_bytes(&data) + crate::bindings::lightgbm::Estimator::from_bytes(&data)? } Algorithm::linear => match project.task { Task::regression => { - crate::bindings::linfa::LinearRegression::from_bytes(&data) + crate::bindings::linfa::LinearRegression::from_bytes(&data)? } Task::classification => { - crate::bindings::linfa::LogisticRegression::from_bytes(&data) + crate::bindings::linfa::LogisticRegression::from_bytes(&data)? } - _ => error!("No default runtime available for tasks other than `classification` and `regression` when using a linear algorithm."), + _ => bail!("No default runtime available for tasks other than `classification` and `regression` when using a linear algorithm."), }, - Algorithm::svm => crate::bindings::linfa::Svm::from_bytes(&data), + Algorithm::svm => crate::bindings::linfa::Svm::from_bytes(&data)?, _ => todo!(), //smartcore_load(&data, task, algorithm, &hyperparams), } } #[cfg(feature = "python")] - Runtime::python => crate::bindings::sklearn::Estimator::from_bytes(&data), + Runtime::python => crate::bindings::sklearn::Estimator::from_bytes(&data)?, #[cfg(not(feature = "python"))] Runtime::python => { - error!("Python runtime not supported, recompile with `--features python`") + bail!("Python runtime not supported, recompile with `--features python`") } }; @@ -366,29 +367,31 @@ impl Model { num_features, }); } - }); - model.unwrap_or_else(|| { - error!( + Ok(()) + })?; + + Ok(model.ok_or_else(|| { + anyhow!( "pgml.models WHERE id = {:?} could not be loaded. Does it exist?", id ) - }) + })?) } - pub fn find_cached(id: i64) -> Arc { + pub fn find_cached(id: i64) -> Result> { { let models = DEPLOYED_MODELS_BY_ID.lock(); if let Some(model) = models.get(&id) { - return model.clone(); + return Ok(model.clone()); } } info!("Model cache miss {:?}", id); - let model = Model::find(id); + let model = Arc::new(Model::find(id)?); let mut models = DEPLOYED_MODELS_BY_ID.lock(); - models.insert(id, Arc::new(model)); - models.get(&id).unwrap().clone() + models.insert(id, Arc::clone(&model)); + Ok(model) } fn get_fit_function(&self) -> crate::bindings::Fit { @@ -563,7 +566,7 @@ impl Model { fn test(&self, dataset: &Dataset) -> IndexMap { info!("Testing {:?} estimator {:?}", self.project.task, self); // Test the estimator on the data - let y_hat = self.predict_batch(&dataset.x_test); + let y_hat = self.predict_batch(&dataset.x_test).unwrap(); let y_test = &dataset.y_test; // Calculate metrics to evaluate this estimator and its hyperparams @@ -573,7 +576,7 @@ impl Model { #[cfg(all(feature = "python", any(test, feature = "pg_test")))] { let sklearn_metrics = - crate::bindings::sklearn::regression_metrics(y_test, &y_hat); + crate::bindings::sklearn::regression_metrics(y_test, &y_hat).unwrap(); metrics.insert("sklearn_r2".to_string(), sklearn_metrics["r2"]); metrics.insert( "sklearn_mean_absolute_error".to_string(), @@ -605,7 +608,8 @@ impl Model { y_test, &y_hat, dataset.num_distinct_labels, - ); + ) + .unwrap(); if dataset.num_distinct_labels == 2 { metrics.insert("sklearn_roc_auc".to_string(), sklearn_metrics["roc_auc"]); @@ -671,7 +675,8 @@ impl Model { dataset.num_features, &dataset.x_test, &y_hat, - ); + ) + .unwrap(); metrics.insert("silhouette".to_string(), sklearn_metrics["silhouette"]); } } @@ -693,7 +698,7 @@ impl Model { let fit = self.get_fit_function(); let now = Instant::now(); - self.bindings = Some(fit(dataset, hyperparams)); + self.bindings = Some(fit(dataset, hyperparams).unwrap()); let fit_time = now.elapsed(); let now = Instant::now(); @@ -1134,23 +1139,23 @@ impl Model { features } - pub fn predict(&self, features: &[f32]) -> f32 { - self.predict_batch(features)[0] + pub fn predict(&self, features: &[f32]) -> Result { + Ok(self.predict_batch(features)?[0]) } - pub fn predict_proba(&self, features: &[f32]) -> Vec { + pub fn predict_proba(&self, features: &[f32]) -> Result> { match self.project.task { - Task::regression => error!("You can't predict probabilities for a regression model"), + Task::regression => bail!("You can't predict probabilities for a regression model"), Task::classification => self .bindings .as_ref() .unwrap() .predict_proba(features, self.num_features), - _ => error!("no predict_proba for huggingface"), + _ => bail!("no predict_proba for huggingface"), } } - pub fn predict_joint(&self, features: &[f32]) -> Vec { + pub fn predict_joint(&self, features: &[f32]) -> Result> { match self.project.task { Task::regression => self.bindings.as_ref().unwrap().predict( features, @@ -1158,13 +1163,13 @@ impl Model { self.num_classes, ), Task::classification => { - error!("You can't predict joint probabilities for a classification model") + bail!("You can't predict joint probabilities for a classification model") } - _ => error!("no predict_joint for huggingface"), + _ => bail!("no predict_joint for huggingface"), } } - pub fn predict_batch(&self, features: &[f32]) -> Vec { + pub fn predict_batch(&self, features: &[f32]) -> Result> { self.bindings .as_ref() .unwrap()