Skip to content

Commit 1386465

Browse files
authored
feat: add endpoint to get listening ports in agent (coder#4260)
1 parent bbe2baf commit 1386465

15 files changed

+501
-1
lines changed

agent/agent.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"net"
13+
"net/http"
1314
"net/netip"
1415
"os"
1516
"os/exec"
@@ -206,6 +207,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
206207
go a.sshServer.HandleConn(a.stats.wrapConn(conn))
207208
}
208209
}()
210+
209211
reconnectingPTYListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetReconnectingPTYPort))
210212
if err != nil {
211213
a.logger.Critical(ctx, "listen for reconnecting pty", slog.Error(err))
@@ -240,6 +242,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
240242
go a.handleReconnectingPTY(ctx, msg, conn)
241243
}
242244
}()
245+
243246
speedtestListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetSpeedtestPort))
244247
if err != nil {
245248
a.logger.Critical(ctx, "listen for speedtest", slog.Error(err))
@@ -261,6 +264,31 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
261264
}()
262265
}
263266
}()
267+
268+
statisticsListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetStatisticsPort))
269+
if err != nil {
270+
a.logger.Critical(ctx, "listen for statistics", slog.Error(err))
271+
return
272+
}
273+
go func() {
274+
defer statisticsListener.Close()
275+
server := &http.Server{
276+
Handler: a.statisticsHandler(),
277+
ReadTimeout: 20 * time.Second,
278+
ReadHeaderTimeout: 20 * time.Second,
279+
WriteTimeout: 20 * time.Second,
280+
ErrorLog: slog.Stdlib(ctx, a.logger.Named("statistics_http_server"), slog.LevelInfo),
281+
}
282+
go func() {
283+
<-ctx.Done()
284+
_ = server.Close()
285+
}()
286+
287+
err = server.Serve(statisticsListener)
288+
if err != nil && !xerrors.Is(err, http.ErrServerClosed) && !strings.Contains(err.Error(), "use of closed network connection") {
289+
a.logger.Critical(ctx, "serve statistics HTTP server", slog.Error(err))
290+
}
291+
}()
264292
}
265293

266294
// runCoordinator listens for nodes and updates the self-node as it changes.

agent/ports_supported.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//go:build linux || windows
2+
// +build linux windows
3+
4+
package agent
5+
6+
import (
7+
"time"
8+
9+
"github.com/cakturk/go-netstat/netstat"
10+
"golang.org/x/xerrors"
11+
12+
"github.com/coder/coder/codersdk"
13+
)
14+
15+
func (lp *listeningPortsHandler) getListeningPorts() ([]codersdk.ListeningPort, error) {
16+
lp.mut.Lock()
17+
defer lp.mut.Unlock()
18+
19+
if time.Since(lp.mtime) < time.Second {
20+
// copy
21+
ports := make([]codersdk.ListeningPort, len(lp.ports))
22+
copy(ports, lp.ports)
23+
return ports, nil
24+
}
25+
26+
tabs, err := netstat.TCPSocks(func(s *netstat.SockTabEntry) bool {
27+
return s.State == netstat.Listen
28+
})
29+
if err != nil {
30+
return nil, xerrors.Errorf("scan listening ports: %w", err)
31+
}
32+
33+
seen := make(map[uint16]struct{}, len(tabs))
34+
ports := []codersdk.ListeningPort{}
35+
for _, tab := range tabs {
36+
if tab.LocalAddr == nil || tab.LocalAddr.Port < uint16(codersdk.MinimumListeningPort) {
37+
continue
38+
}
39+
40+
// Don't include ports that we've already seen. This can happen on
41+
// Windows, and maybe on Linux if you're using a shared listener socket.
42+
if _, ok := seen[tab.LocalAddr.Port]; ok {
43+
continue
44+
}
45+
seen[tab.LocalAddr.Port] = struct{}{}
46+
47+
procName := ""
48+
if tab.Process != nil {
49+
procName = tab.Process.Name
50+
}
51+
ports = append(ports, codersdk.ListeningPort{
52+
ProcessName: procName,
53+
Network: codersdk.ListeningPortNetworkTCP,
54+
Port: tab.LocalAddr.Port,
55+
})
56+
}
57+
58+
lp.ports = ports
59+
lp.mtime = time.Now()
60+
61+
// copy
62+
ports = make([]codersdk.ListeningPort, len(lp.ports))
63+
copy(ports, lp.ports)
64+
return ports, nil
65+
}

