Skip to content

Commit 6b22424

Browse files
committed
LISTEN/NOTIFY funcionality
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent e0090ec commit 6b22424

File tree

12 files changed

+472
-260
lines changed

12 files changed

+472
-260
lines changed

python/psqlpy/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
IsolationLevel,
88
KeepaliveConfig,
99
Listener,
10+
ListenerNotification,
1011
LoadBalanceHosts,
1112
QueryResult,
1213
ReadVariant,
@@ -27,6 +28,7 @@
2728
"IsolationLevel",
2829
"KeepaliveConfig",
2930
"Listener",
31+
"ListenerNotification",
3032
"LoadBalanceHosts",
3133
"QueryResult",
3234
"ReadVariant",

python/psqlpy/_internal/__init__.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,3 +1751,7 @@ class ConnectionPoolBuilder:
17511751

17521752
class Listener:
17531753
"""Result."""
1754+
1755+
1756+
class ListenerNotification:
1757+
"""Result."""

src/common.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
use deadpool_postgres::Object;
21
use pyo3::{
32
types::{PyAnyMethods, PyModule, PyModuleMethods},
43
Bound, PyAny, PyResult, Python,
54
};
65

76
use crate::{
8-
exceptions::rust_errors::RustPSQLDriverPyResult,
9-
query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult},
10-
value_converter::{convert_parameters, PythonDTO, QueryParameter},
7+
driver::connection::InnerConnection, exceptions::rust_errors::RustPSQLDriverPyResult, query_result::{PSQLDriverPyQueryResult, PSQLDriverSinglePyQueryResult}, value_converter::{convert_parameters, PythonDTO, QueryParameter}
118
};
129

1310
/// Add new module to the parent one.
@@ -55,7 +52,7 @@ pub trait ObjectQueryTrait {
5552
) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
5653
}
5754

