Asynchronous PostgreSQL client for Clojure.
Add the following to your Leiningen project.clj
:
A pool of connections to PostgreSQL backend is created with open-db
. Each connection pool starts a single I/O thread used in communicating with PostgreSQL backend.
(require '[postgres.async :refer :all])
(def db (open-db {:hostname "db.example.com"
:port 5432 ; default
:database "exampledb"
:username "user"
:password "pass"
:pool-size 25})) ; default
The connection pool is closed with close-db!
. This closes all open connections and stops the pool I/O thread.
(close-db! db)
Queries are executed with callback-based functions query!
insert!
update!
execute!
and core.async
channel-based functions <query!
<insert!
<update!
<execute!
.
Channel-based functions return a channel where query result is put in vector of [result-set exception]
.
All other query functions delegate to execute!
. This takes a db, a vector of sql string and parameters plus a callback with arity of two.
;; callback
(execute! db ["select $1::text" "hello world"] (fn [rs err]
(println rs err))
; nil
;; async channel
(<!! (<execute! db ["select name, price from products where id = $1" 1001]))
; [{:updated 0, :rows [{:id 1001, :name "hammer", :price 10]]} nil]
(<!! (<execute! db ["select * from foobar"]))
; [nil #<SqlException com.github.pgasync.SqlException: ERROR: SQLSTATE=42P01, MESSAGE=relation "foobar" does not exist>
query!
passes only :rows
to callback.
(<!! (<query! db ["select name, price from products"]))
; [[{:id 1000, :name "screwdriver", :price 15} {:id 1001, :name "hammer", :price 10] nil]
(<!! (<query! db ["select name, price from products where id = $1" 1001]))
; [[{:id 1001, :name "hammer", :price 10] nil]
Insert is executed with an sql-spec that supports keys :table
and :returning
.
(<!! (<insert! db {:table "products"} {:name "screwdriver" :price 15}))
; [{:updated 1, :rows []} nil]
(<!! (<insert! db {:table "products" :returning "id"} {:name "hammer" :price 5}))
; [{:updated 1, :rows [{:id 1001}]} nil]
Multiple rows can be inserted by passing a sequence to insert!
.
(<!! (<insert! db {:table "products" :returning "id"}
[{:name "hammer" :price 5}
{:name "nail" :price 1}]))
; [{:updated 2, :rows [{:id 1001} {:id 1002]} nil]
Update is executed with an sql-spec that supports keys :table
:returning
and :where
.
(<!! (<update! db {:table "users" :where ["id = $1" 1001}} {:price 6}))
; [{:updated 1, :rows []} nil]
Starting a transaction with begin!
borrows a connection from the connection pool until commit!
, rollback!
or query failure. Transactional operations must be issued to the transaction instead of db.
See composition below for example.
Channel-returning functions can be composed with dosql
macro that returns [result-of-body first-exception]
.
(<!! (go
(dosql [tx (<begin! db)
rs (<insert! tx {:table products :returning "id"} {:name "saw"})
_ (<insert! tx {:table promotions} {:product_id (get-in rs [:rows 0 :id])})
rs (<query! tx ["select * from promotions"])
_ (<commit! tx)]
{:now-promoting rs})))
; [{:now-promoting [{:id 1, product_id 1002}]} nil]
Support for custom types can be added by extending IPgParameter
protocol and from-pg-value
multimethod.
(require '[cheshire.core :as json])
(extend-protocol IPgParameter
clojure.lang.IPersistentMap
(to-pg-value [value]
(.getBytes (json/generate-string value))))
(defmethod from-pg-value com.github.pgasync.impl.Oid/JSON [oid value]
(json/parse-string (String. value))
- postgres-async-driver
- core.async
- Java 8