Skip to content

Commit 3af1af1

Browse files
committedSep 15, 2023
fixup! add subscriber subsystem
1 parent 751b22f commit 3af1af1

File tree

3 files changed

+17
-23
lines changed

3 files changed

+17
-23
lines changed
 

‎enterprise/tailnet/connio.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func newConnIO(pCtx context.Context,
3434
id uuid.UUID,
3535
name string,
3636
kind agpl.QueueKind,
37-
) (*connIO, error) {
37+
) *connIO {
3838
ctx, cancel := context.WithCancel(pCtx)
3939
c := &connIO{
4040
pCtx: pCtx,
@@ -48,7 +48,7 @@ func newConnIO(pCtx context.Context,
4848
go c.recvLoop()
4949
go c.updates.SendUpdates()
5050
logger.Info(ctx, "serving connection")
51-
return c, nil
51+
return c
5252
}
5353

5454
func (c *connIO) recvLoop() {

‎enterprise/tailnet/pgcoord.go

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,16 @@ import (
2626
)
2727

2828
const (
29-
EventHeartbeats = "tailnet_coordinator_heartbeat"
30-
eventClientUpdate = "tailnet_client_update"
31-
eventClientSubscription = "tailnet_client_subscription_update"
32-
eventAgentUpdate = "tailnet_agent_update"
33-
HeartbeatPeriod = time.Second * 2
34-
MissedHeartbeats = 3
35-
numQuerierWorkers = 10
36-
numBinderWorkers = 10
37-
dbMaxBackoff = 10 * time.Second
38-
cleanupPeriod = time.Hour
29+
EventHeartbeats = "tailnet_coordinator_heartbeat"
30+
eventClientUpdate = "tailnet_client_update"
31+
eventAgentUpdate = "tailnet_agent_update"
32+
HeartbeatPeriod = time.Second * 2
33+
MissedHeartbeats = 3
34+
numQuerierWorkers = 10
35+
numBinderWorkers = 10
36+
numSubscriberWorkers = 10
37+
dbMaxBackoff = 10 * time.Second
38+
cleanupPeriod = time.Hour
3939
)
4040

4141
// TODO: add subscriber to this graphic
@@ -240,10 +240,8 @@ func (c *pgCoord) ServeClient(conn net.Conn, id uuid.UUID, agent uuid.UUID) erro
240240
slog.Error(err))
241241
}
242242
}()
243-
cIO, err := newConnIO(c.ctx, c.logger, c.bindings, conn, id, id.String(), agpl.QueueKindClient)
244-
if err != nil {
245-
return err
246-
}
243+
244+
cIO := newConnIO(c.ctx, c.logger, c.bindings, conn, id, id.String(), agpl.QueueKindClient)
247245
if err := sendCtx(c.ctx, c.newConnections, agpl.Queue(cIO)); err != nil {
248246
// can only be a context error, no need to log here.
249247
return err
@@ -277,10 +275,7 @@ func (c *pgCoord) ServeAgent(conn net.Conn, id uuid.UUID, name string) error {
277275
}
278276
}()
279277
logger := c.logger.With(slog.F("name", name))
280-
cIO, err := newConnIO(c.ctx, logger, c.bindings, conn, id, name, agpl.QueueKindAgent)
281-
if err != nil {
282-
return err
283-
}
278+
cIO := newConnIO(c.ctx, logger, c.bindings, conn, id, name, agpl.QueueKindAgent)
284279
if err := sendCtx(c.ctx, c.newConnections, agpl.Queue(cIO)); err != nil {
285280
// can only be a context error, no need to log here.
286281
return err
@@ -349,7 +344,7 @@ func newSubscriber(ctx context.Context,
349344
go s.handleSubscriptions()
350345
go func() {
351346
<-startWorkers
352-
for i := 0; i < numBinderWorkers; i++ {
347+
for i := 0; i < numSubscriberWorkers; i++ {
353348
go s.worker()
354349
}
355350
}()

‎tailnet/coordinator.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,7 @@ type core struct {
194194
type QueueKind int
195195

196196
const (
197-
_ QueueKind = iota
198-
QueueKindClient
197+
QueueKindClient QueueKind = 1 + iota
199198
QueueKindAgent
200199
)
201200

0 commit comments

Comments
 (0)