diff --git a/python/psqlpy/__init__.py b/python/psqlpy/__init__.py index 41ede3fe..fbaf123d 100644 --- a/python/psqlpy/__init__.py +++ b/python/psqlpy/__init__.py @@ -13,7 +13,6 @@ ReadVariant, SingleQueryResult, SslMode, - SynchronousCommit, TargetSessionAttrs, Transaction, connect, @@ -35,7 +34,6 @@ "ReadVariant", "SingleQueryResult", "SslMode", - "SynchronousCommit", "TargetSessionAttrs", "Transaction", "connect", diff --git a/python/psqlpy/_internal/__init__.pyi b/python/psqlpy/_internal/__init__.pyi index 8cf394b7..d900d228 100644 --- a/python/psqlpy/_internal/__init__.pyi +++ b/python/psqlpy/_internal/__init__.pyi @@ -150,38 +150,6 @@ class SingleQueryResult: Type that return passed function. """ -class SynchronousCommit(Enum): - """ - Synchronous_commit option for transactions. - - ### Variants: - - `On`: The meaning may change based on whether you have - a synchronous standby or not. - If there is a synchronous standby, - setting the value to on will result in waiting till “remote flush”. - - `Off`: As the name indicates, the commit acknowledgment can come before - flushing the records to disk. - This is generally called as an asynchronous commit. - If the PostgreSQL instance crashes, - the last few asynchronous commits might be lost. - - `Local`: WAL records are written and flushed to local disks. - In this case, the commit will be acknowledged after the - local WAL Write and WAL flush completes. - - `RemoteWrite`: WAL records are successfully handed over to - remote instances which acknowledged back - about the write (not flush). - - `RemoteApply`: This will result in commits waiting until replies from the - current synchronous standby(s) indicate they have received - the commit record of the transaction and applied it so - that it has become visible to queries on the standby(s). - """ - - On = 1 - Off = 2 - Local = 3 - RemoteWrite = 4 - RemoteApply = 5 - class IsolationLevel(Enum): """Isolation Level for transactions.""" @@ -1117,7 +1085,6 @@ class Connection: isolation_level: IsolationLevel | None = None, read_variant: ReadVariant | None = None, deferrable: bool | None = None, - synchronous_commit: SynchronousCommit | None = None, ) -> Transaction: """Create new transaction. @@ -1125,7 +1092,6 @@ class Connection: - `isolation_level`: configure isolation level of the transaction. - `read_variant`: configure read variant of the transaction. - `deferrable`: configure deferrable of the transaction. - - `synchronous_commit`: configure synchronous_commit option for transaction. """ def cursor( self: Self, diff --git a/python/tests/test_transaction.py b/python/tests/test_transaction.py index 280d21be..a6dfd191 100644 --- a/python/tests/test_transaction.py +++ b/python/tests/test_transaction.py @@ -8,7 +8,6 @@ Cursor, IsolationLevel, ReadVariant, - SynchronousCommit, ) from psqlpy.exceptions import ( InterfaceError, @@ -362,29 +361,3 @@ async def test_execute_batch_method(psql_pool: ConnectionPool) -> None: await transaction.execute(querystring="SELECT * FROM execute_batch2") connection.back_to_pool() - - -@pytest.mark.parametrize( - "synchronous_commit", - [ - SynchronousCommit.On, - SynchronousCommit.Off, - SynchronousCommit.Local, - SynchronousCommit.RemoteWrite, - SynchronousCommit.RemoteApply, - ], -) -async def test_synchronous_commit( - synchronous_commit: SynchronousCommit, - psql_pool: ConnectionPool, - table_name: str, - number_database_records: int, -) -> None: - async with psql_pool.acquire() as conn, conn.transaction( - synchronous_commit=synchronous_commit, - ) as trans: - res = await trans.execute( - f"SELECT * FROM {table_name}", - ) - - assert len(res.result()) == number_database_records diff --git a/src/connection/impls.rs b/src/connection/impls.rs new file mode 100644 index 00000000..50b195a0 --- /dev/null +++ b/src/connection/impls.rs @@ -0,0 +1,485 @@ +use bytes::Buf; +use pyo3::{PyAny, Python}; +use tokio_postgres::{CopyInSink, Row, Statement, ToStatement}; + +use crate::{ + exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}, + options::{IsolationLevel, ReadVariant}, + query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, + statement::{statement::PsqlpyStatement, statement_builder::StatementBuilder}, + value_converter::to_python::postgres_to_py, +}; + +use super::{ + structs::{PSQLPyConnection, PoolConnection, SingleConnection}, + traits::{Connection, Transaction}, +}; + +impl Transaction for T +where + T: Connection, +{ + async fn start( + &self, + isolation_level: Option, + read_variant: Option, + deferrable: Option, + ) -> PSQLPyResult<()> { + let start_qs = self.build_start_qs(isolation_level, read_variant, deferrable); + self.batch_execute(start_qs.as_str()).await.map_err(|err| { + RustPSQLDriverError::TransactionBeginError( + format!("Cannot start transaction due to - {err}").into(), + ) + })?; + + Ok(()) + } + + async fn commit(&self) -> PSQLPyResult<()> { + self.batch_execute("COMMIT;").await.map_err(|err| { + RustPSQLDriverError::TransactionCommitError(format!( + "Cannot execute COMMIT statement, error - {err}" + )) + })?; + Ok(()) + } + + async fn rollback(&self) -> PSQLPyResult<()> { + self.batch_execute("ROLLBACK;").await.map_err(|err| { + RustPSQLDriverError::TransactionRollbackError(format!( + "Cannot execute ROLLBACK statement, error - {err}" + )) + })?; + Ok(()) + } +} + +impl Connection for SingleConnection { + async fn prepare(&self, query: &str, prepared: bool) -> PSQLPyResult { + let prepared_stmt = self.connection.prepare(query).await?; + + if !prepared { + self.drop_prepared(&prepared_stmt).await?; + } + return Ok(prepared_stmt); + } + + async fn drop_prepared(&self, stmt: &Statement) -> PSQLPyResult<()> { + let deallocate_query = format!("DEALLOCATE PREPARE {}", stmt.name()); + + Ok(self.connection.batch_execute(&deallocate_query).await?) + } + + async fn query( + &self, + statement: &T, + params: &[&(dyn postgres_types::ToSql + Sync)], + ) -> PSQLPyResult> + where + T: ?Sized + ToStatement, + { + Ok(self.connection.query(statement, params).await?) + } + + async fn query_typed( + &self, + statement: &str, + params: &[(&(dyn postgres_types::ToSql + Sync), postgres_types::Type)], + ) -> PSQLPyResult> { + Ok(self.connection.query_typed(statement, params).await?) + } + + async fn batch_execute(&self, query: &str) -> PSQLPyResult<()> { + Ok(self.connection.batch_execute(query).await?) + } + + async fn query_one( + &self, + statement: &T, + params: &[&(dyn postgres_types::ToSql + Sync)], + ) -> PSQLPyResult + where + T: ?Sized + ToStatement, + { + Ok(self.connection.query_one(statement, params).await?) + } +} + +// impl Transaction for SingleConnection { +// async fn start( +// &self, +// isolation_level: Option, +// read_variant: Option, +// deferrable: Option, +// ) -> PSQLPyResult<()> { +// let start_qs = self.build_start_qs(isolation_level, read_variant, deferrable); +// self.batch_execute(start_qs.as_str()).await.map_err(|err| { +// RustPSQLDriverError::TransactionBeginError( +// format!("Cannot start transaction due to - {err}").into(), +// ) +// })?; + +// Ok(()) +// } + +// async fn commit(&self) -> PSQLPyResult<()> { +// self.batch_execute("COMMIT;").await.map_err(|err| { +// RustPSQLDriverError::TransactionCommitError(format!( +// "Cannot execute COMMIT statement, error - {err}" +// )) +// })?; +// Ok(()) +// } + +// async fn rollback(&self) -> PSQLPyResult<()> { +// self.batch_execute("ROLLBACK;").await.map_err(|err| { +// RustPSQLDriverError::TransactionRollbackError(format!( +// "Cannot execute ROLLBACK statement, error - {err}" +// )) +// })?; +// Ok(()) +// } +// } + +impl Connection for PoolConnection { + async fn prepare(&self, query: &str, prepared: bool) -> PSQLPyResult { + if prepared { + return Ok(self.connection.prepare_cached(query).await?); + } + + let prepared = self.connection.prepare(query).await?; + self.drop_prepared(&prepared).await?; + return Ok(prepared); + } + + async fn drop_prepared(&self, stmt: &Statement) -> PSQLPyResult<()> { + let deallocate_query = format!("DEALLOCATE PREPARE {}", stmt.name()); + + Ok(self.connection.batch_execute(&deallocate_query).await?) + } + + async fn query( + &self, + statement: &T, + params: &[&(dyn postgres_types::ToSql + Sync)], + ) -> PSQLPyResult> + where + T: ?Sized + ToStatement, + { + Ok(self.connection.query(statement, params).await?) + } + + async fn query_typed( + &self, + statement: &str, + params: &[(&(dyn postgres_types::ToSql + Sync), postgres_types::Type)], + ) -> PSQLPyResult> { + Ok(self.connection.query_typed(statement, params).await?) + } + + async fn batch_execute(&self, query: &str) -> PSQLPyResult<()> { + Ok(self.connection.batch_execute(query).await?) + } + + async fn query_one( + &self, + statement: &T, + params: &[&(dyn postgres_types::ToSql + Sync)], + ) -> PSQLPyResult + where + T: ?Sized + ToStatement, + { + Ok(self.connection.query_one(statement, params).await?) + } +} + +// impl Transaction for PoolConnection { +// async fn start( +// &self, +// isolation_level: Option, +// read_variant: Option, +// deferrable: Option, +// ) -> PSQLPyResult<()> { +// let start_qs = self.build_start_qs(isolation_level, read_variant, deferrable); +// self.batch_execute(start_qs.as_str()).await.map_err(|err| { +// RustPSQLDriverError::TransactionBeginError( +// format!("Cannot start transaction due to - {err}").into(), +// ) +// })?; + +// Ok(()) +// } + +// async fn commit(&self) -> PSQLPyResult<()> { +// self.batch_execute("COMMIT;").await.map_err(|err| { +// RustPSQLDriverError::TransactionCommitError(format!( +// "Cannot execute COMMIT statement, error - {err}" +// )) +// })?; +// Ok(()) +// } + +// async fn rollback(&self) -> PSQLPyResult<()> { +// self.batch_execute("ROLLBACK;").await.map_err(|err| { +// RustPSQLDriverError::TransactionRollbackError(format!( +// "Cannot execute ROLLBACK statement, error - {err}" +// )) +// })?; +// Ok(()) +// } +// } + +impl Connection for PSQLPyConnection { + async fn prepare(&self, query: &str, prepared: bool) -> PSQLPyResult { + match self { + PSQLPyConnection::PoolConn(p_conn) => p_conn.prepare(query, prepared).await, + PSQLPyConnection::SingleConnection(s_conn) => s_conn.prepare(query, prepared).await, + } + } + + async fn drop_prepared(&self, stmt: &Statement) -> PSQLPyResult<()> { + match self { + PSQLPyConnection::PoolConn(p_conn) => p_conn.drop_prepared(stmt).await, + PSQLPyConnection::SingleConnection(s_conn) => s_conn.drop_prepared(stmt).await, + } + } + + async fn query( + &self, + statement: &T, + params: &[&(dyn postgres_types::ToSql + Sync)], + ) -> PSQLPyResult> + where + T: ?Sized + ToStatement, + { + match self { + PSQLPyConnection::PoolConn(p_conn) => p_conn.query(statement, params).await, + PSQLPyConnection::SingleConnection(s_conn) => s_conn.query(statement, params).await, + } + } + + async fn query_typed( + &self, + statement: &str, + params: &[(&(dyn postgres_types::ToSql + Sync), postgres_types::Type)], + ) -> PSQLPyResult> { + match self { + PSQLPyConnection::PoolConn(p_conn) => p_conn.query_typed(statement, params).await, + PSQLPyConnection::SingleConnection(s_conn) => { + s_conn.query_typed(statement, params).await + } + } + } + + async fn batch_execute(&self, query: &str) -> PSQLPyResult<()> { + match self { + PSQLPyConnection::PoolConn(p_conn) => p_conn.batch_execute(query).await, + PSQLPyConnection::SingleConnection(s_conn) => s_conn.batch_execute(query).await, + } + } + + async fn query_one( + &self, + statement: &T, + params: &[&(dyn postgres_types::ToSql + Sync)], + ) -> PSQLPyResult + where + T: ?Sized + ToStatement, + { + match self { + PSQLPyConnection::PoolConn(p_conn) => p_conn.query_one(statement, params).await, + PSQLPyConnection::SingleConnection(s_conn) => s_conn.query_one(statement, params).await, + } + } +} + +// impl Transaction for PSQLPyConnection { +// async fn start( +// &self, +// isolation_level: Option, +// read_variant: Option, +// deferrable: Option, +// ) -> PSQLPyResult<()> { +// match self { +// PSQLPyConnection::PoolConn(p_conn) => p_conn.start(isolation_level, read_variant, deferrable).await, +// PSQLPyConnection::SingleConnection(s_conn) => s_conn.start(isolation_level, read_variant, deferrable).await, +// } +// } + +// async fn commit(&self) -> PSQLPyResult<()> { +// self.batch_execute("COMMIT;").await.map_err(|err| { +// RustPSQLDriverError::TransactionCommitError(format!( +// "Cannot execute COMMIT statement, error - {err}" +// )) +// })?; +// Ok(()) +// } + +// async fn rollback(&self) -> PSQLPyResult<()> { +// self.batch_execute("ROLLBACK;").await.map_err(|err| { +// RustPSQLDriverError::TransactionRollbackError(format!( +// "Cannot execute ROLLBACK statement, error - {err}" +// )) +// })?; +// Ok(()) +// } +// } + +impl PSQLPyConnection { + pub async fn execute( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> PSQLPyResult { + let statement = StatementBuilder::new(querystring, parameters, self, prepared) + .build() + .await?; + + let prepared = prepared.unwrap_or(true); + + let result = match prepared { + true => self + .query(statement.statement_query()?, &statement.params()) + .await + .map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot prepare statement, error - {err}" + )) + })?, + false => self + .query_typed(statement.raw_query(), &statement.params_typed()) + .await + .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))?, + }; + + Ok(PSQLDriverPyQueryResult::new(result)) + } + + pub async fn execute_many( + &self, + querystring: String, + parameters: Option>>, + prepared: Option, + ) -> PSQLPyResult<()> { + let mut statements: Vec = vec![]; + if let Some(parameters) = parameters { + for vec_of_py_any in parameters { + // TODO: Fix multiple qs creation + let statement = + StatementBuilder::new(querystring.clone(), Some(vec_of_py_any), self, prepared) + .build() + .await?; + + statements.push(statement); + } + } + + let prepared = prepared.unwrap_or(true); + + for statement in statements { + let querystring_result = if prepared { + let prepared_stmt = &self.prepare(&statement.raw_query(), true).await; + if let Err(error) = prepared_stmt { + return Err(RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot prepare statement in execute_many, operation rolled back {error}", + ))); + } + self.query( + &self.prepare(&statement.raw_query(), true).await?, + &statement.params(), + ) + .await + } else { + self.query(statement.raw_query(), &statement.params()).await + }; + + if let Err(error) = querystring_result { + return Err(RustPSQLDriverError::ConnectionExecuteError(format!( + "Error occured in `execute_many` statement: {error}" + ))); + } + } + + return Ok(()); + } + + pub async fn fetch_row_raw( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> PSQLPyResult { + let statement = StatementBuilder::new(querystring, parameters, self, prepared) + .build() + .await?; + + let prepared = prepared.unwrap_or(true); + + let result = if prepared { + self.query_one( + &self + .prepare(&statement.raw_query(), true) + .await + .map_err(|err| { + RustPSQLDriverError::ConnectionExecuteError(format!( + "Cannot prepare statement, error - {err}" + )) + })?, + &statement.params(), + ) + .await + .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))? + } else { + self.query_one(statement.raw_query(), &statement.params()) + .await + .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))? + }; + + return Ok(result); + } + + pub async fn fetch_row( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> PSQLPyResult { + let result = self + .fetch_row_raw(querystring, parameters, prepared) + .await?; + + return Ok(PSQLDriverSinglePyQueryResult::new(result)); + } + + pub async fn fetch_val( + &self, + querystring: String, + parameters: Option>, + prepared: Option, + ) -> PSQLPyResult> { + let result = self + .fetch_row_raw(querystring, parameters, prepared) + .await?; + + return Python::with_gil(|gil| match result.columns().first() { + Some(first_column) => postgres_to_py(gil, &result, first_column, 0, &None), + None => Ok(gil.None()), + }); + } + + pub async fn copy_in(&self, statement: &T) -> PSQLPyResult> + where + T: ?Sized + ToStatement, + U: Buf + 'static + Send, + { + match self { + PSQLPyConnection::PoolConn(pconn) => { + return Ok(pconn.connection.copy_in(statement).await?) + } + PSQLPyConnection::SingleConnection(sconn) => { + return Ok(sconn.connection.copy_in(statement).await?) + } + } + } +} diff --git a/src/connection/mod.rs b/src/connection/mod.rs new file mode 100644 index 00000000..c8f176fa --- /dev/null +++ b/src/connection/mod.rs @@ -0,0 +1,3 @@ +pub mod impls; +pub mod structs; +pub mod traits; diff --git a/src/connection/structs.rs b/src/connection/structs.rs new file mode 100644 index 00000000..9e713bfd --- /dev/null +++ b/src/connection/structs.rs @@ -0,0 +1,15 @@ +use deadpool_postgres::Object; +use tokio_postgres::Client; + +pub struct PoolConnection { + pub connection: Object, +} + +pub struct SingleConnection { + pub connection: Client, +} + +pub enum PSQLPyConnection { + PoolConn(PoolConnection), + SingleConnection(SingleConnection), +} diff --git a/src/connection/traits.rs b/src/connection/traits.rs new file mode 100644 index 00000000..9428eb70 --- /dev/null +++ b/src/connection/traits.rs @@ -0,0 +1,87 @@ +use postgres_types::{ToSql, Type}; +use tokio_postgres::{Row, Statement, ToStatement}; + +use crate::exceptions::rust_errors::PSQLPyResult; + +use crate::options::{IsolationLevel, ReadVariant}; + +pub trait Connection { + fn prepare( + &self, + query: &str, + prepared: bool, + ) -> impl std::future::Future> + Send; + + fn drop_prepared( + &self, + stmt: &Statement, + ) -> impl std::future::Future> + Send; + + fn query( + &self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + ) -> impl std::future::Future>> + where + T: ?Sized + ToStatement; + + fn query_typed( + &self, + statement: &str, + params: &[(&(dyn ToSql + Sync), Type)], + ) -> impl std::future::Future>>; + + fn batch_execute( + &self, + query: &str, + ) -> impl std::future::Future> + Send; + + fn query_one( + &self, + statement: &T, + params: &[&(dyn ToSql + Sync)], + ) -> impl std::future::Future> + where + T: ?Sized + ToStatement; +} + +pub trait Transaction { + fn build_start_qs( + &self, + isolation_level: Option, + read_variant: Option, + deferrable: Option, + ) -> String { + let mut querystring = "START TRANSACTION".to_string(); + + if let Some(level) = isolation_level { + let level = &level.to_str_level(); + querystring.push_str(format!(" ISOLATION LEVEL {level}").as_str()); + }; + + querystring.push_str(match read_variant { + Some(ReadVariant::ReadOnly) => " READ ONLY", + Some(ReadVariant::ReadWrite) => " READ WRITE", + None => "", + }); + + querystring.push_str(match deferrable { + Some(true) => " DEFERRABLE", + Some(false) => " NOT DEFERRABLE", + None => "", + }); + + querystring + } + + fn start( + &self, + isolation_level: Option, + read_variant: Option, + deferrable: Option, + ) -> impl std::future::Future>; + + fn commit(&self) -> impl std::future::Future>; + + fn rollback(&self) -> impl std::future::Future>; +} diff --git a/src/driver/connection.rs b/src/driver/connection.rs index 2210e303..3b5a4ab5 100644 --- a/src/driver/connection.rs +++ b/src/driver/connection.rs @@ -6,8 +6,13 @@ use std::{collections::HashSet, net::IpAddr, sync::Arc}; use tokio_postgres::{binary_copy::BinaryCopyInWriter, config::Host, Config}; use crate::{ + connection::{ + structs::{PSQLPyConnection, PoolConnection}, + traits::Connection as _, + }, exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}, format_helpers::quote_ident, + options::{IsolationLevel, ReadVariant}, query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, runtime::tokio_runtime, }; @@ -16,9 +21,7 @@ use super::{ common_options::{LoadBalanceHosts, SslMode, TargetSessionAttrs}, connection_pool::connect_pool, cursor::Cursor, - inner_connection::PsqlpyConnection, transaction::Transaction, - transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit}, }; /// Make new connection pool. @@ -118,30 +121,27 @@ pub async fn connect( #[pyclass(subclass)] #[derive(Clone)] pub struct Connection { - db_client: Option>, + db_client: Option>, db_pool: Option, pg_config: Arc, - prepare: bool, } impl Connection { #[must_use] pub fn new( - db_client: Option>, + db_client: Option>, db_pool: Option, pg_config: Arc, - prepare: bool, ) -> Self { Connection { db_client, db_pool, pg_config, - prepare, } } #[must_use] - pub fn db_client(&self) -> Option> { + pub fn db_client(&self) -> Option> { self.db_client.clone() } @@ -153,7 +153,7 @@ impl Connection { impl Default for Connection { fn default() -> Self { - Connection::new(None, None, Arc::new(Config::default()), true) + Connection::new(None, None, Arc::new(Config::default())) } } @@ -237,13 +237,9 @@ impl Connection { } async fn __aenter__<'a>(self_: Py) -> PSQLPyResult> { - let (db_client, db_pool, prepare) = pyo3::Python::with_gil(|gil| { + let (db_client, db_pool) = pyo3::Python::with_gil(|gil| { let self_ = self_.borrow(gil); - ( - self_.db_client.clone(), - self_.db_pool.clone(), - self_.prepare, - ) + (self_.db_client.clone(), self_.db_pool.clone()) }); if db_client.is_some() { @@ -258,8 +254,9 @@ impl Connection { .await??; pyo3::Python::with_gil(|gil| { let mut self_ = self_.borrow_mut(gil); - self_.db_client = - Some(Arc::new(PsqlpyConnection::PoolConn(db_connection, prepare))); + self_.db_client = Some(Arc::new(PSQLPyConnection::PoolConn(PoolConnection { + connection: db_connection, + }))); }); return Ok(self_); } @@ -459,14 +456,12 @@ impl Connection { isolation_level=None, read_variant=None, deferrable=None, - synchronous_commit=None, ))] pub fn transaction( &self, isolation_level: Option, read_variant: Option, deferrable: Option, - synchronous_commit: Option, ) -> PSQLPyResult { if let Some(db_client) = &self.db_client { return Ok(Transaction::new( @@ -475,7 +470,6 @@ impl Connection { false, false, isolation_level, - synchronous_commit, read_variant, deferrable, HashSet::new(), diff --git a/src/driver/connection_pool.rs b/src/driver/connection_pool.rs index a764cea3..c66be3da 100644 --- a/src/driver/connection_pool.rs +++ b/src/driver/connection_pool.rs @@ -1,4 +1,7 @@ -use crate::runtime::tokio_runtime; +use crate::{ + connection::structs::{PSQLPyConnection, PoolConnection}, + runtime::tokio_runtime, +}; use deadpool_postgres::{Manager, ManagerConfig, Pool, RecyclingMethod}; use postgres_types::Type; use pyo3::{pyclass, pyfunction, pymethods, Py, PyAny}; @@ -10,7 +13,6 @@ use crate::exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}; use super::{ common_options::{ConnRecyclingMethod, LoadBalanceHosts, SslMode, TargetSessionAttrs}, connection::Connection, - inner_connection::PsqlpyConnection, listener::core::Listener, utils::{build_connection_config, build_manager, build_tls}, }; @@ -243,13 +245,11 @@ impl ConnectionPool { let connection = self.pool.get().await?; Ok(Connection::new( - Some(Arc::new(PsqlpyConnection::PoolConn( + Some(Arc::new(PSQLPyConnection::PoolConn(PoolConnection { connection, - self.pool_conf.prepare, - ))), + }))), None, self.pg_config.clone(), - self.pool_conf.prepare, )) } @@ -392,12 +392,7 @@ impl ConnectionPool { #[must_use] pub fn acquire(&self) -> Connection { - Connection::new( - None, - Some(self.pool.clone()), - self.pg_config.clone(), - self.pool_conf.prepare, - ) + Connection::new(None, Some(self.pool.clone()), self.pg_config.clone()) } #[must_use] @@ -408,12 +403,7 @@ impl ConnectionPool { (b_gil.pg_config.clone(), b_gil.pool_conf.clone()) }); - Listener::new( - pg_config, - pool_conf.ca_file, - pool_conf.ssl_mode, - pool_conf.prepare, - ) + Listener::new(pg_config, pool_conf.ca_file, pool_conf.ssl_mode) } /// Return new single connection. @@ -421,28 +411,22 @@ impl ConnectionPool { /// # Errors /// May return Err Result if cannot get new connection from the pool. pub async fn connection(self_: pyo3::Py) -> PSQLPyResult { - let (db_pool, pg_config, pool_conf) = pyo3::Python::with_gil(|gil| { + let (db_pool, pg_config) = pyo3::Python::with_gil(|gil| { let slf = self_.borrow(gil); - ( - slf.pool.clone(), - slf.pg_config.clone(), - slf.pool_conf.clone(), - ) + (slf.pool.clone(), slf.pg_config.clone()) }); - let db_connection = tokio_runtime() + let connection = tokio_runtime() .spawn(async move { Ok::(db_pool.get().await?) }) .await??; Ok(Connection::new( - Some(Arc::new(PsqlpyConnection::PoolConn( - db_connection, - pool_conf.prepare, - ))), + Some(Arc::new(PSQLPyConnection::PoolConn(PoolConnection { + connection, + }))), None, pg_config, - pool_conf.prepare, )) } diff --git a/src/driver/cursor.rs b/src/driver/cursor.rs index 54aee852..7229c6ee 100644 --- a/src/driver/cursor.rs +++ b/src/driver/cursor.rs @@ -6,13 +6,12 @@ use pyo3::{ use tokio_postgres::{config::Host, Config}; use crate::{ + connection::structs::PSQLPyConnection, exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}, query_result::PSQLDriverPyQueryResult, runtime::rustdriver_future, }; -use super::inner_connection::PsqlpyConnection; - /// Additional implementation for the `Object` type. #[allow(clippy::ref_option)] trait CursorObjectTrait { @@ -28,7 +27,7 @@ trait CursorObjectTrait { async fn cursor_close(&self, closed: &bool, cursor_name: &str) -> PSQLPyResult<()>; } -impl CursorObjectTrait for PsqlpyConnection { +impl CursorObjectTrait for PSQLPyConnection { /// Start the cursor. /// /// Execute `DECLARE` command with parameters. @@ -90,7 +89,7 @@ impl CursorObjectTrait for PsqlpyConnection { #[pyclass(subclass)] pub struct Cursor { - db_transaction: Option>, + db_transaction: Option>, pg_config: Arc, querystring: String, parameters: Option>, @@ -105,7 +104,7 @@ pub struct Cursor { impl Cursor { #[must_use] pub fn new( - db_transaction: Arc, + db_transaction: Arc, pg_config: Arc, querystring: String, parameters: Option>, diff --git a/src/driver/inner_connection.rs b/src/driver/inner_connection.rs deleted file mode 100644 index bb591de7..00000000 --- a/src/driver/inner_connection.rs +++ /dev/null @@ -1,347 +0,0 @@ -use bytes::Buf; -use deadpool_postgres::{Object, Transaction}; -use postgres_types::{ToSql, Type}; -use pyo3::{pyclass, Py, PyAny, Python}; -use std::vec; -use tokio_postgres::{Client, CopyInSink, Row, Statement, ToStatement}; - -use crate::{ - exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}, - query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, - statement::{statement::PsqlpyStatement, statement_builder::StatementBuilder}, - value_converter::to_python::postgres_to_py, -}; - -#[allow(clippy::module_name_repetitions)] -pub enum PsqlpyConnection { - PoolConn(Object, bool), - SingleConn(Client), -} - -// #[pyclass] -// struct Portal { -// trans: Transaction<'static>, -// } - -impl PsqlpyConnection { - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot prepare statement. - pub async fn prepare(&self, query: &str, prepared: bool) -> PSQLPyResult { - match self { - PsqlpyConnection::PoolConn(pconn, _) => { - if prepared { - return Ok(pconn.prepare_cached(query).await?); - } else { - let prepared = pconn.prepare(query).await?; - self.drop_prepared(&prepared).await?; - return Ok(prepared); - } - } - PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.prepare(query).await?), - } - } - - // pub async fn transaction(&mut self) -> Portal { - // match self { - // PsqlpyConnection::PoolConn(pconn, _) => { - // let b = unsafe { - // std::mem::transmute::, Transaction<'static>>(pconn.transaction().await.unwrap()) - // }; - // Portal {trans: b} - // // let c = b.bind("SELECT 1", &[]).await.unwrap(); - // // b.query_portal(&c, 1).await; - // } - // PsqlpyConnection::SingleConn(sconn) => { - // let b = unsafe { - // std::mem::transmute::, Transaction<'static>>(sconn.transaction().await.unwrap()) - // }; - // Portal {trans: b} - // }, - // } - // } - - /// Delete prepared statement. - /// - /// # Errors - /// May return Err if cannot prepare statement. - pub async fn drop_prepared(&self, stmt: &Statement) -> PSQLPyResult<()> { - let deallocate_query = format!("DEALLOCATE PREPARE {}", stmt.name()); - match self { - PsqlpyConnection::PoolConn(pconn, _) => { - let res = Ok(pconn.batch_execute(&deallocate_query).await?); - res - } - PsqlpyConnection::SingleConn(sconn) => { - return Ok(sconn.batch_execute(&deallocate_query).await?) - } - } - } - - /// Execute statement with parameters. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn query( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> PSQLPyResult> - where - T: ?Sized + ToStatement, - { - match self { - PsqlpyConnection::PoolConn(pconn, _) => { - return Ok(pconn.query(statement, params).await?) - } - PsqlpyConnection::SingleConn(sconn) => { - return Ok(sconn.query(statement, params).await?) - } - } - } - - /// Execute statement with parameters. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn query_typed( - &self, - statement: &str, - params: &[(&(dyn ToSql + Sync), Type)], - ) -> PSQLPyResult> { - match self { - PsqlpyConnection::PoolConn(pconn, _) => { - return Ok(pconn.query_typed(statement, params).await?) - } - PsqlpyConnection::SingleConn(sconn) => { - return Ok(sconn.query_typed(statement, params).await?) - } - } - } - - /// Batch execute statement. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn batch_execute(&self, query: &str) -> PSQLPyResult<()> { - match self { - PsqlpyConnection::PoolConn(pconn, _) => return Ok(pconn.batch_execute(query).await?), - PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.batch_execute(query).await?), - } - } - - /// Prepare cached statement. - /// - /// # Errors - /// May return Err if cannot execute copy data. - pub async fn copy_in(&self, statement: &T) -> PSQLPyResult> - where - T: ?Sized + ToStatement, - U: Buf + 'static + Send, - { - match self { - PsqlpyConnection::PoolConn(pconn, _) => return Ok(pconn.copy_in(statement).await?), - PsqlpyConnection::SingleConn(sconn) => return Ok(sconn.copy_in(statement).await?), - } - } - - /// Executes a statement which returns a single row, returning it. - /// - /// # Errors - /// May return Err if cannot execute statement. - pub async fn query_one( - &self, - statement: &T, - params: &[&(dyn ToSql + Sync)], - ) -> PSQLPyResult - where - T: ?Sized + ToStatement, - { - match self { - PsqlpyConnection::PoolConn(pconn, _) => { - return Ok(pconn.query_one(statement, params).await?) - } - PsqlpyConnection::SingleConn(sconn) => { - return Ok(sconn.query_one(statement, params).await?) - } - } - } - - pub async fn cursor_execute( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> PSQLPyResult { - let statement = StatementBuilder::new(querystring, parameters, self, prepared) - .build() - .await?; - - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - self.query( - &self - .prepare(&statement.raw_query(), true) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - &statement.params(), - ) - .await - .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))? - } else { - self.query(statement.raw_query(), &statement.params()) - .await - .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))? - }; - - Ok(PSQLDriverPyQueryResult::new(result)) - } - - pub async fn execute( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> PSQLPyResult { - let statement = StatementBuilder::new(querystring, parameters, self, prepared) - .build() - .await?; - - let prepared = prepared.unwrap_or(true); - - let result = match prepared { - true => self - .query(statement.statement_query()?, &statement.params()) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - false => self - .query_typed(statement.raw_query(), &statement.params_typed()) - .await - .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))?, - }; - - Ok(PSQLDriverPyQueryResult::new(result)) - } - - pub async fn execute_many( - &self, - querystring: String, - parameters: Option>>, - prepared: Option, - ) -> PSQLPyResult<()> { - let mut statements: Vec = vec![]; - if let Some(parameters) = parameters { - for vec_of_py_any in parameters { - // TODO: Fix multiple qs creation - let statement = - StatementBuilder::new(querystring.clone(), Some(vec_of_py_any), self, prepared) - .build() - .await?; - - statements.push(statement); - } - } - - let prepared = prepared.unwrap_or(true); - - for statement in statements { - let querystring_result = if prepared { - let prepared_stmt = &self.prepare(&statement.raw_query(), true).await; - if let Err(error) = prepared_stmt { - return Err(RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement in execute_many, operation rolled back {error}", - ))); - } - self.query( - &self.prepare(&statement.raw_query(), true).await?, - &statement.params(), - ) - .await - } else { - self.query(statement.raw_query(), &statement.params()).await - }; - - if let Err(error) = querystring_result { - return Err(RustPSQLDriverError::ConnectionExecuteError(format!( - "Error occured in `execute_many` statement: {error}" - ))); - } - } - - return Ok(()); - } - - pub async fn fetch_row_raw( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> PSQLPyResult { - let statement = StatementBuilder::new(querystring, parameters, self, prepared) - .build() - .await?; - - let prepared = prepared.unwrap_or(true); - - let result = if prepared { - self.query_one( - &self - .prepare(&statement.raw_query(), true) - .await - .map_err(|err| { - RustPSQLDriverError::ConnectionExecuteError(format!( - "Cannot prepare statement, error - {err}" - )) - })?, - &statement.params(), - ) - .await - .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))? - } else { - self.query_one(statement.raw_query(), &statement.params()) - .await - .map_err(|err| RustPSQLDriverError::ConnectionExecuteError(format!("{err}")))? - }; - - return Ok(result); - } - - pub async fn fetch_row( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> PSQLPyResult { - let result = self - .fetch_row_raw(querystring, parameters, prepared) - .await?; - - return Ok(PSQLDriverSinglePyQueryResult::new(result)); - } - - pub async fn fetch_val( - &self, - querystring: String, - parameters: Option>, - prepared: Option, - ) -> PSQLPyResult> { - let result = self - .fetch_row_raw(querystring, parameters, prepared) - .await?; - - return Python::with_gil(|gil| match result.columns().first() { - Some(first_column) => postgres_to_py(gil, &result, first_column, 0, &None), - None => Ok(gil.None()), - }); - } -} diff --git a/src/driver/inner_transaction.rs b/src/driver/inner_transaction.rs deleted file mode 100644 index a23f0536..00000000 --- a/src/driver/inner_transaction.rs +++ /dev/null @@ -1,94 +0,0 @@ -use deadpool_postgres::Transaction as dp_Transaction; -use postgres_types::ToSql; -use tokio_postgres::{Portal, Row, ToStatement, Transaction as tp_Transaction}; - -use crate::exceptions::rust_errors::PSQLPyResult; - -pub enum PsqlpyTransaction { - PoolTrans(dp_Transaction<'static>), - SingleConnTrans(tp_Transaction<'static>), -} - -impl PsqlpyTransaction { - async fn commit(self) -> PSQLPyResult<()> { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.commit().await?), - PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.commit().await?), - } - } - - async fn rollback(self) -> PSQLPyResult<()> { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.rollback().await?), - PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.rollback().await?), - } - } - - async fn savepoint(&mut self, sp_name: &str) -> PSQLPyResult<()> { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => { - p_txid.savepoint(sp_name).await?; - Ok(()) - } - PsqlpyTransaction::SingleConnTrans(s_txid) => { - s_txid.savepoint(sp_name).await?; - Ok(()) - } - } - } - - async fn release_savepoint(&self, sp_name: &str) -> PSQLPyResult<()> { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => { - p_txid - .batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str()) - .await?; - Ok(()) - } - PsqlpyTransaction::SingleConnTrans(s_txid) => { - s_txid - .batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str()) - .await?; - Ok(()) - } - } - } - - async fn rollback_savepoint(&self, sp_name: &str) -> PSQLPyResult<()> { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => { - p_txid - .batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str()) - .await?; - Ok(()) - } - PsqlpyTransaction::SingleConnTrans(s_txid) => { - s_txid - .batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str()) - .await?; - Ok(()) - } - } - } - - async fn bind(&self, statement: &T, params: &[&(dyn ToSql + Sync)]) -> PSQLPyResult - where - T: ?Sized + ToStatement, - { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.bind(statement, params).await?), - PsqlpyTransaction::SingleConnTrans(s_txid) => { - Ok(s_txid.bind(statement, params).await?) - } - } - } - - pub async fn query_portal(&self, portal: &Portal, size: i32) -> PSQLPyResult> { - match self { - PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.query_portal(portal, size).await?), - PsqlpyTransaction::SingleConnTrans(s_txid) => { - Ok(s_txid.query_portal(portal, size).await?) - } - } - } -} diff --git a/src/driver/listener/core.rs b/src/driver/listener/core.rs index 4a9580af..7837478f 100644 --- a/src/driver/listener/core.rs +++ b/src/driver/listener/core.rs @@ -12,10 +12,13 @@ use tokio::{ use tokio_postgres::{AsyncMessage, Config}; use crate::{ + connection::{ + structs::{PSQLPyConnection, SingleConnection}, + traits::Connection as _, + }, driver::{ common_options::SslMode, connection::Connection, - inner_connection::PsqlpyConnection, utils::{build_tls, is_coroutine_function, ConfiguredTLS}, }, exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}, @@ -42,19 +45,14 @@ pub struct Listener { impl Listener { #[must_use] - pub fn new( - pg_config: Arc, - ca_file: Option, - ssl_mode: Option, - prepare: bool, - ) -> Self { + pub fn new(pg_config: Arc, ca_file: Option, ssl_mode: Option) -> Self { Listener { pg_config: pg_config.clone(), ca_file, ssl_mode, channel_callbacks: Arc::default(), listen_abort_handler: Option::default(), - connection: Connection::new(None, None, pg_config.clone(), prepare), + connection: Connection::new(None, None, pg_config.clone()), receiver: Option::default(), listen_query: Arc::default(), is_listened: Arc::new(RwLock::new(false)), @@ -224,10 +222,11 @@ impl Listener { self.receiver = Some(Arc::new(RwLock::new(receiver))); self.connection = Connection::new( - Some(Arc::new(PsqlpyConnection::SingleConn(client))), + Some(Arc::new(PSQLPyConnection::SingleConnection( + SingleConnection { connection: client }, + ))), None, self.pg_config.clone(), - false, ); self.is_started = true; @@ -356,7 +355,7 @@ async fn dispatch_callback( async fn execute_listen( is_listened: &Arc>, listen_query: &Arc>, - client: &Arc, + client: &Arc, ) -> PSQLPyResult<()> { let mut write_is_listened = is_listened.write().await; diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 416bfa97..1cff9f57 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -3,10 +3,7 @@ pub mod connection; pub mod connection_pool; pub mod connection_pool_builder; pub mod cursor; -pub mod inner_connection; -pub mod inner_transaction; pub mod listener; pub mod portal; pub mod transaction; -pub mod transaction_options; pub mod utils; diff --git a/src/driver/portal.rs b/src/driver/portal.rs index d90138b0..0c280637 100644 --- a/src/driver/portal.rs +++ b/src/driver/portal.rs @@ -1,52 +1,87 @@ -use std::sync::Arc; - -use pyo3::{pyclass, pymethods}; -use tokio_postgres::Portal as tp_Portal; - -use crate::{exceptions::rust_errors::PSQLPyResult, query_result::PSQLDriverPyQueryResult}; - -use super::inner_transaction::PsqlpyTransaction; - -#[pyclass] -struct Portal { - transaction: Arc, - inner: tp_Portal, - array_size: i32, -} - -impl Portal { - async fn query_portal(&self, size: i32) -> PSQLPyResult { - let result = self.transaction.query_portal(&self.inner, size).await?; - Ok(PSQLDriverPyQueryResult::new(result)) - } -} - -#[pymethods] -impl Portal { - #[getter] - fn get_array_size(&self) -> i32 { - self.array_size - } - - #[setter] - fn set_array_size(&mut self, value: i32) { - self.array_size = value; - } - - async fn fetch_one(&self) -> PSQLPyResult { - self.query_portal(1).await - } - - #[pyo3(signature = (size=None))] - async fn fetch_many(&self, size: Option) -> PSQLPyResult { - self.query_portal(size.unwrap_or(self.array_size)).await - } - - async fn fetch_all(&self) -> PSQLPyResult { - self.query_portal(-1).await - } - - async fn close(&mut self) { - let _ = Arc::downgrade(&self.transaction); - } -} +// use std::sync::Arc; + +// use pyo3::{pyclass, pymethods, Py, PyObject, Python}; +// use tokio_postgres::Portal as tp_Portal; + +// use crate::{ +// exceptions::rust_errors::PSQLPyResult, query_result::PSQLDriverPyQueryResult, +// runtime::rustdriver_future, +// }; + +// use super::inner_transaction::PsqlpyTransaction; + +// #[pyclass] +// pub struct Portal { +// transaction: Arc, +// inner: tp_Portal, +// array_size: i32, +// } + +// impl Portal { +// pub fn new(transaction: Arc, inner: tp_Portal, array_size: i32) -> Self { +// Self { +// transaction, +// inner, +// array_size, +// } +// } + +// async fn query_portal(&self, size: i32) -> PSQLPyResult { +// let result = self.transaction.query_portal(&self.inner, size).await?; +// Ok(PSQLDriverPyQueryResult::new(result)) +// } +// } + +// #[pymethods] +// impl Portal { +// #[getter] +// fn get_array_size(&self) -> i32 { +// self.array_size +// } + +// #[setter] +// fn set_array_size(&mut self, value: i32) { +// self.array_size = value; +// } + +// fn __aiter__(slf: Py) -> Py { +// slf +// } + +// fn __await__(slf: Py) -> Py { +// slf +// } + +// fn __anext__(&self) -> PSQLPyResult> { +// let transaction = self.transaction.clone(); +// let portal = self.inner.clone(); +// let size = self.array_size.clone(); + +// let py_future = Python::with_gil(move |gil| { +// rustdriver_future(gil, async move { +// let result = transaction.query_portal(&portal, size).await?; + +// Ok(PSQLDriverPyQueryResult::new(result)) +// }) +// }); + +// Ok(Some(py_future?)) +// } + +// async fn fetch_one(&self) -> PSQLPyResult { +// self.query_portal(1).await +// } + +// #[pyo3(signature = (size=None))] +// async fn fetch_many(&self, size: Option) -> PSQLPyResult { +// self.query_portal(size.unwrap_or(self.array_size)).await +// } + +// async fn fetch_all(&self) -> PSQLPyResult { +// self.query_portal(-1).await +// } + +// async fn close(&mut self) { +// let _ = Arc::downgrade(&self.transaction); +// } +// } diff --git a/src/driver/transaction.rs b/src/driver/transaction.rs index 60f054b7..50bbfb74 100644 --- a/src/driver/transaction.rs +++ b/src/driver/transaction.rs @@ -9,108 +9,27 @@ use pyo3::{ use tokio_postgres::{binary_copy::BinaryCopyInWriter, config::Host, Config}; use crate::{ + connection::{ + structs::PSQLPyConnection, + traits::{Connection as _, Transaction as _}, + }, exceptions::rust_errors::{PSQLPyResult, RustPSQLDriverError}, format_helpers::quote_ident, + options::{IsolationLevel, ReadVariant}, query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, }; -use super::{ - cursor::Cursor, - inner_connection::PsqlpyConnection, - transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit}, -}; +use super::cursor::Cursor; use std::{collections::HashSet, net::IpAddr, sync::Arc}; -#[allow(clippy::module_name_repetitions)] -pub trait TransactionObjectTrait { - fn start_transaction( - &self, - isolation_level: Option, - read_variant: Option, - defferable: Option, - synchronous_commit: Option, - ) -> impl std::future::Future> + Send; - fn commit(&self) -> impl std::future::Future> + Send; - fn rollback(&self) -> impl std::future::Future> + Send; -} - -impl TransactionObjectTrait for PsqlpyConnection { - async fn start_transaction( - &self, - isolation_level: Option, - read_variant: Option, - deferrable: Option, - synchronous_commit: Option, - ) -> PSQLPyResult<()> { - let mut querystring = "START TRANSACTION".to_string(); - - if let Some(level) = isolation_level { - let level = &level.to_str_level(); - querystring.push_str(format!(" ISOLATION LEVEL {level}").as_str()); - }; - - querystring.push_str(match read_variant { - Some(ReadVariant::ReadOnly) => " READ ONLY", - Some(ReadVariant::ReadWrite) => " READ WRITE", - None => "", - }); - - querystring.push_str(match deferrable { - Some(true) => " DEFERRABLE", - Some(false) => " NOT DEFERRABLE", - None => "", - }); - - self.batch_execute(&querystring).await.map_err(|err| { - RustPSQLDriverError::TransactionBeginError(format!( - "Cannot execute statement to start transaction, err - {err}" - )) - })?; - - if let Some(synchronous_commit) = synchronous_commit { - let str_synchronous_commit = synchronous_commit.to_str_level(); - - let synchronous_commit_query = - format!("SET LOCAL synchronous_commit = '{str_synchronous_commit}'"); - - self.batch_execute(&synchronous_commit_query) - .await - .map_err(|err| { - RustPSQLDriverError::TransactionBeginError(format!( - "Cannot set synchronous_commit parameter, err - {err}" - )) - })?; - } - - Ok(()) - } - async fn commit(&self) -> PSQLPyResult<()> { - self.batch_execute("COMMIT;").await.map_err(|err| { - RustPSQLDriverError::TransactionCommitError(format!( - "Cannot execute COMMIT statement, error - {err}" - )) - })?; - Ok(()) - } - async fn rollback(&self) -> PSQLPyResult<()> { - self.batch_execute("ROLLBACK;").await.map_err(|err| { - RustPSQLDriverError::TransactionRollbackError(format!( - "Cannot execute ROLLBACK statement, error - {err}" - )) - })?; - Ok(()) - } -} - #[pyclass(subclass)] pub struct Transaction { - pub db_client: Option>, + pub db_client: Option>, pg_config: Arc, is_started: bool, is_done: bool, isolation_level: Option, - synchronous_commit: Option, read_variant: Option, deferrable: Option, @@ -121,12 +40,11 @@ impl Transaction { #[allow(clippy::too_many_arguments)] #[must_use] pub fn new( - db_client: Arc, + db_client: Arc, pg_config: Arc, is_started: bool, is_done: bool, isolation_level: Option, - synchronous_commit: Option, read_variant: Option, deferrable: Option, savepoints_map: HashSet, @@ -137,7 +55,6 @@ impl Transaction { is_started, is_done, isolation_level, - synchronous_commit, read_variant, deferrable, savepoints_map, @@ -243,26 +160,18 @@ impl Transaction { } async fn __aenter__<'a>(self_: Py) -> PSQLPyResult> { - let ( - is_started, - is_done, - isolation_level, - synchronous_commit, - read_variant, - deferrable, - db_client, - ) = pyo3::Python::with_gil(|gil| { - let self_ = self_.borrow(gil); - ( - self_.is_started, - self_.is_done, - self_.isolation_level, - self_.synchronous_commit, - self_.read_variant, - self_.deferrable, - self_.db_client.clone(), - ) - }); + let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) = + pyo3::Python::with_gil(|gil| { + let self_ = self_.borrow(gil); + ( + self_.is_started, + self_.is_done, + self_.isolation_level, + self_.read_variant, + self_.deferrable, + self_.db_client.clone(), + ) + }); if is_started { return Err(RustPSQLDriverError::TransactionBeginError( @@ -278,12 +187,7 @@ impl Transaction { if let Some(db_client) = db_client { db_client - .start_transaction( - isolation_level, - read_variant, - deferrable, - synchronous_commit, - ) + .start(isolation_level, read_variant, deferrable) .await?; Python::with_gil(|gil| { @@ -565,26 +469,18 @@ impl Transaction { /// 2) Transaction is done. /// 3) Cannot execute `BEGIN` command. pub async fn begin(self_: Py) -> PSQLPyResult<()> { - let ( - is_started, - is_done, - isolation_level, - synchronous_commit, - read_variant, - deferrable, - db_client, - ) = pyo3::Python::with_gil(|gil| { - let self_ = self_.borrow(gil); - ( - self_.is_started, - self_.is_done, - self_.isolation_level, - self_.synchronous_commit, - self_.read_variant, - self_.deferrable, - self_.db_client.clone(), - ) - }); + let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) = + pyo3::Python::with_gil(|gil| { + let self_ = self_.borrow(gil); + ( + self_.is_started, + self_.is_done, + self_.isolation_level, + self_.read_variant, + self_.deferrable, + self_.db_client.clone(), + ) + }); if let Some(db_client) = db_client { if is_started { @@ -599,12 +495,7 @@ impl Transaction { )); } db_client - .start_transaction( - isolation_level, - read_variant, - deferrable, - synchronous_commit, - ) + .start(isolation_level, read_variant, deferrable) .await?; pyo3::Python::with_gil(|gil| { diff --git a/src/driver/transaction_options.rs b/src/driver/transaction_options.rs deleted file mode 100644 index 281b9a71..00000000 --- a/src/driver/transaction_options.rs +++ /dev/null @@ -1,81 +0,0 @@ -use pyo3::pyclass; - -#[pyclass(eq, eq_int)] -#[derive(Clone, Copy, PartialEq)] -pub enum IsolationLevel { - ReadUncommitted, - ReadCommitted, - RepeatableRead, - Serializable, -} - -impl IsolationLevel { - /// Return isolation level as String literal. - #[must_use] - pub fn to_str_level(&self) -> String { - match self { - IsolationLevel::ReadUncommitted => "READ UNCOMMITTED".into(), - IsolationLevel::ReadCommitted => "READ COMMITTED".into(), - IsolationLevel::RepeatableRead => "REPEATABLE READ".into(), - IsolationLevel::Serializable => "SERIALIZABLE".into(), - } - } -} - -#[pyclass(eq, eq_int)] -#[derive(Clone, Copy, PartialEq)] -pub enum ReadVariant { - ReadOnly, - ReadWrite, -} - -#[pyclass(eq, eq_int)] -#[derive(Clone, Copy, PartialEq)] -pub enum SynchronousCommit { - /// As the name indicates, the commit acknowledgment can come before - /// flushing the records to disk. - /// This is generally called as an asynchronous commit. - /// If the PostgreSQL instance crashes, - /// the last few asynchronous commits might be lost. - Off, - /// WAL records are written and flushed to local disks. - /// In this case, the commit will be acknowledged after the - /// local WAL Write and WAL flush completes. - Local, - /// WAL records are successfully handed over to - /// remote instances which acknowledged back - /// about the write (not flush). - RemoteWrite, - /// The meaning may change based on whether you have - /// a synchronous standby or not. - /// If there is a synchronous standby, - /// setting the value to on will result in waiting till “remote flush”. - On, - /// This will result in commits waiting until replies from the - /// current synchronous standby(s) indicate they have received - /// the commit record of the transaction and applied it so - /// that it has become visible to queries on the standby(s). - RemoteApply, -} - -impl SynchronousCommit { - /// Return isolation level as String literal. - #[must_use] - pub fn to_str_level(&self) -> String { - match self { - SynchronousCommit::Off => "off".into(), - SynchronousCommit::Local => "local".into(), - SynchronousCommit::RemoteWrite => "remote_write".into(), - SynchronousCommit::On => "on".into(), - SynchronousCommit::RemoteApply => "remote_apply".into(), - } - } -} - -#[derive(Clone, Copy, PartialEq)] -pub struct ListenerTransactionConfig { - isolation_level: Option, - read_variant: Option, - deferrable: Option, - synchronous_commit: Option, -} diff --git a/src/lib.rs b/src/lib.rs index d6ae473a..33ec678c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,10 @@ pub mod common; +pub mod connection; pub mod driver; pub mod exceptions; pub mod extra_types; pub mod format_helpers; +pub mod options; pub mod query_result; pub mod row_factories; pub mod runtime; @@ -35,9 +37,8 @@ fn psqlpy(py: Python<'_>, pymod: &Bound<'_, PyModule>) -> PyResult<()> { pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; - pymod.add_class::()?; - pymod.add_class::()?; - pymod.add_class::()?; + pymod.add_class::()?; + pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; pymod.add_class::()?; diff --git a/src/options.rs b/src/options.rs new file mode 100644 index 00000000..bd8ad511 --- /dev/null +++ b/src/options.rs @@ -0,0 +1,221 @@ +use std::time::Duration; + +use deadpool_postgres::RecyclingMethod; +use pyo3::{pyclass, pymethods}; + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum ConnRecyclingMethod { + Fast, + Verified, + Clean, +} + +impl ConnRecyclingMethod { + #[must_use] + pub fn to_internal(&self) -> RecyclingMethod { + match self { + ConnRecyclingMethod::Fast => RecyclingMethod::Fast, + ConnRecyclingMethod::Verified => RecyclingMethod::Verified, + ConnRecyclingMethod::Clean => RecyclingMethod::Clean, + } + } +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum LoadBalanceHosts { + /// Make connection attempts to hosts in the order provided. + Disable, + /// Make connection attempts to hosts in a random order. + Random, +} + +impl LoadBalanceHosts { + #[must_use] + pub fn to_internal(&self) -> tokio_postgres::config::LoadBalanceHosts { + match self { + LoadBalanceHosts::Disable => tokio_postgres::config::LoadBalanceHosts::Disable, + LoadBalanceHosts::Random => tokio_postgres::config::LoadBalanceHosts::Random, + } + } +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum TargetSessionAttrs { + /// No special properties are required. + Any, + /// The session must allow writes. + ReadWrite, + /// The session allow only reads. + ReadOnly, +} + +impl TargetSessionAttrs { + #[must_use] + pub fn to_internal(&self) -> tokio_postgres::config::TargetSessionAttrs { + match self { + TargetSessionAttrs::Any => tokio_postgres::config::TargetSessionAttrs::Any, + TargetSessionAttrs::ReadWrite => tokio_postgres::config::TargetSessionAttrs::ReadWrite, + TargetSessionAttrs::ReadOnly => tokio_postgres::config::TargetSessionAttrs::ReadOnly, + } + } +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq, Debug)] +pub enum SslMode { + /// Do not use TLS. + Disable, + /// Pay the overhead of encryption if the server insists on it. + Allow, + /// Attempt to connect with TLS but allow sessions without. + Prefer, + /// Require the use of TLS. + Require, + /// I want my data encrypted, + /// and I accept the overhead. + /// I want to be sure that I connect to a server that I trust. + VerifyCa, + /// I want my data encrypted, + /// and I accept the overhead. + /// I want to be sure that I connect to a server I trust, + /// and that it's the one I specify. + VerifyFull, +} + +impl SslMode { + #[must_use] + pub fn to_internal(&self) -> tokio_postgres::config::SslMode { + match self { + SslMode::Disable => tokio_postgres::config::SslMode::Disable, + SslMode::Allow => tokio_postgres::config::SslMode::Allow, + SslMode::Prefer => tokio_postgres::config::SslMode::Prefer, + SslMode::Require => tokio_postgres::config::SslMode::Require, + SslMode::VerifyCa => tokio_postgres::config::SslMode::VerifyCa, + SslMode::VerifyFull => tokio_postgres::config::SslMode::VerifyFull, + } + } +} + +#[pyclass] +#[derive(Clone, Copy)] +pub struct KeepaliveConfig { + pub idle: Duration, + pub interval: Option, + pub retries: Option, +} + +#[pymethods] +impl KeepaliveConfig { + #[new] + #[pyo3(signature = (idle, interval=None, retries=None))] + fn build_config(idle: u64, interval: Option, retries: Option) -> Self { + let interval_internal = interval.map(Duration::from_secs); + KeepaliveConfig { + idle: Duration::from_secs(idle), + interval: interval_internal, + retries, + } + } +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum CopyCommandFormat { + TEXT, + CSV, + BINARY, +} + +impl CopyCommandFormat { + #[must_use] + pub fn to_internal(&self) -> String { + match self { + CopyCommandFormat::TEXT => "text".into(), + CopyCommandFormat::CSV => "csv".into(), + CopyCommandFormat::BINARY => "binary".into(), + } + } +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum IsolationLevel { + ReadUncommitted, + ReadCommitted, + RepeatableRead, + Serializable, +} + +impl IsolationLevel { + /// Return isolation level as String literal. + #[must_use] + pub fn to_str_level(&self) -> String { + match self { + IsolationLevel::ReadUncommitted => "READ UNCOMMITTED".into(), + IsolationLevel::ReadCommitted => "READ COMMITTED".into(), + IsolationLevel::RepeatableRead => "REPEATABLE READ".into(), + IsolationLevel::Serializable => "SERIALIZABLE".into(), + } + } +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum ReadVariant { + ReadOnly, + ReadWrite, +} + +#[pyclass(eq, eq_int)] +#[derive(Clone, Copy, PartialEq)] +pub enum SynchronousCommit { + /// As the name indicates, the commit acknowledgment can come before + /// flushing the records to disk. + /// This is generally called as an asynchronous commit. + /// If the PostgreSQL instance crashes, + /// the last few asynchronous commits might be lost. + Off, + /// WAL records are written and flushed to local disks. + /// In this case, the commit will be acknowledged after the + /// local WAL Write and WAL flush completes. + Local, + /// WAL records are successfully handed over to + /// remote instances which acknowledged back + /// about the write (not flush). + RemoteWrite, + /// The meaning may change based on whether you have + /// a synchronous standby or not. + /// If there is a synchronous standby, + /// setting the value to on will result in waiting till “remote flush”. + On, + /// This will result in commits waiting until replies from the + /// current synchronous standby(s) indicate they have received + /// the commit record of the transaction and applied it so + /// that it has become visible to queries on the standby(s). + RemoteApply, +} + +impl SynchronousCommit { + /// Return isolation level as String literal. + #[must_use] + pub fn to_str_level(&self) -> String { + match self { + SynchronousCommit::Off => "off".into(), + SynchronousCommit::Local => "local".into(), + SynchronousCommit::RemoteWrite => "remote_write".into(), + SynchronousCommit::On => "on".into(), + SynchronousCommit::RemoteApply => "remote_apply".into(), + } + } +} + +#[derive(Clone, Copy, PartialEq)] +pub struct ListenerTransactionConfig { + isolation_level: Option, + read_variant: Option, + deferrable: Option, + synchronous_commit: Option, +} diff --git a/src/statement/statement_builder.rs b/src/statement/statement_builder.rs index 5954f88c..c909f68d 100644 --- a/src/statement/statement_builder.rs +++ b/src/statement/statement_builder.rs @@ -2,7 +2,10 @@ use pyo3::PyObject; use tokio::sync::RwLockWriteGuard; use tokio_postgres::Statement; -use crate::{driver::inner_connection::PsqlpyConnection, exceptions::rust_errors::PSQLPyResult}; +use crate::{ + connection::{structs::PSQLPyConnection, traits::Connection}, + exceptions::rust_errors::PSQLPyResult, +}; use super::{ cache::{StatementCacheInfo, StatementsCache, STMTS_CACHE}, @@ -14,7 +17,7 @@ use super::{ pub struct StatementBuilder<'a> { querystring: String, parameters: Option, - inner_conn: &'a PsqlpyConnection, + inner_conn: &'a PSQLPyConnection, prepared: bool, } @@ -22,7 +25,7 @@ impl<'a> StatementBuilder<'a> { pub fn new( querystring: String, parameters: Option, - inner_conn: &'a PsqlpyConnection, + inner_conn: &'a PSQLPyConnection, prepared: Option, ) -> Self { Self {