agent/ports_unsupported.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//go:build !linux && !windows
2+
// +build !linux,!windows
3+
4+
package agent
5+
6+
import "github.com/coder/coder/codersdk"
7+
8+
func (lp *listeningPortsHandler) getListeningPorts() ([]codersdk.ListeningPort, error) {
9+
// Can't scan for ports on non-linux or non-windows systems at the moment.
10+
// The UI will not show any "no ports found" message to the user, so the
11+
// user won't suspect a thing.
12+
return []codersdk.ListeningPort{}, nil
13+
}

agent/statsendpoint.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package agent
2+
3+
import (
4+
"net/http"
5+
"sync"
6+
"time"
7+
8+
"github.com/go-chi/chi"
9+
10+
"github.com/coder/coder/coderd/httpapi"
11+
"github.com/coder/coder/codersdk"
12+
)
13+
14+
func (*agent) statisticsHandler() http.Handler {
15+
r := chi.NewRouter()
16+
r.Get("/", func(rw http.ResponseWriter, r *http.Request) {
17+
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.Response{
18+
Message: "Hello from the agent!",
19+
})
20+
})
21+
22+
lp := &listeningPortsHandler{}
23+
r.Get("/api/v0/listening-ports", lp.handler)
24+
25+
return r
26+
}
27+
28+
type listeningPortsHandler struct {
29+
mut sync.Mutex
30+
ports []codersdk.ListeningPort
31+
mtime time.Time
32+
}
33+
34+
// handler returns a list of listening ports. This is tested by coderd's
35+
// TestWorkspaceAgentListeningPorts test.
36+
func (lp *listeningPortsHandler) handler(rw http.ResponseWriter, r *http.Request) {
37+
ports, err := lp.getListeningPorts()
38+
if err != nil {
39+
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
40+
Message: "Could not scan for listening ports.",
41+
Detail: err.Error(),
42+
})
43+
return
44+
}
45+
46+
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.ListeningPortsResponse{
47+
Ports: ports,
48+
})
49+
}

coderd/coderd.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ func New(options *Options) *API {
438438
)
439439
r.Get("/", api.workspaceAgent)
440440
r.Get("/pty", api.workspaceAgentPTY)
441+
r.Get("/listening-ports", api.workspaceAgentListeningPorts)
441442
r.Get("/connection", api.workspaceAgentConnection)
442443
r.Get("/coordinate", api.workspaceAgentClientCoordinate)
443444
// TODO: This can be removed in October. It allows for a friendly

coderd/workspaceagents.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,52 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
219219
_, _ = io.Copy(ptNetConn, wsNetConn)
220220
}
221221

