Skip to content

Commit 1ce6bdd

Browse files
committed
Parallel analyzer
op variant `median(t_ns)` <fct> <fct> <dbl> 1 Linker: Compute reachability main 323617749 2 Linker: Compute reachability parallel 163542533 3 Refiner: Compute reachability main 255269676 4 Refiner: Compute reachability parallel 170701844
1 parent 355ca93 commit 1ce6bdd

File tree

1 file changed

+21
-51
lines changed
  • linker/shared/src/main/scala/org/scalajs/linker/analyzer

1 file changed

+21
-51
lines changed

linker/shared/src/main/scala/org/scalajs/linker/analyzer/Analyzer.scala

Lines changed: 21 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
6565

6666
private[this] val _errors = new GrowingList[Error]
6767

68-
private var workQueue: WorkQueue = _
68+
private var workTracker: WorkTracker = _
6969

7070
private val fromAnalyzer = FromCore("analyzer")
7171

@@ -78,19 +78,20 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
7878

7979
infoLoader.update(logger)
8080

81-
workQueue = new WorkQueue(ec)
81+
workTracker = new WorkTracker
8282
classLoader = new ClassLoader
8383

8484
loadObjectClass(() => loadEverything(moduleInitializers, symbolRequirements))
8585

86-
workQueue.join()
87-
.map(_ => postLoad(moduleInitializers, logger))(ec)
86+
workTracker
87+
.future
88+
.map(_ => postLoad(moduleInitializers, logger))
8889
.andThen { case _ => infoLoader.cleanAfterRun() }
8990
}
9091

9192
private def resetState(): Unit = {
9293
objectClassInfo = null
93-
workQueue = null
94+
workTracker = null
9495
_errors.clear()
9596
classLoader = null
9697
_topLevelExportInfos.clear()
@@ -330,7 +331,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
330331

331332
private def lookupClass(className: ClassName)(
332333
onSuccess: ClassInfo => Unit)(implicit from: From): Unit = {
333-
workQueue.enqueue(classLoader.lookupClass(className)) {
334+
workTracker.track(classLoader.lookupClass(className)) {
334335
case info: ClassInfo =>
335336
info.link()
336337
onSuccess(info)
@@ -871,13 +872,13 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
871872
()
872873

873874
case onlyCandidate :: Nil =>
874-
// Fast path that does not require workQueue.enqueue
875+
// Fast path that does not require workTracker.track
875876
val proxy = createReflProxy(proxyName, onlyCandidate.methodName)
876877
onSuccess(proxy)
877878

878879
case _ =>
879880
val targetFuture = computeMostSpecificProxyMatch(candidates)
880-
workQueue.enqueue(targetFuture) { reflectiveTarget =>
881+
workTracker.track(targetFuture) { reflectiveTarget =>
881882
val proxy = createReflProxy(proxyName, reflectiveTarget.methodName)
882883
onSuccess(proxy)
883884
}
@@ -943,7 +944,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
943944

944945
// Starting here, we just do data juggling, so it can run on any thread.
945946
locally {
946-
implicit val iec = workQueue.ec
947+
implicit val iec = workTracker.ec
947948

948949
val hasMoreSpecific = Future.traverse(specificityChecks)(
949950
checks => Future.sequence(checks).map(_.contains(true)))
@@ -1566,59 +1567,28 @@ object Analyzer {
15661567
private val getSuperclassMethodName =
15671568
MethodName("getSuperclass", Nil, ClassRef(ClassClass))
15681569

1569-
private class WorkQueue(val ec: ExecutionContext) {
1570-
private val queue = new ConcurrentLinkedQueue[() => Unit]()
1571-
private val working = new AtomicBoolean(false)
1570+
private class WorkTracker(implicit val ec: ExecutionContext) {
15721571
private val pending = new AtomicInteger(0)
15731572
private val promise = Promise[Unit]()
15741573

1575-
def enqueue[T](fut: Future[T])(onSuccess: T => Unit): Unit = {
1574+
def track[T](fut: Future[T])(onSuccess: T => Unit): Unit = {
15761575
val got = pending.incrementAndGet()
15771576
assert(got > 0)
15781577

1579-
fut.onComplete {
1580-
case Success(r) =>
1581-
queue.add(() => onSuccess(r))
1582-
tryDoWork()
1583-
1584-
case Failure(t) =>
1585-
promise.tryFailure(t)
1586-
} (ec)
1587-
}
1588-
1589-
def join(): Future[Unit] = {
1590-
tryDoWork()
1591-
promise.future
1578+
fut.map(onSuccess).onComplete {
1579+
case Success(_) => taskDone()
1580+
case Failure(t) => promise.tryFailure(t)
1581+
}
15921582
}
15931583

1594-
@tailrec
1595-
private def tryDoWork(): Unit = {
1596-
if (!working.getAndSet(true)) {
1597-
while (!queue.isEmpty) {
1598-
try {
1599-
val work = queue.poll()
1600-
work()
1601-
} catch {
1602-
case t: Throwable => promise.tryFailure(t)
1603-
}
1604-
1605-
pending.decrementAndGet()
1606-
}
1607-
1608-
if (pending.compareAndSet(0, -1)) {
1609-
assert(queue.isEmpty)
1584+
private def taskDone(): Unit = {
1585+
if (pending.decrementAndGet() == 0) {
1586+
if (pending.compareAndSet(0, -1))
16101587
promise.trySuccess(())
1611-
}
1612-
1613-
working.set(false)
1614-
1615-
/* Another thread might have inserted work in the meantime but not yet
1616-
* seen that we released the lock. Try and work steal again if this
1617-
* happens.
1618-
*/
1619-
if (!queue.isEmpty) tryDoWork()
16201588
}
16211589
}
1590+
1591+
def future: Future[Unit] = promise.future
16221592
}
16231593

16241594
private final class GrowingList[A] {

0 commit comments

Comments
 (0)