58-
impl ObjectQueryTrait for Object {
55+
impl ObjectQueryTrait for InnerConnection {
5956
async fn psqlpy_query_one(
6057
&self,
6158
querystring: String,

src/driver/connection.rs

Lines changed: 89 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use bytes::BytesMut;
1+
use bytes::{Buf, BytesMut};
22
use deadpool_postgres::{Object, Pool};
33
use futures_util::pin_mut;
4+
use postgres_types::ToSql;
45
use pyo3::{buffer::PyBuffer, pyclass, pymethods, Py, PyAny, PyErr, Python};
56
use std::{collections::HashSet, sync::Arc, vec};
6-
use tokio_postgres::binary_copy::BinaryCopyInWriter;
7+
use tokio_postgres::{binary_copy::BinaryCopyInWriter, Client, CopyInSink, Row, Statement, ToStatement};
78

89
use crate::{
910
exceptions::rust_errors::{RustPSQLDriverError, RustPSQLDriverPyResult},
@@ -19,110 +20,115 @@ use super::{
1920
transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit},
2021
};
2122

22-
/// Format OPTS parameter for Postgres COPY command.
23-
///
24-
/// # Errors
25-
/// May return Err Result if cannot format parameter.
26-
#[allow(clippy::too_many_arguments)]
27-
pub fn _format_copy_opts(
28-
format: Option<String>,
29-
freeze: Option<bool>,
30-
delimiter: Option<String>,
31-
null: Option<String>,
32-
header: Option<String>,
33-
quote: Option<String>,
34-
escape: Option<String>,
35-
force_quote: Option<Py<PyAny>>,
36-
force_not_null: Option<Vec<String>>,
37-
force_null: Option<Vec<String>>,
38-
encoding: Option<String>,
39-
) -> RustPSQLDriverPyResult<String> {
40-
let mut opts: Vec<String> = vec![];
41-
42-
if let Some(format) = format {
43-
opts.push(format!("FORMAT {format}"));
44-
}
23+
pub enum InnerConnection {
24+
PoolConn(Object),
25+
SingleConn(Client),
26+
}
4527

46-
if let Some(freeze) = freeze {
47-
if freeze {
48-
opts.push("FREEZE TRUE".into());
49-
} else {
50-
opts.push("FREEZE FALSE".into());
28+
impl InnerConnection {
29+
pub async fn prepare_cached(
30+
&self,
31+
query: &str
32+
) -> RustPSQLDriverPyResult<Statement> {
33+
match self {
34+
InnerConnection::PoolConn(pconn) => {
35+
return Ok(pconn.prepare_cached(query).await?)
36+
}
37+
InnerConnection::SingleConn(sconn) => {
38+
return Ok(sconn.prepare(query).await?)
39+
}
5140
}
5241
}
53-
54-
if let Some(delimiter) = delimiter {
55-
opts.push(format!("DELIMITER {delimiter}"));
56-
}
57-
58-
if let Some(null) = null {
59-
opts.push(format!("NULL {}", quote_ident(&null)));
60-
}
61-
62-
if let Some(header) = header {
63-
opts.push(format!("HEADER {header}"));
64-
}
65-
66-
if let Some(quote) = quote {
67-
opts.push(format!("QUOTE {quote}"));
68-
}
69-
70-
if let Some(escape) = escape {
71-
opts.push(format!("ESCAPE {escape}"));
72-
}
73-
74-
if let Some(force_quote) = force_quote {
75-
let boolean_force_quote: Result<bool, PyErr> =
76-
Python::with_gil(|gil| force_quote.extract::<bool>(gil));
77-
78-
if let Ok(force_quote) = boolean_force_quote {
79-
if force_quote {
80-
opts.push("FORCE_QUOTE *".into());
42+
43+
pub async fn query<T>(
44+
&self,
45+
statement: &T,
46+
params: &[&(dyn ToSql + Sync)],
47+
) -> RustPSQLDriverPyResult<Vec<Row>>
48+
where T: ?Sized + ToStatement {
49+
match self {
50+
InnerConnection::PoolConn(pconn) => {
51+
return Ok(pconn.query(statement, params).await?)
8152
}
82-
} else {
83-
let sequence_force_quote: Result<Vec<String>, PyErr> =
84-
Python::with_gil(|gil| force_quote.extract::<Vec<String>>(gil));
85-
86-
if let Ok(force_quote) = sequence_force_quote {
87-
opts.push(format!("FORCE_QUOTE ({})", force_quote.join(", ")));
53+
InnerConnection::SingleConn(sconn) => {
54+
return Ok(sconn.query(statement, params).await?)
8855
}
89-
90-
return Err(RustPSQLDriverError::PyToRustValueConversionError(
91-
"force_quote parameter must be boolean or sequence of str's.".into(),
92-
));
9356
}
9457
}
9558

96-
if let Some(force_not_null) = force_not_null {
97-
opts.push(format!("FORCE_NOT_NULL ({})", force_not_null.join(", ")));
98-
}
99-
100-
if let Some(force_null) = force_null {
101-
opts.push(format!("FORCE_NULL ({})", force_null.join(", ")));
59+
pub async fn batch_execute(&self, query: &str) -> RustPSQLDriverPyResult<()> {
60+
match self {
61+
InnerConnection::PoolConn(pconn) => {
62+
return Ok(pconn.batch_execute(query).await?)
63+
}
64+
InnerConnection::SingleConn(sconn) => {
65+
return Ok(sconn.batch_execute(query).await?)
66+
}
67+
}
10268
}
10369

104-
if let Some(encoding) = encoding {
105-
opts.push(format!("ENCODING {}", quote_ident(&encoding)));
70+
pub async fn query_one<T>(
71+
&self,
72+
statement: &T,
73+
params: &[&(dyn ToSql + Sync)],
74+
) -> RustPSQLDriverPyResult<Row>
75+
where T: ?Sized + ToStatement
76+
{
77+
match self {
78+
InnerConnection::PoolConn(pconn) => {
79+
return Ok(pconn.query_one(statement, params).await?)
80+
}
81+
InnerConnection::SingleConn(sconn) => {
82+
return Ok(sconn.query_one(statement, params).await?)
83+
}
84+
}
10685
}
10786

108-
if opts.is_empty() {
109-
Ok(String::new())
110-
} else {
111-
Ok(format!("({})", opts.join(", ")))
87+
pub async fn copy_in<T, U>(
88+
&self,
89+
statement: &T
90+
) -> RustPSQLDriverPyResult<CopyInSink<U>>
91+
where
92+
T: ?Sized + ToStatement,
93+
U: Buf + 'static + Send
94+
{
95+
match self {
96+
InnerConnection::PoolConn(pconn) => {
97+
return Ok(pconn.copy_in(statement).await?)
98+
}
99+
InnerConnection::SingleConn(sconn) => {
100+
return Ok(sconn.copy_in(statement).await?)
101+
}
102+
}
112103
}
113104
}
114105

115106
#[pyclass(subclass)]
107+
#[derive(Clone)]
116108
pub struct Connection {
117-
db_client: Option<Arc<Object>>,
109+
db_client: Option<Arc<InnerConnection>>,
118110
db_pool: Option<Pool>,
119111
}
120112

121113
impl Connection {
122114
#[must_use]
123-
pub fn new(db_client: Option<Arc<Object>>, db_pool: Option<Pool>) -> Self {
115+
pub fn new(db_client: Option<Arc<InnerConnection>>, db_pool: Option<Pool>) -> Self {
124116
Connection { db_client, db_pool }
125117
}
118+
119+
pub fn db_client(&self) -> Option<Arc<InnerConnection>> {
120+
return self.db_client.clone()
121+
}
122+
123+
pub fn db_pool(&self) -> Option<Pool> {
124+
return self.db_pool.clone()
125+
}
126+
}
127+
128+
impl Default for Connection {
129+
fn default() -> Self {
130+
Connection::new(None, None)
131+
}
126132
}
127133

128134
#[pymethods]
@@ -145,7 +151,7 @@ impl Connection {
145151
.await??;
146152
pyo3::Python::with_gil(|gil| {
147153
let mut self_ = self_.borrow_mut(gil);
148-
self_.db_client = Some(Arc::new(db_connection));
154+
self_.db_client = Some(Arc::new(InnerConnection::PoolConn(db_connection)));
149155
});
150156
return Ok(self_);
151157
}

src/driver/connection_pool.rs

Lines changed: 2 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use crate::runtime::tokio_runtime;
22
use deadpool_postgres::{Manager, ManagerConfig, Object, Pool, RecyclingMethod};
3-
use futures::{FutureExt, StreamExt, TryStreamExt};
43
use pyo3::{pyclass, pyfunction, pymethods, Py, PyAny};
54
use std::{sync::Arc, vec};
65
use tokio_postgres::Config;
@@ -13,7 +12,7 @@ use crate::{
1312

1413
use super::{
1514
common_options::{ConnRecyclingMethod, LoadBalanceHosts, SslMode, TargetSessionAttrs},
16-
connection::Connection,
15+
connection::{Connection, InnerConnection},
1716
listener::Listener,
1817
utils::{build_connection_config, build_manager, build_tls},
1918
};
@@ -503,7 +502,6 @@ impl ConnectionPool {
503502

504503
pub async fn add_listener(
505504
self_: pyo3::Py<Self>,
506-
callback: Py<PyAny>,
507505
) -> RustPSQLDriverPyResult<Listener> {
508506
let (pg_config, ca_file, ssl_mode) = pyo3::Python::with_gil(|gil| {
509507
let b_gil = self_.borrow(gil);
@@ -514,58 +512,6 @@ impl ConnectionPool {
514512
)
515513
});
516514

517-
// let tls_ = build_tls(&ca_file, Some(SslMode::Disable)).unwrap();
518-
519-
// match tls_ {
520-
// ConfiguredTLS::NoTls => {
521-
// let a = pg_config.connect(NoTls).await.unwrap();
522-
// },
523-
// ConfiguredTLS::TlsConnector(connector) => {
524-
// let a = pg_config.connect(connector).await.unwrap();
525-
// }
526-
// }
527-
528-
// let (client, mut connection) = tokio_runtime()
529-
// .spawn(async move { pg_config.connect(NoTls).await.unwrap() })
530-
// .await?;
531-
532-
// // Make transmitter and receiver.
533-
// let (tx, mut rx) = futures_channel::mpsc::unbounded();
534-
// let stream =
535-
// stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!("{}", e));
536-
// let connection = stream.forward(tx).map(|r| r.unwrap());
537-
// tokio_runtime().spawn(connection);
538-
539-
// // Wait for notifications in separate thread.
540-
// tokio_runtime().spawn(async move {
541-
// client
542-
// .batch_execute(
543-
// "LISTEN test_notifications;
544-
// LISTEN test_notifications2;",
545-
// )
546-
// .await
547-
// .unwrap();
548-
549-
// loop {
550-
// let next_element = rx.next().await;
551-
// client.batch_execute("LISTEN test_notifications3;").await.unwrap();
552-
// match next_element {
553-
// Some(n) => {
554-
// match n {
555-
// tokio_postgres::AsyncMessage::Notification(n) => {
556-
// Python::with_gil(|gil| {
557-
// callback.call0(gil);
558-
// });
559-
// println!("Notification {:?}", n);
560-
// },
561-
// _ => {println!("in_in {:?}", n)}
562-
// }
563-
// },
564-
// _ => {println!("in {:?}", next_element)}
565-
// }
566-
// }
567-
// });
568-
569515
Ok(Listener::new(pg_config, ca_file, ssl_mode))
570516
}
571517

@@ -581,7 +527,7 @@ impl ConnectionPool {
581527
})
582528
.await??;
583529

584-
Ok(Connection::new(Some(Arc::new(db_connection)), None))
530+
Ok(Connection::new(Some(Arc::new(InnerConnection::PoolConn(db_connection))), None))
585531
}
586532

587533
/// Close connection pool.

0 commit comments

Comments
 (0)