Skip to content

Commit e9703fd

Browse files
authored
Add support for stop(cause:). (#388)
* Move `Async::Stop` to `lib/async/stop.rb` and implement compatibility for older Ruby versions.
1 parent 8af530a commit e9703fd

File tree

3 files changed

+140
-32
lines changed

3 files changed

+140
-32
lines changed

lib/async/stop.rb

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
# frozen_string_literal: true
2+
3+
# Released under the MIT License.
4+
# Copyright, 2025, by Samuel Williams.
5+
6+
require "fiber"
7+
require "console"
8+
9+
module Async
10+
# Raised when a task is explicitly stopped.
11+
class Stop < Exception
12+
# Represents the source of the stop operation.
13+
class Cause < Exception
14+
if RUBY_VERSION >= "3.4"
15+
# @returns [Array(Thread::Backtrace::Location)] The backtrace of the caller.
16+
def self.backtrace
17+
caller_locations(2..-1)
18+
end
19+
else
20+
# @returns [Array(String)] The backtrace of the caller.
21+
def self.backtrace
22+
caller(2..-1)
23+
end
24+
end
25+
26+
# Create a new cause of the stop operation, with the given message.
27+
#
28+
# @parameter message [String] The error message.
29+
# @returns [Cause] The cause of the stop operation.
30+
def self.for(message = "Task was stopped")
31+
instance = self.new(message)
32+
instance.set_backtrace(self.backtrace)
33+
return instance
34+
end
35+
end
36+
37+
if RUBY_VERSION < "3.5"
38+
# Create a new stop operation.
39+
#
40+
# This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise}
41+
#
42+
# @parameter message [String | Hash] The error message or a hash containing the cause.
43+
def initialize(message = "Task was stopped")
44+
if message.is_a?(Hash)
45+
@cause = message[:cause]
46+
message = "Task was stopped"
47+
end
48+
49+
super(message)
50+
end
51+
52+
# @returns [Exception] The cause of the stop operation.
53+
#
54+
# This is a compatibility method for Ruby versions before 3.5 where cause is not propagated correctly when using {Fiber#raise}, we explicitly capture the cause here.
55+
def cause
56+
super || @cause
57+
end
58+
end
59+
60+
# Used to defer stopping the current task until later.
61+
class Later
62+
# Create a new stop later operation.
63+
#
64+
# @parameter task [Task] The task to stop later.
65+
# @parameter cause [Exception] The cause of the stop operation.
66+
def initialize(task, cause = nil)
67+
@task = task
68+
@cause = cause
69+
end
70+
71+
# @returns [Boolean] Whether the task is alive.
72+
def alive?
73+
true
74+
end
75+
76+
# Transfer control to the operation - this will stop the task.
77+
def transfer
78+
@task.stop(false, cause: @cause)
79+
end
80+
end
81+
end
82+
end

lib/async/task.rb

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,33 +13,11 @@
1313

1414
require_relative "node"
1515
require_relative "condition"
16+
require_relative "stop"
1617

1718
Fiber.attr_accessor :async_task
1819

1920
module Async
20-
# Raised when a task is explicitly stopped.
21-
class Stop < Exception
22-
# Used to defer stopping the current task until later.
23-
class Later
24-
# Create a new stop later operation.
25-
#
26-
# @parameter task [Task] The task to stop later.
27-
def initialize(task)
28-
@task = task
29-
end
30-
31-
# @returns [Boolean] Whether the task is alive.
32-
def alive?
33-
true
34-
end
35-
36-
# Transfer control to the operation - this will stop the task.
37-
def transfer
38-
@task.stop
39-
end
40-
end
41-
end
42-
4321
# Raised if a timeout occurs on a specific Fiber. Handled gracefully by `Task`.
4422
# @public Since *Async v1*.
4523
class TimeoutError < StandardError
@@ -271,7 +249,13 @@ def wait
271249
# If `later` is false, it means that `stop` has been invoked directly. When `later` is true, it means that `stop` is invoked by `stop_children` or some other indirect mechanism. In that case, if we encounter the "current" fiber, we can't stop it right away, as it's currently performing `#stop`. Stopping it immediately would interrupt the current stop traversal, so we need to schedule the stop to occur later.
272250
#
273251
# @parameter later [Boolean] Whether to stop the task later, or immediately.
274-
def stop(later = false)
252+
# @parameter cause [Exception] The cause of the stop operation.
253+
def stop(later = false, cause: $!)
254+
# If no cause is given, we generate one from the current call stack:
255+
unless cause
256+
cause = Stop::Cause.for("Stopping task!")
257+
end
258+
275259
if self.stopped?
276260
# If the task is already stopped, a `stop` state transition re-enters the same state which is a no-op. However, we will also attempt to stop any running children too. This can happen if the children did not stop correctly the first time around. Doing this should probably be considered a bug, but it's better to be safe than sorry.
277261
return stopped!
@@ -285,27 +269,27 @@ def stop(later = false)
285269
# If we are deferring stop...
286270
if @defer_stop == false
287271
# Don't stop now... but update the state so we know we need to stop later.
288-
@defer_stop = true
272+
@defer_stop = cause
289273
return false
290274
end
291275

292276
if self.current?
293277
# If the fiber is current, and later is `true`, we need to schedule the fiber to be stopped later, as it's currently invoking `stop`:
294278
if later
295279
# If the fiber is the current fiber and we want to stop it later, schedule it:
296-
Fiber.scheduler.push(Stop::Later.new(self))
280+
Fiber.scheduler.push(Stop::Later.new(self, cause))
297281
else
298282
# Otherwise, raise the exception directly:
299-
raise Stop, "Stopping current task!"
283+
raise Stop, "Stopping current task!", cause: cause
300284
end
301285
else
302286
# If the fiber is not curent, we can raise the exception directly:
303287
begin
304288
# There is a chance that this will stop the fiber that originally called stop. If that happens, the exception handling in `#stopped` will rescue the exception and re-raise it later.
305-
Fiber.scheduler.raise(@fiber, Stop)
289+
Fiber.scheduler.raise(@fiber, Stop, cause: cause)
306290
rescue FiberError
307291
# In some cases, this can cause a FiberError (it might be resumed already), so we schedule it to be stopped later:
308-
Fiber.scheduler.push(Stop::Later.new(self))
292+
Fiber.scheduler.push(Stop::Later.new(self, cause))
309293
end
310294
end
311295
else
@@ -345,7 +329,7 @@ def defer_stop
345329

346330
# If we were asked to stop, we should do so now:
347331
if defer_stop
348-
raise Stop, "Stopping current task (was deferred)!"
332+
raise Stop, "Stopping current task (was deferred)!", cause: defer_stop
349333
end
350334
end
351335
else
@@ -356,7 +340,7 @@ def defer_stop
356340

357341
# @returns [Boolean] Whether stop has been deferred.
358342
def stop_deferred?
359-
@defer_stop
343+
!!@defer_stop
360344
end
361345

362346
# Lookup the {Task} for the current fiber. Raise `RuntimeError` if none is available.

test/async/task.rb

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,48 @@
552552
expect(transient).to be(:running?)
553553
end.wait
554554
end
555+
556+
it "can stop a task from within with a cause" do
557+
error = nil
558+
559+
cause = Async::Stop::Cause.for("boom")
560+
561+
task = reactor.async do |task|
562+
begin
563+
task.stop(cause: cause)
564+
rescue Async::Stop => error
565+
raise
566+
end
567+
end
568+
569+
reactor.run
570+
571+
expect(task).to be(:stopped?)
572+
expect(error).to be_a(Async::Stop)
573+
expect(error.cause).to be == cause
574+
end
575+
576+
it "can stop a task from outside with a cause" do
577+
error = nil
578+
579+
cause = RuntimeError.new("boom")
580+
581+
task = reactor.async do |task|
582+
begin
583+
task.yield
584+
rescue Async::Stop => error
585+
raise
586+
end
587+
end
588+
589+
task.stop(cause: cause)
590+
591+
reactor.run
592+
593+
expect(task).to be(:stopped?)
594+
expect(error).to be_a(Async::Stop)
595+
expect(error.cause).to be == cause
596+
end
555597
end
556598

557599
with "#sleep" do
@@ -923,7 +965,7 @@ def sleep_forever
923965

924966
reactor.run_once(0)
925967

926-
expect(child_task.stop_deferred?).to be == nil
968+
expect(child_task.stop_deferred?).to be == false
927969
end
928970
end
929971

0 commit comments

Comments
 (0)