Skip to content

Commit 727a2fc

Browse files
authored
Show running thread in async registry (#21776)
1 parent 9af8f2b commit 727a2fc

File tree

7 files changed

+116
-34
lines changed

7 files changed

+116
-34
lines changed

CHANGELOG

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
devel
22
-----
33

4+
* Fix a possible crash when requesting the async registry:
5+
Does now not request the thread name of a potentially destroyed thread.
6+
47
* Add detection of two threads working on an ExecutionBlock concurrently.
58

69
* Add detection of two threads waiting for a PrefetchTask concurrently.

arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/asyncregistry/gdb_data.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ class Thread:
2626
# TODO is there a way to get the thread name?
2727

2828
@classmethod
29-
def from_gdb(cls, value: gdb.Value):
29+
def from_gdb(cls, value: gdb.Value | None):
30+
if not value:
31+
return None
3032
return cls(value['posix_id'], value['kernel_id'])
3133

3234
def __str__(self):
33-
return "thread " + str(self.lwpid)
35+
return f"LWPID {self.lwpid} (pthread {self.posix_id})"
3436

3537
@dataclass
3638
class SourceLocation:
@@ -79,7 +81,7 @@ def __str__(self):
7981
@dataclass
8082
class Promise:
8183
id: PromiseId
82-
thread: Thread
84+
thread: Optional[Thread]
8385
source_location: SourceLocation
8486
requester: Requester
8587
state: State
@@ -88,7 +90,7 @@ class Promise:
8890
def from_gdb(cls, ptr: gdb.Value, value: gdb.Value):
8991
return cls(
9092
PromiseId(ptr),
91-
Thread.from_gdb(value["thread"]),
93+
Thread.from_gdb(GdbOptional.from_gdb(value["running_thread"])._value),
9294
SourceLocation.from_gdb(value["source_location"]),
9395
Requester.from_gdb(value["requester"]["_M_i"]),
9496
State.from_gdb(value["state"])
@@ -98,8 +100,22 @@ def is_valid(self):
98100
return not self.state.is_deleted()
99101

100102
def __str__(self):
101-
return str(self.source_location) + ", " + str(self.thread) + ", " + str(self.state)
103+
thread_str = f" on {self.thread}" if self.thread else ""
104+
return str(self.source_location) + ", " + str(self.state) + thread_str
105+
106+
@dataclass
107+
class GdbOptional:
108+
_value: Optional[gdb.Value]
102109

110+
@classmethod
111+
def from_gdb(cls, value: gdb.Value):
112+
payload = value["_M_i"]["_M_payload"]
113+
engaged = payload["_M_engaged"]
114+
if not engaged:
115+
return cls(None)
116+
internal_value = payload["_M_payload"]["_M_value"]
117+
return cls(internal_value)
118+
103119
@dataclass
104120
class GdbAtomicList:
105121
_head_ptr: gdb.Value

arangod/SystemMonitor/AsyncRegistry/PrettyPrinter/src/pretty-printer.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
from asyncregistry.stacktrace import Stacktrace
1616

1717
class Thread(object):
18-
def __init__(self, name: str, id: int):
19-
self.name = name
18+
def __init__(self, id: int, posix_id: int):
2019
self.id = id
20+
self.posix_id = posix_id
2121
@classmethod
2222
def from_json(cls, blob: dict):
23-
return cls(blob["name"], blob["LWPID"])
23+
return cls(blob["LWPID"], blob["posix_id"])
2424
def __str__(self):
25-
return self.name + "(" + str(self.id) + ")"
25+
return f"LWPID {self.id} (pthread {self.posix_id})"
2626

2727
class SourceLocation(object):
2828
def __init__(self, file_name: str, line: int, function_name: str):
@@ -58,18 +58,19 @@ def __str__(self):
5858
return ""
5959

6060
class Data(object):
61-
def __init__(self, owning_thread: Thread, source_location: SourceLocation, id: int, state: str, requester: Requester):
62-
self.owning_thread = owning_thread
61+
def __init__(self, running_thread: Optional[Thread], source_location: SourceLocation, id: int, state: str, requester: Requester):
62+
self.running_thread = running_thread
6363
self.source_location = source_location
6464
self.id = id
6565
self.waiter = requester
6666
self.state = state
6767
@classmethod
6868
def from_json(cls, blob: dict):
69-
return cls(Thread.from_json(blob["owning_thread"]), SourceLocation.from_json(blob["source_location"]), blob["id"], blob["state"], Requester.from_json(blob["requester"]))
69+
return cls(Thread.from_json(blob["running_thread"]) if "running_thread" in blob else None, SourceLocation.from_json(blob["source_location"]), blob["id"], blob["state"], Requester.from_json(blob["requester"]))
7070
def __str__(self):
7171
waiter_str = str(self.waiter) if self.waiter != None else ""
72-
return str(self.source_location) + ", " + str(self.owning_thread) + ", " + self.state + waiter_str
72+
thread_str = f" on {self.running_thread}" if self.running_thread else ""
73+
return str(self.source_location) + ", " + self.state + thread_str + waiter_str
7374

7475
class Promise(object):
7576
def __init__(self, hierarchy: int, data: Data):

lib/Async/Registry/promise.cpp

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
using namespace arangodb::async_registry;
3030

3131
Promise::Promise(Requester requester, std::source_location entry_point)
32-
: thread{basics::ThreadId::current()},
32+
: owning_thread{basics::ThreadId::current()},
33+
requester{requester},
34+
state{State::Running},
35+
running_thread{basics::ThreadId::current()},
3336
source_location{entry_point.file_name(), entry_point.function_name(),
34-
entry_point.line()},
35-
requester{requester} {}
37+
entry_point.line()} {}
3638

3739
auto arangodb::async_registry::get_current_coroutine() noexcept -> Requester* {
3840
struct Guard {
@@ -73,6 +75,13 @@ auto AddToAsyncRegistry::update_source_location(std::source_location loc)
7375
}
7476
auto AddToAsyncRegistry::update_state(State state) -> std::optional<State> {
7577
if (node_in_registry != nullptr) {
78+
if (state == State::Running) {
79+
node_in_registry->data.running_thread.store(basics::ThreadId::current(),
80+
std::memory_order_release);
81+
} else {
82+
node_in_registry->data.running_thread.store(std::nullopt,
83+
std::memory_order_release);
84+
}
7685
return node_in_registry->data.state.exchange(state);
7786
} else {
7887
return std::nullopt;

lib/Async/Registry/promise.h

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,18 @@ auto inspect(Inspector& f, Requester& x) {
102102

103103
struct PromiseSnapshot {
104104
void* id;
105-
basics::ThreadId thread;
106-
basics::SourceLocationSnapshot source_location;
107105
Requester requester;
108106
State state;
107+
std::optional<basics::ThreadId> thread;
108+
basics::SourceLocationSnapshot source_location;
109109
bool operator==(PromiseSnapshot const&) const = default;
110110
};
111111
template<typename Inspector>
112112
auto inspect(Inspector& f, PromiseSnapshot& x) {
113-
return f.object(x).fields(f.field("owning_thread", x.thread),
114-
f.field("source_location", x.source_location),
115-
f.field("id", fmt::format("{}", x.id)),
116-
f.field("requester", x.requester),
117-
f.field("state", x.state));
113+
return f.object(x).fields(
114+
f.field("id", fmt::format("{}", x.id)), f.field("requester", x.requester),
115+
f.field("state", x.state), f.field("running_thread", x.thread),
116+
f.field("source_location", x.source_location));
118117
}
119118

120119
/**
@@ -127,20 +126,22 @@ struct Promise {
127126

128127
auto id() -> void* { return this; }
129128
auto snapshot() -> Snapshot {
130-
return PromiseSnapshot{.id = id(),
131-
.thread = thread,
132-
.source_location = source_location.snapshot(),
133-
.requester = requester.load(),
134-
.state = state.load()};
129+
return PromiseSnapshot{
130+
.id = id(),
131+
.requester = requester.load(),
132+
.state = state.load(),
133+
.thread = running_thread.load(std::memory_order_acquire),
134+
.source_location = source_location.snapshot()};
135135
}
136136
auto set_to_deleted() -> void {
137137
state.store(State::Deleted, std::memory_order_relaxed);
138138
}
139139

140-
basics::ThreadId thread;
141-
basics::VariableSourceLocation source_location;
140+
basics::ThreadId owning_thread;
142141
std::atomic<Requester> requester;
143142
std::atomic<State> state = State::Running;
143+
std::atomic<std::optional<basics::ThreadId>> running_thread;
144+
basics::VariableSourceLocation source_location;
144145
};
145146

146147
/**

lib/Containers/Concurrent/thread.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ struct ThreadId {
3737
template<typename Inspector>
3838
auto inspect(Inspector& f, ThreadId& x) {
3939
return f.object(x).fields(f.field("LWPID", x.kernel_id),
40-
f.field("name", x.name()));
40+
f.field("posix_id", x.posix_id));
4141
}
4242

4343
} // namespace arangodb::basics

tests/Async/Registry/RegistryTest.cpp

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
////////////////////////////////////////////////////////////////////////////////
2323
#include "Async/Registry/promise.h"
2424
#include "Async/Registry/registry_variable.h"
25+
#include "thread.h"
2526

2627
#include <gtest/gtest.h>
28+
#include <optional>
2729
#include <source_location>
2830
#include <thread>
2931

@@ -48,10 +50,10 @@ struct MyPromise : public AddToAsyncRegistry {
4850
thread{basics::ThreadId::current()} {}
4951
auto snapshot(State state = State::Running) -> PromiseSnapshot {
5052
return PromiseSnapshot{.id = id(),
51-
.thread = thread,
52-
.source_location = source_location,
5353
.requester = {thread},
54-
.state = state};
54+
.state = state,
55+
.thread = thread,
56+
.source_location = source_location};
5557
}
5658
};
5759

@@ -132,3 +134,53 @@ TEST_F(
132134
get_thread_registry().garbage_collect();
133135
EXPECT_EQ(promises_in_registry(), (std::vector<PromiseSnapshot>{}));
134136
}
137+
138+
TEST_F(AsyncRegistryTest, sets_running_thread_to_current_thread_when_running) {
139+
auto promise = MyPromise{};
140+
auto all_promises = promises_in_registry();
141+
EXPECT_EQ(all_promises.size(), 1);
142+
EXPECT_EQ(all_promises[0].state, State::Running);
143+
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());
144+
145+
promise.update_state(State::Suspended);
146+
all_promises = promises_in_registry();
147+
EXPECT_EQ(all_promises[0].state, State::Suspended);
148+
EXPECT_EQ(all_promises[0].thread, std::nullopt);
149+
150+
promise.update_state(State::Running);
151+
all_promises = promises_in_registry();
152+
EXPECT_EQ(all_promises[0].state, State::Running);
153+
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());
154+
155+
promise.update_state(State::Resolved);
156+
all_promises = promises_in_registry();
157+
EXPECT_EQ(all_promises[0].state, State::Resolved);
158+
EXPECT_EQ(all_promises[0].thread, std::nullopt);
159+
160+
promise.update_state(State::Running);
161+
all_promises = promises_in_registry();
162+
EXPECT_EQ(all_promises[0].state, State::Running);
163+
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());
164+
165+
promise.update_state(State::Deleted);
166+
all_promises = promises_in_registry();
167+
EXPECT_EQ(all_promises[0].state, State::Deleted);
168+
EXPECT_EQ(all_promises[0].thread, std::nullopt);
169+
170+
promise.update_state(State::Running);
171+
all_promises = promises_in_registry();
172+
EXPECT_EQ(all_promises[0].state, State::Running);
173+
EXPECT_EQ(all_promises[0].thread, basics::ThreadId::current());
174+
}
175+
176+
TEST_F(AsyncRegistryTest, inpection_works_on_after_thread_was_deleted) {
177+
PromiseSnapshot promise_snapshot;
178+
std::ignore = std::jthread([&promise_snapshot]() {
179+
auto promise = MyPromise{};
180+
promise_snapshot = promise.snapshot();
181+
});
182+
183+
// we just make sure that we can still inspect the promise (and it does not
184+
// crash the system), although the thread the promise was created on is gone
185+
EXPECT_NE(fmt::format("{}", inspection::json(promise_snapshot)), "");
186+
}

0 commit comments

Comments
 (0)