|
| 1 | +/* __ *\ |
| 2 | +** ________ ___ / / ___ Scala API ** |
| 3 | +** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL ** |
| 4 | +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** |
| 5 | +** /____/\___/_/ |_/____/_/ | | ** |
| 6 | +** |/ ** |
| 7 | +\* */ |
| 8 | + |
| 9 | +package scala.concurrent |
| 10 | + |
| 11 | +import java.util.concurrent.Executor |
| 12 | +import scala.annotation.tailrec |
| 13 | + |
| 14 | +/** |
| 15 | + * Mixin trait for an Executor |
| 16 | + * which groups multiple nested `Runnable.run()` calls |
| 17 | + * into a single Runnable passed to the original |
| 18 | + * Executor. This can be a useful optimization |
| 19 | + * because it bypasses the original context's task |
| 20 | + * queue and keeps related (nested) code on a single |
| 21 | + * thread which may improve CPU affinity. However, |
| 22 | + * if tasks passed to the Executor are blocking |
| 23 | + * or expensive, this optimization can prevent work-stealing |
| 24 | + * and make performance worse. Also, some ExecutionContext |
| 25 | + * may be fast enough natively that this optimization just |
| 26 | + * adds overhead. |
| 27 | + * The default ExecutionContext.global is already batching |
| 28 | + * or fast enough not to benefit from it; while |
| 29 | + * `fromExecutor` and `fromExecutorService` do NOT add |
| 30 | + * this optimization since they don't know whether the underlying |
| 31 | + * executor will benefit from it. |
| 32 | + * A batching executor can create deadlocks if code does |
| 33 | + * not use `scala.concurrent.blocking` when it should, |
| 34 | + * because tasks created within other tasks will block |
| 35 | + * on the outer task completing. |
| 36 | + * This executor may run tasks in any order, including LIFO order. |
| 37 | + * There are no ordering guarantees. |
| 38 | + * |
| 39 | + * WARNING: The underlying Executor's execute-method must not execute the submitted Runnable |
| 40 | + * in the calling thread synchronously. It must enqueue/handoff the Runnable. |
| 41 | + */ |
| 42 | +private[concurrent] trait BatchingExecutor extends Executor { |
| 43 | + |
| 44 | + // invariant: if "_tasksLocal.get ne null" then we are inside BatchingRunnable.run; if it is null, we are outside |
| 45 | + private val _tasksLocal = new ThreadLocal[List[Runnable]]() |
| 46 | + |
| 47 | + private class Batch(val initial: List[Runnable]) extends Runnable with BlockContext { |
| 48 | + private var parentBlockContext: BlockContext = _ |
| 49 | + // this method runs in the delegate ExecutionContext's thread |
| 50 | + override def run(): Unit = { |
| 51 | + require(_tasksLocal.get eq null) |
| 52 | + |
| 53 | + val prevBlockContext = BlockContext.current |
| 54 | + BlockContext.withBlockContext(this) { |
| 55 | + try { |
| 56 | + parentBlockContext = prevBlockContext |
| 57 | + |
| 58 | + @tailrec def processBatch(batch: List[Runnable]): Unit = batch match { |
| 59 | + case Nil => () |
| 60 | + case head :: tail => |
| 61 | + _tasksLocal set tail |
| 62 | + try { |
| 63 | + head.run() |
| 64 | + } catch { |
| 65 | + case t: Throwable => |
| 66 | + // if one task throws, move the |
| 67 | + // remaining tasks to another thread |
| 68 | + // so we can throw the exception |
| 69 | + // up to the invoking executor |
| 70 | + val remaining = _tasksLocal.get |
| 71 | + _tasksLocal set Nil |
| 72 | + unbatchedExecute(new Batch(remaining)) //TODO what if this submission fails? |
| 73 | + throw t // rethrow |
| 74 | + } |
| 75 | + processBatch(_tasksLocal.get) // since head.run() can add entries, always do _tasksLocal.get here |
| 76 | + } |
| 77 | + |
| 78 | + processBatch(initial) |
| 79 | + } finally { |
| 80 | + _tasksLocal.remove() |
| 81 | + parentBlockContext = null |
| 82 | + } |
| 83 | + } |
| 84 | + } |
| 85 | + |
| 86 | + override def blockOn[T](thunk: => T)(implicit permission: CanAwait): T = { |
| 87 | + // if we know there will be blocking, we don't want to keep tasks queued up because it could deadlock. |
| 88 | + { |
| 89 | + val tasks = _tasksLocal.get |
| 90 | + _tasksLocal set Nil |
| 91 | + if ((tasks ne null) && tasks.nonEmpty) |
| 92 | + unbatchedExecute(new Batch(tasks)) |
| 93 | + } |
| 94 | + |
| 95 | + // now delegate the blocking to the previous BC |
| 96 | + require(parentBlockContext ne null) |
| 97 | + parentBlockContext.blockOn(thunk) |
| 98 | + } |
| 99 | + } |
| 100 | + |
| 101 | + protected def unbatchedExecute(r: Runnable): Unit |
| 102 | + |
| 103 | + override def execute(runnable: Runnable): Unit = { |
| 104 | + if (batchable(runnable)) { // If we can batch the runnable |
| 105 | + _tasksLocal.get match { |
| 106 | + case null => unbatchedExecute(new Batch(List(runnable))) // If we aren't in batching mode yet, enqueue batch |
| 107 | + case some => _tasksLocal.set(runnable :: some) // If we are already in batching mode, add to batch |
| 108 | + } |
| 109 | + } else unbatchedExecute(runnable) // If not batchable, just delegate to underlying |
| 110 | + } |
| 111 | + |
| 112 | + /** Override this to define which runnables will be batched. */ |
| 113 | + def batchable(runnable: Runnable): Boolean = runnable match { |
| 114 | + case _: OnCompleteRunnable => true |
| 115 | + case _ => false |
| 116 | + } |
| 117 | +} |
0 commit comments