Skip to content

feat: add endpoint to get listening ports in agent #4260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net"
"net/http"
"net/netip"
"os"
"os/exec"
Expand Down Expand Up @@ -206,6 +207,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
go a.sshServer.HandleConn(a.stats.wrapConn(conn))
}
}()

reconnectingPTYListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetReconnectingPTYPort))
if err != nil {
a.logger.Critical(ctx, "listen for reconnecting pty", slog.Error(err))
Expand Down Expand Up @@ -240,6 +242,7 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
go a.handleReconnectingPTY(ctx, msg, conn)
}
}()

speedtestListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetSpeedtestPort))
if err != nil {
a.logger.Critical(ctx, "listen for speedtest", slog.Error(err))
Expand All @@ -261,6 +264,31 @@ func (a *agent) runTailnet(ctx context.Context, derpMap *tailcfg.DERPMap) {
}()
}
}()

statisticsListener, err := a.network.Listen("tcp", ":"+strconv.Itoa(codersdk.TailnetStatisticsPort))
if err != nil {
a.logger.Critical(ctx, "listen for statistics", slog.Error(err))
return
}
go func() {
defer statisticsListener.Close()
server := &http.Server{
Handler: a.statisticsHandler(),
ReadTimeout: 20 * time.Second,
ReadHeaderTimeout: 20 * time.Second,
WriteTimeout: 20 * time.Second,
ErrorLog: slog.Stdlib(ctx, a.logger.Named("statistics_http_server"), slog.LevelInfo),
}
go func() {
<-ctx.Done()
_ = server.Close()
}()

err = server.Serve(statisticsListener)
if err != nil && !xerrors.Is(err, http.ErrServerClosed) && !strings.Contains(err.Error(), "use of closed network connection") {
a.logger.Critical(ctx, "serve statistics HTTP server", slog.Error(err))
}
}()
}

// runCoordinator listens for nodes and updates the self-node as it changes.
Expand Down
65 changes: 65 additions & 0 deletions agent/ports_supported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//go:build linux || windows
// +build linux windows

package agent

import (
"time"

"github.com/cakturk/go-netstat/netstat"
"golang.org/x/xerrors"

"github.com/coder/coder/codersdk"
)

func (lp *listeningPortsHandler) getListeningPorts() ([]codersdk.ListeningPort, error) {
lp.mut.Lock()
defer lp.mut.Unlock()

if time.Since(lp.mtime) < time.Second {
// copy
ports := make([]codersdk.ListeningPort, len(lp.ports))
copy(ports, lp.ports)
return ports, nil
}

tabs, err := netstat.TCPSocks(func(s *netstat.SockTabEntry) bool {
return s.State == netstat.Listen
})
if err != nil {
return nil, xerrors.Errorf("scan listening ports: %w", err)
}

seen := make(map[uint16]struct{}, len(tabs))
ports := []codersdk.ListeningPort{}
for _, tab := range tabs {
if tab.LocalAddr == nil || tab.LocalAddr.Port < uint16(codersdk.MinimumListeningPort) {
continue
}

// Don't include ports that we've already seen. This can happen on
// Windows, and maybe on Linux if you're using a shared listener socket.
if _, ok := seen[tab.LocalAddr.Port]; ok {
continue
}
seen[tab.LocalAddr.Port] = struct{}{}

procName := ""
if tab.Process != nil {
procName = tab.Process.Name
}
ports = append(ports, codersdk.ListeningPort{
ProcessName: procName,
Network: codersdk.ListeningPortNetworkTCP,
Port: tab.LocalAddr.Port,
})
}

lp.ports = ports
lp.mtime = time.Now()

// copy
ports = make([]codersdk.ListeningPort, len(lp.ports))
copy(ports, lp.ports)
return ports, nil
}
13 changes: 13 additions & 0 deletions agent/ports_unsupported.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//go:build !linux && !windows
// +build !linux,!windows

package agent

import "github.com/coder/coder/codersdk"

func (lp *listeningPortsHandler) getListeningPorts() ([]codersdk.ListeningPort, error) {
// Can't scan for ports on non-linux or non-windows systems at the moment.
// The UI will not show any "no ports found" message to the user, so the
// user won't suspect a thing.
return []codersdk.ListeningPort{}, nil
}
49 changes: 49 additions & 0 deletions agent/statsendpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package agent

