Skip to content
This repository was archived by the owner on Dec 3, 2019. It is now read-only.

Connection mutex improvements for issue #59 #63

Merged
merged 3 commits into from
Nov 6, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.github.mauricio.async.db.exceptions

class ConnectionStillRunningQueryException( connectionCount : Long, readyForQuery : Boolean )
extends DatabaseException ( "[%s] - There is a query still being run here - readyForQuery -> %s".format(
class ConnectionStillRunningQueryException( connectionCount : Long, caughtRace : Boolean)
extends DatabaseException ( "[%s] - There is a query still being run here - race -> %s".format(
connectionCount,
readyForQuery
caughtRace
))
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.github.mauricio.async.db.mysql.message.server._
import com.github.mauricio.async.db.mysql.util.CharsetMapper
import com.github.mauricio.async.db.util.ChannelFutureTransformer.toFuture
import com.github.mauricio.async.db.util._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{AtomicLong,AtomicReference}
import scala.Some
import scala.concurrent.{ExecutionContext, Promise, Future}
import scala.util.Failure
Expand Down Expand Up @@ -78,7 +78,7 @@ class MySQLConnection(
private final val connectionPromise = Promise[Connection]()
private final val disconnectionPromise = Promise[Connection]()

private var queryPromise: Promise[QueryResult] = null
private val queryPromiseReference = new AtomicReference[Option[Promise[QueryResult]]](None)
private var connected = false
private var _lastException : Throwable = null
private var serverVersion : Version = null
Expand Down Expand Up @@ -185,33 +185,28 @@ class MySQLConnection(
def sendQuery(query: String): Future[QueryResult] = {
this.validateIsReadyForQuery()
val promise = Promise[QueryResult]
this.queryPromise = promise
this.setQueryPromise(promise)
this.connectionHandler.write(new QueryMessage(query))
promise.future
}

private def failQueryPromise(t: Throwable) {

if (this.isQuerying) {
val promise = this.queryPromise
this.queryPromise = null

promise.tryFailure(t)
this.clearQueryPromise.foreach {
_.tryFailure(t)
}

}

private def succeedQueryPromise(queryResult: QueryResult) {

if (this.isQuerying) {
val promise = this.queryPromise
this.queryPromise = null
promise.success(queryResult)
this.clearQueryPromise.foreach {
_.success(queryResult)
}

}

def isQuerying: Boolean = this.queryPromise != null && !this.queryPromise.isCompleted
def isQuerying: Boolean = this.queryPromise.isDefined

def onResultSet(resultSet: ResultSet, message: EOFMessage) {
if (this.isQuerying) {
Expand Down Expand Up @@ -239,15 +234,25 @@ class MySQLConnection(
throw new InsufficientParametersException(totalParameters, values)
}
val promise = Promise[QueryResult]
this.queryPromise = promise
this.setQueryPromise(promise)
this.connectionHandler.write(new PreparedStatementMessage(query, values))
promise.future
}

private def validateIsReadyForQuery() {
if ( this.queryPromise != null && !this.queryPromise.isCompleted ) {
throw new ConnectionStillRunningQueryException(this.connectionCount, false )
if ( isQuerying ) {
throw new ConnectionStillRunningQueryException(this.connectionCount, false)
}
}

private def queryPromise: Option[Promise[QueryResult]] = queryPromiseReference.get()

private def setQueryPromise(promise: Promise[QueryResult]) {
if (!this.queryPromiseReference.compareAndSet(None, Some(promise)))
throw new ConnectionStillRunningQueryException(this.connectionCount, true)
}

private def clearQueryPromise : Option[Promise[QueryResult]] = {
this.queryPromiseReference.getAndSet(None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.github.mauricio.async.db.postgresql.column.{PostgreSQLColumnDecoderRe
import com.github.mauricio.async.db.postgresql.exceptions._
import com.github.mauricio.async.db.util._
import com.github.mauricio.async.db.{Configuration, Connection}
import java.util.concurrent.atomic._
import java.util.concurrent.atomic.{AtomicLong,AtomicInteger,AtomicReference}
import messages.backend._
import messages.frontend._
import scala.Some
Expand Down Expand Up @@ -65,7 +65,6 @@ class PostgreSQLConnection
private final val preparedStatementsCounter = new AtomicInteger()
private final implicit val internalExecutionContext = executionContext

private var readyForQuery = false
private val parameterStatus = new scala.collection.mutable.HashMap[String, String]()
private val parsedStatements = new scala.collection.mutable.HashMap[String, PreparedStatementHolder]()
private var authenticated = false
Expand All @@ -80,7 +79,7 @@ class PostgreSQLConnection

private var queryResult: Option[QueryResult] = None

def isReadyForQuery: Boolean = this.readyForQuery
def isReadyForQuery: Boolean = this.queryPromise.isEmpty

def connect: Future[Connection] = {
this.connectionHandler.connect.onFailure {
Expand All @@ -98,7 +97,6 @@ class PostgreSQLConnection

override def sendQuery(query: String): Future[QueryResult] = {
validateQuery(query)
this.readyForQuery = false

val promise = Promise[QueryResult]()
this.setQueryPromise(promise)
Expand Down Expand Up @@ -132,7 +130,6 @@ class PostgreSQLConnection
throw new InsufficientParametersException(paramsCount, values)
}

this.readyForQuery = false
val promise = Promise[QueryResult]()
this.setQueryPromise(promise)
this.currentPreparedStatement = Some(query)
Expand Down Expand Up @@ -172,19 +169,15 @@ class PostgreSQLConnection
this.disconnect
}

this.failQueryPromise(e)

this.currentPreparedStatement = None
this.failQueryPromise(e)
}

override def onReadyForQuery() {
this.connectionFuture.trySuccess(this)

queryResult.map(this.succeedQueryPromise)

this.queryResult = None
this.recentError = false
this.readyForQuery = true
queryResult.foreach(this.succeedQueryPromise)
}

override def onError(m: ErrorMessage) {
Expand Down Expand Up @@ -269,15 +262,18 @@ class PostgreSQLConnection
authenticationMessage.challengeType)
}
}

private[this] def notReadyForQueryError(errorMessage : String, race : Boolean) = {
log.error(errorMessage)
throw new ConnectionStillRunningQueryException(
this.currentCount,
race
)
}

def validateIfItIsReadyForQuery(errorMessage: String) =
if (this.queryPromise.isDefined) {
log.error(errorMessage)
throw new ConnectionStillRunningQueryException(
this.currentCount,
this.readyForQuery
)
}
if (this.queryPromise.isDefined)
notReadyForQueryError(errorMessage, false)

private def validateQuery(query: String) {
this.validateIfItIsReadyForQuery("Can't run query because there is one query pending already")
Expand All @@ -290,29 +286,25 @@ class PostgreSQLConnection
private def queryPromise: Option[Promise[QueryResult]] = queryPromiseReference.get()

private def setQueryPromise(promise: Promise[QueryResult]) {
this.queryPromiseReference.set(Some(promise))
if (!this.queryPromiseReference.compareAndSet(None, Some(promise)))
notReadyForQueryError("Can't run query due to a race with another started query", true)
}

private def clearQueryPromise {
this.queryPromiseReference.set(None)
private def clearQueryPromise : Option[Promise[QueryResult]] = {
this.queryPromiseReference.getAndSet(None)
}

private def failQueryPromise(t: Throwable) {
val promise = this.queryPromise

if (promise.isDefined) {
this.clearQueryPromise
this.clearQueryPromise.foreach { promise =>
log.error("Setting error on future {}", promise)
promise.get.failure(t)
promise.failure(t)
}
}

private def succeedQueryPromise(result: QueryResult) {
val promise = this.queryPromise

if (promise.isDefined) {
this.clearQueryPromise
promise.get.success(result)
this.queryResult = None
this.clearQueryPromise.foreach {
_.success(result)
}
}

Expand Down