Skip to content

Commit c94b0ad

Browse files
committed
Merge pull request mauricio#167 from SattaiLanfear/master
AsyncObjectPool Tests + Fixes for Returning to Wrong Pool, Multiple Close, Missed Destruction
2 parents 5e24cb0 + d6180b5 commit c94b0ad

File tree

3 files changed

+277
-25
lines changed

3 files changed

+277
-25
lines changed

db-async-common/src/main/scala/com/github/mauricio/async/db/pool/SingleThreadedAsyncObjectPool.scala

Lines changed: 47 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
package com.github.mauricio.async.db.pool
1818

19+
import java.util.concurrent.RejectedExecutionException
20+
1921
import com.github.mauricio.async.db.util.{Log, Worker}
2022
import java.util.concurrent.atomic.AtomicLong
21-
import java.util.{TimerTask, Timer}
23+
import java.util.{Timer, TimerTask}
24+
2225
import scala.collection.mutable.{ArrayBuffer, Queue, Stack}
23-
import scala.concurrent.{Promise, Future}
26+
import scala.concurrent.{Future, Promise}
2427
import scala.util.{Failure, Success}
2528

2629
object SingleThreadedAsyncObjectPool {
@@ -93,15 +96,30 @@ class SingleThreadedAsyncObjectPool[T](
9396
def giveBack(item: T): Future[AsyncObjectPool[T]] = {
9497
val promise = Promise[AsyncObjectPool[T]]()
9598
this.mainPool.action {
96-
this.checkouts -= item
97-
this.factory.validate(item) match {
98-
case Success(item) => {
99-
this.addBack(item, promise)
99+
// Ensure it came from this pool
100+
val idx = this.checkouts.indexOf(item)
101+
if(idx >= 0) {
102+
this.checkouts.remove(idx)
103+
this.factory.validate(item) match {
104+
case Success(item) => {
105+
this.addBack(item, promise)
106+
}
107+
case Failure(e) => {
108+
this.factory.destroy(item)
109+
promise.failure(e)
110+
}
100111
}
101-
case Failure(e) => {
102-
this.checkouts -= item
103-
this.factory.destroy(item)
104-
promise.failure(e)
112+
} else {
113+
// It's already a failure but lets doublecheck why
114+
val isFromOurPool = (item match {
115+
case x: AnyRef => this.poolables.find(holder => x eq holder.item.asInstanceOf[AnyRef])
116+
case _ => this.poolables.find(holder => item == holder.item)
117+
}).isDefined
118+
119+
if(isFromOurPool) {
120+
promise.failure(new IllegalStateException("This item has already been returned"))
121+
} else {
122+
promise.failure(new IllegalArgumentException("The returned item did not come from this pool."))
105123
}
106124
}
107125
}
@@ -112,25 +130,28 @@ class SingleThreadedAsyncObjectPool[T](
112130
def isFull: Boolean = this.poolables.isEmpty && this.checkouts.size == configuration.maxObjects
113131

114132
def close: Future[AsyncObjectPool[T]] = {
115-
val promise = Promise[AsyncObjectPool[T]]()
116-
117-
this.mainPool.action {
118-
if (!this.closed) {
119-
try {
120-
this.timer.cancel()
121-
this.mainPool.shutdown
122-
this.closed = true
123-
(this.poolables.map(i => i.item) ++ this.checkouts).foreach(item => factory.destroy(item))
133+
try {
134+
val promise = Promise[AsyncObjectPool[T]]()
135+
this.mainPool.action {
136+
if (!this.closed) {
137+
try {
138+
this.timer.cancel()
139+
this.mainPool.shutdown
140+
this.closed = true
141+
(this.poolables.map(i => i.item) ++ this.checkouts).foreach(item => factory.destroy(item))
142+
promise.success(this)
143+
} catch {
144+
case e: Exception => promise.failure(e)
145+
}
146+
} else {
124147
promise.success(this)
125-
} catch {
126-
case e: Exception => promise.failure(e)
127148
}
128-
} else {
129-
promise.success(this)
130149
}
150+
promise.future
151+
} catch {
152+
case e: RejectedExecutionException if this.closed =>
153+
Future.successful(this)
131154
}
132-
133-
promise.future
134155
}
135156

136157
def availables: Traversable[T] = this.poolables.map(item => item.item)
@@ -238,6 +259,7 @@ class SingleThreadedAsyncObjectPool[T](
238259
case Failure(e) => {
239260
log.error("Failed to validate object", e)
240261
removals += poolable
262+
factory.destroy(poolable.item)
241263
}
242264
}
243265
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
package com.github.mauricio.async.db.pool
2+
3+
import com.github.mauricio.async.db.pool.AbstractAsyncObjectPoolSpec.Widget
4+
import org.mockito.Mockito.reset
5+
import org.specs2.mock.Mockito
6+
import org.specs2.mutable.Specification
7+
8+
import scala.concurrent.{Await, Future}
9+
import scala.util.Failure
10+
11+
import scala.reflect.runtime.universe.TypeTag
12+
import scala.util.Try
13+
import scala.concurrent.duration.{Duration, SECONDS}
14+
15+
/**
16+
* This spec is designed abstract to allow testing of any implementation of AsyncObjectPool, against the common
17+
* requirements the interface expects.
18+
*
19+
* @tparam T the AsyncObjectPool being tested.
20+
*/
21+
abstract class AbstractAsyncObjectPoolSpec[T <: AsyncObjectPool[Widget]](implicit tag: TypeTag[T])
22+
extends Specification
23+
with Mockito {
24+
25+
import AbstractAsyncObjectPoolSpec._
26+
27+
protected def pool(factory: ObjectFactory[Widget] = new TestWidgetFactory, conf: PoolConfiguration = PoolConfiguration.Default): T
28+
29+
// Evaluates to the type of AsyncObjectPool
30+
s"the ${tag.tpe.erasure} variant of AsyncObjectPool" should {
31+
32+
"successfully retrieve and return a Widget" in {
33+
val p = pool()
34+
val widget = Await.result(p.take, Duration.Inf)
35+
36+
widget must not beNull
37+
38+
val thePool = Await.result(p.giveBack(widget), Duration.Inf)
39+
thePool must be(p)
40+
}
41+
42+
"reject Widgets that did not come from it" in {
43+
val p = pool()
44+
45+
Await.result(p.giveBack(Widget(null)), Duration.Inf) must throwAn[IllegalArgumentException]
46+
}
47+
48+
"scale contents" >> {
49+
sequential
50+
51+
val factory = spy(new TestWidgetFactory)
52+
53+
val p = pool(
54+
factory = factory,
55+
conf = PoolConfiguration(
56+
maxObjects = 5,
57+
maxIdle = 2,
58+
maxQueueSize = 5,
59+
validationInterval = 2000
60+
))
61+
62+
63+
64+
var taken = Seq.empty[Widget]
65+
"can take up to maxObjects" in {
66+
taken = Await.result(Future.sequence(for (i <- 1 to 5) yield p.take), Duration.Inf)
67+
68+
taken must have size 5
69+
taken.head must not beNull;
70+
taken(1) must not beNull;
71+
taken(2) must not beNull;
72+
taken(3) must not beNull;
73+
taken(4) must not beNull
74+
}
75+
76+
"does not attempt to expire taken items" in {
77+
// Wait 3 seconds to ensure idle check has run at least once
78+
there was after(3.seconds).no(factory).destroy(any[Widget])
79+
}
80+
81+
reset(factory) // Considered bad form, but necessary as we depend on previous state in these tests
82+
"takes maxObjects back" in {
83+
val returns = Await.result(Future.sequence(for (widget <- taken) yield p.giveBack(widget)), Duration.Inf)
84+
85+
returns must have size 5
86+
87+
returns.head must be(p)
88+
returns(1) must be(p)
89+
returns(2) must be(p)
90+
returns(3) must be(p)
91+
returns(4) must be(p)
92+
}
93+
94+
"protest returning an item that was already returned" in {
95+
val resultFuture = p.giveBack(taken.head)
96+
97+
Await.result(resultFuture, Duration.Inf) must throwAn[IllegalStateException]
98+
}
99+
100+
"destroy down to maxIdle widgets" in {
101+
Thread.sleep(3000)
102+
there were 5.times(factory).destroy(any[Widget])
103+
}
104+
}
105+
106+
"queue requests after running out" in {
107+
val p = pool(conf = PoolConfiguration.Default.copy(maxObjects = 2, maxQueueSize = 1))
108+
109+
val widgets = Await.result(Future.sequence(for (i <- 1 to 2) yield p.take), Duration.Inf)
110+
111+
val future = p.take
112+
113+
// Wait five seconds
114+
Thread.sleep(5000)
115+
116+
val failedFuture = p.take
117+
118+
// Cannot be done, would exceed maxObjects
119+
future.isCompleted must beFalse
120+
121+
Await.result(failedFuture, Duration.Inf) must throwA[PoolExhaustedException]
122+
123+
Await.result(p.giveBack(widgets.head), Duration.Inf) must be(p)
124+
125+
Await.result(future, Duration(5, SECONDS)) must be(widgets.head)
126+
}
127+
128+
"refuse to allow take after being closed" in {
129+
val p = pool()
130+
131+
Await.result(p.close, Duration.Inf) must be(p)
132+
133+
Await.result(p.take, Duration.Inf) must throwA[PoolAlreadyTerminatedException]
134+
}
135+
136+
"allow being closed more than once" in {
137+
val p = pool()
138+
139+
Await.result(p.close, Duration.Inf) must be(p)
140+
141+
Await.result(p.close, Duration.Inf) must be(p)
142+
}
143+
144+
145+
"destroy a failed widget" in {
146+
val factory = spy(new TestWidgetFactory)
147+
val p = pool(factory = factory)
148+
149+
val widget = Await.result(p.take, Duration.Inf)
150+
151+
widget must not beNull
152+
153+
factory.validate(widget) returns Failure(new RuntimeException("This is a bad widget!"))
154+
155+
Await.result(p.giveBack(widget), Duration.Inf) must throwA[RuntimeException](message = "This is a bad widget!")
156+
157+
there was atLeastOne(factory).destroy(widget)
158+
}
159+
160+
"clean up widgets that die in the pool" in {
161+
val factory = spy(new TestWidgetFactory)
162+
// Deliberately make it impossible to expire (nearly)
163+
val p = pool(factory = factory, conf = PoolConfiguration.Default.copy(maxIdle = Long.MaxValue, validationInterval = 2000))
164+
165+
val widget = Await.result(p.take, Duration.Inf)
166+
167+
widget must not beNull
168+
169+
Await.result(p.giveBack(widget), Duration.Inf) must be(p)
170+
171+
there was atLeastOne(factory).validate(widget)
172+
there were no(factory).destroy(widget)
173+
174+
there was after(3.seconds).atLeastTwo(factory).validate(widget)
175+
176+
factory.validate(widget) returns Failure(new RuntimeException("Test Exception, Not an Error"))
177+
178+
there was after(3.seconds).one(factory).destroy(widget)
179+
180+
Await.ready(p.take, Duration.Inf)
181+
182+
there was two(factory).create
183+
}
184+
185+
}
186+
187+
}
188+
189+
object AbstractAsyncObjectPoolSpec {
190+
191+
case class Widget(factory: TestWidgetFactory)
192+
193+
class TestWidgetFactory extends ObjectFactory[Widget] {
194+
195+
override def create: Widget = Widget(this)
196+
197+
override def destroy(item: Widget) = {}
198+
199+
override def validate(item: Widget): Try[Widget] = Try {
200+
if (item.factory eq this)
201+
item
202+
else
203+
throw new IllegalArgumentException("Not our item")
204+
}
205+
}
206+
207+
}
208+
209+
210+
class SingleThreadedAsyncObjectPoolSpec extends AbstractAsyncObjectPoolSpec[SingleThreadedAsyncObjectPool[Widget]] {
211+
212+
import AbstractAsyncObjectPoolSpec._
213+
214+
override protected def pool(factory: ObjectFactory[Widget], conf: PoolConfiguration) =
215+
new SingleThreadedAsyncObjectPool(factory, conf)
216+
217+
"SingleThreadedAsyncObjectPool" should {
218+
"successfully record a closed state" in {
219+
val p = pool()
220+
221+
Await.result(p.close, Duration.Inf) must be(p)
222+
223+
p.isClosed must beTrue
224+
}
225+
226+
}
227+
228+
}

project/Build.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ object Configuration {
5555

5656
val specs2Dependency = "org.specs2" %% "specs2-core" % specs2Version % "test"
5757
val specs2JunitDependency = "org.specs2" %% "specs2-junit" % specs2Version % "test"
58+
val specs2MockDependency = "org.specs2" %% "specs2-mock" % specs2Version % "test"
5859
val logbackDependency = "ch.qos.logback" % "logback-classic" % "1.1.6" % "test"
5960

6061
val commonDependencies = Seq(
@@ -65,6 +66,7 @@ object Configuration {
6566
"org.javassist" % "javassist" % "3.20.0-GA",
6667
specs2Dependency,
6768
specs2JunitDependency,
69+
specs2MockDependency,
6870
logbackDependency
6971
)
7072

0 commit comments

Comments
 (0)