Skip to content

Commit 41f178d

Browse files
committed
Parallel analyzer
op variant `median(t_ns)` <fct> <fct> <dbl> 1 Linker: Compute reachability main 323617749 2 Linker: Compute reachability parallel 163542533 3 Refiner: Compute reachability main 255269676 4 Refiner: Compute reachability parallel 170701844
1 parent 8885107 commit 41f178d

File tree

1 file changed

+21
-51
lines changed
  • linker/shared/src/main/scala/org/scalajs/linker/analyzer

1 file changed

+21
-51
lines changed

linker/shared/src/main/scala/org/scalajs/linker/analyzer/Analyzer.scala

Lines changed: 21 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
6565

6666
private[this] val _errors = new AtomicReference[List[Error]](Nil)
6767

68-
private var workQueue: WorkQueue = _
68+
private var workTracker: WorkTracker = _
6969

7070
private val fromAnalyzer = FromCore("analyzer")
7171

@@ -78,13 +78,14 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
7878

7979
infoLoader.update(logger)
8080

81-
workQueue = new WorkQueue(ec)
81+
workTracker = new WorkTracker
8282
classLoader = new ClassLoader
8383

8484
loadObjectClass(() => loadEverything(moduleInitializers, symbolRequirements))
8585

86-
workQueue.join()
87-
.map(_ => postLoad(moduleInitializers, logger))(ec)
86+
workTracker
87+
.future
88+
.map(_ => postLoad(moduleInitializers, logger))
8889
.andThen { case _ => infoLoader.cleanAfterRun() }
8990
}
9091

@@ -93,7 +94,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
9394

9495
private def resetState(): Unit = {
9596
objectClassInfo = null
96-
workQueue = null
97+
workTracker = null
9798
_errors.set(Nil)
9899
classLoader = null
99100
_topLevelExportInfos.clear()
@@ -331,7 +332,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
331332

332333
private def lookupClass(className: ClassName)(
333334
onSuccess: ClassInfo => Unit)(implicit from: From): Unit = {
334-
workQueue.enqueue(classLoader.lookupClass(className)) {
335+
workTracker.track(classLoader.lookupClass(className)) {
335336
case info: ClassInfo =>
336337
info.link()
337338
onSuccess(info)
@@ -862,13 +863,13 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
862863
()
863864

864865
case onlyCandidate :: Nil =>
865-
// Fast path that does not require workQueue.enqueue
866+
// Fast path that does not require workTracker.track
866867
val proxy = createReflProxy(proxyName, onlyCandidate.methodName)
867868
onSuccess(proxy)
868869

869870
case _ =>
870871
val targetFuture = computeMostSpecificProxyMatch(candidates)
871-
workQueue.enqueue(targetFuture) { reflectiveTarget =>
872+
workTracker.track(targetFuture) { reflectiveTarget =>
872873
val proxy = createReflProxy(proxyName, reflectiveTarget.methodName)
873874
onSuccess(proxy)
874875
}
@@ -934,7 +935,7 @@ final class Analyzer(config: CommonPhaseConfig, initial: Boolean,
934935

935936
// Starting here, we just do data juggling, so it can run on any thread.
936937
locally {
937-
implicit val iec = workQueue.ec
938+
implicit val iec = workTracker.ec
938939

939940
val hasMoreSpecific = Future.traverse(specificityChecks)(
940941
checks => Future.sequence(checks).map(_.contains(true)))
@@ -1555,58 +1556,27 @@ object Analyzer {
15551556
private val getSuperclassMethodName =
15561557
MethodName("getSuperclass", Nil, ClassRef(ClassClass))
15571558

1558-
private class WorkQueue(val ec: ExecutionContext) {
1559-
private val queue = new ConcurrentLinkedQueue[() => Unit]()
1560-
private val working = new AtomicBoolean(false)
1559+
private class WorkTracker(implicit val ec: ExecutionContext) {
15611560
private val pending = new AtomicInteger(0)
15621561
private val promise = Promise[Unit]()
15631562

1564-
def enqueue[T](fut: Future[T])(onSuccess: T => Unit): Unit = {
1563+
def track[T](fut: Future[T])(onSuccess: T => Unit): Unit = {
15651564
val got = pending.incrementAndGet()
15661565
assert(got > 0)
15671566

1568-
fut.onComplete {
1569-
case Success(r) =>
1570-
queue.add(() => onSuccess(r))
1571-
tryDoWork()
1572-
1573-
case Failure(t) =>
1574-
promise.tryFailure(t)
1575-
} (ec)
1576-
}
1577-
1578-
def join(): Future[Unit] = {
1579-
tryDoWork()
1580-
promise.future
1567+
fut.map(onSuccess).onComplete {
1568+
case Success(_) => taskDone()
1569+
case Failure(t) => promise.tryFailure(t)
1570+
}
15811571
}
15821572

1583-
@tailrec
1584-
private def tryDoWork(): Unit = {
1585-
if (!working.getAndSet(true)) {
1586-
while (!queue.isEmpty) {
1587-
try {
1588-
val work = queue.poll()
1589-
work()
1590-
} catch {
1591-
case t: Throwable => promise.tryFailure(t)
1592-
}
1593-
1594-
pending.decrementAndGet()
1595-
}
1596-
1597-
if (pending.compareAndSet(0, -1)) {
1598-
assert(queue.isEmpty)
1573+
private def taskDone(): Unit = {
1574+
if (pending.decrementAndGet() == 0) {
1575+
if (pending.compareAndSet(0, -1))
15991576
promise.trySuccess(())
1600-
}
1601-
1602-
working.set(false)
1603-
1604-
/* Another thread might have inserted work in the meantime but not yet
1605-
* seen that we released the lock. Try and work steal again if this
1606-
* happens.
1607-
*/
1608-
if (!queue.isEmpty) tryDoWork()
16091577
}
16101578
}
1579+
1580+
def future: Future[Unit] = promise.future
16111581
}
16121582
}

0 commit comments

Comments
 (0)