Skip to content

Commit 3a56c04

Browse files
committed
Add support for custom types
1 parent 76432d9 commit 3a56c04

File tree

3 files changed

+71
-51
lines changed

3 files changed

+71
-51
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,17 @@ Asynchronous PostgreSQL client for Clojure
5151

5252
(close-db! db)
5353
; nil
54+
55+
;; Extension points for custom column types
56+
(require '[cheshire.core :as json])
57+
58+
(defmethod from-pg-value Oid/JSON [oid value]
59+
(json/parse-string value))
60+
61+
(extend-protocol IPgParameter
62+
clojure.lang.IPersistentMap
63+
(to-pg-value [value]
64+
(.getBytes (json/generate-string value))))
65+
5466
```
5567

src/clj_postgres_async/core.clj

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,17 @@
22
(:require [clojure.string :as string]
33
[clojure.core.async :refer [chan put! go <!] :as async])
44
(:import [com.github.pgasync ConnectionPoolBuilder]
5-
[com.github.pgasync.callback ErrorHandler ResultHandler
6-
TransactionHandler TransactionCompletedHandler])
7-
(:gen-class))
5+
[com.github.pgasync.impl Oid]
6+
[com.github.pgasync.impl.conversion DataConverter]
7+
[java.util.function Consumer]))
8+
9+
(defmacro ^:private consumer-fn [[param] body]
10+
`(reify Consumer (accept [_# ~param]
11+
(~@body))))
12+
13+
(defmulti from-pg-value (fn [oid value] oid))
14+
(defprotocol IPgParameter
15+
(to-pg-value [value]))
816

917
(defn open-db [{:keys [hostname port username password database pool-size]}]
1018
(-> (ConnectionPoolBuilder.)
@@ -14,6 +22,11 @@
1422
(.username username)
1523
(.password password)
1624
(.poolSize (or pool-size 25))
25+
(.dataConverter (proxy [DataConverter] []
26+
(toConvertable [oid value]
27+
(from-pg-oid oid value))
28+
(fromConvertable [value]
29+
(to-pg-value value))))
1730
(.build)))
1831

1932
(defn close-db! [db]
@@ -32,12 +45,11 @@
3245
{:updated (.updatedRows result) :rows (into [] rows)}))
3346

3447
(defn execute! [db [sql & params] f]
35-
(let [handler (reify
36-
ResultHandler (onResult [_ r]
37-
(f [(result->map r) nil]))
38-
ErrorHandler (onError [_ t]
39-
(f [nil t])))]
40-
(.query db sql params handler handler)))
48+
(.query db sql params
49+
(consumer-fn [rs]
50+
(f [(result->map rs) nil]))
51+
(consumer-fn [exception]
52+
(f [nil exception]))))
4153

4254
(defn query! [db sql f]
4355
(execute! db sql (fn [[rs err]]
@@ -76,28 +88,25 @@
7688
f))
7789

7890
(defn begin! [db f]
79-
(let [handler (reify
80-
TransactionHandler (onBegin [_ tx]
81-
(f [tx nil]))
82-
ErrorHandler (onError [_ t]
83-
(f [nil t])))]
84-
(.begin db handler handler)))
85-
86-
(defn- complete-tx [tx completion-fn f]
87-
(let [handler (reify
88-
TransactionCompletedHandler (onComplete [_]
89-
(f [true nil]))
90-
ErrorHandler (onError [_ t]
91-
(f [nil t])))]
92-
(completion-fn handler)))
91+
(.begin db
92+
(consumer-fn [tx]
93+
(f [tx nil]))
94+
(consumer-fn [exception]
95+
(f [nil exception]))))
9396