222+
func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Request) {
223+
ctx := r.Context()
224+
workspace := httpmw.WorkspaceParam(r)
225+
workspaceAgent := httpmw.WorkspaceAgentParam(r)
226+
if !api.Authorize(r, rbac.ActionRead, workspace) {
227+
httpapi.ResourceNotFound(rw)
228+
return
229+
}
230+
231+
apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
232+
if err != nil {
233+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
234+
Message: "Internal error reading workspace agent.",
235+
Detail: err.Error(),
236+
})
237+
return
238+
}
239+
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
240+
httpapi.Write(ctx, rw, http.StatusPreconditionRequired, codersdk.Response{
241+
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
242+
})
243+
return
244+
}
245+
246+
agentConn, release, err := api.workspaceAgentCache.Acquire(r, workspaceAgent.ID)
247+
if err != nil {
248+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
249+
Message: "Internal error dialing workspace agent.",
250+
Detail: err.Error(),
251+
})
252+
return
253+
}
254+
defer release()
255+
256+
portsResponse, err := agentConn.ListeningPorts(ctx)
257+
if err != nil {
258+
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
259+
Message: "Internal error fetching listening ports.",
260+
Detail: err.Error(),
261+
})
262+
return
263+
}
264+
265+
httpapi.Write(ctx, rw, http.StatusOK, portsResponse)
266+
}
267+
222268
func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*codersdk.AgentConn, error) {
223269
clientConn, serverConn := net.Pipe()
224270
go func() {

coderd/workspaceagents_test.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"bufio"
55
"context"
66
"encoding/json"
7+
"net"
78
"runtime"
9+
"strconv"
810
"strings"
911
"testing"
1012
"time"
@@ -363,6 +365,133 @@ func TestWorkspaceAgentPTY(t *testing.T) {
363365
expectLine(matchEchoOutput)
364366
}
365367

368+
func TestWorkspaceAgentListeningPorts(t *testing.T) {
369+
t.Parallel()
370+
client := coderdtest.New(t, &coderdtest.Options{
371+
IncludeProvisionerDaemon: true,
372+
})
373+
coderdPort, err := strconv.Atoi(client.URL.Port())
374+
require.NoError(t, err)
375+
376+
user := coderdtest.CreateFirstUser(t, client)
377+
authToken := uuid.NewString()
378+
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
379+
Parse: echo.ParseComplete,
380+
ProvisionDryRun: echo.ProvisionComplete,
381+
Provision: []*proto.Provision_Response{{
382+
Type: &proto.Provision_Response_Complete{
383+
Complete: &proto.Provision_Complete{
384+
Resources: []*proto.Resource{{
385+
Name: "example",
386+
Type: "aws_instance",
387+
Agents: []*proto.Agent{{
388+
Id: uuid.NewString(),
389+
Auth: &proto.Agent_Token{
390+
Token: authToken,
391+
},
392+
}},
393+
}},
394+
},
395+
},
396+
}},
397+
})
398+
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
399+
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
400+
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
401+
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)
402+
403+
agentClient := codersdk.New(client.URL)
404+
agentClient.SessionToken = authToken
405+
agentCloser := agent.New(agent.Options{
406+
FetchMetadata: agentClient.WorkspaceAgentMetadata,
407+
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
408+
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
409+
})
410+
t.Cleanup(func() {
411+
_ = agentCloser.Close()
412+
})
413+
resources := coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)
414+
415+
t.Run("LinuxAndWindows", func(t *testing.T) {
416+
t.Parallel()
417+
if runtime.GOOS != "linux" && runtime.GOOS != "windows" {
418+
t.Skip("only runs on linux and windows")
419+
return
420+
}
421+
422+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
423+
defer cancel()
424+
425+
// Create a TCP listener on a random port that we expect to see in the
426+
// response.
427+
l, err := net.Listen("tcp", "localhost:0")
428+
require.NoError(t, err)
429+
defer l.Close()
430+
tcpAddr, _ := l.Addr().(*net.TCPAddr)
431+
432+
// List ports and ensure that the port we expect to see is there.
433+
res, err := client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
434+
require.NoError(t, err)
435+
436+
var (
437+
expected = map[uint16]bool{
438+
// expect the listener we made
439+
uint16(tcpAddr.Port): false,
440+
// expect the coderdtest server
441+
uint16(coderdPort): false,
442+
}
443+
)
444+
for _, port := range res.Ports {
445+
if port.Network == codersdk.ListeningPortNetworkTCP {
446+
if val, ok := expected[port.Port]; ok {
447+
if val {
448+
t.Fatalf("expected to find TCP port %d only once in response", port.Port)
449+
}
450+
}
451+
expected[port.Port] = true
452+
}
453+
}
454+
for port, found := range expected {
455+
if !found {
456+
t.Fatalf("expected to find TCP port %d in response", port)
457+
}
458+
}
459+
460+
// Close the listener and check that the port is no longer in the response.
461+
require.NoError(t, l.Close())
462+
time.Sleep(2 * time.Second) // avoid cache
463+
res, err = client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
464+
require.NoError(t, err)
465+
466+
for _, port := range res.Ports {
467+
if port.Network == codersdk.ListeningPortNetworkTCP && port.Port == uint16(tcpAddr.Port) {
468+
t.Fatalf("expected to not find TCP port %d in response", tcpAddr.Port)
469+
}
470+
}
471+
})
472+
473+
t.Run("Darwin", func(t *testing.T) {
474+
t.Parallel()
475+
if runtime.GOOS != "darwin" {
476+
t.Skip("only runs on darwin")
477+
return
478+
}
479+
480+
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
481+
defer cancel()
482+
483+
// Create a TCP listener on a random port.
484+
l, err := net.Listen("tcp", "localhost:0")
485+
require.NoError(t, err)
486+
defer l.Close()
487+
488+
// List ports and ensure that the list is empty because we're on darwin.
489+
res, err := client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
490+
require.NoError(t, err)
491+
require.Len(t, res.Ports, 0)
492+
})
493+
}
494+
366495
func TestWorkspaceAgentAppHealth(t *testing.T) {
367496
t.Parallel()
368497
client := coderdtest.New(t, &coderdtest.Options{

0 commit comments

Comments
 (0)