Skip to content

Commit 90bfc52

Browse files
committed
analyze system activity
1 parent 4a762de commit 90bfc52

File tree

6 files changed

+295
-180
lines changed

6 files changed

+295
-180
lines changed

pkg/estimator/monitor.go

Lines changed: 243 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,243 @@
1+
/*
2+
2021 © Postgres.ai
3+
*/
4+
5+
package estimator
6+
7+
import (
8+
"bufio"
9+
"bytes"
10+
"context"
11+
"fmt"
12+
"io"
13+
"os/exec"
14+
"regexp"
15+
"strconv"
16+
"sync/atomic"
17+
18+
"github.com/pkg/errors"
19+
20+
"gitlab.com/postgres-ai/database-lab/v2/pkg/log"
21+
)
22+
23+
const (
24+
regExp = "^[.0-9]+\\s+\\S+\\s+(\\d+)\\s+\\w+\\s+(W|R)\\s+\\d+\\s+(\\d+)\\s+[.0-9]+$"
25+
countMatches = 4
26+
expectedMappingParts = 2
27+
)
28+
29+
var (
30+
r = regexp.MustCompile(regExp)
31+
nsPrefix = []byte("NSpid:")
32+
)
33+
34+
// Monitor observes processes and system activity.
35+
type Monitor struct {
36+
pid int
37+
container string
38+
pidMapping map[int]int
39+
profiler *Profiler
40+
}
41+
42+
// NewMonitor creates a new monitor.
43+
func NewMonitor(pid int, container string, profiler *Profiler) *Monitor {
44+
return &Monitor{
45+
pid: pid,
46+
container: container,
47+
profiler: profiler,
48+
pidMapping: make(map[int]int),
49+
}
50+
}
51+
52+
// InspectIOBlocks counts physically read blocks.
53+
func (m *Monitor) InspectIOBlocks(ctx context.Context) error {
54+
log.Dbg("Run read physical")
55+
56+
cmd := exec.Command("biosnoop")
57+
58+
r, err := cmd.StdoutPipe()
59+
if err != nil {
60+
return err
61+
}
62+
63+
cmd.Stderr = cmd.Stdout
64+
65+
go m.scanOutput(ctx, r)
66+
67+
if err := cmd.Start(); err != nil {
68+
return errors.Wrap(err, "failed to run")
69+
}
70+
71+
<-m.profiler.exitChan
72+
73+
log.Dbg("End read physical")
74+
75+
return nil
76+
}
77+
78+
type bytesEntry struct {
79+
pid int
80+
totalBytes uint64
81+
}
82+
83+
func (m *Monitor) scanOutput(ctx context.Context, r io.Reader) {
84+
scanner := bufio.NewScanner(r)
85+
86+
for scanner.Scan() {
87+
scanBytes := scanner.Bytes()
88+
89+
if !bytes.Contains(scanBytes, []byte("postgres")) && !bytes.Contains(scanBytes, []byte("psql")) {
90+
continue
91+
}
92+
93+
bytesEntry := m.parseReadBytes(scanBytes)
94+
if bytesEntry == nil || bytesEntry.totalBytes == 0 {
95+
continue
96+
}
97+
98+
pid, ok := m.pidMapping[bytesEntry.pid]
99+
if !ok {
100+
hostPID, err := m.filterPID(bytesEntry.pid)
101+
m.pidMapping[bytesEntry.pid] = hostPID
102+
103+
if err != nil {
104+
// log.Dbg("failed to get PID mapping: ", err)
105+
continue
106+
}
107+
108+
pid = hostPID
109+
}
110+
111+
if pid != m.pid {
112+
continue
113+
}
114+
115+
log.Dbg("read bytes: ", bytesEntry.totalBytes)
116+
117+
atomic.AddUint64(&m.profiler.readBytes, bytesEntry.totalBytes)
118+
119+
select {
120+
case <-ctx.Done():
121+
log.Dbg("context")
122+
return
123+
124+
case <-m.profiler.exitChan:
125+
log.Dbg("exit chan")
126+
return
127+
128+
default:
129+
}
130+
}
131+
}
132+
133+
func getContainerHash(pid int) (string, error) {
134+
procParallel, err := exec.Command("cat", fmt.Sprintf("/host_proc/%d/cgroup", pid)).Output()
135+
if err != nil {
136+
return "", err
137+
}
138+
139+
return isInside(procParallel), nil
140+
}
141+
142+
func isInside(procParallel []byte) string {
143+
sc := bufio.NewScanner(bytes.NewBuffer(procParallel))
144+
145+
for sc.Scan() {
146+
line := sc.Bytes()
147+
148+
if !bytes.HasPrefix(line, []byte("1:name")) {
149+
continue
150+
}
151+
152+
res := bytes.SplitN(line, []byte("/docker/"), 2)
153+
154+
if len(res) == 1 {
155+
return ""
156+
}
157+
158+
return string(res[1])
159+
}
160+
161+
return ""
162+
}
163+
164+
func (m *Monitor) isValidContainer(hash string) bool {
165+
return m.container == hash
166+
}
167+
168+
func (m *Monitor) filterPID(pid int) (int, error) {
169+
hash, err := getContainerHash(pid)
170+
if err != nil {
171+
return 0, err
172+
}
173+
174+
if !m.isValidContainer(hash) {
175+
return 0, nil
176+
}
177+
178+
procParallel, err := exec.Command("cat", fmt.Sprintf("/host_proc/%d/cmdline", pid)).Output()
179+
if err != nil {
180+
return 0, err
181+
}
182+
183+
if bytes.Contains(procParallel, []byte("postgres")) &&
184+
bytes.Contains(procParallel, []byte("parallel worker for PID "+strconv.Itoa(m.pid))) {
185+
return m.pid, nil
186+
}
187+
188+
procStatus, err := exec.Command("cat", fmt.Sprintf("/host_proc/%d/status", pid)).Output()
189+
if err != nil {
190+
return 0, err
191+
}
192+
193+
return m.parsePIDMapping(procStatus)
194+
}
195+
196+
func (m *Monitor) parsePIDMapping(procStatus []byte) (int, error) {
197+
sc := bufio.NewScanner(bytes.NewBuffer(procStatus))
198+
199+
for sc.Scan() {
200+
line := sc.Bytes()
201+
if !bytes.HasPrefix(line, nsPrefix) {
202+
continue
203+
}
204+
205+
nsPID := bytes.TrimSpace(bytes.TrimPrefix(line, nsPrefix))
206+
207+
pidValues := bytes.Fields(nsPID)
208+
if len(pidValues) < expectedMappingParts {
209+
return 0, nil
210+
}
211+
212+
hostPID, err := strconv.Atoi(string(bytes.TrimSpace(pidValues[1])))
213+
if err != nil {
214+
return 0, err
215+
}
216+
217+
return hostPID, nil
218+
}
219+
220+
return 0, nil
221+
}
222+
223+
func (m *Monitor) parseReadBytes(line []byte) *bytesEntry {
224+
submatch := r.FindSubmatch(line)
225+
if len(submatch) != countMatches {
226+
return nil
227+
}
228+
229+
totalBytes, err := strconv.ParseUint(string(submatch[3]), 10, 64)
230+
if err != nil {
231+
return nil
232+
}
233+
234+
pid, err := strconv.Atoi(string(submatch[1]))
235+
if err != nil {
236+
return nil
237+
}
238+
239+
return &bytesEntry{
240+
pid: pid,
241+
totalBytes: totalBytes,
242+
}
243+
}

