8
8
"fmt"
9
9
"hash/fnv"
10
10
"io"
11
+ "net"
11
12
"net/http"
12
13
"net/netip"
13
14
"os"
@@ -28,6 +29,7 @@ import (
28
29
"golang.org/x/exp/slices"
29
30
"golang.org/x/sync/errgroup"
30
31
"golang.org/x/xerrors"
32
+ "google.golang.org/protobuf/types/known/timestamppb"
31
33
"tailscale.com/net/speedtest"
32
34
"tailscale.com/tailcfg"
33
35
"tailscale.com/types/netlogtype"
@@ -90,6 +92,7 @@ type Options struct {
90
92
ContainerLister agentcontainers.Lister
91
93
92
94
ExperimentalContainersEnabled bool
95
+ ExperimentalConnectionReports bool
93
96
}
94
97
95
98
type Client interface {
@@ -177,6 +180,7 @@ func New(options Options) Agent {
177
180
lifecycleUpdate : make (chan struct {}, 1 ),
178
181
lifecycleReported : make (chan codersdk.WorkspaceAgentLifecycle , 1 ),
179
182
lifecycleStates : []agentsdk.PostLifecycleRequest {{State : codersdk .WorkspaceAgentLifecycleCreated }},
183
+ reportConnectionsUpdate : make (chan struct {}, 1 ),
180
184
ignorePorts : options .IgnorePorts ,
181
185
portCacheDuration : options .PortCacheDuration ,
182
186
reportMetadataInterval : options .ReportMetadataInterval ,
@@ -192,6 +196,7 @@ func New(options Options) Agent {
192
196
lister : options .ContainerLister ,
193
197
194
198
experimentalDevcontainersEnabled : options .ExperimentalContainersEnabled ,
199
+ experimentalConnectionReports : options .ExperimentalConnectionReports ,
195
200
}
196
201
// Initially, we have a closed channel, reflecting the fact that we are not initially connected.
197
202
// Each time we connect we replace the channel (while holding the closeMutex) with a new one
@@ -252,6 +257,10 @@ type agent struct {
252
257
lifecycleStates []agentsdk.PostLifecycleRequest
253
258
lifecycleLastReportedIndex int // Keeps track of the last lifecycle state we successfully reported.
254
259
260
+ reportConnectionsUpdate chan struct {}
261
+ reportConnectionsMu sync.Mutex
262
+ reportConnections []* proto.ReportConnectionRequest
263
+
255
264
network * tailnet.Conn
256
265
statsReporter * statsReporter
257
266
logSender * agentsdk.LogSender
@@ -264,6 +273,7 @@ type agent struct {
264
273
lister agentcontainers.Lister
265
274
266
275
experimentalDevcontainersEnabled bool
276
+ experimentalConnectionReports bool
267
277
}
268
278
269
279
func (a * agent ) TailnetConn () * tailnet.Conn {
@@ -279,6 +289,24 @@ func (a *agent) init() {
279
289
UpdateEnv : a .updateCommandEnv ,
280
290
WorkingDirectory : func () string { return a .manifest .Load ().Directory },
281
291
BlockFileTransfer : a .blockFileTransfer ,
292
+ ReportConnection : func (id uuid.UUID , magicType agentssh.MagicSessionType , ip string ) func (code int , reason string ) {
293
+ var connectionType proto.Connection_Type
294
+ switch magicType {
295
+ case agentssh .MagicSessionTypeSSH :
296
+ connectionType = proto .Connection_SSH
297
+ case agentssh .MagicSessionTypeVSCode :
298
+ connectionType = proto .Connection_VSCODE
299
+ case agentssh .MagicSessionTypeJetBrains :
300
+ connectionType = proto .Connection_JETBRAINS
301
+ case agentssh .MagicSessionTypeUnknown :
302
+ connectionType = proto .Connection_TYPE_UNSPECIFIED
303
+ default :
304
+ a .logger .Error (a .hardCtx , "unhandled magic session type when reporting connection" , slog .F ("magic_type" , magicType ))
305
+ connectionType = proto .Connection_TYPE_UNSPECIFIED
306
+ }
307
+
308
+ return a .reportConnection (id , connectionType , ip )
309
+ },
282
310
})
283
311
if err != nil {
284
312
panic (err )
@@ -301,6 +329,9 @@ func (a *agent) init() {
301
329
a .reconnectingPTYServer = reconnectingpty .NewServer (
302
330
a .logger .Named ("reconnecting-pty" ),
303
331
a .sshServer ,
332
+ func (id uuid.UUID , ip string ) func (code int , reason string ) {
333
+ return a .reportConnection (id , proto .Connection_RECONNECTING_PTY , ip )
334
+ },
304
335
a .metrics .connectionsTotal , a .metrics .reconnectingPTYErrors ,
305
336
a .reconnectingPTYTimeout ,
306
337
func (s * reconnectingpty.Server ) {
@@ -713,6 +744,129 @@ func (a *agent) setLifecycle(state codersdk.WorkspaceAgentLifecycle) {
713
744
}
714
745
}
715
746
747
+ // reportConnectionsLoop reports connections to the agent for auditing.
748
+ func (a * agent ) reportConnectionsLoop (ctx context.Context , aAPI proto.DRPCAgentClient24 ) error {
749
+ for {
750
+ select {
751
+ case <- a .reportConnectionsUpdate :
752
+ case <- ctx .Done ():
753
+ return ctx .Err ()
754
+ }
755
+
756
+ for {
757
+ a .reportConnectionsMu .Lock ()
758
+ if len (a .reportConnections ) == 0 {
759
+ a .reportConnectionsMu .Unlock ()
760
+ break
761
+ }
762
+ payload := a .reportConnections [0 ]
763
+ // Release lock while we send the payload, this is safe
764
+ // since we only append to the slice.
765
+ a .reportConnectionsMu .Unlock ()
766
+
767
+ logger := a .logger .With (slog .F ("payload" , payload ))
768
+ logger .Debug (ctx , "reporting connection" )
769
+ _ , err := aAPI .ReportConnection (ctx , payload )
770
+ if err != nil {
771
+ return xerrors .Errorf ("failed to report connection: %w" , err )
772
+ }
773
+
774
+ logger .Debug (ctx , "successfully reported connection" )
775
+
776
+ // Remove the payload we sent.
777
+ a .reportConnectionsMu .Lock ()
778
+ a .reportConnections [0 ] = nil // Release the pointer from the underlying array.
779
+ a .reportConnections = a .reportConnections [1 :]
780
+ a .reportConnectionsMu .Unlock ()
781
+ }
782
+ }
783
+ }
784
+
785
+ const (
786
+ // reportConnectionBufferLimit limits the number of connection reports we
787
+ // buffer to avoid growing the buffer indefinitely. This should not happen
788
+ // unless the agent has lost connection to coderd for a long time or if
789
+ // the agent is being spammed with connections.
790
+ //
791
+ // If we assume ~150 byte per connection report, this would be around 300KB
792
+ // of memory which seems acceptable. We could reduce this if necessary by
793
+ // not using the proto struct directly.
794
+ reportConnectionBufferLimit = 2048
795
+ )
796
+
797
+ func (a * agent ) reportConnection (id uuid.UUID , connectionType proto.Connection_Type , ip string ) (disconnected func (code int , reason string )) {
798
+ // If the experiment hasn't been enabled, we don't report connections.
799
+ if ! a .experimentalConnectionReports {
800
+ return func (int , string ) {} // Noop.
801
+ }
802
+
803
+ // Remove the port from the IP because ports are not supported in coderd.
804
+ if host , _ , err := net .SplitHostPort (ip ); err != nil {
805
+ a .logger .Error (a .hardCtx , "split host and port for connection report failed" , slog .F ("ip" , ip ), slog .Error (err ))
806
+ } else {
807
+ // Best effort.
808
+ ip = host
809
+ }
810
+
811
+ a .reportConnectionsMu .Lock ()
812
+ defer a .reportConnectionsMu .Unlock ()
813
+
814
+ if len (a .reportConnections ) >= reportConnectionBufferLimit {
815
+ a .logger .Warn (a .hardCtx , "connection report buffer limit reached, dropping connect" ,
816
+ slog .F ("limit" , reportConnectionBufferLimit ),
817
+ slog .F ("connection_id" , id ),
818
+ slog .F ("connection_type" , connectionType ),
819
+ slog .F ("ip" , ip ),
820
+ )
821
+ } else {
822
+ a .reportConnections = append (a .reportConnections , & proto.ReportConnectionRequest {
823
+ Connection : & proto.Connection {
824
+ Id : id [:],
825
+ Action : proto .Connection_CONNECT ,
826
+ Type : connectionType ,
827
+ Timestamp : timestamppb .New (time .Now ()),
828
+ Ip : ip ,
829
+ StatusCode : 0 ,
830
+ Reason : nil ,
831
+ },
832
+ })
833
+ select {
834
+ case a .reportConnectionsUpdate <- struct {}{}:
835
+ default :
836
+ }
837
+ }
838
+
839
+ return func (code int , reason string ) {
840
+ a .reportConnectionsMu .Lock ()
841
+ defer a .reportConnectionsMu .Unlock ()
842
+ if len (a .reportConnections ) >= reportConnectionBufferLimit {
843
+ a .logger .Warn (a .hardCtx , "connection report buffer limit reached, dropping disconnect" ,
844
+ slog .F ("limit" , reportConnectionBufferLimit ),
845
+ slog .F ("connection_id" , id ),
846
+ slog .F ("connection_type" , connectionType ),
847
+ slog .F ("ip" , ip ),
848
+ )
849
+ return
850
+ }
851
+
852
+ a .reportConnections = append (a .reportConnections , & proto.ReportConnectionRequest {
853
+ Connection : & proto.Connection {
854
+ Id : id [:],
855
+ Action : proto .Connection_DISCONNECT ,
856
+ Type : connectionType ,
857
+ Timestamp : timestamppb .New (time .Now ()),
858
+ Ip : ip ,
859
+ StatusCode : int32 (code ), //nolint:gosec
860
+ Reason : & reason ,
861
+ },
862
+ })
863
+ select {
864
+ case a .reportConnectionsUpdate <- struct {}{}:
865
+ default :
866
+ }
867
+ }
868
+ }
869
+
716
870
// fetchServiceBannerLoop fetches the service banner on an interval. It will
717
871
// not be fetched immediately; the expectation is that it is primed elsewhere
718
872
// (and must be done before the session actually starts).
@@ -823,6 +977,10 @@ func (a *agent) run() (retErr error) {
823
977
return resourcesmonitor .Start (ctx )
824
978
})
825
979
980
+ // Connection reports are part of auditing, we should keep sending them via
981
+ // gracefulShutdownBehaviorRemain.
982
+ connMan .startAgentAPI ("report connections" , gracefulShutdownBehaviorRemain , a .reportConnectionsLoop )
983
+
826
984
// channels to sync goroutines below
827
985
// handle manifest
828
986
// |
0 commit comments