diff --git a/README.md b/README.md index cdea7eb..cf246a4 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,13 @@ # QueryBuilder python module -![PyPI](https://img.shields.io/pypi/v/simple-query-builder?color=orange) ![Python 3.7, 3.8, 3.9, 3.10](https://img.shields.io/pypi/pyversions/simple-query-builder?color=blueviolet) ![GitHub Pull Requests](https://img.shields.io/github/issues-pr/co0lc0der/simple-query-builder-python?color=blueviolet) ![License](https://img.shields.io/pypi/l/simple-query-builder?color=blueviolet) ![Forks](https://img.shields.io/github/forks/co0lc0der/simple-query-builder-python?style=social) +[![Latest Version](https://img.shields.io/github/release/co0lc0der/simple-query-builder-python?color=orange&style=flat-square)](https://github.com/co0lc0der/simple-query-builder-python/release) +![GitHub repo size](https://img.shields.io/github/repo-size/co0lc0der/simple-query-builder-python?label=size&style=flat-square) +[![GitHub license](https://img.shields.io/github/license/co0lc0der/simple-query-builder-python?style=flat-square)](https://github.com/co0lc0der/simple-query-builder-python/blob/main/LICENSE.md) +![Python 3.7, 3.8, 3.9, 3.10, 3.11, 3.12](https://img.shields.io/pypi/pyversions/simple-query-builder?color=blueviolet&style=flat-square) +![PyPI](https://img.shields.io/pypi/v/simple-query-builder?color=yellow&style=flat-square) +![PyPI - Downloads](https://img.shields.io/pypi/dm/simple-query-builder?color=darkgreen&style=flat-square) -This is a small easy-to-use component for working with a database. It provides some public methods to compose SQL queries and manipulate data. Each SQL query is prepared and safe. QueryBuilder fetches data to _list_ by default. At present time the component supports SQLite (file or memory). +This is a small easy-to-use module for working with a database. It provides some public methods to compose SQL queries and manipulate data. Each SQL query is prepared and safe. QueryBuilder fetches data to _dictionary_ by default. At present time the component supports SQLite (file or memory). ## Contributing @@ -10,7 +15,7 @@ Bug reports and/or pull requests are welcome ## License -The module is available as open source under the terms of the [MIT](https://github.com/co0lc0der/simple-query-builder-python/blob/main/LICENSE.md) +The module is available as open source under the terms of the [MIT license](https://github.com/co0lc0der/simple-query-builder-python/blob/main/LICENSE.md) ## Installation @@ -25,198 +30,33 @@ Or from Github: pip install https://github.com/co0lc0der/simple-query-builder-python/archive/main.zip ``` ## How to use -### Main public methods -- `get_sql()` returns SQL query string which will be executed -- `get_params()` returns an array of parameters for a query -- `get_result()` returns query's results -- `get_count()` returns results' rows count -- `get_error()` returns `True` if an error is had -- `get_error_message()` returns an error message if an error is had -- `set_error(message)` sets `_error` to `True` and `_error_essage` -- `get_first()` returns the first item of results -- `get_last()` returns the last item of results -- `reset()` resets state to default values (except PDO property) -- `all()` executes SQL query and return all rows of result (`fetchall()`) -- `one()` executes SQL query and return the first row of result (`fetchone()`) -- `column(col_index)` executes SQL query and return the first column of result, `col_index` is `0` by default -- `go()` this method is for non `SELECT` queries. it executes SQL query and return nothing (but returns the last inserted row ID for `INSERT` method) -- `count()` prepares a query with SQL `COUNT()` function -- `query(sql, params, fetch_type, col_index)` executes prepared `sql` with `params`, it can be used for custom queries -- 'SQL' methods are presented in [Usage section](#usage-examples) - ### Import the module and init `QueryBuilder` with `Database()` ```python -from querybuilder import * +from simple_query_builder import * qb = QueryBuilder(DataBase(), 'my_db.db') + +# or DB in memory +qb = QueryBuilder(DataBase(), ':memory:') ``` ### Usage examples -- Select all rows from a table +#### Select all rows from a table ```python results = qb.select('users').all() ``` +Result query ```sql SELECT * FROM `users`; ``` -- Select a row with a condition -```python -results = qb.select('users').where([['id', '=', 10]]).one() -``` -```sql -SELECT * FROM `users` WHERE `id` = 10; -``` -- Select rows with two conditions -```python -results = qb.select('users')\ - .where([['id', '>', 1], 'and', ['group_id', '=', 2]])\ - .all() -``` -```sql -SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2) -``` -- Select a row with a `LIKE` and `NOT LIKE` condition -```python -results = qb.select('users').like(['name', '%John%']).all() -# or -results = qb.select('users').where([['name', 'LIKE', '%John%']]).all() -``` -```sql -SELECT * FROM `users` WHERE (`name` LIKE '%John%') -``` -```python -results = qb.select('users').notLike(['name', '%John%']).all() -# or -results = qb.select('users').where([['name', 'NOT LIKE', '%John%']]).all() -``` -```sql -SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%') -``` -- Select rows with `OFFSET` and `LIMIT` -```python -results = qb.select('posts')\ - .where([['user_id', '=', 3]])\ - .offset(14)\ - .limit(7)\ - .all() -``` -```sql -SELECT * FROM `posts` WHERE (`user_id` = 3) OFFSET 14 LIMIT 7; -``` -- Select custom fields with additional SQL -1. `COUNT()` -```python -results = qb.select('users', {'counter': 'COUNT(*)'}).one() -# or -results = qb.count('users').one() -``` -```sql -SELECT COUNT(*) AS `counter` FROM `users`; -``` -2. `ORDER BY` -```python -results = qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ - .where([['b.id', '>', 1], 'and', ['b.parent_id', '=', 1]])\ - .orderBy('b.id', 'desc')\ - .all() -``` -```sql -SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` -WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) -ORDER BY `b`.`id` DESC; -``` -3. `GROUP BY` and `HAVING` -```python -results = qb.select('posts', ['id', 'category', 'title'])\ - .where([['views', '>=', 1000]])\ - .groupBy('category')\ - .all() -``` -```sql -SELECT `id`, `category`, `title` FROM `posts` -WHERE (`views` >= 1000) GROUP BY `category`; -``` -```python -groups = qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ - .where([['YEAR(`created_at`)', '=', 2020]])\ - .groupBy('month_num')\ - .having([['total', '>', 20000]])\ - .all() -``` -```sql -SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` -FROM `orders` WHERE (YEAR(`created_at`) = 2020) -GROUP BY `month_num` HAVING (`total` > 20000) -``` -4. `JOIN`. Supports `INNER`, `LEFT OUTER`, `RIGHT OUTER`, `FULL OUTER` and `CROSS` joins (`INNER` is by default) +#### Select rows with two conditions ```python -results = qb.select({'u': 'users'}, [ - 'u.id', - 'u.email', - 'u.username', - {'perms': 'groups.permissions'} - ])\ - .join('groups', ['u.group_id', 'groups.id'])\ - .limit(5)\ - .all() +results = qb.select('users').where([['id', '>', 1], 'and', ['group_id', '=', 2]]).all() ``` +Result query ```sql -SELECT `u`.`id`, `u`.`email`, `u`.`username`, `groups`.`permissions` AS `perms` -FROM `users` AS `u` -INNER JOIN `groups` ON `u`.`group_id` = `groups`.`id` -LIMIT 5; +SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2); ``` -```python -results = qb.select({'cp': 'cabs_printers'}, [ - 'cp.id', - 'cp.cab_id', - {'cab_name': 'cb.name'}, - 'cp.printer_id', - {'printer_name': 'p.name'}, - {'cartridge_type': 'c.name'}, - 'cp.comment' - ])\ - .join({'cb': 'cabs'}, ['cp.cab_id', 'cb.id'])\ - .join({'p': 'printer_models'}, ['cp.printer_id', 'p.id'])\ - .join({'c': 'cartridge_types'}, 'p.cartridge_id=c.id')\ - .where([['cp.cab_id', 'in', [11, 12, 13]], 'or', ['cp.cab_id', '=', 5], 'and', ['p.id', '>', 'c.id']])\ - .all() -``` -```sql -SELECT `cp`.`id`, `cp`.`cab_id`, `cb`.`name` AS `cab_name`, `cp`.`printer_id`, - `p`.`name` AS `printer_name`, `c`.`name` AS `cartridge_type`, `cp`.`comment` -FROM `cabs_printers` AS `cp` -INNER JOIN `cabs` AS `cb` ON `cp`.`cab_id` = `cb`.`id` -INNER JOIN `printer_models` AS `p` ON `cp`.`printer_id` = `p`.`id` -INNER JOIN `cartridge_types` AS `c` ON p.cartridge_id=c.id -WHERE (`cp`.`cab_id` IN (11,12,13)) OR (`cp`.`cab_id` = 5) AND (`p`.`id` > `c`.`id`) -``` -- Insert a row -```python -new_id = qb.insert('groups', { - 'name': 'Moderator', - 'permissions': 'moderator' -}).go() -``` -```sql -INSERT INTO `groups` (`name`, `permissions`) VALUES ('Moderator', 'moderator') -``` -- Insert many rows -```python -qb.insert('groups', [['name', 'role'], - ['Moderator', 'moderator'], - ['Moderator2', 'moderator'], - ['User', 'user'], - ['User2', 'user'] -]).go() -``` -```sql -INSERT INTO `groups` (`name`, `role`) -VALUES ('Moderator', 'moderator'), - ('Moderator2', 'moderator'), - ('User', 'user'), - ('User2', 'user') -``` -- Update a row +#### Update a row ```python qb.update('users', { 'username': 'John Doe', @@ -226,50 +66,21 @@ qb.update('users', { .limit()\ .go() ``` +Result query ```sql UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE `id` = 7 LIMIT 1; ``` -- Update rows -```python -qb.update('posts', {'status': 'published'})\ - .where([['YEAR(`updated_at`)', '>', 2020]])\ - .go() -``` -```sql -UPDATE `posts` SET `status` = 'published' -WHERE (YEAR(`updated_at`) > 2020) -``` -- Delete a row -```python -qb.delete('users')\ - .where([['name', '=', 'John']])\ - .limit()\ - .go() -``` -```sql -DELETE FROM `users` WHERE `name` = 'John' LIMIT 1; -``` -- Delete rows -```python -qb.delete('comments')\ - .where([['user_id', '=', 10]])\ - .go() -``` -```sql -DELETE FROM `comments` WHERE `user_id` = 10; -``` -- Truncate a table -```python -qb.truncate('users').go() -``` -```sql -TRUNCATE TABLE `users`; -``` -- Drop a table -```python -qb.drop('temporary').go() -``` -```sql -DROP TABLE IF EXISTS `temporary`; -``` +More examples you can find in [documentation](https://github.com/co0lc0der/simple-query-builder-python/blob/main/docs/index.md) + +## ToDo +I'm going to add the next features into future versions +- write more unit testes +- add subqueries for QueryBuilder +- add `BETWEEN` +- add `WHERE EXISTS` +- add TableBuilder class (for beginning `CREATE TABLE`, move `qb.drop()` and `qb.truncate()` into it) +- add MySQL support +- add PostgreSQL support +- add `WITH` +- and probably something more diff --git a/ToDo.md b/ToDo.md new file mode 100644 index 0000000..fa4dfa5 --- /dev/null +++ b/ToDo.md @@ -0,0 +1,11 @@ +## ToDo +I'm going to add the next features into future versions +- write more unit testes +- add subqueries for QueryBuilder +- add `BETWEEN` +- add `WHERE EXISTS` +- add TableBuilder class (for beginning `CREATE TABLE`, move `qb.drop()` and `qb.truncate()` into it) +- add MySQL support +- add PostgreSQL support +- add `WITH` +- and probably something more diff --git a/docs/Delete.md b/docs/Delete.md new file mode 100644 index 0000000..f411d22 --- /dev/null +++ b/docs/Delete.md @@ -0,0 +1,37 @@ +# DELETE +## Delete a row +```python +qb.delete('users')\ + .where([['name', '=', 'John']])\ + .go() +``` +or since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +qb.delete('users')\ + .where([['name', 'John']])\ + .go() +``` +Result query +```sql +DELETE FROM `users` WHERE `name` = 'John'; +``` +## Delete rows +```python +qb.delete('comments')\ + .where([['user_id', '=', 10]])\ + .go() +``` +or since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +qb.delete('comments')\ + .where([['user_id', 10]])\ + .go() +``` +Result query +```sql +DELETE FROM `comments` WHERE `user_id` = 10; +``` + +To the [TABLE section](Table.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Init.md b/docs/Init.md new file mode 100644 index 0000000..6291e33 --- /dev/null +++ b/docs/Init.md @@ -0,0 +1,21 @@ +# Initialization +## Import the module and init `QueryBuilder` with `Database()` +```python +from simple_query_builder import * + +# if you want to get results as a list of dictionaries (by default since 0.3.5) +qb = QueryBuilder(DataBase(), 'my_db.db') # result_dict=True, print_errors=False + +# or if you want to get results as a list of tuples (since 0.3.5) +qb = QueryBuilder(DataBase(), 'my_db.db', result_dict=False) + +# for printing errors into terminal (since 0.3.5) +qb = QueryBuilder(DataBase(), 'my_db.db', print_errors=True) + +# DB in memory +qb = QueryBuilder(DataBase(), ':memory:') +``` + +To the [Methods section](Methods.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Insert.md b/docs/Insert.md new file mode 100644 index 0000000..1062f4f --- /dev/null +++ b/docs/Insert.md @@ -0,0 +1,33 @@ +# INSERT +## Insert a row +```python +new_id = qb.insert('groups', { + 'name': 'Moderator', + 'permissions': 'moderator' +}).go() +``` +Result query +```sql +INSERT INTO `groups` (`name`, `permissions`) VALUES ('Moderator', 'moderator'); +``` +## Insert many rows +```python +qb.insert('groups', [['name', 'role'], + ['Moderator', 'moderator'], + ['Moderator2', 'moderator'], + ['User', 'user'], + ['User2', 'user'] +]).go() +``` +Result query +```sql +INSERT INTO `groups` (`name`, `role`) +VALUES ('Moderator', 'moderator'), + ('Moderator2', 'moderator'), + ('User', 'user'), + ('User2', 'user'); +``` + +To the [UPDATE section](Update.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Install.md b/docs/Install.md new file mode 100644 index 0000000..01767e5 --- /dev/null +++ b/docs/Install.md @@ -0,0 +1,16 @@ +# Installation + +Install the current version with [PyPI](https://pypi.org/project/simple-query-builder): + +```bash +pip install simple-query-builder +``` + +Or from Github: +```bash +pip install https://github.com/co0lc0der/simple-query-builder-python/archive/main.zip +``` + +To the [Init section](Init.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Methods.md b/docs/Methods.md new file mode 100644 index 0000000..816562d --- /dev/null +++ b/docs/Methods.md @@ -0,0 +1,26 @@ +# Main public methods +## QueryBuilder class +- `query(sql, params, fetch_type, col_index)` executes prepared `sql` with `params`, it can be used for custom queries +- `get_sql()` returns SQL query string which will be executed +- `__str__()` works the same as `get_sql()` +- `get_params()` returns a tuple of parameters for a query +- `get_result()` returns query's result +- `get_count()` returns result's rows count +- `has_error()` returns `True` if an error is had +- `get_error()` returns `True` if an error is had, ***this method will be changed in the next version!*** +- `get_error_message()` returns an error message if an error is had +- `set_error(message)` sets `_error` to `True` and `_error_message` +- `get_first()` returns the first item of results +- `get_last()` returns the last item of results +- `reset()` resets state to default values +- `all()` executes SQL query and returns **all rows** of result (`fetchall()`) +- `one()` executes SQL query and returns **the first row** of result (`fetchone()`) +- `column(col)` executes SQL query and returns the needed column of result by its index or name, `col` is `0` by default +- `pluck(key, col)` executes SQL query and returns a list of tuples/dicts (the key (usually ID) and the needed column of result) by its indexes or names, `key` is `0` and `col` is `1` by default +- `go()` this method is for non `SELECT` queries. it executes SQL query and returns nothing (but returns the last inserted row ID for `INSERT` method) +- `exists()` returns `True` if SQL query has at least one row and `False` if it hasn't +- `count()` prepares a query with SQL `COUNT(*)` function and _executes it_ + +'SQL' methods are presented in the [next section](Select.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Select.md b/docs/Select.md new file mode 100644 index 0000000..77524c3 --- /dev/null +++ b/docs/Select.md @@ -0,0 +1,186 @@ +# SELECT +## Simple queries +### Select all rows from a table +```python +results = qb.select('users').all() +``` +Result query +```sql +SELECT * FROM `users`; +``` +### Select a row with a condition +```python +results = qb.select('users').where([['id', '=', 10]]).one() +``` +It's able not using equals `=` in `WHERE` conditions since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +results = qb.select('users').where([['id', 10]]).one() +``` +Result query +```sql +SELECT * FROM `users` WHERE `id` = 10; +``` +### Select rows with two conditions +```python +results = qb.select('users').where([['id', '>', 1], 'and', ['group_id', '=', 2]]).all() +``` +or since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +results = qb.select('users').where([['id', '>', 1], 'and', ['group_id', 2]]).all() +``` +Result query +```sql +SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2); +``` +### Select a row with a `LIKE` and `NOT LIKE` condition +```python +results = qb.select('users').like(['name', '%John%']).all() + +# or +results = qb.select('users').where([['name', 'LIKE', '%John%']]).all() +``` +or it's able to use two strings in parameters since [v0.3.5](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.5) +```python +results = qb.select('users').like('name', '%John%').all() +``` +Result query +```sql +SELECT * FROM `users` WHERE (`name` LIKE '%John%'); +``` +```python +results = qb.select('users').not_like(['name', '%John%']).all() + +# or +results = qb.select('users').where([['name', 'NOT LIKE', '%John%']]).all() +``` +or it's able to use two strings in parameters since [v0.3.5](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.5) +```python +results = qb.select('users').not_like('name', '%John%').all() +``` +Result query +```sql +SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%'); +``` +### Select a row with a `IS NULL` and `IS NOT NULL` condition +since [v0.3.5](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.5) +```python +results = qb.select('users').is_null('phone').all() + +# or +results = qb.select('users').where([['phone', 'is null']]).all() +``` +Result query +```sql +SELECT * FROM `users` WHERE (`phone` IS NULL); +``` +```python +results = qb.select('customers').is_not_null('address').all() + +# or +results = qb.select('customers').not_null('address').all() + +# or +results = qb.select('customers').where([['address', 'is not null']]).all() +``` +Result query +```sql +SELECT * FROM `customers` WHERE (`address` IS NOT NULL); +``` +### Select rows with `OFFSET` and `LIMIT` +```python +results = qb.select('posts')\ + .where([['user_id', '=', 3]])\ + .offset(14)\ + .limit(7)\ + .all() +``` +It's able not using equals `=` in `WHERE` conditions since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +results = qb.select('posts')\ + .where([['user_id', 3]])\ + .offset(14)\ + .limit(7)\ + .all() +``` +Result query +```sql +SELECT * FROM `posts` WHERE (`user_id` = 3) OFFSET 14 LIMIT 7; +``` +### Select custom fields with additional SQL +#### `COUNT()` +```python +results = qb.select('users', {'counter': 'COUNT(*)'}).one() + +# or +results = qb.count('users').one() +``` +Result query +```sql +SELECT COUNT(*) AS `counter` FROM `users`; +``` +#### `ORDER BY` +```python +results = qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ + .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]])\ + .order_by('b.id', 'desc')\ + .all() +``` +It's able not using equals `=` in `WHERE` conditions since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +results = qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ + .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]])\ + .order_by('b.id desc')\ + .all() +``` +Result query +```sql +SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` +WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) +ORDER BY `b`.`id` DESC; +``` +#### `DISTINCT` +```python +results = qb.select('customers', ['city', 'country'], True).order_by('country desc').all() +``` +Result query +```sql +SELECT DISTINCT `city`, `country` FROM `customers` ORDER BY `country` DESC; +``` +#### `GROUP BY` and `HAVING` +```python +results = qb.select('posts', ['id', 'category', 'title'])\ + .where([['views', '>=', 1000]])\ + .group_by('category')\ + .all() +``` +Result query +```sql +SELECT `id`, `category`, `title` FROM `posts` +WHERE (`views` >= 1000) GROUP BY `category`; +``` +More complicated example +```python +groups = qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ + .where([['YEAR(`created_at`)', '=', 2020]])\ + .group_by('month_num')\ + .having([['total', '=', 20000]])\ + .all() +``` +It's able not using equals `=` in `HAVING` conditions since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +groups = qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ + .where([['YEAR(`created_at`)', 2020]])\ + .group_by('month_num')\ + .having([['total', 20000]])\ + .all() +``` +Result query +```sql +SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` +FROM `orders` WHERE (YEAR(`created_at`) = 2020) +GROUP BY `month_num` HAVING (`total` = 20000); +``` + +To the [SELECT extensions section](Select_ext.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Select_ext.md b/docs/Select_ext.md new file mode 100644 index 0000000..f2e1a8d --- /dev/null +++ b/docs/Select_ext.md @@ -0,0 +1,186 @@ +# SELECT extensions +## `JOIN` +SQLite supports `INNER`, `LEFT OUTER` and `CROSS` joins (`INNER` is by default) +### `INNER JOIN` +```python +results = qb.select({'u': 'users'}, [ + 'u.id', + 'u.email', + 'u.username', + {'perms': 'groups.permissions'} + ])\ + .join('groups', ['u.group_id', 'groups.id'])\ + .limit(5)\ + .all() +``` +Result query +```sql +SELECT `u`.`id`, `u`.`email`, `u`.`username`, `groups`.`permissions` AS `perms` +FROM `users` AS `u` +INNER JOIN `groups` ON `u`.`group_id` = `groups`.`id` +LIMIT 5; +``` +More complicated examples +```python +results = qb.select({'cp': 'cabs_printers'}, [ + 'cp.id', + 'cp.cab_id', + {'cab_name': 'cb.name'}, + 'cp.printer_id', + {'printer_name': 'p.name'}, + {'cartridge_type': 'c.name'}, + 'cp.comment' + ])\ + .join({'cb': 'cabs'}, ['cp.cab_id', 'cb.id'])\ + .join({'p': 'printer_models'}, ['cp.printer_id', 'p.id'])\ + .join({'c': 'cartridge_types'}, 'p.cartridge_id=c.id')\ + .where([['cp.cab_id', 'in', [11, 12, 13]], 'or', ['cp.cab_id', '=', 5], 'and', ['p.id', '>', 'c.id']])\ + .all() +``` +Result query +```sql +SELECT `cp`.`id`, `cp`.`cab_id`, `cb`.`name` AS `cab_name`, `cp`.`printer_id`, + `p`.`name` AS `printer_name`, `c`.`name` AS `cartridge_type`, `cp`.`comment` +FROM `cabs_printers` AS `cp` +INNER JOIN `cabs` AS `cb` ON `cp`.`cab_id` = `cb`.`id` +INNER JOIN `printer_models` AS `p` ON `cp`.`printer_id` = `p`.`id` +INNER JOIN `cartridge_types` AS `c` ON p.cartridge_id=c.id +WHERE (`cp`.`cab_id` IN (11, 12, 13)) OR (`cp`.`cab_id` = 5) AND (`p`.`id` > `c`.`id`); +``` +It's able not using equals `=` in `JOIN` conditions since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +results = qb.select({'cp': 'cabs_printers'}, [ + 'cp.id', + 'cp.cab_id', + {'cab_name': 'cb.name'}, + 'cp.printer_id', + {'cartridge_id': 'c.id'}, + {'printer_name': 'p.name'}, + {'cartridge_type': 'c.name'}, + 'cp.comment' + ])\ + .join({'cb': 'cabs'}, ['cp.cab_id', 'cb.id'])\ + .join({'p': 'printer_models'}, ['cp.printer_id', 'p.id'])\ + .join({'c': 'cartridge_types'}, ['p.cartridge_id', 'c.id'])\ + .group_by(['cp.printer_id', 'cartridge_id'])\ + .order_by(['cp.cab_id', 'cp.printer_id desc'])\ + .all() +``` +Result query +```sql +SELECT `cp`.`id`, `cp`.`cab_id`, `cb`.`name` AS `cab_name`, `cp`.`printer_id`, `c`.`id` AS `cartridge_id`, + `p`.`name` AS `printer_name`, `c`.`name` AS `cartridge_type`, `cp`.`comment` +FROM `cabs_printers` AS `cp` +INNER JOIN `cabs` AS `cb` ON `cp`.`cab_id` = `cb`.`id` +INNER JOIN `printer_models` AS `p` ON `cp`.`printer_id` = `p`.`id` +INNER JOIN `cartridge_types` AS `c` ON `p`.`cartridge_id` = `c`.`id` +GROUP BY `cp`.`printer_id`, `cartridge_id` +ORDER BY `cp`.`cab_id` ASC, `cp`.`printer_id` DESC; +``` +### `LEFT [OUTER] JOIN` +```python +# LEFT JOIN +results = qb.select('employees', ['employees.employee_id', 'employees.last_name', 'positions.title'])\ + .join('positions', ['employees.position_id', 'positions.position_id'], join_type="left")\ + .all() + +# or LEFT OUTER JOIN +results = qb.select('employees', ['employees.employee_id', 'employees.last_name', 'positions.title'])\ + .join('positions', ['employees.position_id', 'positions.position_id'], join_type="left outer")\ + .all() +``` +Result query +```sql +SELECT `employees`.`employee_id`, `employees`.`last_name`, `positions`.`title` FROM `employees` +LEFT [OUTER] JOIN `positions` ON `employees`.`position_id` = `positions`.`position_id`; +``` +## `INTERSECT` +Since [v0.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.4) +```python +results = qb.select('departments', ['department_id']).intersect_select('employees').all() +``` +Result query +```sql +SELECT `department_id` FROM `departments` +INTERSECT +SELECT `department_id` FROM `employees`; +``` +One more example +```python +results = qb.select('contacts', ['contact_id', 'last_name', 'first_name']).where([['contact_id', '>', 50]])\ + .intersect()\ + .select('customers', ['customer_id', 'last_name', 'first_name']).where([['last_name', '<>', 'Zagoskin']])\ + .all() +``` +Result query +```sql +SELECT `contact_id`, `last_name`, `first_name` FROM `contacts` WHERE (`contact_id` > 50) +INTERSECT +SELECT `customer_id`, `last_name`, `first_name` FROM `customers` WHERE (`last_name` <> 'Zagoskin'); +``` +## `EXCEPT` +Since [v0.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.4) +```python +results = qb.select('departments', ['department_id']).except_select('employees').all() +``` +Result query +```sql +SELECT `department_id` FROM `departments` +EXCEPT +SELECT `department_id` FROM `employees`; +``` +One more example +```python +results = qb.select('suppliers', ['supplier_id', 'state']).where([['state', 'Nevada']])\ + .excepts()\ + .select('companies', ['company_id', 'state']).where([['company_id', '<', 2000]])\ + .order_by('1 desc').all() +``` +Result query +```sql +SELECT `supplier_id`, `state` FROM `suppliers` WHERE (`state` = 'Nevada') +EXCEPT +SELECT `company_id`, `state` FROM `companies` WHERE (`company_id` < 2000) ORDER BY `1` DESC; +``` +## `UNION` and `UNION ALL` +Since [v0.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.4) +### `UNION` +```python +results = qb.select('clients', ['name', 'age'])\ + .union()\ + .select('employees', ['name', 'age'])\ + .all() + +# or +results = qb.select('clients', ['name', 'age'])\ + .union_select('employees')\ + .all() +``` +Result query +```sql +SELECT `name`, `age` FROM `clients` +UNION +SELECT `name`, `age` FROM `employees`; +``` +### `UNION ALL` +```python +results = qb.select('clients', ['name', 'age'])\ + .union(True)\ + .select('employees', ['name', 'age'])\ + .all() + +# or +results = qb.select('clients', ['name', 'age'])\ + .union_select('employees', True)\ + .all() +``` +Result query +```sql +SELECT `name`, `age` FROM `clients` +UNION ALL +SELECT `name`, `age` FROM `employees`; +``` + +To the [INSERT section](Insert.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Table.md b/docs/Table.md new file mode 100644 index 0000000..0284a51 --- /dev/null +++ b/docs/Table.md @@ -0,0 +1,35 @@ +# TABLE +## TRUNCATE +Truncate a table + +**! This method will be moved into _TableBuilder_ class !** +```python +qb.truncate('users').go() +``` +Result query +```sql +TRUNCATE TABLE `users`; +``` +## DROP +- Drop a table + +**! This method will be moved into _TableBuilder_ class !** +```python +qb.drop('temporary').go() +``` +Result query +```sql +DROP TABLE IF EXISTS `temporary`; +``` +- Without `IF EXISTS` +```python +qb.drop('temporary', False).go() +``` +Result query +```sql +DROP TABLE `temporary`; +``` + +To the [View section](View.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/Update.md b/docs/Update.md new file mode 100644 index 0000000..f97cc97 --- /dev/null +++ b/docs/Update.md @@ -0,0 +1,41 @@ +# UPDATE +## Update a row +```python +qb.update('users', { + 'username': 'John Doe', + 'status': 'new status' + })\ + .where([['id', '=', 7]])\ + .limit()\ + .go() +``` +or since [v0.3.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.3.4) +```python +qb.update('users', { + 'username': 'John Doe', + 'status': 'new status' + })\ + .where([['id', 7]])\ + .limit()\ + .go() +``` +Result query +```sql +UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' +WHERE `id` = 7 LIMIT 1; +``` +## Update rows +```python +qb.update('posts', {'status': 'published'})\ + .where([['YEAR(`updated_at`)', '>', 2020]])\ + .go() +``` +Result query +```sql +UPDATE `posts` SET `status` = 'published' +WHERE (YEAR(`updated_at`) > 2020); +``` + +To the [DELETE section](Delete.md) + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/View.md b/docs/View.md new file mode 100644 index 0000000..d81c6f1 --- /dev/null +++ b/docs/View.md @@ -0,0 +1,55 @@ +# VIEW +Since [v0.4](https://github.com/co0lc0der/simple-query-builder-python/releases/tag/v0.4) +## CREATE +Create a view from SELECT query +```python +qb.select('users')\ + .where([['email', 'is null'], 'or', ['email', '']])\ + .create_view('users_no_email')\ + .go() +``` +Result query +```sql +CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL) OR (`email` = ''); +``` +One more example +```python +qb.select('users')\ + .is_null('email')\ + .create_view('users_no_email')\ + .go() +``` +Result query +```sql +CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL); +``` +- Without `IF EXISTS` +```python +qb.select('users')\ + .is_null('email')\ + .create_view('users_no_email', False)\ + .go() +``` +Result query +```sql +CREATE VIEW `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL); +``` +## DROP +- Drop a view +```python +qb.drop_view('users_no_email').go() +``` +Result query +```sql +DROP VIEW IF EXISTS `users_no_email`; +``` +- Without `IF EXISTS` +```python +qb.drop_view('users_no_email', False).go() +``` +Result query +```sql +DROP VIEW `users_no_email`; +``` + +Back to [doc index](index.md) or [readme](../README.md) diff --git a/docs/index.md b/docs/index.md new file mode 100644 index 0000000..7da2ea2 --- /dev/null +++ b/docs/index.md @@ -0,0 +1,20 @@ +# Index + +[![Latest Version](https://img.shields.io/github/release/co0lc0der/simple-query-builder-python?color=orange&style=flat-square)](https://github.com/co0lc0der/simple-query-builder-python/release) +![Python 3.7, 3.8, 3.9, 3.10, 3.11, 3.12](https://img.shields.io/pypi/pyversions/simple-query-builder?color=blueviolet&style=flat-square) + +- [Install](Install.md) +- [Initialization](Init.md) +- Main [public methods](Methods.md) +- Data manipulation + * [SELECT](Select.md) + + [SELECT extensions](Select_ext.md) (JOIN, UNION etc) + * [INSERT](Insert.md) + * [UPDATE](Update.md) + * [DELETE](Delete.md) +- Tables + * [TABLE](Table.md) +- Views + * [VIEW](View.md) + +Go to [readme](../README.md) diff --git a/example/example.py b/example/example.py new file mode 100644 index 0000000..6d94bb2 --- /dev/null +++ b/example/example.py @@ -0,0 +1,7 @@ +from simple_query_builder import * + +qb = QueryBuilder(DataBase(), 'my_db.db') + +res = qb.select('users').all() +# SELECT * FROM `users` +print(res) diff --git a/setup.cfg b/setup.cfg index 00aca2e..36ff2f5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,3 @@ [egg_info] -tag_build = +tag_build = 0.4 tag_date = 0 diff --git a/setup.py b/setup.py index dcb65f8..8218d85 100644 --- a/setup.py +++ b/setup.py @@ -5,17 +5,17 @@ """ :authors: co0lc0der -:license: MIT, see LICENSE file -:copyright: (c) 2022 co0lc0der +:license: MIT +:copyright: (c) 2022-2024 co0lc0der """ -version = '0.3.0' +version = '0.4' with open('README.md', encoding='utf-8') as f: long_description = f.read() setup( - name='simple-query-builder', + name='simple_query_builder', version=version, author='co0lc0der', @@ -34,15 +34,15 @@ version ), - license='MIT, see LICENSE.md file', + license='MIT', - packages=['simple-query-builder'], - install_requires=['sqlite3', 'Union'], + packages=['simple_query_builder'], classifiers=[ + 'Topic :: Database', + 'Topic :: Database :: Database Engines/Servers', 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', - 'Intended Audience :: End Users/Desktop', 'Intended Audience :: Developers', 'Programming Language :: Python', 'Programming Language :: Python :: 3', @@ -50,7 +50,11 @@ 'Programming Language :: Python :: 3.8', 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', 'Programming Language :: Python :: Implementation :: PyPy', 'Programming Language :: Python :: Implementation :: CPython', + 'Programming Language :: SQL', + 'Typing :: Typed' ] ) diff --git a/simple-query-builder/querybuilder.py b/simple-query-builder/querybuilder.py deleted file mode 100644 index 53bf019..0000000 --- a/simple-query-builder/querybuilder.py +++ /dev/null @@ -1,499 +0,0 @@ -""" -:authors: co0lc0der -:license: MIT License, see LICENSE file -:copyright: (c) 2022 co0lc0der -""" - -import sqlite3 -import traceback -import sys -import inspect -from typing import Union - - -class MetaSingleton(type): - _instances = {} - - def __call__(cls, *args, **kwargs): - if cls not in cls._instances: - cls._instances[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs) - return cls._instances[cls] - - -class DataBase(metaclass=MetaSingleton): - db_name = 'db.db' - conn = None - cursor = None - - def connect(self, db_name=''): - if db_name != '': - self.db_name = db_name - - if self.conn is None: - self.conn = sqlite3.connect(self.db_name) - self.cursor = self.conn.cursor() - - return self.conn - - def c(self): - return self.cursor - - -class QueryBuilder: - _OPERATORS: list = ['=', '>', '<', '>=', '<=', '!=', 'LIKE', 'NOT LIKE', 'IN', 'NOT IN'] - _LOGICS: list = ['AND', 'OR', 'NOT'] - _SORT_TYPES: list = ['ASC', 'DESC'] - _JOIN_TYPES: list = ['INNER', 'LEFT OUTER', 'RIGHT OUTER', 'FULL OUTER', 'CROSS'] - _NO_FETCH: int = 0 - _FETCH_ONE: int = 1 - _FETCH_ALL: int = 2 - _FETCH_COLUMN: int = 3 - _conn = None - _cur = None - _query = None - _sql: str = '' - _error: bool = False - _error_message: str = '' - _result: Union[tuple, list] = [] - _count: int = -1 - _params: tuple = () - - def __init__(self, database: DataBase, db_name='') -> None: - self._conn = database.connect(db_name) - # self._conn.row_factory = sqlite3.Row - # self._conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)]) - self._cur = database.c() - - def query(self, sql: str = '', params=(), fetch=2, column=0): - if fetch == 2: - fetch = self._FETCH_ALL - - self.set_error() - - if sql: - self._sql = sql - - self.add_semicolon() - self._sql = self._sql.replace("'NULL'", "NULL") - - if params: - self._params = params - - try: - self._query = self._cur.execute(self._sql, self._params) - - if fetch == self._NO_FETCH: - self._conn.commit() - elif fetch == self._FETCH_ONE: - self._result = self._query.fetchone() - elif fetch == self._FETCH_ALL: - self._result = self._query.fetchall() - elif fetch == self._FETCH_COLUMN: - self._result = [x[column] for x in self._query.fetchall()] - - if self._result: - self._count = len(self._result) - - self.set_error() - except sqlite3.Error as er: - self._error = True - print('SQLite error: %s' % (' '.join(er.args))) - print("Exception class is: ", er.__class__) - print('SQLite traceback: ') - exc_type, exc_value, exc_tb = sys.exc_info() - print(traceback.format_exception(exc_type, exc_value, exc_tb)) - - return self - - def add_semicolon(self, sql: str = '') -> str: - new_sql = self._sql if sql == '' else sql - - if new_sql != '': - new_sql += ';' if new_sql[-1] != ';' else '' - - if sql == '': - self._sql = new_sql - - return new_sql - - def get_sql(self) -> str: - return self._sql - - def get_error(self) -> bool: - return self._error - - def get_error_message(self) -> str: - return self._error_message - - def set_error(self, message: str = '') -> None: - self._error = message != '' - self._error_message = message - - def get_params(self) -> tuple: - return self._params - - def get_result(self) -> Union[tuple, list]: - return self._result - - def get_count(self) -> int: - return self._count - - def reset(self) -> None: - self._sql = '' - self._params = () - self._query = None - self._result = [] - self._count = -1 - self.set_error() - - def all(self) -> Union[list, dict]: - self.query() - return self._result - - def one(self) -> Union[list, dict]: - self.query(self._sql, self._params, self._FETCH_ONE) - return self._result - - def go(self) -> Union[int, None]: - self.query(self._sql, self._params, self._NO_FETCH) - return self._cur.lastrowid - - def column(self, column=0): - self.query('', (), self._FETCH_COLUMN, column) - return self._result - - def count(self, table: Union[str, dict], field: str = ''): - if table == '' or table == {}: - self.set_error(f"Empty table in {inspect.stack()[0][3]} method") - return self - - if field == '': - self.select(table, "COUNT(*) AS `counter`") - else: - field = field.replace('.', '`.`') - self.select(table, f"COUNT(`{field}`) AS `counter`") - - return self.one()[0] - - def get_first(self): - return self.one() - - def get_last(self): - self.all() - return self._result[-1] - - def exists(self) -> bool: - result = self.one() - return self._count > 0 - - def _prepare_aliases(self, items: Union[str, list, dict], as_list: bool = False) -> Union[str, list]: - if items == '' or items == {} or items == []: - self.set_error(f"Empty items in {inspect.stack()[0][3]} method") - return '' - - sql = [] - if isinstance(items, str): - sql.append(items) - elif isinstance(items, list) or isinstance(items, dict): - for item in items: - if isinstance(items, list): - if isinstance(item, str): - new_item = item.replace('.', '`.`') - sql.append(f"`{new_item}`") - elif isinstance(item, dict): - first_item = list(item.values())[0].replace('.', '`.`') - alias = list(item.keys())[0] - if first_item.find('(') > -1 or first_item.find(')') > -1: - sql.append(f"{first_item}" if isinstance(alias, int) else f"{first_item} AS `{alias}`") - else: - sql.append(f"`{first_item}`" if isinstance(alias, int) else f"`{first_item}` AS `{alias}`") - elif isinstance(items, dict): - new_item = items[item].replace('.', '`.`') - if new_item.find('(') > -1 or new_item.find(')') > -1: - sql.append(f"{new_item}" if isinstance(item, int) else f"{new_item} AS `{item}`") - else: - sql.append(f"`{new_item}`" if isinstance(item, int) else f"`{new_item}` AS `{item}`") - else: - self.set_error(f"Incorrect type of items in {inspect.stack()[0][3]} method") - return '' - - return ', '.join(sql) if not as_list else sql - - def _prepare_conditions(self, where: Union[str, list]) -> dict: - result = {'sql': '', 'values': []} - sql = '' - - if not where: - return result - - if isinstance(where, str): - sql += f"{where}" - elif isinstance(where, list): - for cond in where: - if isinstance(cond, list): - if len(cond) == 3: - field = cond[0].replace('.', '`.`') - operator = cond[1].upper() - value = cond[2] - if operator in self._OPERATORS: - if operator == 'IN' and (isinstance(value, list) or isinstance(value, tuple)): - values = ("?," * len(value)).rstrip(',') - sql += f"(`{field}` {operator} ({values}))" - for item in value: - result['values'].append(item) - else: - sql += f"({field} {operator} ?)" if field.find('(') > -1 or field.find(')') > -1 else f"(`{field}` {operator} ?)" - result['values'].append(value) - elif isinstance(cond, str): - upper = cond.upper() - if upper in self._LOGICS: - sql += f" {upper} " - else: - self.set_error(f"Incorrect type of where in {inspect.stack()[0][3]} method") - return result - - result['sql'] = sql - - return result - - def select(self, table: Union[str, dict], fields: Union[str, list, dict] = '*'): - if table == '' or table == {} or fields == '' or fields == [] or fields == {}: - self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") - return self - - self.reset() - - if isinstance(fields, dict) or isinstance(fields, list): - self._sql = f"SELECT {self._prepare_aliases(fields)}" - elif isinstance(fields, str): - self._sql = f"SELECT {fields}" - else: - self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") - return self - - if isinstance(table, dict): - self._sql += f" FROM {self._prepare_aliases(table)}" - elif isinstance(table, str): - self._sql += f" FROM `{table}`" - else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") - return self - - return self - - def where(self, where: Union[str, list], addition: str = ''): - if where == '' or where == []: - self.set_error(f"Empty where in {inspect.stack()[0][3]} method") - return self - - conditions = self._prepare_conditions(where) - - if addition != '': - self._sql += f" WHERE {conditions['sql']} {addition}" - else: - self._sql += f" WHERE {conditions['sql']}" - - if isinstance(conditions['values'], list) and conditions['values'] != []: - self._params += tuple(conditions['values']) - - return self - - def having(self, having: Union[str, list]): - if having == '' or having == []: - self.set_error(f"Empty having in {inspect.stack()[0][3]} method") - return self - - conditions = self._prepare_conditions(having) - - self._sql += f" HAVING {conditions['sql']}" - - if isinstance(conditions['values'], list) and conditions['values'] != []: - self._params += tuple(conditions['values']) - - return self - - def like(self, cond: Union[str, tuple, list] = ()): - if cond: - if isinstance(cond, str): - self.where(cond) - elif isinstance(cond, tuple) or isinstance(cond, list): - self.where([[cond[0], 'LIKE', cond[1]]]) - return self - - def not_like(self, cond: Union[str, tuple, list] = ()): - if cond: - if isinstance(cond, str): - self.where(cond) - elif isinstance(cond, tuple) or isinstance(cond, list): - self.where([[cond[0], 'NOT LIKE', cond[1]]]) - return self - - def limit(self, limit: int = 1): - self._sql += f" LIMIT {limit}" - return self - - def offset(self, offset: int = 0): - self._sql += f" OFFSET {offset}" - return self - - def order_by(self, field: str = '', sort: str = 'ASC'): - if field == '' or sort == '': - self.set_error(f"Empty field or sort in {inspect.stack()[0][3]} method") - return self - - sort = sort.upper() - field = field.replace('.', '`.`') - - if sort in self._SORT_TYPES: - self._sql += f" ORDER BY `{field}` {sort}" - else: - self._sql += f" ORDER BY `{field}`" - return self - - def group_by(self, field: str = ''): - if field == '': - self.set_error(f"Empty field in {inspect.stack()[0][3]} method") - return self - - field = field.replace('.', '`.`') - self._sql += f" GROUP BY `{field}`" - return self - - def delete(self, table: Union[str, dict]): - if table == '' or table == {}: - self.set_error(f"Empty table in {inspect.stack()[0][3]} method") - return self - - if isinstance(table, dict): - table = f"`{self._prepare_aliases(table)}`" - elif isinstance(table, str): - table = f"`{table}`" - else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") - return self - - self.reset() - - self._sql = f"DELETE FROM {table}" - return self - - def insert(self, table: Union[str, dict], fields: Union[list, dict]): - if table == '' or table == {} or fields == [] or fields == {}: - self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") - return self - - if isinstance(table, dict): - table = f"`{self._prepare_aliases(table)}`" - elif isinstance(table, str): - table = f"`{table}`" - else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") - return self - - self.reset() - - if isinstance(fields, dict): - values = ("?," * len(fields)).rstrip(',') - self._sql = f"INSERT INTO {table} (`" + '`, `'.join(list(fields.keys())) + f"`) VALUES ({values})" - self._params = tuple(fields.values()) - elif isinstance(fields, list): - names = fields.pop(0) - value = ("?," * len(names)).rstrip(',') - v = f"({value})," - values = (v * len(fields)).rstrip(',') - self._sql = f"INSERT INTO {table} (`" + '`, `'.join(names) + f"`) VALUES {values}" - params = [] - for item in fields: - if isinstance(item, list): - for subitem in item: - params.append(subitem) - self._params = tuple(params) - else: - self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") - return self - - return self - - def update(self, table: Union[str, dict], fields: Union[list, dict]): - if table == '' or table == {} or fields == [] or fields == {}: - self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") - return self - - if isinstance(table, dict): - table = f"`{self._prepare_aliases(table)}`" - elif isinstance(table, str): - table = f"`{table}`" - else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") - return self - - if isinstance(fields, list) or isinstance(fields, dict): - sets = '' - for item in fields: - sets += f" `{item.replace('.', '`.`')}` = ?," - sets = sets.rstrip(',') - else: - self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") - return self - - self.reset() - - self._sql = f"UPDATE {table} SET{sets}" - self._params = tuple(fields.values()) - - return self - - def join(self, table: Union[str, dict] = '', on: Union[str, tuple, list] = (), join_type: str = 'INNER'): - join_type = join_type.upper() - if join_type == '' or join_type not in self._JOIN_TYPES: - self.set_error(f"Empty join_type or is not allowed in {inspect.stack()[0][3]} method") - return self - - if table == '' or table == {}: - self.set_error(f"Empty table in {inspect.stack()[0][3]} method") - return self - - if isinstance(table, dict): - self._sql += f" {join_type} JOIN {self._prepare_aliases(table)}" - elif isinstance(table, str): - self._sql += f" {join_type} JOIN `{table}`" - else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") - return self - - if on: - if isinstance(on, tuple) or isinstance(on, list): - field1 = f"`{on[0].replace('.', '`.`')}`" - field2 = f"`{on[1].replace('.', '`.`')}`" - self._sql += f" ON {field1} = {field2}" - elif isinstance(on, str): - self._sql += f" ON {on}" - else: - self.set_error(f"Incorrect type of on in {inspect.stack()[0][3]} method. On must be String, Tuple or List") - return self - - self.set_error() - - return self - - def drop(self, table: str, add_exists: bool = True): - if table == '': - self.set_error(f"Empty table in {inspect.stack()[0][3]} method") - return self - - exists = 'IF EXISTS ' if add_exists else '' - - self.reset() - self._sql = f"DROP TABLE {exists}`{table}`" - - return self - - def truncate(self, table: str): - if table == '': - self.set_error(f"Empty table in {inspect.stack()[0][3]} method") - return self - - self.reset() - self._sql = f"TRUNCATE TABLE `{table}`" - - return self diff --git a/simple-query-builder/__init__.py b/simple_query_builder/__init__.py similarity index 56% rename from simple-query-builder/__init__.py rename to simple_query_builder/__init__.py index ff33c44..6fb9ec1 100644 --- a/simple-query-builder/__init__.py +++ b/simple_query_builder/__init__.py @@ -1,11 +1,11 @@ """ :authors: co0lc0der -:license: MIT, see LICENSE file -:copyright: (c) 2022 co0lc0der +:license: MIT +:copyright: (c) 2022-2024 co0lc0der """ from .querybuilder import * __author__ = 'co0lc0der' -__version__ = '0.3.0' +__version__ = '0.4' __email__ = 'c0der@ya.ru' diff --git a/simple_query_builder/database.py b/simple_query_builder/database.py new file mode 100644 index 0000000..41d3923 --- /dev/null +++ b/simple_query_builder/database.py @@ -0,0 +1,42 @@ +""" +:authors: co0lc0der +:license: MIT +:copyright: (c) 2022-2024 co0lc0der +""" + +import sqlite3 + + +class MetaSingleton(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(MetaSingleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] + + +class DataBase(metaclass=MetaSingleton): + _driver = "sqlite" + _db_name = ":memory:" + _conn = None + _cursor = None + + def connect(self, db_name: str = "", uri: bool = False): + if db_name: + self._db_name = db_name + + if self._conn is None: + if self._driver == "sqlite": + self._conn = sqlite3.connect(self._db_name, uri=uri) + self._cursor = self._conn.cursor() + else: + print("Wrong DB driver. At present time it's supported 'sqlite' only") + + return self._conn + + def c(self): + return self._cursor + + def get_driver(self): + return self._driver diff --git a/simple_query_builder/querybuilder.py b/simple_query_builder/querybuilder.py new file mode 100644 index 0000000..7672103 --- /dev/null +++ b/simple_query_builder/querybuilder.py @@ -0,0 +1,754 @@ +""" +:authors: co0lc0der +:license: MIT +:copyright: (c) 2022-2024 co0lc0der +""" + +import inspect +import sys +import traceback +from typing import Union +from database import * + + +class QueryBuilder: + _COND_OPERATORS: list = [ + "=", + ">", + "<", + ">=", + "<=", + "!=", + "<>", + "LIKE", + "NOT LIKE", + "IN", + "NOT IN", + ] + _FIELD_SPEC_CHARS: list = [ + "+", + "-", + "*", + "/", + "%", + "(", + ")", + "||", + ] + _LOGICS: list = [ + "AND", + "OR", + "NOT" + ] + _SORT_TYPES: list = [ + "ASC", + "DESC" + ] + _JOIN_TYPES: list = [ + "INNER", + "LEFT", + "LEFT OUTER", + "RIGHT OUTER", + "FULL OUTER", + "CROSS" + ] + _SQLITE_JOIN_TYPES: list = [ + "INNER", + "LEFT", + "LEFT OUTER", + "CROSS" + ] + _NO_FETCH: int = 0 + _FETCH_ONE: int = 1 + _FETCH_ALL: int = 2 + _FETCH_COLUMN: int = 3 + _db = None + _conn = None + _cur = None + _query = None + _sql: str = "" + _concat = False + _error: bool = False + _error_message: str = "" + _print_errors: bool = False + _result: Union[tuple, list] = [] + _result_dict = True + _count: int = -1 + _params: tuple = () + _fields = [] + + def __init__(self, database: DataBase, db_name: str = "", result_dict: bool = True, + print_errors: bool = False, uri: bool = False) -> None: + if database: + self._db = database + self._conn = self._db.connect(db_name, uri) + self._set_row_factory(result_dict) + self._cur = self._conn.cursor() + else: + self.set_error(f"Empty database in {inspect.stack()[0][3]} method") + self._print_errors = print_errors + + def _set_row_factory(self, result_dict: bool = True): + self._result_dict = result_dict + # self._conn.row_factory = sqlite3.Row + if self._result_dict: + self._conn.row_factory = lambda c, r: dict( + [(col[0], r[idx]) for idx, col in enumerate(c.description)] + ) + + def query(self, sql: str = "", params: tuple = (), fetch: int = 2, column: Union[str, int] = 0): + if fetch == 2: + fetch = self._FETCH_ALL + + self.set_error() + + if sql: + self._sql = sql + + self.add_semicolon() + self._sql = self._sql.replace("'NULL'", "NULL") + + if params: + self._params = params + + try: + self._query = self._cur.execute(self._sql, self._params) + + if fetch == self._NO_FETCH: + self._conn.commit() + elif fetch == self._FETCH_ONE: + self._result = self._query.fetchone() + elif fetch == self._FETCH_ALL: + self._result = self._query.fetchall() + elif fetch == self._FETCH_COLUMN: + self._result = [x[column] for x in self._query.fetchall()] + + if self._result: + self._count = len(self._result) + + self.set_error() + except sqlite3.Error as er: + self._error = True + print("SQL: %s" % self._sql) + print("Params: %s" % str(self._params)) + print("SQLite error: %s" % (" ".join(er.args))) + print("Exception class is: ", er.__class__) + print("SQLite traceback: ") + exc_type, exc_value, exc_tb = sys.exc_info() + print(traceback.format_exception(exc_type, exc_value, exc_tb)) + + return self + + def add_semicolon(self, sql: str = "") -> str: + new_sql = self._sql if not sql else sql + + if new_sql: + new_sql += ";" if new_sql[-1] != ";" else "" + + if not sql: + self._sql = new_sql + + return new_sql + + def get_sql(self, with_values: bool = True) -> str: + sql = self._sql + params = self._params + if params and with_values: + # Replace ? with markers + for p in params: + if isinstance(p, str): + sql = sql.replace("?", f"'{p}'", 1) + else: + sql = sql.replace("?", str(p), 1) + return sql + + def get_error(self) -> bool: + """ + Logics of this method will be changed in next version, use has_error() instead + """ + return self._error + + def has_error(self) -> bool: + return self._error + + def get_error_message(self) -> str: + if self._print_errors and self._error: + print(self._error_message) + return self._error_message + + def set_error(self, message: str = "") -> None: + self._error = bool(message) + self._error_message = message + if self._print_errors and self._error: + print("QueryBuilder error:", self._error_message) + + def get_params(self) -> tuple: + return self._params + + def get_result(self) -> Union[tuple, list]: + return self._result + + def get_count(self) -> int: + return self._count + + def reset(self) -> bool: + self._sql = "" + self._params = () + self._fields = [] + self._query = None + self._result = [] + self._count = -1 + self.set_error() + self._concat = False + return True + + def all(self) -> Union[tuple, list, dict, None]: + self.query() + return self._result + + def one(self) -> Union[tuple, list, dict, None]: + self.query(self._sql, self._params, self._FETCH_ONE) + return self._result + + def go(self) -> Union[int, None]: + self.query(self._sql, self._params, self._NO_FETCH) + return self._cur.lastrowid + + def column(self, column: Union[str, int] = 0): + if (self._result_dict and isinstance(column, int)) or (not self._result_dict and isinstance(column, str)): + self.set_error(f"Incorrect type of column in {inspect.stack()[0][3]} method. Result dict is {self._result_dict}") + return self + + self.query(fetch=self._FETCH_COLUMN, column=column) + return self._result + + def pluck(self, key: Union[str, int] = 0, column: Union[str, int] = 1): + if (self._result_dict and (isinstance(key, int) or isinstance(column, int))) or\ + (not self._result_dict and (isinstance(key, str) or isinstance(column, str))): + self.set_error(f"Incorrect type of key or column in {inspect.stack()[0][3]} method. Result dict is {self._result_dict}") + return self + + self.query() + return [(x[key], x[column]) for x in self._result] + + def count(self, table: Union[str, dict], field: str = ""): + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + if not field: + self.select(table, "COUNT(*) AS `counter`") + else: + field = field.replace(".", "`.`") + self.select(table, f"COUNT(`{field}`) AS `counter`") + + return self.one()[0] + + def get_first(self): + return self.one() + + def get_last(self): + self.all() + return self._result[-1] + + def exists(self) -> bool: + self.one() + return self._count > 0 + + def _prepare_aliases(self, items: Union[str, list, dict], as_list: bool = False) -> Union[str, list]: + if not items: + self.set_error(f"Empty items in {inspect.stack()[0][3]} method") + return "" + + sql = [] + if isinstance(items, str): + sql.append(items) + elif isinstance(items, list) or isinstance(items, dict): + for item in items: + if isinstance(items, list): + if isinstance(item, str): + sql.append(item) + elif isinstance(item, dict): + first_item = list(item.values())[0] + alias = list(item.keys())[0] + sql.append(first_item if isinstance(alias, int) else f"{first_item} AS {alias}") + elif isinstance(items, dict): + new_item = items[item] + sql.append(new_item if isinstance(item, int) else f"{new_item} AS {item}") + else: + self.set_error(f"Incorrect type of items in {inspect.stack()[0][3]} method") + return "" + + return self._prepare_fieldlist(sql) if not as_list else sql + + def _prepare_conditions(self, where: Union[str, list]) -> dict: + result = { + "sql": "", + "values": [] + } + sql = "" + + if not where: + return result + + if isinstance(where, str): + sql += where + elif isinstance(where, list): + for cond in where: + if isinstance(cond, list): + if len(cond) == 2: + field = self._prepare_field(cond[0]) + value = cond[1] + + if isinstance(value, str) and value.lower() == "is null": + operator = "IS NULL" + sql += f"({field} {operator})" + elif isinstance(value, str) and value.lower() == "is not null": + operator = "IS NOT NULL" + sql += f"({field} {operator})" + elif isinstance(value, list) or isinstance(value, tuple): + operator = "IN" + values = ("?," * len(value)).rstrip(",") + sql += f"({field} {operator} ({values}))" + for item in value: + result["values"].append(item) + else: + operator = "=" + sql += f"({field} {operator} ?)" + result["values"].append(value) + elif len(cond) == 3: + field = self._prepare_field(cond[0]) + operator = cond[1].upper() + value = cond[2] + if operator in self._COND_OPERATORS: + if operator == "IN" and ( + isinstance(value, list) or isinstance(value, tuple) + ): + values = ("?," * len(value)).rstrip(",") + sql += f"({field} {operator} ({values}))" + for item in value: + result["values"].append(item) + else: + sql += f"({field} {operator} ?)" + result["values"].append(value) + elif isinstance(cond, str): + upper = cond.upper() + if upper in self._LOGICS: + sql += f" {upper} " + else: + self.set_error(f"Incorrect type of where in {inspect.stack()[0][3]} method") + return result + + result["sql"] = sql + + return result + + def _prepare_tables(self, table: Union[str, list, dict]) -> str: + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return '' + + if isinstance(table, str) and 'select' in table.lower(): + self._concat = True + return f"({table})" + elif isinstance(table, str) and any(x in table for x in self._FIELD_SPEC_CHARS): + # self._fields = table + return f"{table}" + + return self._prepare_aliases(table) + + def select(self, table: Union[str, list, dict], fields: Union[str, list, dict] = "*", dist: bool = False): + if not table or not fields: + self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") + return self + + prepared_table = self._prepare_tables(table) + prepared_fields = self._prepare_aliases(fields) + + if not self._concat: + self.reset() + + sql = 'SELECT ' + sql += 'DISTINCT ' if dist else '' + if isinstance(table, str) and any(x in table for x in self._FIELD_SPEC_CHARS) and fields == '*': + sql += f"{prepared_table}" + self._fields = self._prepare_aliases(table) + else: + self._fields = fields + sql += f"{prepared_fields} FROM {prepared_table}" + + if self._concat: + self._sql += sql + else: + self._sql = sql + + return self + + def where(self, where: Union[str, list], addition: str = ""): + if not where: + self.set_error(f"Empty where in {inspect.stack()[0][3]} method") + return self + + conditions = self._prepare_conditions(where) + + if addition: + self._sql += f" WHERE {conditions['sql']} {addition}" + else: + self._sql += f" WHERE {conditions['sql']}" + + if isinstance(conditions["values"], list) and conditions["values"] != []: + self._params += tuple(conditions["values"]) + + return self + + def having(self, having: Union[str, list]): + if not having: + self.set_error(f"Empty having in {inspect.stack()[0][3]} method") + return self + + conditions = self._prepare_conditions(having) + + self._sql += f" HAVING {conditions['sql']}" + + if isinstance(conditions["values"], list) and conditions["values"] != []: + self._params += tuple(conditions["values"]) + + return self + + def like(self, field: Union[str, tuple, list] = (), value: str = ""): + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return self + + if isinstance(field, str) and field and isinstance(value, str) and value: + self.where([[field, "LIKE", value]]) + elif isinstance(field, str) and not value: + self.where(field) + elif isinstance(field, tuple) or isinstance(field, list): + self.where([[field[0], "LIKE", field[1]]]) + + return self + + def not_like(self, field: Union[str, tuple, list] = (), value: str = ""): + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return self + + if isinstance(field, str) and isinstance(value, str) and value: + self.where([[field, "NOT LIKE", value]]) + elif isinstance(field, str) and not value: + self.where(field) + elif isinstance(field, tuple) or isinstance(field, list): + self.where([[field[0], "NOT LIKE", field[1]]]) + + return self + + def is_null(self, field: str = ""): + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return self + self.where([[field, "IS NULL"]]) + return self + + def is_not_null(self, field: str): + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return self + self.where([[field, "IS NOT NULL"]]) + return self + + def not_null(self, field: str): + self.is_not_null(field) + return self + + def limit(self, limit: int = 1): + if 'DELETE' not in self._sql: + self._sql += f" LIMIT {limit}" + return self + + def offset(self, offset: int = 0): + self._sql += f" OFFSET {offset}" + return self + + def _prepare_sorting(self, field: str = "", sort: str = "") -> tuple: + if field.find(" ") > -1: + splitted = field.split(" ") + field = splitted[0] + sort = splitted[1] + + field = self._prepare_field(field) + + sort = "ASC" if sort == "" else sort.upper() + + return field, sort + + def _prepare_field(self, field: str = "") -> str: + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return "" + + if any(x in field for x in self._FIELD_SPEC_CHARS): + if field.find(" AS ") > -1: + field = field.replace(" AS ", " AS `") + return f"{field}`" + else: + return field + else: + field = field.replace(".", "`.`") + field = field.replace(" AS ", "` AS `") + return f"`{field}`" + + def _prepare_fieldlist(self, fields: Union[str, tuple, list] = ()) -> str: + result = "" + if not fields: + self.set_error(f"Empty fields in {inspect.stack()[0][3]} method") + return result + + if isinstance(fields, str): + result = self._prepare_field(fields) + elif isinstance(fields, tuple) or isinstance(fields, list): + fields = [self._prepare_field(field) for field in fields] + result = ", ".join(fields) + + return result + + def order_by(self, field: Union[str, tuple, list] = (), sort: str = ""): + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return self + + if isinstance(field, str): + field, sort = self._prepare_sorting(field, sort) + + if sort in self._SORT_TYPES: + self._sql += f" ORDER BY {field} {sort}" + else: + self._sql += f" ORDER BY {field}" + elif isinstance(field, tuple) or isinstance(field, list): + new_list = [] + for item in field: + new_item = self._prepare_sorting(item) + new_list.append(f"{new_item[0]} {new_item[1]}") + self._sql += " ORDER BY " + ", ".join(new_list) + + return self + + def group_by(self, field: Union[str, tuple, list] = ()): + if not field: + self.set_error(f"Empty field in {inspect.stack()[0][3]} method") + return self + + self._sql += f" GROUP BY {self._prepare_fieldlist(field)}" + + return self + + def delete(self, table: Union[str, list, dict]): + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + self.reset() + self._sql = f"DELETE FROM {self._prepare_tables(table)}" + + return self + + def insert(self, table: Union[str, list, dict], fields: Union[list, dict]): + if not table or not fields: + self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") + return self + + self.reset() + table = self._prepare_aliases(table) + self._fields = fields + + if isinstance(fields, dict): + values = ("?," * len(fields)).rstrip(",") + self._sql = f"INSERT INTO {table} (" + self._prepare_fieldlist(list(fields.keys())) + f") VALUES ({values})" + self._params = tuple(fields.values()) + elif isinstance(fields, list): + names = fields.pop(0) + value = ("?," * len(names)).rstrip(",") + v = f"({value})," + values = (v * len(fields)).rstrip(",") + self._sql = f"INSERT INTO {table} (" + self._prepare_fieldlist(names) + f") VALUES {values}" + params = [] + for item in fields: + if isinstance(item, list): + for subitem in item: + params.append(subitem) + self._params = tuple(params) + else: + self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") + return self + + return self + + def update(self, table: Union[str, dict], fields: Union[list, dict]): + if not table or not fields: + self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") + return self + + self._fields = fields + + if isinstance(table, dict) or isinstance(table, str): + table = self._prepare_aliases(table) + else: + self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + return self + + if isinstance(fields, list) or isinstance(fields, dict): + sets = "" + for item in fields: + sets += f" {self._prepare_field(item)} = ?," + sets = sets.rstrip(",") + else: + self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") + return self + + self.reset() + + self._sql = f"UPDATE {table} SET{sets}" + self._params = tuple(fields.values()) + + return self + + def join(self, table: Union[str, dict] = "", on: Union[str, tuple, list] = (), join_type: str = "INNER"): + join_type = join_type.upper() + if self._db.get_driver() == 'sqlite': + if join_type == "" or join_type not in self._SQLITE_JOIN_TYPES: + self.set_error(f"Empty join_type or is not allowed in {inspect.stack()[0][3]} method. Try one of these {self._SQLITE_JOIN_TYPES}") + return self + else: + if join_type == "" or join_type not in self._JOIN_TYPES: + self.set_error(f"Empty join_type or is not allowed in {inspect.stack()[0][3]} method. Try one of these {self._JOIN_TYPES}") + return self + + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + if isinstance(table, dict) or isinstance(table, str): + self._sql += f" {join_type} JOIN {self._prepare_aliases(table)}" + else: + self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + return self + + if on: + if isinstance(on, tuple) or isinstance(on, list): + field1 = self._prepare_field(on[0]) + field2 = self._prepare_field(on[1]) + self._sql += f" ON {field1} = {field2}" + elif isinstance(on, str): + self._sql += f" ON {on}" + else: + self.set_error(f"Incorrect type of on in {inspect.stack()[0][3]} method. On must be String, Tuple or List") + return self + + self.set_error() + return self + + def union(self, union_all: bool = False): + self._concat = True + self._sql += " UNION ALL " if union_all else " UNION " + return self + + def union_select(self, table: Union[str, list, dict], union_all: bool = False): + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + self._concat = True + fields = self._fields if self._fields else '*' + sql = self._sql + sql += " UNION ALL " if union_all else " UNION " + self._sql = sql + f"SELECT {self._prepare_aliases(fields)} FROM {self._prepare_aliases(table)}" + + return self + + def excepts(self): + self._concat = True + self._sql += " EXCEPT " + return self + + def except_select(self, table: Union[str, list, dict]): + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + self._concat = True + fields = self._fields if self._fields else '*' + self._sql += f" EXCEPT SELECT {self._prepare_aliases(fields)} FROM {self._prepare_aliases(table)}" + + return self + + def intersect(self): + self._concat = True + self._sql += " INTERSECT " + return self + + def intersect_select(self, table: Union[str, list, dict]): + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + self._concat = True + fields = self._fields if self._fields else '*' + self._sql += f" INTERSECT SELECT {self._prepare_aliases(fields)} FROM {self._prepare_aliases(table)}" + + return self + + def __str__(self): + return self.get_sql(False) + + def create_view(self, view_name: str, add_exists: bool = True): + # this method will be moved to another class + if not view_name: + self.set_error(f"Empty view_name in {inspect.stack()[0][3]} method") + return self + + exists = "IF NOT EXISTS " if add_exists else "" + + # self.reset() + if 'select' not in self._sql.lower(): + self.set_error(f"No select found in {inspect.stack()[0][3]} method") + return self + self._sql = f"CREATE VIEW {exists}`{view_name}` AS " + self._sql + + return self + + def drop_view(self, view_name: str, add_exists: bool = True): + # this method will be moved to another class + if not view_name: + self.set_error(f"Empty view_name in {inspect.stack()[0][3]} method") + return self + + exists = "IF EXISTS " if add_exists else "" + + self.reset() + self._sql = f"DROP VIEW {exists}`{view_name}`" + + return self + + def drop(self, table: str, add_exists: bool = True): + # this method will be moved to another class + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + exists = "IF EXISTS " if add_exists else "" + + self.reset() + self._sql = f"DROP TABLE {exists}`{table}`" + + return self + + def truncate(self, table: str): + # this method will be moved to another class + if not table: + self.set_error(f"Empty table in {inspect.stack()[0][3]} method") + return self + + self.reset() + self._sql = f"TRUNCATE TABLE `{table}`" + + return self diff --git a/tests/qb_delete_unittest.py b/tests/qb_delete_unittest.py new file mode 100644 index 0000000..8796ab4 --- /dev/null +++ b/tests/qb_delete_unittest.py @@ -0,0 +1,34 @@ +import unittest +from querybuilder import * + +# test run +# python .\qb_delete_unittest.py -v + + +class QBDeleteTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_delete_eq(self): + sql = self.qb.delete('comments').where([['user_id', '=', 10]]).get_sql() + self.assertEqual(sql, "DELETE FROM `comments` WHERE (`user_id` = 10)") + self.assertEqual(self.qb.get_params(), (10, )) + + def test_sql_delete_no_eq(self): + sql = self.qb.delete('comments').where([['user_id', 10]]).get_sql() + self.assertEqual(sql, "DELETE FROM `comments` WHERE (`user_id` = 10)") + self.assertEqual(self.qb.get_params(), (10, )) + + # def test_sql_delete_limit_eq(self): + # sql = self.qb.delete('users').where([['name', '=', 'John']]).limit().get_sql() + # self.assertEqual(sql, "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") + # self.assertEqual(self.qb.get_params(), ('John',)) + # + # def test_sql_delete_limit_no_eq(self): + # sql = self.qb.delete('users').where([['name', 'John']]).limit().get_sql() + # self.assertEqual(sql, "DELETE FROM `users` WHERE (`name` = 'John') LIMIT 1") + # self.assertEqual(self.qb.get_params(), ('John',)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/qb_insert_unittest.py b/tests/qb_insert_unittest.py new file mode 100644 index 0000000..1c08d53 --- /dev/null +++ b/tests/qb_insert_unittest.py @@ -0,0 +1,28 @@ +import unittest +from querybuilder import * + +# test run +# python .\qb_insert_unittest.py -v + + +class QBInsertTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_insert(self): + sql = self.qb.insert('groups', {'name': 'Moderator', 'permissions': 'moderator'}).get_sql() + self.assertEqual(sql, "INSERT INTO `groups` (`name`, `permissions`) VALUES ('Moderator','moderator')") + self.assertEqual(self.qb.get_params(), ('Moderator', 'moderator')) + + def test_sql_insert_multiple(self): + sql = self.qb.insert('groups', [ + ['name', 'role'], + ['Moderator', 'moderator'], ['Moderator2', 'moderator'], + ['User', 'user'], ['User2', 'user'] + ]).get_sql() + self.assertEqual(sql, "INSERT INTO `groups` (`name`, `role`) VALUES ('Moderator','moderator'),('Moderator2','moderator'),('User','user'),('User2','user')") + self.assertEqual(self.qb.get_params(), ('Moderator', 'moderator', 'Moderator2', 'moderator', 'User', 'user', 'User2', 'user')) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/qb_select_ext_unittest.py b/tests/qb_select_ext_unittest.py new file mode 100644 index 0000000..13ea47a --- /dev/null +++ b/tests/qb_select_ext_unittest.py @@ -0,0 +1,185 @@ +import unittest +from querybuilder import * + +# test run +# python .\qb_select_ext_unittest.py -v + + +class QBSelectExtensionsTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_select_inner_join_list(self): + sql = self.qb.select({'u': 'users'}, ['u.id', 'u.email', 'u.username', {'perms': 'groups.permissions'}])\ + .join('groups', ['u.group_id', 'groups.id']).get_sql() + self.assertEqual(sql, "SELECT `u`.`id`, `u`.`email`, `u`.`username`, `groups`.`permissions` AS `perms` FROM `users` AS `u` INNER JOIN `groups` ON `u`.`group_id` = `groups`.`id`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_inner_join3_str(self): + sql = self.qb.select({'cp': 'cabs_printers'}, [ + 'cp.id', 'cp.cab_id', {'cab_name': 'cb.name'}, 'cp.printer_id', + {'printer_name': 'p.name'}, {'cartridge_type': 'c.name'}, 'cp.comment' + ])\ + .join({'cb': 'cabs'}, ['cp.cab_id', 'cb.id'])\ + .join({'p': 'printer_models'}, ['cp.printer_id', 'p.id'])\ + .join({'c': 'cartridge_types'}, 'p.cartridge_id=c.id')\ + .where([['cp.cab_id', 'in', [11, 12, 13]], 'or', ['cp.cab_id', 5], 'and', ['p.id', '>', 'c.id']]).get_sql() + self.assertEqual(sql, "SELECT `cp`.`id`, `cp`.`cab_id`, `cb`.`name` AS `cab_name`, `cp`.`printer_id`, `p`.`name` AS `printer_name`, `c`.`name` AS `cartridge_type`, `cp`.`comment` FROM `cabs_printers` AS `cp` INNER JOIN `cabs` AS `cb` ON `cp`.`cab_id` = `cb`.`id` INNER JOIN `printer_models` AS `p` ON `cp`.`printer_id` = `p`.`id` INNER JOIN `cartridge_types` AS `c` ON p.cartridge_id=c.id WHERE (`cp`.`cab_id` IN (11,12,13)) OR (`cp`.`cab_id` = 5) AND (`p`.`id` > 'c.id')") + self.assertEqual(self.qb.get_params(), (11, 12, 13, 5, 'c.id')) + + def test_sql_select_inner_join3_groupby_orederby(self): + sql = self.qb.select({'cp': 'cabs_printers'}, [ + 'cp.id', 'cp.cab_id', {'cab_name': 'cb.name'}, + 'cp.printer_id', {'cartridge_id': 'c.id'}, + {'printer_name': 'p.name'}, {'cartridge_type': 'c.name'}, 'cp.comment' + ])\ + .join({'cb': 'cabs'}, ['cp.cab_id', 'cb.id'])\ + .join({'p': 'printer_models'}, ['cp.printer_id', 'p.id'])\ + .join({'c': 'cartridge_types'}, ['p.cartridge_id', 'c.id'])\ + .group_by(['cp.printer_id', 'cartridge_id'])\ + .order_by(['cp.cab_id', 'cp.printer_id desc']).get_sql() + self.assertEqual(sql, "SELECT `cp`.`id`, `cp`.`cab_id`, `cb`.`name` AS `cab_name`, `cp`.`printer_id`, `c`.`id` AS `cartridge_id`, `p`.`name` AS `printer_name`, `c`.`name` AS `cartridge_type`, `cp`.`comment` FROM `cabs_printers` AS `cp` INNER JOIN `cabs` AS `cb` ON `cp`.`cab_id` = `cb`.`id` INNER JOIN `printer_models` AS `p` ON `cp`.`printer_id` = `p`.`id` INNER JOIN `cartridge_types` AS `c` ON `p`.`cartridge_id` = `c`.`id` GROUP BY `cp`.`printer_id`, `cartridge_id` ORDER BY `cp`.`cab_id` ASC, `cp`.`printer_id` DESC") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_left_join(self): + sql = self.qb.select('employees', ['employees.employee_id', 'employees.last_name', 'positions.title'])\ + .join('positions', ['employees.position_id', 'positions.position_id'], join_type="left").get_sql() + self.assertEqual(sql, "SELECT `employees`.`employee_id`, `employees`.`last_name`, `positions`.`title` FROM `employees` LEFT JOIN `positions` ON `employees`.`position_id` = `positions`.`position_id`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_left_outer_join(self): + sql = self.qb.select({'e': 'employees'}, ['e.employee_id', 'e.last_name', 'p.title'])\ + .join({'p': 'positions'}, ['e.position_id', 'p.position_id'], join_type="left outer").get_sql() + self.assertEqual(sql, "SELECT `e`.`employee_id`, `e`.`last_name`, `p`.`title` FROM `employees` AS `e` LEFT OUTER JOIN `positions` AS `p` ON `e`.`position_id` = `p`.`position_id`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_cross_join(self): + sql = self.qb.select('positions').join('departments', join_type="cross").get_sql() + self.assertEqual(sql, "SELECT * FROM `positions` CROSS JOIN `departments`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_except_select(self): + sql = self.qb.select('departments', ['department_id']).except_select('employees').get_sql() + self.assertEqual(sql, "SELECT `department_id` FROM `departments` EXCEPT SELECT `department_id` FROM `employees`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_excepts_where(self): + sql = self.qb.select('contacts', ['contact_id', 'last_name', 'first_name']).where([['contact_id', '>=', 74]])\ + .excepts()\ + .select('employees', ['employee_id', 'last_name', 'first_name']).where([['first_name', 'Sandra']])\ + .get_sql() + self.assertEqual(sql, "SELECT `contact_id`, `last_name`, `first_name` FROM `contacts` WHERE (`contact_id` >= 74) EXCEPT SELECT `employee_id`, `last_name`, `first_name` FROM `employees` WHERE (`first_name` = 'Sandra')") + self.assertEqual(self.qb.get_params(), (74, 'Sandra')) + + def test_sql_select_excepts_where_order_by(self): + sql = self.qb.select('suppliers', ['supplier_id', 'state']).where([['state', 'Nevada']]) \ + .excepts()\ + .select('companies', ['company_id', 'state']).where([['company_id', '<', 2000]]).order_by('1 desc')\ + .get_sql() + self.assertEqual(sql, "SELECT `supplier_id`, `state` FROM `suppliers` WHERE (`state` = 'Nevada') EXCEPT SELECT `company_id`, `state` FROM `companies` WHERE (`company_id` < 2000) ORDER BY `1` DESC") + self.assertEqual(self.qb.get_params(), ('Nevada', 2000)) + + def test_sql_intersect_select(self): + sql = self.qb.select('departments', ['department_id']).intersect_select('employees').get_sql() + self.assertEqual(sql, "SELECT `department_id` FROM `departments` INTERSECT SELECT `department_id` FROM `employees`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_intersect_where(self): + sql = self.qb.select('departments', 'department_id').where([['department_id', '>=', 25]])\ + .intersect()\ + .select('employees', 'department_id').where([['last_name', '<>', 'Petrov']])\ + .get_sql() + self.assertEqual(sql, "SELECT `department_id` FROM `departments` WHERE (`department_id` >= 25) INTERSECT SELECT `department_id` FROM `employees` WHERE (`last_name` <> 'Petrov')") + self.assertEqual(self.qb.get_params(), (25, 'Petrov')) + + def test_sql_select_intersect_where2(self): + sql = self.qb.select('contacts', ['contact_id', 'last_name', 'first_name']).where([['contact_id', '>', 50]])\ + .intersect()\ + .select('customers', ['customer_id', 'last_name', 'first_name']).where([['last_name', '<>', 'Zagoskin']])\ + .get_sql() + self.assertEqual(sql, "SELECT `contact_id`, `last_name`, `first_name` FROM `contacts` WHERE (`contact_id` > 50) INTERSECT SELECT `customer_id`, `last_name`, `first_name` FROM `customers` WHERE (`last_name` <> 'Zagoskin')") + self.assertEqual(self.qb.get_params(), (50, 'Zagoskin')) + + def test_sql_select_intersect_where_orderby(self): + sql = self.qb.select('departments', ['department_id', 'state']).where([['department_id', '>=', 25]]) \ + .intersect()\ + .select('companies', ['company_id', 'state']).like('company_name', 'G%').order_by('1')\ + .get_sql() + self.assertEqual(sql, "SELECT `department_id`, `state` FROM `departments` WHERE (`department_id` >= 25) INTERSECT SELECT `company_id`, `state` FROM `companies` WHERE (`company_name` LIKE 'G%') ORDER BY `1` ASC") + self.assertEqual(self.qb.get_params(), (25, 'G%')) + + def test_sql_select_union_where(self): + sql = self.qb.select('clients', ['name', 'age', {'total_sum': 'account_sum + account_sum * 0.1'}])\ + .where([['account_sum', '<', 3000]])\ + .union()\ + .select('clients', ['name', 'age', {'total_sum': 'account_sum + account_sum * 0.3'}])\ + .where([['account_sum', '>=', 3000]]).get_sql() + self.assertEqual(sql, "SELECT `name`, `age`, account_sum + account_sum * 0.1 AS `total_sum` FROM `clients` WHERE (`account_sum` < 3000) UNION SELECT `name`, `age`, account_sum + account_sum * 0.3 AS `total_sum` FROM `clients` WHERE (`account_sum` >= 3000)") + self.assertEqual(self.qb.get_params(), (3000, 3000)) + + def test_sql_union_select_where(self): + sql = self.qb.select('clients', ['name', 'age']).where([['id', '<', 10]])\ + .union_select('employees').where([['id', 1]]).get_sql() + self.assertEqual(sql, "SELECT `name`, `age` FROM `clients` WHERE (`id` < 10) UNION SELECT `name`, `age` FROM `employees` WHERE (`id` = 1)") + self.assertEqual(self.qb.get_params(), (10, 1)) + + def test_sql_select_union_where_orderby(self): + sql = self.qb.select('departments', ['department_id', 'department_name']).where([['department_id', 'in', [1, 2]]])\ + .union()\ + .select('employees', ['employee_id', 'last_name']).where([['hire_date', '2024-02-08']]).order_by('2')\ + .get_sql() + self.assertEqual(sql, "SELECT `department_id`, `department_name` FROM `departments` WHERE (`department_id` IN (1,2)) UNION SELECT `employee_id`, `last_name` FROM `employees` WHERE (`hire_date` = '2024-02-08') ORDER BY `2` ASC") + self.assertEqual(self.qb.get_params(), (1, 2, '2024-02-08')) + + def test_sql_select_union_all(self): + sql = self.qb.select('clients', ['name', 'age', {'total_sum': 'account_sum + account_sum * 0.1'}])\ + .where([['account_sum', '<', 3000]])\ + .union(True)\ + .select('clients', ['name', 'age', {'total_sum': 'account_sum + account_sum * 0.3'}])\ + .where([['account_sum', '>=', 3000]]).get_sql() + self.assertEqual(sql, "SELECT `name`, `age`, account_sum + account_sum * 0.1 AS `total_sum` FROM `clients` WHERE (`account_sum` < 3000) UNION ALL SELECT `name`, `age`, account_sum + account_sum * 0.3 AS `total_sum` FROM `clients` WHERE (`account_sum` >= 3000)") + self.assertEqual(self.qb.get_params(), (3000, 3000)) + + def test_sql_union_select_all_where(self): + sql = self.qb.select('cabs', ['id', 'name']) \ + .union_select('printer_models', True).where([['id', '<', 10]]) \ + .get_sql() + self.assertEqual(sql, "SELECT `id`, `name` FROM `cabs` UNION ALL SELECT `id`, `name` FROM `printer_models` WHERE (`id` < 10)") + self.assertEqual(self.qb.get_params(), (10, )) + + def test_sql_select_union_all_where_orderby(self): + sql = self.qb.select('departments', ['department_id', 'department_name']).where([['department_id', '>=', 10]])\ + .union(True)\ + .select('employees', ['employee_id', 'last_name']).where([['last_name', 'Rassohin']]).order_by('2')\ + .get_sql() + self.assertEqual(sql, "SELECT `department_id`, `department_name` FROM `departments` WHERE (`department_id` >= 10) UNION ALL SELECT `employee_id`, `last_name` FROM `employees` WHERE (`last_name` = 'Rassohin') ORDER BY `2` ASC") + self.assertEqual(self.qb.get_params(), (10, 'Rassohin')) + + # def test_sql_select_in_select_str_one_param(self): + # sql = self.qb.select(self.qb.select('categories').get_sql(), ['id', 'name']).where([['id', '<=', 5]]).get_sql() + # self.assertEqual(sql, "SELECT `id`, `name` FROM (SELECT * FROM `categories`) WHERE (`id` <= 5)") + # self.assertEqual(self.qb.get_params(), (5, )) + # + # def test_sql_select_in_select_str_two_params(self): + # sql = self.qb.select(self.qb.select('categories').where([['parent_id', 0]]).get_sql(), ['id', 'name']).where([['id', '<=', 5]]).get_sql() + # self.assertEqual(sql, "SELECT `id`, `name` FROM (SELECT * FROM `categories` WHERE (`parent_id` = 0)) WHERE (`id` <= 5)") + # self.assertEqual(self.qb.get_params(), (0, 5)) + # + # def test_sql_select_in_select_add_query(self): + # q1 = self.qb.select('categories').where([['parent_id', 0]]).get_sql() + # sql = self.qb.select(q1, ['id', 'name']).where([['id', '<=', 5]]).get_sql() + # self.assertEqual(sql, "SELECT `id`, `name` FROM (SELECT * FROM `categories` WHERE (`parent_id` = 0)) WHERE (`id` <= 5)") + # self.assertEqual(self.qb.get_params(), (0, 5)) + # + # def test_sql_select_in_select_add_query_format(self): + # q1 = self.qb.select('categories').where([['parent_id', 0]]) + # sql = self.qb.select(f'{q1}', ['id', 'name']).where([['id', '<=', 5]]).get_sql() + # self.assertEqual(sql, "SELECT `id`, `name` FROM (SELECT * FROM `categories` WHERE (`parent_id` = 0)) WHERE (`id` <= 5)") + # self.assertEqual(self.qb.get_params(), (0, 5)) + + def tearDown(self): + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/qb_select_unittest.py b/tests/qb_select_unittest.py new file mode 100644 index 0000000..145db22 --- /dev/null +++ b/tests/qb_select_unittest.py @@ -0,0 +1,203 @@ +import unittest +from querybuilder import * + +# test run +# python .\qb_select_unittest.py -v + + +class QBSelectTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_select_all(self): + sql = self.qb.select('users').get_sql() + self.assertEqual(sql, 'SELECT * FROM `users`') + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_where_eq(self): + sql = self.qb.select('users').where([['id', '=', 10]]).get_sql() + self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` = 10)') + self.assertEqual(self.qb.get_params(), (10,)) + + def test_sql_select_where_no_eq(self): + sql = self.qb.select('users').where([['id', 10]]).get_sql() + self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` = 10)') + self.assertEqual(self.qb.get_params(), (10, )) + + def test_sql_select_where_and_eq(self): + sql = self.qb.select('users').where([['id', '>', 1], 'and', ['group_id', '=', 2]]).get_sql() + self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2)') + self.assertEqual(self.qb.get_params(), (1, 2)) + + def test_sql_select_where_and_no_eq(self): + sql = self.qb.select('users').where([['id', '>', 1], 'and', ['group_id', 2]]).get_sql() + self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) AND (`group_id` = 2)') + self.assertEqual(self.qb.get_params(), (1, 2)) + + def test_sql_select_where_or_eq(self): + sql = self.qb.select('users').where([['id', '>', 1], 'or', ['group_id', '=', 2]]).get_sql() + self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) OR (`group_id` = 2)') + self.assertEqual(self.qb.get_params(), (1, 2)) + + def test_sql_select_where_or_no_eq(self): + sql = self.qb.select('users').where([['id', '>', 1], 'or', ['group_id', 2]]).get_sql() + self.assertEqual(sql, 'SELECT * FROM `users` WHERE (`id` > 1) OR (`group_id` = 2)') + self.assertEqual(self.qb.get_params(), (1, 2)) + + def test_sql_select_where_like(self): + sql = self.qb.select('users').where([['name', 'LIKE', '%John%']]).get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") + self.assertEqual(self.qb.get_params(), ('%John%',)) + + def test_sql_select_like_list(self): + sql = self.qb.select('users').like(['name', '%John%']).get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") + self.assertEqual(self.qb.get_params(), ('%John%',)) + + def test_sql_select_like_str(self): + sql = self.qb.select('users').like('name', '%John%').get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` LIKE '%John%')") + self.assertEqual(self.qb.get_params(), ('%John%',)) + + def test_sql_select_where_not_like(self): + sql = self.qb.select('users').where([['name', 'NOT LIKE', '%John%']]).get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") + self.assertEqual(self.qb.get_params(), ('%John%',)) + + def test_sql_select_not_like_list(self): + sql = self.qb.select('users').not_like(['name', '%John%']).get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") + self.assertEqual(self.qb.get_params(), ('%John%',)) + + def test_sql_select_not_like_str(self): + sql = self.qb.select('users').not_like('name', '%John%').get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`name` NOT LIKE '%John%')") + self.assertEqual(self.qb.get_params(), ('%John%',)) + + def test_sql_select_where_is_null(self): + sql = self.qb.select('users').where([['phone', 'is null']]).get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`phone` IS NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_is_null(self): + sql = self.qb.select('users').is_null('phone').get_sql() + self.assertEqual(sql, "SELECT * FROM `users` WHERE (`phone` IS NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_where_is_not_null(self): + sql = self.qb.select('customers').where([['address', 'is not null']]).get_sql() + self.assertEqual(sql, "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_not_null(self): + sql = self.qb.select('customers').not_null('address').get_sql() + self.assertEqual(sql, "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_is_not_null(self): + sql = self.qb.select('customers').is_not_null('address').get_sql() + self.assertEqual(sql, "SELECT * FROM `customers` WHERE (`address` IS NOT NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_offset(self): + sql = self.qb.select('posts').where([['user_id', 3]]).offset(14).get_sql() + self.assertEqual(sql, "SELECT * FROM `posts` WHERE (`user_id` = 3) OFFSET 14") + self.assertEqual(self.qb.get_params(), (3,)) + + def test_sql_select_limit(self): + sql = self.qb.select('posts').where([['id', '>', 42]]).limit(7).get_sql() + self.assertEqual(sql, "SELECT * FROM `posts` WHERE (`id` > 42) LIMIT 7") + self.assertEqual(self.qb.get_params(), (42, )) + + def test_sql_select_counter(self): + sql = self.qb.select('users', {'counter': 'COUNT(*)'}).get_sql() + self.assertEqual(sql, "SELECT COUNT(*) AS `counter` FROM `users`") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_distinct_order_by(self): + sql = self.qb.select('customers', ['city'], True).order_by('city').get_sql() + self.assertEqual(sql, "SELECT DISTINCT `city` FROM `customers` ORDER BY `city` ASC") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_distinct_order_by_2col(self): + sql = self.qb.select('customers', ['city', 'country'], True).order_by('country desc').get_sql() + self.assertEqual(sql, "SELECT DISTINCT `city`, `country` FROM `customers` ORDER BY `country` DESC") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_order_by_two_param(self): + sql = self.qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ + .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]]).order_by('b.id', 'desc').get_sql() + self.assertEqual(sql, "SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) ORDER BY `b`.`id` DESC") + self.assertEqual(self.qb.get_params(), (1, 1)) + + def test_sql_select_order_by_one_param(self): + sql = self.qb.select({'b': 'branches'}, ['b.id', 'b.name'])\ + .where([['b.id', '>', 1], 'and', ['b.parent_id', 1]]).order_by('b.id desc').get_sql() + self.assertEqual(sql, "SELECT `b`.`id`, `b`.`name` FROM `branches` AS `b` WHERE (`b`.`id` > 1) AND (`b`.`parent_id` = 1) ORDER BY `b`.`id` DESC") + self.assertEqual(self.qb.get_params(), (1, 1)) + + def test_sql_select_group_by(self): + sql = self.qb.select('posts', ['id', 'category', 'title'])\ + .where([['views', '>=', 1000]]).group_by('category').get_sql() + self.assertEqual(sql, "SELECT `id`, `category`, `title` FROM `posts` WHERE (`views` >= 1000) GROUP BY `category`") + self.assertEqual(self.qb.get_params(), (1000, )) + + def test_sql_select_group_by_having_eq(self): + sql = self.qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ + .where([['YEAR(`created_at`)', 2020]]).group_by('month_num')\ + .having([['total', '=', 20000]]).get_sql() + self.assertEqual(sql, "SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` FROM `orders` WHERE (YEAR(`created_at`) = 2020) GROUP BY `month_num` HAVING (`total` = 20000)") + self.assertEqual(self.qb.get_params(), (2020, 20000)) + + def test_sql_select_group_by_having_no_eq_sum(self): + sql = self.qb.select('orders', {'month_num': 'MONTH(`created_at`)', 'total': 'SUM(`total`)'})\ + .where([['YEAR(`created_at`)', 2020]]).group_by('month_num')\ + .having([['total', 20000]]).get_sql() + self.assertEqual(sql, "SELECT MONTH(`created_at`) AS `month_num`, SUM(`total`) AS `total` FROM `orders` WHERE (YEAR(`created_at`) = 2020) GROUP BY `month_num` HAVING (`total` = 20000)") + self.assertEqual(self.qb.get_params(), (2020, 20000)) + + def test_sql_select_group_by_having_max(self): + sql = self.qb.select('employees', ['department', {'Highest salary': 'MAX(`salary`)'}])\ + .where([['favorite_website', 'Google.com']]).group_by('department')\ + .having([['MAX(`salary`)', '>=', 30000]]).get_sql() + self.assertEqual(sql, "SELECT `department`, MAX(`salary`) AS `Highest salary` FROM `employees` WHERE (`favorite_website` = 'Google.com') GROUP BY `department` HAVING (MAX(`salary`) >= 30000)") + self.assertEqual(self.qb.get_params(), ('Google.com', 30000)) + + def test_sql_select_group_by_having_count(self): + sql = self.qb.select('employees', ['department', {'Number of employees': 'COUNT(*)'}])\ + .where([['state', 'Nevada']]).group_by('department')\ + .having([['COUNT(*)', '>', 20]]).get_sql() + self.assertEqual(sql, "SELECT `department`, COUNT(*) AS `Number of employees` FROM `employees` WHERE (`state` = 'Nevada') GROUP BY `department` HAVING (COUNT(*) > 20)") + self.assertEqual(self.qb.get_params(), ('Nevada', 20)) + + def test_sql_select_summ(self): + sql = self.qb.select("1+5 as 'res'").get_sql() + self.assertEqual(sql, "SELECT 1+5 as 'res'") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_sub(self): + sql = self.qb.select("10 - 3 as 'res'").get_sql() + self.assertEqual(sql, "SELECT 10 - 3 as 'res'") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_substr(self): + sql = self.qb.select("substr('Hello world!', 1, 5) as 'str'").get_sql() + self.assertEqual(sql, "SELECT substr('Hello world!', 1, 5) as 'str'") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_sqlite_version(self): + sql = self.qb.select("sqlite_version() as ver").get_sql() + self.assertEqual(sql, "SELECT sqlite_version() as ver") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_select_time(self): + sql = self.qb.select("strftime('%Y-%m-%d %H:%M', 'now')").get_sql() + self.assertEqual(sql, "SELECT strftime('%Y-%m-%d %H:%M', 'now')") + self.assertEqual(self.qb.get_params(), ()) + + def tearDown(self): + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/qb_update_unittest.py b/tests/qb_update_unittest.py new file mode 100644 index 0000000..cb33602 --- /dev/null +++ b/tests/qb_update_unittest.py @@ -0,0 +1,32 @@ +import unittest +from querybuilder import * + +# test run +# python .\qb_update_unittest.py -v + + +class QBUpdateTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_update(self): + sql = self.qb.update('posts', {'status': 'published'})\ + .where([['YEAR(`updated_at`)', '>', 2020]]).get_sql() + self.assertEqual(sql, "UPDATE `posts` SET `status` = 'published' WHERE (YEAR(`updated_at`) > 2020)") + self.assertEqual(self.qb.get_params(), ('published', 2020)) + + def test_sql_update_limit_eq(self): + sql = self.qb.update('users', {'username': 'John Doe', 'status': 'new status'})\ + .where([['id', '=', 7]]).limit().get_sql() + self.assertEqual(sql, "UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE (`id` = 7) LIMIT 1") + self.assertEqual(self.qb.get_params(), ('John Doe', 'new status', 7)) + + def test_sql_update_limit_no_eq(self): + sql = self.qb.update('users', {'username': 'John Doe', 'status': 'new status'})\ + .where([['id', 7]]).limit().get_sql() + self.assertEqual(sql, "UPDATE `users` SET `username` = 'John Doe', `status` = 'new status' WHERE (`id` = 7) LIMIT 1") + self.assertEqual(self.qb.get_params(), ('John Doe', 'new status', 7)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/qb_views_unittest.py b/tests/qb_views_unittest.py new file mode 100644 index 0000000..7bcb1ba --- /dev/null +++ b/tests/qb_views_unittest.py @@ -0,0 +1,39 @@ +import unittest +from querybuilder import * + +# test run +# python .\qb_views_unittest.py -v + + +class QBViewsTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_create_view_exists(self): + sql = self.qb.select('users').is_null('email').create_view('users_no_email').get_sql() + self.assertEqual(sql, "CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_create_view_no_exists(self): + sql = self.qb.select('users').is_null('email').create_view('users_no_email', False).get_sql() + self.assertEqual(sql, "CREATE VIEW `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL)") + self.assertEqual(self.qb.get_params(), ()) + + def test_sql_create_view(self): + sql = self.qb.select('users').where([['email', 'is null'], 'or', ['email', '']])\ + .create_view('users_no_email').get_sql() + self.assertEqual(sql, "CREATE VIEW IF NOT EXISTS `users_no_email` AS SELECT * FROM `users` WHERE (`email` IS NULL) OR (`email` = '')") + self.assertEqual(self.qb.get_params(), ('', )) + # self.qb.go() + + def test_sql_drop_view_no_exists(self): + sql = self.qb.drop_view('users_no_email', False).get_sql() + self.assertEqual(sql, "DROP VIEW `users_no_email`") + + def test_sql_drop_view_exists(self): + sql = self.qb.drop_view('users_no_email').get_sql() + self.assertEqual(sql, "DROP VIEW IF EXISTS `users_no_email`") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/tb_unittest.py b/tests/tb_unittest.py new file mode 100644 index 0000000..6fcd298 --- /dev/null +++ b/tests/tb_unittest.py @@ -0,0 +1,27 @@ +import unittest +from simple_query_builder.querybuilder import * + +# test run +# python .\tb_unittest.py -v +# python -m unittest .\tb_unittest.py + + +class TBTestCase(unittest.TestCase): + def setUp(self): + self.qb = QueryBuilder(DataBase(), ":memory:") + + def test_sql_drop_table_no_exists(self): + sql = self.qb.drop('temporary', False).get_sql() + self.assertEqual(sql, "DROP TABLE `temporary`") + + def test_sql_drop_table_exists(self): + sql = self.qb.drop('temporary').get_sql() + self.assertEqual(sql, "DROP TABLE IF EXISTS `temporary`") + + def test_sql_truncate_table(self): + sql = self.qb.truncate('users').get_sql() + self.assertEqual(sql, "TRUNCATE TABLE `users`") + + +if __name__ == "__main__": + unittest.main()