Skip to content

Commit 0bf36b0

Browse files
authored
Merge pull request #126 from psqlpy-python/feature/oltp_support
Prepared psqlpy for OTLP
2 parents 2e478e2 + 444464f commit 0bf36b0

22 files changed

+488
-473
lines changed

README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,10 @@ async def main() -> None:
5555
max_db_pool_size=2,
5656
)
5757

58-
res: QueryResult = await db_pool.execute(
59-
"SELECT * FROM users",
60-
)
58+
async with db_pool.acquire() as conn:
59+
res: QueryResult = await conn.execute(
60+
"SELECT * FROM users",
61+
)
6162

6263
print(res.result())
6364
db_pool.close()

docs/components/connection_pool.md

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -178,55 +178,6 @@ It has 4 parameters:
178178
- `available` - available connection in the connection pool.
179179
- `waiting` - waiting requests to retrieve connection from connection pool.
180180

181-
### Execute
182-
183-
#### Parameters:
184-
185-
- `querystring`: Statement string.
186-
- `parameters`: List of parameters for the statement string.
187-
- `prepared`: Prepare statement before execution or not.
188-
189-
You can execute any query directly from Connection Pool.
190-
This method supports parameters, each parameter must be marked as `$<number>` (number starts with 1).
191-
Parameters must be passed as list after querystring.
192-
::: caution
193-
You must use `ConnectionPool.execute` method in high-load production code wisely!
194-
It pulls connection from the pool each time you execute query.
195-
Preferable way to execute statements with [Connection](./../components/connection.md) or [Transaction](./../components/transaction.md)
196-
:::
197-
198-
```python
199-
async def main() -> None:
200-
...
201-
results: QueryResult = await db_pool.execute(
202-
"SELECT * FROM users WHERE id = $1 and username = $2",
203-
[100, "Alex"],
204-
)
205-
206-
dict_results: list[dict[str, Any]] = results.result()
207-
```
208-
209-
### Fetch
210-
211-
#### Parameters:
212-
213-
- `querystring`: Statement string.
214-
- `parameters`: List of parameters for the statement string.
215-
- `prepared`: Prepare statement before execution or not.
216-
217-
The same as the `execute` method, for some people this naming is preferable.
218-
219-
```python
220-
async def main() -> None:
221-
...
222-
results: QueryResult = await db_pool.fetch(
223-
"SELECT * FROM users WHERE id = $1 and username = $2",
224-
[100, "Alex"],
225-
)
226-
227-
dict_results: list[dict[str, Any]] = results.result()
228-
```
229-
230181
### Acquire
231182

232183
Get single connection for async context manager.

