diff --git a/README.md b/README.md index b404fb2..4c6b1bc 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,15 @@ postgres.async ============== +[![Clojars Project](https://img.shields.io/clojars/v/alaisi/postgres.async.svg)](https://clojars.org/alaisi/postgres.async) + Asynchronous PostgreSQL client for Clojure. ## Download Add the following to your [Leiningen](http://github.com/technomancy/leiningen) `project.clj`: -![latest postgres.async version](https://clojars.org/alaisi/postgres.async/latest-version.svg) - +[![latest postgres.async version](https://clojars.org/alaisi/postgres.async/latest-version.svg)](https://clojars.org/alaisi/postgres.async) ## Setting up a connection pool @@ -103,7 +104,7 @@ See composition below for example. ## Composition -Channel-returning functions can be composed with `dosql` macro that returns result of last form of first exception. +Channel-returning functions can be composed with `dosql` macro that returns result of last form or first exception. ```clojure ( (.queryRows db sql (into-array params)) + (.subscribe (pg/row-observer c))) + c)) + (defn insert! "Executes an sql insert and returns update count and returned rows. Spec format is diff --git a/src/postgres/async/impl.clj b/src/postgres/async/impl.clj index cb68264..924b9bc 100644 --- a/src/postgres/async/impl.clj +++ b/src/postgres/async/impl.clj @@ -17,20 +17,36 @@ (apply f (concat args [callback])) channel)) -(defn column->value [^Object value] +(defn- column->value [^Object value] (if (and value (-> value .getClass .isArray)) (vec (map column->value value)) value)) +(defn- get-columns [^PgRow row] + (keys (.getColumns row))) + +(defn- row->map [^PgRow row ^Object rowmap ^String col] + (assoc rowmap + (keyword (.toLowerCase col)) + (column->value (.get row col)))) + (defn result->map [^ResultSet result] - (let [columns (.getColumns result) - row->map (fn [^PgRow row rowmap ^String col] - (assoc rowmap (keyword (.toLowerCase col)) - (column->value (.get row col))))] + (let [columns (.getColumns result)] {:updated (.updatedRows result) :rows (vec (map (fn [row] - (reduce (partial row->map row) {} columns)) - result))})) + (reduce (partial row->map row) {} columns)) + result))})) + +(defn ^rx.Observer row-observer [channel] + (reify rx.Observer + (onNext [_ row] + (put! channel (reduce (partial row->map row) + {} (get-columns row)))) + (onError [_ err] + (put! channel err) + (close! channel)) + (onCompleted [_] + (close! channel)))) (defn- list-columns [data] (if (map? data) @@ -68,3 +84,5 @@ " WHERE " (first where) (when returning (str " RETURNING " returning)))) + + diff --git a/test/postgres/async_test.clj b/test/postgres/async_test.clj index df39264..cc240ee 100644 --- a/test/postgres/async_test.clj +++ b/test/postgres/async_test.clj @@ -1,7 +1,8 @@ (ns postgres.async-test (:require [clojure.test :refer :all] [clojure.core.async :refer ['::xml"]) + ^Throwable ex (