Skip to content

Commit d6180b5

Browse files
committed
Added check to SingleThreadedAsyncObjectPool to ensure returned objects
came from that pool. Fixed not destroying invalidated objects during test cycle. Fixed exception on multiple close attempts in SingleThreadedAsyncObjectPool to make consistent with simultaneous request execution path. Added generic spec for testing an AsyncObjectPool implementation, and applied it to SingleThreadedAsyncObjectPool to guard against the above problems reappearing. Added mock capabilities back for specs2.
1 parent 5e24cb0 commit d6180b5

File tree

3 files changed

+277
-25
lines changed

3 files changed

+277
-25
lines changed

db-async-common/src/main/scala/com/github/mauricio/async/db/pool/SingleThreadedAsyncObjectPool.scala

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package com.github.mauricio.async.db.pool
1818

19+
import java.util.concurrent.RejectedExecutionException
20+
1921
import com.github.mauricio.async.db.util.{Log, Worker}
2022
import java.util.concurrent.atomic.AtomicLong
21-
import java.util.{TimerTask, Timer}
23+
import java.util.{Timer, TimerTask}
24+
2225
import scala.collection.mutable.{ArrayBuffer, Queue, Stack}
23-
import scala.concurrent.{Promise, Future}
26+
import scala.concurrent.{Future, Promise}
2427
import scala.util.{Failure, Success}
2528

