From 4db17a8754d67bb45cf797ebe60cb571e2d8f39d Mon Sep 17 00:00:00 2001 From: Dylan Simon Date: Sun, 3 Nov 2013 17:45:01 -0500 Subject: [PATCH 1/3] Make PostgresSQLConnection defensively thread safe Race conditions should be properly caught, but will still throw a ConnectionStillRunningQueryException. Basically this just involves making setQueryPromise check for an existing promise and reordering operations to happen within this mutex. readyForQuery is no longer necessary as it is fully redundant with queryPromise. For Issue #59. --- ...ConnectionStillRunningQueryException.scala | 6 +-- .../db/postgresql/PostgreSQLConnection.scala | 48 ++++++++----------- 2 files changed, 24 insertions(+), 30 deletions(-) diff --git a/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionStillRunningQueryException.scala b/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionStillRunningQueryException.scala index aaf028d6..a6302410 100644 --- a/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionStillRunningQueryException.scala +++ b/db-async-common/src/main/scala/com/github/mauricio/async/db/exceptions/ConnectionStillRunningQueryException.scala @@ -16,8 +16,8 @@ package com.github.mauricio.async.db.exceptions -class ConnectionStillRunningQueryException( connectionCount : Long, readyForQuery : Boolean ) - extends DatabaseException ( "[%s] - There is a query still being run here - readyForQuery -> %s".format( +class ConnectionStillRunningQueryException( connectionCount : Long, caughtRace : Boolean) + extends DatabaseException ( "[%s] - There is a query still being run here - race -> %s".format( connectionCount, - readyForQuery + caughtRace )) diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala index f442950b..88ff733f 100644 --- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala +++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala @@ -25,7 +25,7 @@ import com.github.mauricio.async.db.postgresql.column.{PostgreSQLColumnDecoderRe import com.github.mauricio.async.db.postgresql.exceptions._ import com.github.mauricio.async.db.util._ import com.github.mauricio.async.db.{Configuration, Connection} -import java.util.concurrent.atomic._ +import java.util.concurrent.atomic.{AtomicLong,AtomicInteger,AtomicReference} import messages.backend._ import messages.frontend._ import scala.Some @@ -65,7 +65,6 @@ class PostgreSQLConnection private final val preparedStatementsCounter = new AtomicInteger() private final implicit val internalExecutionContext = executionContext - private var readyForQuery = false private val parameterStatus = new scala.collection.mutable.HashMap[String, String]() private val parsedStatements = new scala.collection.mutable.HashMap[String, PreparedStatementHolder]() private var authenticated = false @@ -80,7 +79,7 @@ class PostgreSQLConnection private var queryResult: Option[QueryResult] = None - def isReadyForQuery: Boolean = this.readyForQuery + def isReadyForQuery: Boolean = this.queryPromise.isEmpty def connect: Future[Connection] = { this.connectionHandler.connect.onFailure { @@ -98,7 +97,6 @@ class PostgreSQLConnection override def sendQuery(query: String): Future[QueryResult] = { validateQuery(query) - this.readyForQuery = false val promise = Promise[QueryResult]() this.setQueryPromise(promise) @@ -132,7 +130,6 @@ class PostgreSQLConnection throw new InsufficientParametersException(paramsCount, values) } - this.readyForQuery = false val promise = Promise[QueryResult]() this.setQueryPromise(promise) this.currentPreparedStatement = Some(query) @@ -172,19 +169,15 @@ class PostgreSQLConnection this.disconnect } - this.failQueryPromise(e) - this.currentPreparedStatement = None + this.failQueryPromise(e) } override def onReadyForQuery() { this.connectionFuture.trySuccess(this) - queryResult.map(this.succeedQueryPromise) - - this.queryResult = None this.recentError = false - this.readyForQuery = true + queryResult.map(this.succeedQueryPromise) } override def onError(m: ErrorMessage) { @@ -269,15 +262,18 @@ class PostgreSQLConnection authenticationMessage.challengeType) } } + + private[this] def notReadyForQueryError(errorMessage : String, race : Boolean) = { + log.error(errorMessage) + throw new ConnectionStillRunningQueryException( + this.currentCount, + race + ) + } def validateIfItIsReadyForQuery(errorMessage: String) = - if (this.queryPromise.isDefined) { - log.error(errorMessage) - throw new ConnectionStillRunningQueryException( - this.currentCount, - this.readyForQuery - ) - } + if (this.queryPromise.isDefined) + notReadyForQueryError(errorMessage, false) private def validateQuery(query: String) { this.validateIfItIsReadyForQuery("Can't run query because there is one query pending already") @@ -290,7 +286,8 @@ class PostgreSQLConnection private def queryPromise: Option[Promise[QueryResult]] = queryPromiseReference.get() private def setQueryPromise(promise: Promise[QueryResult]) { - this.queryPromiseReference.set(Some(promise)) + if (!this.queryPromiseReference.compareAndSet(None, Some(promise))) + notReadyForQueryError("Can't run query due to a race with another started query", true) } private def clearQueryPromise { @@ -298,21 +295,18 @@ class PostgreSQLConnection } private def failQueryPromise(t: Throwable) { - val promise = this.queryPromise - - if (promise.isDefined) { + this.queryPromise.foreach { promise => this.clearQueryPromise log.error("Setting error on future {}", promise) - promise.get.failure(t) + promise.failure(t) } } private def succeedQueryPromise(result: QueryResult) { - val promise = this.queryPromise - - if (promise.isDefined) { + this.queryResult = None + this.queryPromise.foreach { promise => this.clearQueryPromise - promise.get.success(result) + promise.success(result) } } From 25892ec95cfa829e42a51fc1e9058875ca46e9c9 Mon Sep 17 00:00:00 2001 From: Dylan Simon Date: Sun, 3 Nov 2013 18:02:12 -0500 Subject: [PATCH 2/3] Make MySQLConnection defensively thread-safe Use the same approach as in PostgreSQLConnection, where queryPromise is an AtomicReference. Previously it would check queryPromise.isComplete. However, there was no way that a non-null promise could be complete as the only two places it was completed (in {succeed,fail}QueryPromise) immediately set it to null beforehand. This should be reviewed/tested. Issue #59. Updated clearQueryPromise in both to return the old promise, in order to avoid racing completions. --- .../async/db/mysql/MySQLConnection.scala | 37 +++++++++++-------- .../db/postgresql/PostgreSQLConnection.scala | 12 +++--- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala index 851615da..8a8bf773 100644 --- a/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala +++ b/mysql-async/src/main/scala/com/github/mauricio/async/db/mysql/MySQLConnection.scala @@ -25,7 +25,7 @@ import com.github.mauricio.async.db.mysql.message.server._ import com.github.mauricio.async.db.mysql.util.CharsetMapper import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture import com.github.mauricio.async.db.util._ -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{AtomicLong,AtomicReference} import scala.Some import scala.concurrent.{ExecutionContext, Promise, Future} import scala.util.Failure @@ -78,7 +78,7 @@ class MySQLConnection( private final val connectionPromise = Promise[Connection]() private final val disconnectionPromise = Promise[Connection]() - private var queryPromise: Promise[QueryResult] = null + private val queryPromiseReference = new AtomicReference[Option[Promise[QueryResult]]](None) private var connected = false private var _lastException : Throwable = null private var serverVersion : Version = null @@ -185,33 +185,28 @@ class MySQLConnection( def sendQuery(query: String): Future[QueryResult] = { this.validateIsReadyForQuery() val promise = Promise[QueryResult] - this.queryPromise = promise + this.setQueryPromise(promise) this.connectionHandler.write(new QueryMessage(query)) promise.future } private def failQueryPromise(t: Throwable) { - if (this.isQuerying) { - val promise = this.queryPromise - this.queryPromise = null - - promise.tryFailure(t) + this.clearQueryPromise.foreach { + _.tryFailure(t) } } private def succeedQueryPromise(queryResult: QueryResult) { - if (this.isQuerying) { - val promise = this.queryPromise - this.queryPromise = null - promise.success(queryResult) + this.clearQueryPromise.foreach { + _.success(queryResult) } } - def isQuerying: Boolean = this.queryPromise != null && !this.queryPromise.isCompleted + def isQuerying: Boolean = this.queryPromise.isDefined def onResultSet(resultSet: ResultSet, message: EOFMessage) { if (this.isQuerying) { @@ -239,15 +234,25 @@ class MySQLConnection( throw new InsufficientParametersException(totalParameters, values) } val promise = Promise[QueryResult] - this.queryPromise = promise + this.setQueryPromise(promise) this.connectionHandler.write(new PreparedStatementMessage(query, values)) promise.future } private def validateIsReadyForQuery() { - if ( this.queryPromise != null && !this.queryPromise.isCompleted ) { - throw new ConnectionStillRunningQueryException(this.connectionCount, false ) + if ( isQuerying ) { + throw new ConnectionStillRunningQueryException(this.connectionCount, false) } } + private def queryPromise: Option[Promise[QueryResult]] = queryPromiseReference.get() + + private def setQueryPromise(promise: Promise[QueryResult]) { + if (!this.queryPromiseReference.compareAndSet(None, Some(promise))) + throw new ConnectionStillRunningQueryException(this.connectionCount, true) + } + + private def clearQueryPromise : Option[Promise[QueryResult]] = { + this.queryPromiseReference.getAndSet(None) + } } diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala index 88ff733f..17057bc6 100644 --- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala +++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala @@ -290,13 +290,12 @@ class PostgreSQLConnection notReadyForQueryError("Can't run query due to a race with another started query", true) } - private def clearQueryPromise { - this.queryPromiseReference.set(None) + private def clearQueryPromise : Option[Promise[QueryResult]] = { + this.queryPromiseReference.getAndSet(None) } private def failQueryPromise(t: Throwable) { - this.queryPromise.foreach { promise => - this.clearQueryPromise + this.clearQueryPromise.foreach { promise => log.error("Setting error on future {}", promise) promise.failure(t) } @@ -304,9 +303,8 @@ class PostgreSQLConnection private def succeedQueryPromise(result: QueryResult) { this.queryResult = None - this.queryPromise.foreach { promise => - this.clearQueryPromise - promise.success(result) + this.clearQueryPromise.foreach { + _.success(result) } } From f754c1b72524974a8d011b458daac9bd4140e590 Mon Sep 17 00:00:00 2001 From: Dylan Simon Date: Sun, 3 Nov 2013 18:11:27 -0500 Subject: [PATCH 3/3] Minor optimization: use foreach rather than map --- .../mauricio/async/db/postgresql/PostgreSQLConnection.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala index 17057bc6..4fbdfaaf 100644 --- a/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala +++ b/postgresql-async/src/main/scala/com/github/mauricio/async/db/postgresql/PostgreSQLConnection.scala @@ -177,7 +177,7 @@ class PostgreSQLConnection this.connectionFuture.trySuccess(this) this.recentError = false - queryResult.map(this.succeedQueryPromise) + queryResult.foreach(this.succeedQueryPromise) } override def onError(m: ErrorMessage) {