import (
"net/http"
"sync"
"time"

"github.com/go-chi/chi"

"github.com/coder/coder/coderd/httpapi"
"github.com/coder/coder/codersdk"
)

func (*agent) statisticsHandler() http.Handler {
r := chi.NewRouter()
r.Get("/", func(rw http.ResponseWriter, r *http.Request) {
httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.Response{
Message: "Hello from the agent!",
})
})

lp := &listeningPortsHandler{}
r.Get("/api/v0/listening-ports", lp.handler)
Comment on lines +22 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating an API for statistics, we should just handle this single port for right now on a handler. We probably don't even need Chi and can just directly serve it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@f0ssel and I would like to convert the stats websocket code in the agent to be in this webserver too, which is why it has path based routing. I've only reserved 8 ports for Coder so we can't just create a new http server for each function


return r
}

type listeningPortsHandler struct {
mut sync.Mutex
ports []codersdk.ListeningPort
mtime time.Time
}

// handler returns a list of listening ports. This is tested by coderd's
// TestWorkspaceAgentListeningPorts test.
func (lp *listeningPortsHandler) handler(rw http.ResponseWriter, r *http.Request) {
ports, err := lp.getListeningPorts()
if err != nil {
httpapi.Write(r.Context(), rw, http.StatusInternalServerError, codersdk.Response{
Message: "Could not scan for listening ports.",
Detail: err.Error(),
})
return
}

httpapi.Write(r.Context(), rw, http.StatusOK, codersdk.ListeningPortsResponse{
Ports: ports,
})
}
1 change: 1 addition & 0 deletions coderd/coderd.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func New(options *Options) *API {
)
r.Get("/", api.workspaceAgent)
r.Get("/pty", api.workspaceAgentPTY)
r.Get("/listening-ports", api.workspaceAgentListeningPorts)
r.Get("/connection", api.workspaceAgentConnection)
r.Get("/coordinate", api.workspaceAgentClientCoordinate)
// TODO: This can be removed in October. It allows for a friendly
Expand Down
46 changes: 46 additions & 0 deletions coderd/workspaceagents.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,52 @@ func (api *API) workspaceAgentPTY(rw http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(ptNetConn, wsNetConn)
}

func (api *API) workspaceAgentListeningPorts(rw http.ResponseWriter, r *http.Request) {
ctx := r.Context()
workspace := httpmw.WorkspaceParam(r)
workspaceAgent := httpmw.WorkspaceAgentParam(r)
if !api.Authorize(r, rbac.ActionRead, workspace) {
httpapi.ResourceNotFound(rw)
return
}

apiAgent, err := convertWorkspaceAgent(api.DERPMap, api.TailnetCoordinator, workspaceAgent, nil, api.AgentInactiveDisconnectTimeout)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error reading workspace agent.",
Detail: err.Error(),
})
return
}
if apiAgent.Status != codersdk.WorkspaceAgentConnected {
httpapi.Write(ctx, rw, http.StatusPreconditionRequired, codersdk.Response{
Message: fmt.Sprintf("Agent state is %q, it must be in the %q state.", apiAgent.Status, codersdk.WorkspaceAgentConnected),
})
return
}

agentConn, release, err := api.workspaceAgentCache.Acquire(r, workspaceAgent.ID)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error dialing workspace agent.",
Detail: err.Error(),
})
return
}
defer release()

portsResponse, err := agentConn.ListeningPorts(ctx)
if err != nil {
httpapi.Write(ctx, rw, http.StatusInternalServerError, codersdk.Response{
Message: "Internal error fetching listening ports.",
Detail: err.Error(),
})
return
}
Comment on lines +246 to +263
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about pushing port data instead of pulling it? This seems like it could lead to a lot of workspace traffic with page reloads, and we've learned from v1 that dynamic data like this (especially on main pages) can be sketchy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only be loaded when people click the port forward button. There will be more traffic if workspaces push it to coderd rather than if we load it when the user clicks the button which won't be that often.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To add to this, I talked with dean and I think the syscall overhead is way too much for a responsive push system for such a small feature like this. Because we cache the response for 1 second on the agent side this already has built in protection for the workspace.


httpapi.Write(ctx, rw, http.StatusOK, portsResponse)
}

