Skip to content

Commit 519c4d4

Browse files
committed
Added Portal impl
1 parent b260ba3 commit 519c4d4

File tree

3 files changed

+81
-16
lines changed

3 files changed

+81
-16
lines changed

src/driver/inner_transaction.rs

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ use crate::exceptions::rust_errors::PSQLPyResult;
66

77
pub enum PsqlpyTransaction {
88
PoolTrans(dp_Transaction<'static>),
9-
SingleConnTrans(tp_Transaction<'static>)
9+
SingleConnTrans(tp_Transaction<'static>),
1010
}
1111

1212
impl PsqlpyTransaction {
1313
async fn commit(self) -> PSQLPyResult<()> {
1414
match self {
1515
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.commit().await?),
16-
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.commit().await?)
16+
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.commit().await?),
1717
}
1818
}
1919

2020
async fn rollback(self) -> PSQLPyResult<()> {
2121
match self {
2222
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.rollback().await?),
23-
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.rollback().await?)
23+
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.rollback().await?),
2424
}
2525
}
2626

@@ -29,7 +29,7 @@ impl PsqlpyTransaction {
2929
PsqlpyTransaction::PoolTrans(p_txid) => {
3030
p_txid.savepoint(sp_name).await?;
3131
Ok(())
32-
},
32+
}
3333
PsqlpyTransaction::SingleConnTrans(s_txid) => {
3434
s_txid.savepoint(sp_name).await?;
3535
Ok(())
@@ -40,11 +40,15 @@ impl PsqlpyTransaction {
4040
async fn release_savepoint(&self, sp_name: &str) -> PSQLPyResult<()> {
4141
match self {
4242
PsqlpyTransaction::PoolTrans(p_txid) => {
43-
p_txid.batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str()).await?;
43+
p_txid
44+
.batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str())
45+
.await?;
4446
Ok(())
45-
},
47+
}
4648
PsqlpyTransaction::SingleConnTrans(s_txid) => {
47-
s_txid.batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str()).await?;
49+
s_txid
50+
.batch_execute(format!("RELEASE SAVEPOINT {sp_name}").as_str())
51+
.await?;
4852
Ok(())
4953
}
5054
}
@@ -53,31 +57,38 @@ impl PsqlpyTransaction {
5357
async fn rollback_savepoint(&self, sp_name: &str) -> PSQLPyResult<()> {
5458
match self {
5559
PsqlpyTransaction::PoolTrans(p_txid) => {
56-
p_txid.batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str()).await?;
60+
p_txid
61+
.batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str())
62+
.await?;
5763
Ok(())
58-
},
64+
}
5965
PsqlpyTransaction::SingleConnTrans(s_txid) => {
60-
s_txid.batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str()).await?;
66+
s_txid
67+
.batch_execute(format!("ROLLBACK TO SAVEPOINT {sp_name}").as_str())
68+
.await?;
6169
Ok(())
6270
}
6371
}
6472
}
6573

6674
async fn bind<T>(&self, statement: &T, params: &[&(dyn ToSql + Sync)]) -> PSQLPyResult<Portal>
6775
where
68-
T: ?Sized + ToStatement {
76+
T: ?Sized + ToStatement,
77+
{
6978
match self {
7079
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.bind(statement, params).await?),
71-
PsqlpyTransaction::SingleConnTrans(s_txid) => Ok(s_txid.bind(statement, params).await?)
80+
PsqlpyTransaction::SingleConnTrans(s_txid) => {
81+
Ok(s_txid.bind(statement, params).await?)
82+
}
7283
}
7384
}
7485

7586
pub async fn query_portal(&self, portal: &Portal, size: i32) -> PSQLPyResult<Vec<Row>> {
7687
match self {
77-
PsqlpyTransaction::PoolTrans(p_txid)
78-
=> Ok(p_txid.query_portal(portal, size).await?),
79-
PsqlpyTransaction::SingleConnTrans(s_txid)
80-
=> Ok(s_txid.query_portal(portal, size).await?)
88+
PsqlpyTransaction::PoolTrans(p_txid) => Ok(p_txid.query_portal(portal, size).await?),
89+
PsqlpyTransaction::SingleConnTrans(s_txid) => {
90+
Ok(s_txid.query_portal(portal, size).await?)
91+
}
8192
}
8293
}
8394
}

src/driver/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ pub mod connection_pool;
44
pub mod connection_pool_builder;
55
pub mod cursor;
66
pub mod inner_connection;
7+
pub mod inner_transaction;
78
pub mod listener;
9+
pub mod portal;
810
pub mod transaction;
911
pub mod transaction_options;
1012
pub mod utils;

src/driver/portal.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
use std::sync::Arc;
2+
3+
use pyo3::{pyclass, pymethods};
4+
use tokio_postgres::Portal as tp_Portal;
5+
6+
use crate::{exceptions::rust_errors::PSQLPyResult, query_result::PSQLDriverPyQueryResult};
7+
8+
use super::inner_transaction::PsqlpyTransaction;
9+
10+
#[pyclass]
11+
struct Portal {
12+
transaction: Arc<PsqlpyTransaction>,
13+
inner: tp_Portal,
14+
array_size: i32,
15+
}
16+
17+
impl Portal {
18+
async fn query_portal(&self, size: i32) -> PSQLPyResult<PSQLDriverPyQueryResult> {
19+
let result = self.transaction.query_portal(&self.inner, size).await?;
20+
Ok(PSQLDriverPyQueryResult::new(result))
21+
}
22+
}
23+
24+
#[pymethods]
25+
impl Portal {
26+
#[getter]
27+
fn get_array_size(&self) -> i32 {
28+
self.array_size
29+
}
30+
31+
#[setter]
32+
fn set_array_size(&mut self, value: i32) {
33+
self.array_size = value;
34+
}
35+
36+
async fn fetch_one(&self) -> PSQLPyResult<PSQLDriverPyQueryResult> {
37+
self.query_portal(1).await
38+
}
39+
40+
#[pyo3(signature = (size=None))]
41+
async fn fetch_many(&self, size: Option<i32>) -> PSQLPyResult<PSQLDriverPyQueryResult> {
42+
self.query_portal(size.unwrap_or(self.array_size)).await
43+
}
44+
45+
async fn fetch_all(&self) -> PSQLPyResult<PSQLDriverPyQueryResult> {
46+
self.query_portal(-1).await
47+
}
48+
49+
async fn close(&mut self) {
50+
let _ = Arc::downgrade(&self.transaction);
51+
}
52+
}

0 commit comments

Comments
 (0)