Skip to content

Commit 11ad66e

Browse files
committed
Added synchronous_commit option for transactions
Signed-off-by: chandr-andr (Kiselev Aleksandr) <chandr@chandr.net>
1 parent 969aadc commit 11ad66e

File tree

3 files changed

+120
-28
lines changed

3 files changed

+120
-28
lines changed

src/driver/connection.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::{
1616
use super::{
1717
cursor::Cursor,
1818
transaction::Transaction,
19-
transaction_options::{IsolationLevel, ReadVariant},
19+
transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit},
2020
};
2121

2222
/// Format OPTS parameter for Postgres COPY command.
@@ -594,13 +594,15 @@ impl Connection {
594594
isolation_level: Option<IsolationLevel>,
595595
read_variant: Option<ReadVariant>,
596596
deferrable: Option<bool>,
597+
synchronous_commit: Option<SynchronousCommit>,
597598
) -> RustPSQLDriverPyResult<Transaction> {
598599
if let Some(db_client) = &self.db_client {
599600
return Ok(Transaction::new(
600601
db_client.clone(),
601602
false,
602603
false,
603604
isolation_level,
605+
synchronous_commit,
604606
read_variant,
605607
deferrable,
606608
HashSet::new(),

src/driver/transaction.rs

Lines changed: 74 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818

1919
use super::{
2020
cursor::Cursor,
21-
transaction_options::{IsolationLevel, ReadVariant},
21+
transaction_options::{IsolationLevel, ReadVariant, SynchronousCommit},
2222
};
2323
use crate::common::ObjectQueryTrait;
2424
use std::{collections::HashSet, sync::Arc};
@@ -30,6 +30,7 @@ pub trait TransactionObjectTrait {
3030
isolation_level: Option<IsolationLevel>,
3131
read_variant: Option<ReadVariant>,
3232
defferable: Option<bool>,
33+
synchronous_commit: Option<SynchronousCommit>,
3334
) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
3435
fn commit(&self) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
3536
fn rollback(&self) -> impl std::future::Future<Output = RustPSQLDriverPyResult<()>> + Send;
@@ -41,6 +42,7 @@ impl TransactionObjectTrait for Object {
4142
isolation_level: Option<IsolationLevel>,
4243
read_variant: Option<ReadVariant>,
4344
deferrable: Option<bool>,
45+
synchronous_commit: Option<SynchronousCommit>,
4446
) -> RustPSQLDriverPyResult<()> {
4547
let mut querystring = "START TRANSACTION".to_string();
4648

@@ -60,12 +62,28 @@ impl TransactionObjectTrait for Object {
6062
Some(false) => " NOT DEFERRABLE",
6163
None => "",
6264
});
65+
6366
self.batch_execute(&querystring).await.map_err(|err| {
6467
RustPSQLDriverError::TransactionBeginError(format!(
6568
"Cannot execute statement to start transaction, err - {err}"
6669
))
6770
})?;
6871

72+
if let Some(synchronous_commit) = synchronous_commit {
73+
let str_synchronous_commit = synchronous_commit.to_str_level();
74+
75+
let synchronous_commit_query =
76+
format!("SET LOCAL synchronous_commit = '{str_synchronous_commit}'");
77+
78+
self.batch_execute(&synchronous_commit_query)
79+
.await
80+
.map_err(|err| {
81+
RustPSQLDriverError::TransactionBeginError(format!(
82+
"Cannot set synchronous_commit parameter, err - {err}"
83+
))
84+
})?;
85+
}
86+
6987
Ok(())
7088
}
7189
async fn commit(&self) -> RustPSQLDriverPyResult<()> {
@@ -93,6 +111,7 @@ pub struct Transaction {
93111
is_done: bool,
94112

95113
isolation_level: Option<IsolationLevel>,
114+
synchronous_commit: Option<SynchronousCommit>,
96115
read_variant: Option<ReadVariant>,
97116
deferrable: Option<bool>,
98117

@@ -107,6 +126,7 @@ impl Transaction {
107126
is_started: bool,
108127
is_done: bool,
109128
isolation_level: Option<IsolationLevel>,
129+
synchronous_commit: Option<SynchronousCommit>,
110130
read_variant: Option<ReadVariant>,
111131
deferrable: Option<bool>,
112132
savepoints_map: HashSet<String>,
@@ -116,6 +136,7 @@ impl Transaction {
116136
is_started,
117137
is_done,
118138
isolation_level,
139+
synchronous_commit,
119140
read_variant,
120141
deferrable,
121142
savepoints_map,
@@ -149,18 +170,26 @@ impl Transaction {
149170
}
150171

151172
async fn __aenter__<'a>(self_: Py<Self>) -> RustPSQLDriverPyResult<Py<Self>> {
152-
let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) =
153-
pyo3::Python::with_gil(|gil| {
154-
let self_ = self_.borrow(gil);
155-
(
156-
self_.is_started,
157-
self_.is_done,
158-
self_.isolation_level,
159-
self_.read_variant,
160-
self_.deferrable,
161-
self_.db_client.clone(),
162-
)
163-
});
173+
let (
174+
is_started,
175+
is_done,
176+
isolation_level,
177+
synchronous_commit,
178+
read_variant,
179+
deferrable,
180+
db_client,
181+
) = pyo3::Python::with_gil(|gil| {
182+
let self_ = self_.borrow(gil);
183+
(
184+
self_.is_started,
185+
self_.is_done,
186+
self_.isolation_level,
187+
self_.synchronous_commit,
188+
self_.read_variant,
189+
self_.deferrable,
190+
self_.db_client.clone(),
191+
)
192+
});
164193

165194
if is_started {
166195
return Err(RustPSQLDriverError::TransactionBeginError(
@@ -176,7 +205,12 @@ impl Transaction {
176205

177206
if let Some(db_client) = db_client {
178207
db_client
179-
.start_transaction(isolation_level, read_variant, deferrable)
208+
.start_transaction(
209+
isolation_level,
210+
read_variant,
211+
deferrable,
212+
synchronous_commit,
213+
)
180214
.await?;
181215

182216
Python::with_gil(|gil| {
@@ -558,18 +592,26 @@ impl Transaction {
558592
/// 2) Transaction is done.
559593
/// 3) Cannot execute `BEGIN` command.
560594
pub async fn begin(self_: Py<Self>) -> RustPSQLDriverPyResult<()> {
561-
let (is_started, is_done, isolation_level, read_variant, deferrable, db_client) =
562-
pyo3::Python::with_gil(|gil| {
563-
let self_ = self_.borrow(gil);
564-
(
565-
self_.is_started,
566-
self_.is_done,
567-
self_.isolation_level,
568-
self_.read_variant,
569-
self_.deferrable,
570-
self_.db_client.clone(),
571-
)
572-
});
595+
let (
596+
is_started,
597+
is_done,
598+
isolation_level,
599+
synchronous_commit,
600+
read_variant,
601+
deferrable,
602+
db_client,
603+
) = pyo3::Python::with_gil(|gil| {
604+
let self_ = self_.borrow(gil);
605+
(
606+
self_.is_started,
607+
self_.is_done,
608+
self_.isolation_level,
609+
self_.synchronous_commit,
610+
self_.read_variant,
611+
self_.deferrable,
612+
self_.db_client.clone(),
613+
)
614+
});
573615

574616
if let Some(db_client) = db_client {
575617
if is_started {
@@ -584,7 +626,12 @@ impl Transaction {
584626
));
585627
}
586628
db_client
587-
.start_transaction(isolation_level, read_variant, deferrable)
629+
.start_transaction(
630+
isolation_level,
631+
read_variant,
632+
deferrable,
633+
synchronous_commit,
634+
)
588635
.await?;
589636

590637
pyo3::Python::with_gil(|gil| {

src/driver/transaction_options.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,46 @@ pub enum ReadVariant {
2828
ReadOnly,
2929
ReadWrite,
3030
}
31+
32+
#[pyclass]
33+
#[derive(Clone, Copy)]
34+
pub enum SynchronousCommit {
35+
/// As the name indicates, the commit acknowledgment can come before
36+
/// flushing the records to disk.
37+
/// This is generally called as an asynchronous commit.
38+
/// If the PostgreSQL instance crashes,
39+
/// the last few asynchronous commits might be lost.
40+
Off,
41+
/// WAL records are written and flushed to local disks.
42+
/// In this case, the commit will be acknowledged after the
43+
/// local WAL Write and WAL flush completes.
44+
Local,
45+
/// WAL records are successfully handed over to
46+
/// remote instances which acknowledged back
47+
/// about the write (not flush).
48+
RemoteWrite,
49+
/// The meaning may change based on whether you have
50+
/// a synchronous standby or not.
51+
/// If there is a synchronous standby,
52+
/// setting the value to on will result in waiting till “remote flush”.
53+
On,
54+
/// This will result in commits waiting until replies from the
55+
/// current synchronous standby(s) indicate they have received
56+
/// the commit record of the transaction and applied it so
57+
/// that it has become visible to queries on the standby(s).
58+
RemoteApply,
59+
}
60+
61+
impl SynchronousCommit {
62+
/// Return isolation level as String literal.
63+
#[must_use]
64+
pub fn to_str_level(&self) -> String {
65+
match self {
66+
SynchronousCommit::Off => "off".into(),
67+
SynchronousCommit::Local => "local".into(),
68+
SynchronousCommit::RemoteWrite => "remote_write".into(),
69+
SynchronousCommit::On => "on".into(),
70+
SynchronousCommit::RemoteApply => "remote_apply".into(),
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)