func (api *API) dialWorkspaceAgentTailnet(r *http.Request, agentID uuid.UUID) (*codersdk.AgentConn, error) {
clientConn, serverConn := net.Pipe()
go func() {
Expand Down
129 changes: 129 additions & 0 deletions coderd/workspaceagents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"bufio"
"context"
"encoding/json"
"net"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -363,6 +365,133 @@ func TestWorkspaceAgentPTY(t *testing.T) {
expectLine(matchEchoOutput)
}

func TestWorkspaceAgentListeningPorts(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{
IncludeProvisionerDaemon: true,
})
coderdPort, err := strconv.Atoi(client.URL.Port())
require.NoError(t, err)

user := coderdtest.CreateFirstUser(t, client)
authToken := uuid.NewString()
version := coderdtest.CreateTemplateVersion(t, client, user.OrganizationID, &echo.Responses{
Parse: echo.ParseComplete,
ProvisionDryRun: echo.ProvisionComplete,
Provision: []*proto.Provision_Response{{
Type: &proto.Provision_Response_Complete{
Complete: &proto.Provision_Complete{
Resources: []*proto.Resource{{
Name: "example",
Type: "aws_instance",
Agents: []*proto.Agent{{
Id: uuid.NewString(),
Auth: &proto.Agent_Token{
Token: authToken,
},
}},
}},
},
},
}},
})
template := coderdtest.CreateTemplate(t, client, user.OrganizationID, version.ID)
coderdtest.AwaitTemplateVersionJob(t, client, version.ID)
workspace := coderdtest.CreateWorkspace(t, client, user.OrganizationID, template.ID)
coderdtest.AwaitWorkspaceBuildJob(t, client, workspace.LatestBuild.ID)

agentClient := codersdk.New(client.URL)
agentClient.SessionToken = authToken
agentCloser := agent.New(agent.Options{
FetchMetadata: agentClient.WorkspaceAgentMetadata,
CoordinatorDialer: agentClient.ListenWorkspaceAgentTailnet,
Logger: slogtest.Make(t, nil).Named("agent").Leveled(slog.LevelDebug),
})
t.Cleanup(func() {
_ = agentCloser.Close()
})
resources := coderdtest.AwaitWorkspaceAgents(t, client, workspace.ID)

t.Run("LinuxAndWindows", func(t *testing.T) {
t.Parallel()
if runtime.GOOS != "linux" && runtime.GOOS != "windows" {
t.Skip("only runs on linux and windows")
return
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

// Create a TCP listener on a random port that we expect to see in the
// response.
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
defer l.Close()
tcpAddr, _ := l.Addr().(*net.TCPAddr)

// List ports and ensure that the port we expect to see is there.
res, err := client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
require.NoError(t, err)

var (
expected = map[uint16]bool{
// expect the listener we made
uint16(tcpAddr.Port): false,
// expect the coderdtest server
uint16(coderdPort): false,
}
)
for _, port := range res.Ports {
if port.Network == codersdk.ListeningPortNetworkTCP {
if val, ok := expected[port.Port]; ok {
if val {
t.Fatalf("expected to find TCP port %d only once in response", port.Port)
}
}
expected[port.Port] = true
}
}
for port, found := range expected {
if !found {
t.Fatalf("expected to find TCP port %d in response", port)
}
}

// Close the listener and check that the port is no longer in the response.
require.NoError(t, l.Close())
time.Sleep(2 * time.Second) // avoid cache
res, err = client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
require.NoError(t, err)

for _, port := range res.Ports {
if port.Network == codersdk.ListeningPortNetworkTCP && port.Port == uint16(tcpAddr.Port) {
t.Fatalf("expected to not find TCP port %d in response", tcpAddr.Port)
}
}
})

t.Run("Darwin", func(t *testing.T) {
t.Parallel()
if runtime.GOOS != "darwin" {
t.Skip("only runs on darwin")
return
}

ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitLong)
defer cancel()

// Create a TCP listener on a random port.
l, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
defer l.Close()

// List ports and ensure that the list is empty because we're on darwin.
res, err := client.WorkspaceAgentListeningPorts(ctx, resources[0].Agents[0].ID)
require.NoError(t, err)
require.Len(t, res.Ports, 0)
})
}

func TestWorkspaceAgentAppHealth(t *testing.T) {
t.Parallel()
client := coderdtest.New(t, &coderdtest.Options{
Expand Down
Loading