Skip to content

Commit f7b5b31

Browse files
committed
Merge pull request scala#1962 from phaller/issue/6932-2.9.x
Backport of SI-6932 to 2.9.x
2 parents 53d4ec0 + 4897063 commit f7b5b31

File tree

4 files changed

+128
-3
lines changed

4 files changed

+128
-3
lines changed

build.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,6 +401,7 @@ LOCAL REFERENCE BUILD (LOCKER)
401401
<exclude name="scala/concurrent/Promise.scala"/>
402402
<exclude name="scala/concurrent/ExecutionContext.scala"/>
403403
<exclude name="scala/concurrent/BlockContext.scala"/>
404+
<exclude name="scala/concurrent/BatchingExecutor.scala"/>
404405
<exclude name="scala/concurrent/impl/Future.scala"/>
405406
<exclude name="scala/concurrent/impl/Promise.scala"/>
406407
<exclude name="scala/concurrent/impl/ExecutionContextImpl.scala"/>
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/* __ *\
2+
** ________ ___ / / ___ Scala API **
3+
** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
4+
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
5+
** /____/\___/_/ |_/____/_/ | | **
6+
** |/ **
7+
\* */
8+
9+
package scala.concurrent
10+
11+
import java.util.concurrent.Executor
12+
import scala.annotation.tailrec
13+
14+
/**
15+
* Mixin trait for an Executor
16+
* which groups multiple nested `Runnable.run()` calls
17+
* into a single Runnable passed to the original
18+
* Executor. This can be a useful optimization
19+
* because it bypasses the original context's task
20+
* queue and keeps related (nested) code on a single
21+
* thread which may improve CPU affinity. However,
22+
* if tasks passed to the Executor are blocking
23+
* or expensive, this optimization can prevent work-stealing
24+
* and make performance worse. Also, some ExecutionContext
25+
* may be fast enough natively that this optimization just
26+
* adds overhead.
27+
* The default ExecutionContext.global is already batching
28+
* or fast enough not to benefit from it; while
29+
* `fromExecutor` and `fromExecutorService` do NOT add
30+
* this optimization since they don't know whether the underlying
31+
* executor will benefit from it.
32+
* A batching executor can create deadlocks if code does
33+
* not use `scala.concurrent.blocking` when it should,
34+
* because tasks created within other tasks will block
35+
* on the outer task completing.
36+
* This executor may run tasks in any order, including LIFO order.
37+
* There are no ordering guarantees.
38+
*
39+
* WARNING: The underlying Executor's execute-method must not execute the submitted Runnable
40+
* in the calling thread synchronously. It must enqueue/handoff the Runnable.
41+
*/
42+
private[concurrent] trait BatchingExecutor extends Executor {
43+
44+
// invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside
45+
private val _tasksLocal = new ThreadLocal[List[Runnable]]()
46+
47+
private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext {
48+
private var parentBlockContext: BlockContext = _
49+
// this method runs in the delegate ExecutionContext's thread
50+
override def run(): Unit = {
51+
require(_tasksLocal.get eq null)
52+
53+
val prevBlockContext = BlockContext.current
54+
BlockContext.withBlockContext(this) {
55+
try {
56+
parentBlockContext = prevBlockContext
57+
58+
@tailrec def processBatch(batch: List[Runnable]): Unit = batch match {
59+
case Nil => ()
60+
case head :: tail =>
61+
_tasksLocal set tail
62+
try {
63+
head.run()
64+
} catch {
65+
case t: Throwable =>
66+
// if one task throws, move the
67+
// remaining tasks to another thread
68+
// so we can throw the exception
69+
// up to the invoking executor
70+
val remaining = _tasksLocal.get
71+
_tasksLocal set Nil
72+
unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails?
73+
throw t // rethrow
74+
}
75+
processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here
76+
}
77+
78+
processBatch(initial)
79+
} finally {
80+
_tasksLocal.remove()
81+
parentBlockContext = null
82+
}
83+
}
84+
}
85+
86+
override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = {
87+
// if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock.
88+
{
89+
val tasks = _tasksLocal.get
90+
_tasksLocal set Nil
91+
if ((tasks ne null) && tasks.nonEmpty)
92+
unbatchedExecute(new Batch(tasks))
93+
}
94+
95+
// now delegate the blocking to the previous BC
96+
require(parentBlockContext ne null)
97+
parentBlockContext.blockOn(thunk)
98+
}
99+
}
100+
101+
protected def unbatchedExecute(r: Runnable): Unit
102+
103+
override def execute(runnable: Runnable): Unit = {
104+
if (batchable(runnable)) { // If we can batch the runnable
105+
_tasksLocal.get match {
106+
case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch
107+
case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch
108+
}
109+
} else unbatchedExecute(runnable) // If not batchable, just delegate to underlying
110+
}
111+
112+
/** Override this to define which runnables will be batched. */
113+
def batchable(runnable: Runnable): Boolean = runnable match {
114+
case _: OnCompleteRunnable => true
115+
case _ => false
116+
}
117+
}

src/library/scala/concurrent/Future.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -672,9 +672,9 @@ object Future {
672672
// by just not ever using it itself. scala.concurrent
673673
// doesn't need to create defaultExecutionContext as
674674
// a side effect.
675-
private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
676-
override def execute(runnable: Runnable): Unit =
677-
runnable.run()
675+
private[concurrent] object InternalCallbackExecutor extends ExecutionContext with BatchingExecutor {
676+
override protected def unbatchedExecute(r: Runnable): Unit =
677+
r.run()
678678
override def reportFailure(t: Throwable): Unit =
679679
throw new IllegalStateException("problem in scala.concurrent internal callback", t)
680680
}

test/files/jvm/scala-concurrent-tck.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,12 @@ trait FutureCallbacks extends TestBase {
141141
assert(false)
142142
}
143143
}
144+
145+
def testThatNestedCallbacksDoNotYieldStackOverflow(): Unit = {
146+
val promise = Promise[Int]
147+
(0 to 10000).map(Future(_)).foldLeft(promise.future)((f1, f2) => f2.flatMap(i => f1))
148+
promise.success(-1)
149+
}
144150

145151
testOnSuccess()
146152
testOnSuccessWhenCompleted()
@@ -150,6 +156,7 @@ trait FutureCallbacks extends TestBase {
150156
// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { })
151157
//TODO: this test is currently problematic, because NonFatal does not match InterruptedException
152158
//testOnFailureWhenSpecialThrowable(7, new InterruptedException)
159+
testThatNestedCallbacksDoNotYieldStackOverflow()
153160
testOnFailureWhenTimeoutException()
154161

155162
}

0 commit comments

Comments
 (0)