@@ -65,7 +65,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
65
65
66
66
private [this ] val _errors = new AtomicReference [List [Error ]](Nil )
67
67
68
- private var workQueue : WorkQueue = _
68
+ private var workTracker : WorkTracker = _
69
69
70
70
private val fromAnalyzer = FromCore (" analyzer" )
71
71
@@ -78,13 +78,14 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
78
78
79
79
infoLoader.update(logger)
80
80
81
- workQueue = new WorkQueue (ec)
81
+ workTracker = new WorkTracker
82
82
classLoader = new ClassLoader
83
83
84
84
loadObjectClass(() => loadEverything(moduleInitializers, symbolRequirements))
85
85
86
- workQueue.join()
87
- .map(_ => postLoad(moduleInitializers, logger))(ec)
86
+ workTracker
87
+ .future
88
+ .map(_ => postLoad(moduleInitializers, logger))
88
89
.andThen { case _ => infoLoader.cleanAfterRun() }
89
90
}
90
91
@@ -93,7 +94,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
93
94
94
95
private def resetState (): Unit = {
95
96
objectClassInfo = null
96
- workQueue = null
97
+ workTracker = null
97
98
_errors.set(Nil )
98
99
classLoader = null
99
100
_topLevelExportInfos.clear()
@@ -331,7 +332,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
331
332
332
333
private def lookupClass (className : ClassName )(
333
334
onSuccess : ClassInfo => Unit )(implicit from : From ): Unit = {
334
- workQueue.enqueue (classLoader.lookupClass(className)) {
335
+ workTracker.track (classLoader.lookupClass(className)) {
335
336
case info : ClassInfo =>
336
337
info.link()
337
338
onSuccess(info)
@@ -862,13 +863,13 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
862
863
()
863
864
864
865
case onlyCandidate :: Nil =>
865
- // Fast path that does not require workQueue.enqueue
866
+ // Fast path that does not require workTracker.track
866
867
val proxy = createReflProxy(proxyName, onlyCandidate.methodName)
867
868
onSuccess(proxy)
868
869
869
870
case _ =>
870
871
val targetFuture = computeMostSpecificProxyMatch(candidates)
871
- workQueue.enqueue (targetFuture) { reflectiveTarget =>
872
+ workTracker.track (targetFuture) { reflectiveTarget =>
872
873
val proxy = createReflProxy(proxyName, reflectiveTarget.methodName)
873
874
onSuccess(proxy)
874
875
}
@@ -934,7 +935,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
934
935
935
936
// Starting here, we just do data juggling, so it can run on any thread.
936
937
locally {
937
- implicit val iec = workQueue .ec
938
+ implicit val iec = workTracker .ec
938
939
939
940
val hasMoreSpecific = Future .traverse(specificityChecks)(
940
941
checks => Future .sequence(checks).map(_.contains(true )))
@@ -1555,58 +1556,27 @@ object Analyzer {
1555
1556
private val getSuperclassMethodName =
1556
1557
MethodName (" getSuperclass" , Nil , ClassRef (ClassClass ))
1557
1558
1558
- private class WorkQueue (val ec : ExecutionContext ) {
1559
- private val queue = new ConcurrentLinkedQueue [() => Unit ]()
1560
- private val working = new AtomicBoolean (false )
1559
+ private class WorkTracker (implicit val ec : ExecutionContext ) {
1561
1560
private val pending = new AtomicInteger (0 )
1562
1561
private val promise = Promise [Unit ]()
1563
1562
1564
- def enqueue [T ](fut : Future [T ])(onSuccess : T => Unit ): Unit = {
1563
+ def track [T ](fut : Future [T ])(onSuccess : T => Unit ): Unit = {
1565
1564
val got = pending.incrementAndGet()
1566
1565
assert(got > 0 )
1567
1566
1568
- fut.onComplete {
1569
- case Success (r) =>
1570
- queue.add(() => onSuccess(r))
1571
- tryDoWork()
1572
-
1573
- case Failure (t) =>
1574
- promise.tryFailure(t)
1575
- } (ec)
1576
- }
1577
-
1578
- def join (): Future [Unit ] = {
1579
- tryDoWork()
1580
- promise.future
1567
+ fut.map(onSuccess).onComplete {
1568
+ case Success (_) => taskDone()
1569
+ case Failure (t) => promise.tryFailure(t)
1570
+ }
1581
1571
}
1582
1572
1583
- @ tailrec
1584
- private def tryDoWork (): Unit = {
1585
- if (! working.getAndSet(true )) {
1586
- while (! queue.isEmpty) {
1587
- try {
1588
- val work = queue.poll()
1589
- work()
1590
- } catch {
1591
- case t : Throwable => promise.tryFailure(t)
1592
- }
1593
-
1594
- pending.decrementAndGet()
1595
- }
1596
-
1597
- if (pending.compareAndSet(0 , - 1 )) {
1598
- assert(queue.isEmpty)
1573
+ private def taskDone (): Unit = {
1574
+ if (pending.decrementAndGet() == 0 ) {
1575
+ if (pending.compareAndSet(0 , - 1 ))
1599
1576
promise.trySuccess(())
1600
- }
1601
-
1602
- working.set(false )
1603
-
1604
- /* Another thread might have inserted work in the meantime but not yet
1605
- * seen that we released the lock. Try and work steal again if this
1606
- * happens.
1607
- */
1608
- if (! queue.isEmpty) tryDoWork()
1609
1577
}
1610
1578
}
1579
+
1580
+ def future : Future [Unit ] = promise.future
1611
1581
}
1612
1582
}
0 commit comments