docs/integrations/opentelemetry.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
title: Integration with OpenTelemetry
3+
---
4+
5+
# OTLP-PSQLPy
6+
7+
There is a library for OpenTelemetry support.
8+
Please follow the [link](https://github.com/psqlpy-python/otlp-psqlpy)

docs/integrations/taskiq.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
title: Integration with TaskIQ
3+
---
4+
5+
# TaskIQ-PSQLPy
6+
7+
There is integration with [TaskIQ](https://github.com/taskiq-python/taskiq-psqlpy).
8+
You can use PSQLPy for result backend.

docs/introduction/lets_start.md

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,16 @@ async def main() -> None:
4949
# It uses default connection parameters
5050
db_pool: Final = ConnectionPool()
5151

52-
results: Final[QueryResult] = await db_pool.execute(
53-
"SELECT * FROM users WHERE id = $1",
54-
[2],
55-
)
52+
async with db_pool.acquire() as conn:
53+
results: Final[QueryResult] = await conn.execute(
54+
"SELECT * FROM users WHERE id = $1",
55+
[2],
56+
)
5657

5758
dict_results: Final[list[dict[Any, Any]]] = results.result()
5859
db_pool.close()
5960
```
6061

6162
::: tip
62-
You must call `close()` on database pool when you application is shutting down.
63-
:::
64-
::: caution
65-
You must not use `ConnectionPool.execute` method in high-load production code!
66-
It pulls new connection from connection pull each call.
67-
Recommended way to make queries is executing them with `Connection`, `Transaction` or `Cursor`.
63+
It's better to call `close()` on database pool when you application is shutting down.
6864
:::

psqlpy-stress/psqlpy_stress/mocker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ def get_pool() -> psqlpy.ConnectionPool:
1717
async def fill_users() -> None:
1818
pool = get_pool()
1919
users_amount = 10000000
20+
connection = await pool.connection()
2021
for _ in range(users_amount):
21-
await pool.execute(
22+
await connection.execute(
2223
querystring="INSERT INTO users (username) VALUES($1)",
2324
parameters=[str(uuid.uuid4())],
2425
)
@@ -35,8 +36,9 @@ def generate_random_dict() -> dict[str, str]:
3536
async def fill_big_table() -> None:
3637
pool = get_pool()
3738
big_table_amount = 10000000
39+
connection = await pool.connection()
3840
for _ in range(big_table_amount):
39-
await pool.execute(
41+
await connection.execute(
4042
"INSERT INTO big_table (string_field, integer_field, json_field, array_field) VALUES($1, $2, $3, $4)",
4143
parameters=[
4244
str(uuid.uuid4()),

python/psqlpy/_internal/__init__.pyi

Lines changed: 22 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,16 @@ class Cursor:
288288
It can be used as an asynchronous iterator.
289289
"""
290290

291+
cursor_name: str
292+
querystring: str
293+
parameters: Sequence[Any]
294+
prepared: bool | None
295+
conn_dbname: str | None
296+
user: str | None
297+
host_addrs: list[str]
298+
hosts: list[str]
299+
ports: list[int]
300+
291301
def __aiter__(self: Self) -> Self: ...
292302
async def __anext__(self: Self) -> QueryResult: ...
293303
async def __aenter__(self: Self) -> Self: ...
@@ -424,6 +434,12 @@ class Transaction:
424434
`.transaction()`.
425435
"""
426436

437+
conn_dbname: str | None
438+
user: str | None
439+
host_addrs: list[str]
440+
hosts: list[str]
441+
ports: list[int]
442+
427443
async def __aenter__(self: Self) -> Self: ...
428444
async def __aexit__(
429445
self: Self,
@@ -874,6 +890,12 @@ class Connection:
874890
It can be created only from connection pool.
875891
"""
876892

893+
conn_dbname: str | None
894+
user: str | None
895+
host_addrs: list[str]
896+
hosts: list[str]
897+
ports: list[int]
898+
877899
async def __aenter__(self: Self) -> Self: ...
878900
async def __aexit__(
879901
self: Self,
@@ -1284,60 +1306,6 @@ class ConnectionPool:
12841306
### Parameters:
12851307
- `new_max_size`: new size for the connection pool.
12861308
"""
1287-
async def execute(
1288-
self: Self,
1289-
querystring: str,
1290-
parameters: Sequence[Any] | None = None,
1291-
prepared: bool = True,
1292-
) -> QueryResult:
1293-
"""Execute the query.
1294-
1295-
Querystring can contain `$<number>` parameters
1296-
for converting them in the driver side.
1297-
1298-
### Parameters:
1299-
- `querystring`: querystring to execute.
1300-
- `parameters`: list of parameters to pass in the query.
1301-
- `prepared`: should the querystring be prepared before the request.
1302-
By default any querystring will be prepared.
1303-
1304-
### Example:
1305-
```python
1306-
import asyncio
1307-
1308-
from psqlpy import PSQLPool, QueryResult
1309-
1310-
async def main() -> None:
1311-
db_pool = PSQLPool()
1312-
query_result: QueryResult = await psqlpy.execute(
1313-
"SELECT username FROM users WHERE id = $1",
1314-
[100],
1315-
)
1316-
dict_result: List[Dict[Any, Any]] = query_result.result()
1317-
# you don't need to close the pool,
1318-
# it will be dropped on Rust side.
1319-
```
1320-
"""
1321-
async def fetch(
1322-
self: Self,
1323-
querystring: str,
1324-
parameters: Sequence[Any] | None = None,
1325-
prepared: bool = True,
1326-
) -> QueryResult:
1327-
"""Fetch the result from database.
1328-
1329-
It's the same as `execute` method, we made it because people are used
1330-
to `fetch` method name.
1331-
1332-
Querystring can contain `$<number>` parameters
1333-
for converting them in the driver side.
1334-
1335-
### Parameters:
1336-
- `querystring`: querystring to execute.
1337-
- `parameters`: list of parameters to pass in the query.
1338-
- `prepared`: should the querystring be prepared before the request.
1339-
By default any querystring will be prepared.
1340-
"""
13411309
async def connection(self: Self) -> Connection:
13421310
"""Create new connection.
13431311

python/tests/conftest.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,18 +126,19 @@ async def create_default_data_for_tests(
126126
table_name: str,
127127
number_database_records: int,
128128
) -> AsyncGenerator[None, None]:
129-
await psql_pool.execute(
129+
connection = await psql_pool.connection()
130+
await connection.execute(
130131
f"CREATE TABLE {table_name} (id SERIAL, name VARCHAR(255))",
131132
)
132133

133134
for table_id in range(1, number_database_records + 1):
134135
new_name = random_string()
135-
await psql_pool.execute(
136+
await connection.execute(
136137
querystring=f"INSERT INTO {table_name} VALUES ($1, $2)",
137138
parameters=[table_id, new_name],
138139
)
139140
yield
140-
await psql_pool.execute(
141+
await connection.execute(
141142
f"DROP TABLE {table_name}",
142143
)
143144

@@ -147,14 +148,15 @@ async def create_table_for_listener_tests(
147148
psql_pool: ConnectionPool,
148149
listener_table_name: str,
149150
) -> AsyncGenerator[None, None]:
150-
await psql_pool.execute(
151+
connection = await psql_pool.connection()
152+
await connection.execute(
151153
f"CREATE TABLE {listener_table_name}"
152154
f"(id SERIAL, payload VARCHAR(255),"
153155
f"channel VARCHAR(255), process_id INT)",
154156
)
155157

156158
yield
157-
await psql_pool.execute(
159+
await connection.execute(
158160
f"DROP TABLE {listener_table_name}",
159161
)
160162

python/tests/test_binary_copy.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ async def test_binary_copy_to_table_in_connection(
1515
) -> None:
1616
"""Test binary copy in connection."""
1717
table_name: typing.Final = "cars"
18-
await psql_pool.execute(f"DROP TABLE IF EXISTS {table_name}")
19-
await psql_pool.execute(
18+
connection = await psql_pool.connection()
19+
await connection.execute(f"DROP TABLE IF EXISTS {table_name}")
20+
await connection.execute(
2021
"""
2122
CREATE TABLE IF NOT EXISTS cars (
2223
model VARCHAR,
@@ -46,17 +47,16 @@ async def test_binary_copy_to_table_in_connection(
4647
buf.write(encoder.finish())
4748
buf.seek(0)
4849

49-
async with psql_pool.acquire() as connection:
50-
inserted_rows = await connection.binary_copy_to_table(
51-
source=buf,
52-
table_name=table_name,
53-
)
50+
inserted_rows = await connection.binary_copy_to_table(
51+
source=buf,
52+
table_name=table_name,
53+
)
5454

5555
expected_inserted_row: typing.Final = 32
5656

5757
assert inserted_rows == expected_inserted_row
5858

59-
real_table_rows: typing.Final = await psql_pool.execute(
59+
real_table_rows: typing.Final = await connection.execute(
6060
f"SELECT COUNT(*) AS rows_count FROM {table_name}",
6161
)
6262
assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row
@@ -67,8 +67,10 @@ async def test_binary_copy_to_table_in_transaction(
6767
) -> None:
6868
"""Test binary copy in transaction."""
6969
table_name: typing.Final = "cars"
70-
await psql_pool.execute(f"DROP TABLE IF EXISTS {table_name}")
71-
await psql_pool.execute(
70+
71+
connection = await psql_pool.connection()
72+
await connection.execute(f"DROP TABLE IF EXISTS {table_name}")
73+
await connection.execute(
7274
"""
7375
CREATE TABLE IF NOT EXISTS cars (
7476
model VARCHAR,
@@ -108,7 +110,8 @@ async def test_binary_copy_to_table_in_transaction(
108110

109111
assert inserted_rows == expected_inserted_row
110112

111-
real_table_rows: typing.Final = await psql_pool.execute(
113+
connection = await psql_pool.connection()
114+
real_table_rows: typing.Final = await connection.execute(
112115
f"SELECT COUNT(*) AS rows_count FROM {table_name}",
113116
)
114117
assert real_table_rows.result()[0]["rows_count"] == expected_inserted_row

python/tests/test_connection.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,9 @@ async def test_closed_connection_error(
180180

181181
async def test_execute_batch_method(psql_pool: ConnectionPool) -> None:
182182
"""Test `execute_batch` method."""
183-
await psql_pool.execute(querystring="DROP TABLE IF EXISTS execute_batch")
184-
await psql_pool.execute(querystring="DROP TABLE IF EXISTS execute_batch2")
183+
connection = await psql_pool.connection()
184+
await connection.execute(querystring="DROP TABLE IF EXISTS execute_batch")
185+
await connection.execute(querystring="DROP TABLE IF EXISTS execute_batch2")
185186
query = "CREATE TABLE execute_batch (name VARCHAR);CREATE TABLE execute_batch2 (name VARCHAR);"
186187
async with psql_pool.acquire() as conn:
187188
await conn.execute_batch(querystring=query)

0 commit comments

Comments
 (0)