@@ -65,7 +65,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
65
65
66
66
private [this ] val _errors = new GrowingList [Error ]
67
67
68
- private var workQueue : WorkQueue = _
68
+ private var workTracker : WorkTracker = _
69
69
70
70
private val fromAnalyzer = FromCore (" analyzer" )
71
71
@@ -78,19 +78,20 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
78
78
79
79
infoLoader.update(logger)
80
80
81
- workQueue = new WorkQueue (ec)
81
+ workTracker = new WorkTracker
82
82
classLoader = new ClassLoader
83
83
84
84
loadObjectClass(() => loadEverything(moduleInitializers, symbolRequirements))
85
85
86
- workQueue.join()
87
- .map(_ => postLoad(moduleInitializers, logger))(ec)
86
+ workTracker
87
+ .future
88
+ .map(_ => postLoad(moduleInitializers, logger))
88
89
.andThen { case _ => infoLoader.cleanAfterRun() }
89
90
}
90
91
91
92
private def resetState (): Unit = {
92
93
objectClassInfo = null
93
- workQueue = null
94
+ workTracker = null
94
95
_errors.clear()
95
96
classLoader = null
96
97
_topLevelExportInfos.clear()
@@ -330,7 +331,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
330
331
331
332
private def lookupClass (className : ClassName )(
332
333
onSuccess : ClassInfo => Unit )(implicit from : From ): Unit = {
333
- workQueue.enqueue (classLoader.lookupClass(className)) {
334
+ workTracker.track (classLoader.lookupClass(className)) {
334
335
case info : ClassInfo =>
335
336
info.link()
336
337
onSuccess(info)
@@ -871,13 +872,13 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
871
872
()
872
873
873
874
case onlyCandidate :: Nil =>
874
- // Fast path that does not require workQueue.enqueue
875
+ // Fast path that does not require workTracker.track
875
876
val proxy = createReflProxy(proxyName, onlyCandidate.methodName)
876
877
onSuccess(proxy)
877
878
878
879
case _ =>
879
880
val targetFuture = computeMostSpecificProxyMatch(candidates)
880
- workQueue.enqueue (targetFuture) { reflectiveTarget =>
881
+ workTracker.track (targetFuture) { reflectiveTarget =>
881
882
val proxy = createReflProxy(proxyName, reflectiveTarget.methodName)
882
883
onSuccess(proxy)
883
884
}
@@ -943,7 +944,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
943
944
944
945
// Starting here, we just do data juggling, so it can run on any thread.
945
946
locally {
946
- implicit val iec = workQueue .ec
947
+ implicit val iec = workTracker .ec
947
948
948
949
val hasMoreSpecific = Future .traverse(specificityChecks)(
949
950
checks => Future .sequence(checks).map(_.contains(true )))
@@ -1566,59 +1567,28 @@ object Analyzer {
1566
1567
private val getSuperclassMethodName =
1567
1568
MethodName (" getSuperclass" , Nil , ClassRef (ClassClass ))
1568
1569
1569
- private class WorkQueue (val ec : ExecutionContext ) {
1570
- private val queue = new ConcurrentLinkedQueue [() => Unit ]()
1571
- private val working = new AtomicBoolean (false )
1570
+ private class WorkTracker (implicit val ec : ExecutionContext ) {
1572
1571
private val pending = new AtomicInteger (0 )
1573
1572
private val promise = Promise [Unit ]()
1574
1573
1575
- def enqueue [T ](fut : Future [T ])(onSuccess : T => Unit ): Unit = {
1574
+ def track [T ](fut : Future [T ])(onSuccess : T => Unit ): Unit = {
1576
1575
val got = pending.incrementAndGet()
1577
1576
assert(got > 0 )
1578
1577
1579
- fut.onComplete {
1580
- case Success (r) =>
1581
- queue.add(() => onSuccess(r))
1582
- tryDoWork()
1583
-
1584
- case Failure (t) =>
1585
- promise.tryFailure(t)
1586
- } (ec)
1587
- }
1588
-
1589
- def join (): Future [Unit ] = {
1590
- tryDoWork()
1591
- promise.future
1578
+ fut.map(onSuccess).onComplete {
1579
+ case Success (_) => taskDone()
1580
+ case Failure (t) => promise.tryFailure(t)
1581
+ }
1592
1582
}
1593
1583
1594
- @ tailrec
1595
- private def tryDoWork (): Unit = {
1596
- if (! working.getAndSet(true )) {
1597
- while (! queue.isEmpty) {
1598
- try {
1599
- val work = queue.poll()
1600
- work()
1601
- } catch {
1602
- case t : Throwable => promise.tryFailure(t)
1603
- }
1604
-
1605
- pending.decrementAndGet()
1606
- }
1607
-
1608
- if (pending.compareAndSet(0 , - 1 )) {
1609
- assert(queue.isEmpty)
1584
+ private def taskDone (): Unit = {
1585
+ if (pending.decrementAndGet() == 0 ) {
1586
+ if (pending.compareAndSet(0 , - 1 ))
1610
1587
promise.trySuccess(())
1611
- }
1612
-
1613
- working.set(false )
1614
-
1615
- /* Another thread might have inserted work in the meantime but not yet
1616
- * seen that we released the lock. Try and work steal again if this
1617
- * happens.
1618
- */
1619
- if (! queue.isEmpty) tryDoWork()
1620
1588
}
1621
1589
}
1590
+
1591
+ def future : Future [Unit ] = promise.future
1622
1592
}
1623
1593
1624
1594
private final class GrowingList [A ] {
0 commit comments