pkg/estimator/profile_test.go renamed to pkg/estimator/monitor_test.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package estimator
33
import (
44
"bytes"
55
"context"
6-
"strconv"
76
"testing"
87

98
"github.com/stretchr/testify/assert"
@@ -88,18 +87,14 @@ func TestOutputScanner(t *testing.T) {
8887

8988
for _, tc := range testCases {
9089
r := bytes.NewReader([]byte(sample))
91-
p := Profiler{
92-
pidMapping: map[string]int{
93-
strconv.Itoa(tc.pid): tc.pid,
94-
},
95-
opts: TraceOptions{
96-
Pid: tc.pid,
97-
},
90+
monitor := NewMonitor(tc.pid, "", &Profiler{})
91+
monitor.pidMapping = map[int]int{
92+
tc.pid: tc.pid,
9893
}
9994

100-
p.scanOutput(context.TODO(), r)
95+
monitor.scanOutput(context.TODO(), r)
10196

102-
assert.Equal(t, tc.readBytes, p.readBytes)
97+
assert.Equal(t, tc.readBytes, monitor.profiler.readBytes)
10398
}
10499
}
105100

@@ -124,10 +119,31 @@ VmSize: 2315104 kB
124119
`
125120

126121
func TestProcStatParsing(t *testing.T) {
127-
p := Profiler{}
122+
monitor := Monitor{}
128123

129-
hostPID, err := p.parsePIDMapping([]byte(procStatus))
124+
hostPID, err := monitor.parsePIDMapping([]byte(procStatus))
130125

131126
require.Nil(t, err)
132127
assert.Equal(t, 674, hostPID)
133128
}
129+
130+
const procCgroup = `
131+
12:rdma:/
132+
11:pids:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
133+
10:cpuset:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
134+
9:perf_event:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
135+
8:blkio:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
136+
7:freezer:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
137+
6:cpu,cpuacct:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
138+
5:memory:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
139+
4:net_cls,net_prio:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
140+
3:devices:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
141+
2:hugetlb:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
142+
1:name=systemd:/docker/ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b
143+
`
144+
145+
func TestIsInside(t *testing.T) {
146+
containerHash := isInside([]byte(procCgroup))
147+
148+
assert.Equal(t, "ad63ab82fdb32dd384ac76ab5a9d20fb7cb48f53be4d4cac52924e920c4a967b", containerHash)
149+
}

0 commit comments

Comments
 (0)