@@ -25,7 +25,7 @@ import com.github.mauricio.async.db.postgresql.column.{PostgreSQLColumnDecoderRe
25
25
import com .github .mauricio .async .db .postgresql .exceptions ._
26
26
import com .github .mauricio .async .db .util ._
27
27
import com .github .mauricio .async .db .{Configuration , Connection }
28
- import java .util .concurrent .atomic ._
28
+ import java .util .concurrent .atomic .{ AtomicLong , AtomicInteger , AtomicReference }
29
29
import messages .backend ._
30
30
import messages .frontend ._
31
31
import scala .Some
@@ -65,7 +65,6 @@ class PostgreSQLConnection
65
65
private final val preparedStatementsCounter = new AtomicInteger ()
66
66
private final implicit val internalExecutionContext = executionContext
67
67
68
- private var readyForQuery = false
69
68
private val parameterStatus = new scala.collection.mutable.HashMap [String , String ]()
70
69
private val parsedStatements = new scala.collection.mutable.HashMap [String , PreparedStatementHolder ]()
71
70
private var authenticated = false
@@ -80,7 +79,7 @@ class PostgreSQLConnection
80
79
81
80
private var queryResult : Option [QueryResult ] = None
82
81
83
- def isReadyForQuery : Boolean = this .readyForQuery
82
+ def isReadyForQuery : Boolean = this .queryPromise.isEmpty
84
83
85
84
def connect : Future [Connection ] = {
86
85
this .connectionHandler.connect.onFailure {
@@ -98,7 +97,6 @@ class PostgreSQLConnection
98
97
99
98
override def sendQuery (query : String ): Future [QueryResult ] = {
100
99
validateQuery(query)
101
- this .readyForQuery = false
102
100
103
101
val promise = Promise [QueryResult ]()
104
102
this .setQueryPromise(promise)
@@ -132,7 +130,6 @@ class PostgreSQLConnection
132
130
throw new InsufficientParametersException (paramsCount, values)
133
131
}
134
132
135
- this .readyForQuery = false
136
133
val promise = Promise [QueryResult ]()
137
134
this .setQueryPromise(promise)
138
135
this .currentPreparedStatement = Some (query)
@@ -172,19 +169,15 @@ class PostgreSQLConnection
172
169
this .disconnect
173
170
}
174
171
175
- this .failQueryPromise(e)
176
-
177
172
this .currentPreparedStatement = None
173
+ this .failQueryPromise(e)
178
174
}
179
175
180
176
override def onReadyForQuery () {
181
177
this .connectionFuture.trySuccess(this )
182
178
183
- queryResult.map(this .succeedQueryPromise)
184
-
185
- this .queryResult = None
186
179
this .recentError = false
187
- this .readyForQuery = true
180
+ queryResult.foreach( this .succeedQueryPromise)
188
181
}
189
182
190
183
override def onError (m : ErrorMessage ) {
@@ -269,15 +262,18 @@ class PostgreSQLConnection
269
262
authenticationMessage.challengeType)
270
263
}
271
264
}
265
+
266
+ private [this ] def notReadyForQueryError (errorMessage : String , race : Boolean ) = {
267
+ log.error(errorMessage)
268
+ throw new ConnectionStillRunningQueryException (
269
+ this .currentCount,
270
+ race
271
+ )
272
+ }
272
273
273
274
def validateIfItIsReadyForQuery (errorMessage : String ) =
274
- if (this .queryPromise.isDefined) {
275
- log.error(errorMessage)
276
- throw new ConnectionStillRunningQueryException (
277
- this .currentCount,
278
- this .readyForQuery
279
- )
280
- }
275
+ if (this .queryPromise.isDefined)
276
+ notReadyForQueryError(errorMessage, false )
281
277
282
278
private def validateQuery (query : String ) {
283
279
this .validateIfItIsReadyForQuery(" Can't run query because there is one query pending already" )
@@ -290,29 +286,25 @@ class PostgreSQLConnection
290
286
private def queryPromise : Option [Promise [QueryResult ]] = queryPromiseReference.get()
291
287
292
288
private def setQueryPromise (promise : Promise [QueryResult ]) {
293
- this .queryPromiseReference.set(Some (promise))
289
+ if (! this .queryPromiseReference.compareAndSet(None , Some (promise)))
290
+ notReadyForQueryError(" Can't run query due to a race with another started query" , true )
294
291
}
295
292
296
- private def clearQueryPromise {
297
- this .queryPromiseReference.set (None )
293
+ private def clearQueryPromise : Option [ Promise [ QueryResult ]] = {
294
+ this .queryPromiseReference.getAndSet (None )
298
295
}
299
296
300
297
private def failQueryPromise (t : Throwable ) {
301
- val promise = this .queryPromise
302
-
303
- if (promise.isDefined) {
304
- this .clearQueryPromise
298
+ this .clearQueryPromise.foreach { promise =>
305
299
log.error(" Setting error on future {}" , promise)
306
- promise.get. failure(t)
300
+ promise.failure(t)
307
301
}
308
302
}
309
303
310
304
private def succeedQueryPromise (result : QueryResult ) {
311
- val promise = this .queryPromise
312
-
313
- if (promise.isDefined) {
314
- this .clearQueryPromise
315
- promise.get.success(result)
305
+ this .queryResult = None
306
+ this .clearQueryPromise.foreach {
307
+ _.success(result)
316
308
}
317
309
}
318
310
0 commit comments