9497
(defn commit! [tx f]
95-
(complete-tx tx #(.commit tx %1 %1) f))
98+
(.commit tx
99+
#(f [true nil])
100+
(consumer-fn [exception]
101+
(f [nil exception]))))
96102

97103
(defn rollback! [tx f]
98-
(complete-tx tx #(.rollback tx %1 %1) f))
104+
(.rollback tx
105+
#(f [true nil])
106+
(consumer-fn [exception]
107+
(f [nil exception]))))
99108

100-
(defmacro defasync [name args]
109+
(defmacro ^:private defasync [name args]
101110
`(defn ~name [~@args]
102111
(let [c# (chan 1)]
103112
(~(symbol (subs (str name) 1)) ~@args #(put! c# %))

test/clj_postgres_async/core_test.clj

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,52 +5,51 @@
55

66
(def ^:private ^:dynamic *db*)
77

8-
(defn- await [channel]
8+
(defn- wait [channel]
99
(let [[r err] (<!! channel)]
1010
(if err
1111
(throw err)
1212
r)))
1313

1414
(defn- create-tables [db]
15-
(await (<execute! db ["drop table if exists clj_pg_test"]))
16-
(await (<execute! db ["create table clj_pg_test (
15+
(wait (<execute! db ["drop table if exists clj_pg_test"]))
16+
(wait (<execute! db ["create table clj_pg_test (
1717
id serial, t varchar(10))"])))
1818

19-
(defn- env [name default]
20-
(if-let [value (System/getenv name)]
21-
value
22-
default))
23-
2419
(defn- db-fixture [f]
25-
(binding [*db* (open-db {:hostname (env "PG_HOST" "localhost")
26-
:port (env "PG_PORT" 5432)
27-
:database (env "PG_DB" "postgres")
28-
:username (env "PG_USER" "postgres")
29-
:password (env "PG_PASSWORD" "postgres")
30-
:pool-size 1})]
31-
(try
32-
(create-tables *db*)
33-
(f)
34-
(finally (close-db! *db*)))))
20+
(letfn [(env [name default]
21+
(or (System/getenv name) default))]
22+
(binding [*db* (open-db {:hostname (env "PG_HOST" "localhost")
23+
:port (env "PG_PORT" 5432)
24+
:database (env "PG_DB" "postgres")
25+
:username (env "PG_USER" "postgres")
26+
:password (env "PG_PASSWORD" "postgres")
27+
:pool-size 1})]
28+
(try
29+
(create-tables *db*)
30+
(f)
31+
(finally (close-db! *db*))))))
3532

3633
(use-fixtures :each db-fixture)
3734

3835
(deftest queries
39-
(testing "<query! returns rows as map"
40-
(let [rs (await (<query! *db* ["select 1 as x"]))]
36+
(testing "query returns rows as map"
37+
(let [rs (wait (<query! *db* ["select 1 as x"]))]
4138
(is (= 1 (get-in rs [0 :x]))))))
4239

4340
(deftest inserts
4441
(testing "insert return row count"
45-
(let [rs (await (<insert! *db* {:table "clj_pg_test"} {:t "x"}))]
42+
(let [rs (wait (<insert! *db* {:table "clj_pg_test"} {:t "x"}))]
4643
(is (= 1 (:updated rs)))))
4744
(testing "insert with returning returns generated keys"
48-
(let [rs (await (<insert! *db* {:table "clj_pg_test" :returning "id"} {:t "y"}))]
45+
(let [rs (wait (<insert! *db* {:table "clj_pg_test" :returning "id"} {:t "y"}))]
4946
(is (get-in rs [:rows 0 :id])))))
5047

5148
(deftest sql-macro
5249
(testing "dosql returns last form"
53-
(is (= "123" (await (go (dosql
54-
[rs (<query! *db* ["select 123 as x"])
55-
rs (<query! *db* ["select $1::text as t" (:x (first rs))])]
50+
(is (= "123" (wait (go (dosql
51+
[tx (<begin! *db*)
52+
rs (<query! tx ["select 123 as x"])
53+
rs (<query! tx ["select $1::text as t" (:x (first rs))])
54+
_ (<commit! tx)]
5655
(:t (first rs)))))))))

0 commit comments

Comments
 (0)