-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreader.go
268 lines (250 loc) · 7.68 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"bufio"
"fmt"
"io"
"slices"
"strings"
"internal/trace/internal/tracev1"
"internal/trace/tracev2"
"internal/trace/version"
)
// Reader reads a byte stream, validates it, and produces trace events.
//
// Provided the trace is non-empty the Reader always produces a Sync
// event as the first event, and a Sync event as the last event.
// (There may also be any number of Sync events in the middle, too.)
type Reader struct {
version version.Version
r *bufio.Reader
lastTs Time
gen *generation
spill *spilledBatch
spillErr error // error from reading spill
spillErrSync bool // whether we emitted a Sync before reporting spillErr
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
syncs int
done bool
v1Events *traceV1Converter
}
// NewReader creates a new trace reader.
func NewReader(r io.Reader) (*Reader, error) {
br := bufio.NewReader(r)
v, err := version.ReadHeader(br)
if err != nil {
return nil, err
}
switch v {
case version.Go111, version.Go119, version.Go121:
tr, err := tracev1.Parse(br, v)
if err != nil {
return nil, err
}
return &Reader{
v1Events: convertV1Trace(tr),
}, nil
case version.Go122, version.Go123:
return &Reader{
version: v,
r: br,
order: ordering{
traceVer: v,
mStates: make(map[ThreadID]*mState),
pStates: make(map[ProcID]*pState),
gStates: make(map[GoID]*gState),
activeTasks: make(map[TaskID]taskState),
},
}, nil
default:
return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
}
}
// ReadEvent reads a single event from the stream.
//
// If the stream has been exhausted, it returns an invalid event and io.EOF.
func (r *Reader) ReadEvent() (e Event, err error) {
// Return only io.EOF if we're done.
if r.done {
return Event{}, io.EOF
}
// Handle v1 execution traces.
if r.v1Events != nil {
if r.syncs == 0 {
// Always emit a sync event first, if we have any events at all.
ev, ok := r.v1Events.events.Peek()
if ok {
r.syncs++
return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
}
}
ev, err := r.v1Events.next()
if err == io.EOF {
// Always emit a sync event at the end.
r.done = true
r.syncs++
return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
} else if err != nil {
return Event{}, err
}
return ev, nil
}
// Trace v2 parsing algorithm.
//
// (1) Read in all the batches for the next generation from the stream.
// (a) Use the size field in the header to quickly find all batches.
// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
// (5) Try to advance the next event for the M at the top of the min-heap.
// (a) On success, select that M.
// (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
// (c) If there's nothing left to advance, goto (1).
// (6) Select the latest event for the selected M and get it ready to be returned.
// (7) Read the next event for the selected M and update the min-heap.
// (8) Return the selected event, goto (5) on the next call.
// Set us up to track the last timestamp and fix up
// the timestamp of any event that comes through.
defer func() {
if err != nil {
return
}
if err = e.validateTableIDs(); err != nil {
return
}
if e.base.time <= r.lastTs {
e.base.time = r.lastTs + 1
}
r.lastTs = e.base.time
}()
// Consume any events in the ordering first.
if ev, ok := r.order.Next(); ok {
return ev, nil
}
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
if r.spillErr != nil {
if r.spillErrSync {
return Event{}, r.spillErr
}
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
if r.gen != nil && r.spill == nil {
// If we have a generation from the last read,
// and there's nothing left in the frontier, and
// there's no spilled batch, indicating that there's
// no further generation, it means we're done.
// Emit the final sync event.
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
// Read the next generation.
r.gen, r.spill, r.spillErr = readGeneration(r.r, r.spill)
if r.gen == nil {
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
// Reset CPU samples cursor.
r.cpuSamples = r.gen.cpuSamples
// Reset frontier.
for _, m := range r.gen.batchMs {
batches := r.gen.batches[m]
bc := &batchCursor{m: m}
ok, err := bc.nextEvent(batches, r.gen.freq)
if err != nil {
return Event{}, err
}
if !ok {
// Turns out there aren't actually any events in these batches.
continue
}
r.frontier = heapInsert(r.frontier, bc)
}
r.syncs++
if r.lastTs == 0 {
r.lastTs = r.gen.freq.mul(r.gen.minTs)
}
// Always emit a sync event at the beginning of the generation.
return syncEvent(r.gen.evTable, r.lastTs, r.syncs), nil
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
return ok, err
}
// Refresh the cursor's event.
ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
if err != nil {
return false, err
}
if ok {
// If we successfully refreshed, update the heap.
heapUpdate(r.frontier, i)
} else {
// There's nothing else to read. Delete this cursor from the frontier.
r.frontier = heapRemove(r.frontier, i)
}
return true, nil
}
// Inject a CPU sample if it comes next.
if len(r.cpuSamples) != 0 {
if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
e := r.cpuSamples[0].asEvent(r.gen.evTable)
r.cpuSamples = r.cpuSamples[1:]
return e, nil
}
}
// Try to advance the head of the frontier, which should have the minimum timestamp.
// This should be by far the most common case
if len(r.frontier) == 0 {
return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
if ok, err := tryAdvance(0); err != nil {
return Event{}, err
} else if !ok {
// Try to advance the rest of the frontier, in timestamp order.
//
// To do this, sort the min-heap. A sorted min-heap is still a
// min-heap, but now we can iterate over the rest and try to
// advance in order. This path should be rare.
slices.SortFunc(r.frontier, (*batchCursor).compare)
success := false
for i := 1; i < len(r.frontier); i++ {
if ok, err = tryAdvance(i); err != nil {
return Event{}, err
} else if ok {
success = true
break
}
}
if !success {
return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
}
// Pick off the next event on the queue. At this point, one must exist.
ev, ok := r.order.Next()
if !ok {
panic("invariant violation: advance successful, but queue is empty")
}
return ev, nil
}
func dumpFrontier(frontier []*batchCursor) string {
var sb strings.Builder
for _, bc := range frontier {
spec := tracev2.Specs()[bc.ev.typ]
fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
for i, arg := range spec.Args[1:] {
fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
}
fmt.Fprintf(&sb, "]\n")
}
return sb.String()
}