2629
object SingleThreadedAsyncObjectPool {
@@ -93,15 +96,30 @@ class SingleThreadedAsyncObjectPool[T](
9396
def giveBack(item: T): Future[AsyncObjectPool[T]] = {
9497
val promise = Promise[AsyncObjectPool[T]]()
9598
this.mainPool.action {
96-
this.checkouts -= item
97-
this.factory.validate(item) match {
98-
case Success(item) => {
99-
this.addBack(item, promise)
99+
// Ensure it came from this pool
100+
val idx = this.checkouts.indexOf(item)
101+
if(idx >= 0) {
102+
this.checkouts.remove(idx)
103+
this.factory.validate(item) match {
104+
case Success(item) => {
105+
this.addBack(item, promise)
106+
}
107+
case Failure(e) => {
108+
this.factory.destroy(item)
109+
promise.failure(e)
110+
}
100111
}
101-
case Failure(e) => {
102-
this.checkouts -= item
103-
this.factory.destroy(item)
104-
promise.failure(e)
112+
} else {
113+
// It's already a failure but lets doublecheck why
114+
val isFromOurPool = (item match {
115+
case x: AnyRef => this.poolables.find(holder => x eq holder.item.asInstanceOf[AnyRef])
116+
case _ => this.poolables.find(holder => item == holder.item)
117+
}).isDefined
118+
119+
if(isFromOurPool) {
120+
promise.failure(new IllegalStateException("This item has already been returned"))
121+
} else {
122+
promise.failure(new IllegalArgumentException("The returned item did not come from this pool."))
105123
}
106124
}
107125
}
@@ -112,25 +130,28 @@ class SingleThreadedAsyncObjectPool[T](
112130
def isFull: Boolean = this.poolables.isEmpty && this.checkouts.size == configuration.maxObjects
113131

114132
def close: Future[AsyncObjectPool[T]] = {
115-
val promise = Promise[AsyncObjectPool[T]]()
116-
117-
this.mainPool.action {
118-
if (!this.closed) {
119-
try {
120-
this.timer.cancel()
121-
this.mainPool.shutdown
122-
this.closed = true
123-
(this.poolables.map(i => i.item) ++ this.checkouts).foreach(item => factory.destroy(item))
133+
try {
134+
val promise = Promise[AsyncObjectPool[T]]()
135+
this.mainPool.action {
136+
if (!this.closed) {
137+
try {
138+
this.timer.cancel()
139+
this.mainPool.shutdown
140+
this.closed = true
141+
(this.poolables.map(i => i.item) ++ this.checkouts).foreach(item => factory.destroy(item))
142+
promise.success(this)
143+
} catch {
144+
case e: Exception => promise.failure(e)
145+
}
146+
} else {
124147
promise.success(this)
125-
} catch {
126-
case e: Exception => promise.failure(e)
127148
}
128-
} else {
129-
promise.success(this)
130149
}
150+
promise.future
151+
} catch {
152+
case e: RejectedExecutionException if this.closed =>
153+
Future.successful(this)
131154
}
132-
133-
promise.future
134155
}
135156

136157
def availables: Traversable[T] = this.poolables.map(item => item.item)
@@ -238,6 +259,7 @@ class SingleThreadedAsyncObjectPool[T](
238259
case Failure(e) => {
239260
log.error("Failed to validate object", e)
240261
removals += poolable
262+
factory.destroy(poolable.item)
241263
}
242264
}
243265
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package com.github.mauricio.async.db.pool
2+
3+
import com.github.mauricio.async.db.pool.AbstractAsyncObjectPoolSpec.Widget
4+
import org.mockito.Mockito.reset
5+
import org.specs2.mock.Mockito
6+
import org.specs2.mutable.Specification
7+
8+
import scala.concurrent.{Await, Future}
9+
import scala.util.Failure
10+
11+
import scala.reflect.runtime.universe.TypeTag
12+
import scala.util.Try
13+
import scala.concurrent.duration.{Duration, SECONDS}
14+
15+
/**
16+
* This spec is designed abstract to allow testing of any implementation of AsyncObjectPool, against the common
17+
* requirements the interface expects.
18+
*
19+
* @tparam T the AsyncObjectPool being tested.
20+
*/
21+
abstract class AbstractAsyncObjectPoolSpec[T <: AsyncObjectPool[Widget]](implicit tag: TypeTag[T])
22+
extends Specification
23+
with Mockito {
24+
25+
import AbstractAsyncObjectPoolSpec._
26+
27+
protected def pool(factory: ObjectFactory[Widget] = new TestWidgetFactory, conf: PoolConfiguration = PoolConfiguration.Default): T
28+
29+
// Evaluates to the type of AsyncObjectPool
30+
s"the ${tag.tpe.erasure} variant of AsyncObjectPool" should {
31+
32+
"successfully retrieve and return a Widget" in {
33+
val p = pool()
34+
val widget = Await.result(p.take, Duration.Inf)
35+
36+
widget must not beNull
37+
38+
val thePool = Await.result(p.giveBack(widget), Duration.Inf)
39+
thePool must be(p)
40+
}
41+
42+
"reject Widgets that did not come from it" in {
43+
val p = pool()
44+
45+
Await.result(p.giveBack(Widget(null)), Duration.Inf) must throwAn[IllegalArgumentException]
46+
}
47+
48+
"scale contents" >> {
49+
sequential
50+
51+
val factory = spy(new TestWidgetFactory)
52+
53+
val p = pool(
54+
factory = factory,
55+
conf = PoolConfiguration(
56+
maxObjects = 5,
57+
maxIdle = 2,
58+
maxQueueSize = 5,
59+
validationInterval = 2000
60+
))
61+
62+
63+
64+
var taken = Seq.empty[Widget]
65+
"can take up to maxObjects" in {
66+
taken = Await.result(Future.sequence(for (i <- 1 to 5) yield p.take), Duration.Inf)
67+
68+
taken must have size 5
69+
taken.head must not beNull;
70+
taken(1) must not beNull;
71+
taken(2) must not beNull;
72+
taken(3) must not beNull;
73+
taken(4) must not beNull
74+
}
75+
76+
"does not attempt to expire taken items" in {
77+
// Wait 3 seconds to ensure idle check has run at least once
78+
there was after(3.seconds).no(factory).destroy(any[Widget])
79+
}
80+
81+
reset(factory) // Considered bad form, but necessary as we depend on previous state in these tests
82+
"takes maxObjects back" in {
83+
val returns = Await.result(Future.sequence(for (widget <- taken) yield p.giveBack(widget)), Duration.Inf)
84+
85+
returns must have size 5
86+
87+
returns.head must be(p)
88+
returns(1) must be(p)
89+
returns(2) must be(p)
90+
returns(3) must be(p)
91+
returns(4) must be(p)
92+
}
93+
94+
"protest returning an item that was already returned" in {
95+
val resultFuture = p.giveBack(taken.head)
96+
97+
Await.result(resultFuture, Duration.Inf) must throwAn[IllegalStateException]
98+
}
99+
100+
"destroy down to maxIdle widgets" in {
101+
Thread.sleep(3000)
102+
there were 5.times(factory).destroy(any[Widget])
103+
}
104+
}
105+
106+
"queue requests after running out" in {
107+
val p = pool(conf = PoolConfiguration.Default.copy(maxObjects = 2, maxQueueSize = 1))
108+
109+
val widgets = Await.result(Future.sequence(for (i <- 1 to 2) yield p.take), Duration.Inf)
110+
111+
val future = p.take
112+
113+
// Wait five seconds
114+
Thread.sleep(5000)
115+
116+
val failedFuture = p.take
117+
118+
// Cannot be done, would exceed maxObjects
119+
future.isCompleted must beFalse
120+
121+
Await.result(failedFuture, Duration.Inf) must throwA[PoolExhaustedException]
122+
123+
Await.result(p.giveBack(widgets.head), Duration.Inf) must be(p)
124+
125+
Await.result(future, Duration(5, SECONDS)) must be(widgets.head)
126+
}
127+
128+
"refuse to allow take after being closed" in {
129+
val p = pool()
130+
131+
Await.result(p.close, Duration.Inf) must be(p)
132+
133+
Await.result(p.take, Duration.Inf) must throwA[PoolAlreadyTerminatedException]
134+
}
135+
136+
"allow being closed more than once" in {
137+
val p = pool()
138+
139+
Await.result(p.close, Duration.Inf) must be(p)
140+
141+
Await.result(p.close, Duration.Inf) must be(p)
142+
}
143+
144+
145+
"destroy a failed widget" in {
146+
val factory = spy(new TestWidgetFactory)
147+
val p = pool(factory = factory)
148+
149+
val widget = Await.result(p.take, Duration.Inf)
150+
151+
widget must not beNull
152+
153+
factory.validate(widget) returns Failure(new RuntimeException("This is a bad widget!"))
154+
155+
Await.result(p.giveBack(widget), Duration.Inf) must throwA[RuntimeException](message = "This is a bad widget!")
156+
157+
there was atLeastOne(factory).destroy(widget)
158+
}
159+
160+
"clean up widgets that die in the pool" in {
161+
val factory = spy(new TestWidgetFactory)
162+
// Deliberately make it impossible to expire (nearly)
163+
val p = pool(factory = factory, conf = PoolConfiguration.Default.copy(maxIdle = Long.MaxValue, validationInterval = 2000))
164+
165+
val widget = Await.result(p.take, Duration.Inf)
166+
167+
widget must not beNull
168+
169+
Await.result(p.giveBack(widget), Duration.Inf) must be(p)
170+
171+
there was atLeastOne(factory).validate(widget)
172+
there were no(factory).destroy(widget)
173+
174+
there was after(3.seconds).atLeastTwo(factory).validate(widget)
175+
176+
factory.validate(widget) returns Failure(new RuntimeException("Test Exception, Not an Error"))
177+
178+
there was after(3.seconds).one(factory).destroy(widget)
179+
180+
Await.ready(p.take, Duration.Inf)
181+
182+
there was two(factory).create
183+
}
184+
185+
}
186+
187+
}
188+
189+
object AbstractAsyncObjectPoolSpec {
190+
191+
case class Widget(factory: TestWidgetFactory)
192+
193+
class TestWidgetFactory extends ObjectFactory[Widget] {
194+
195+
override def create: Widget = Widget(this)
196+
197+
override def destroy(item: Widget) = {}
198+
199+
override def validate(item: Widget): Try[Widget] = Try {
200+
if (item.factory eq this)
201+
item
202+
else
203+
throw new IllegalArgumentException("Not our item")
204+
}
205+
}
206+
207+
}
208+
209+
210+
class SingleThreadedAsyncObjectPoolSpec extends AbstractAsyncObjectPoolSpec[SingleThreadedAsyncObjectPool[Widget]] {
211+
212+
import AbstractAsyncObjectPoolSpec._
213+
214+
override protected def pool(factory: ObjectFactory[Widget], conf: PoolConfiguration) =
215+
new SingleThreadedAsyncObjectPool(factory, conf)
216+
217+
"SingleThreadedAsyncObjectPool" should {
218+
"successfully record a closed state" in {
219+
val p = pool()
220+
221+
Await.result(p.close, Duration.Inf) must be(p)
222+
223+
p.isClosed must beTrue
224+
}
225+
226+
}
227+
228+
}

project/Build.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ object Configuration {
5555

5656
val specs2Dependency = "org.specs2" %% "specs2-core" % specs2Version % "test"
5757
val specs2JunitDependency = "org.specs2" %% "specs2-junit" % specs2Version % "test"
58+
val specs2MockDependency = "org.specs2" %% "specs2-mock" % specs2Version % "test"
5859
val logbackDependency = "ch.qos.logback" % "logback-classic" % "1.1.6" % "test"
5960

6061
val commonDependencies = Seq(
@@ -65,6 +66,7 @@ object Configuration {
6566
"org.javassist" % "javassist" % "3.20.0-GA",
6667
specs2Dependency,
6768
specs2JunitDependency,
69+
specs2MockDependency,
6870
logbackDependency
6971
)
7072

0 commit comments

Comments
 (0)