-
Notifications
You must be signed in to change notification settings - Fork 928
refactor: add postgres tailnet coordinator #8044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
enterprise/tailnet/pgcoord.go
Outdated
func (q *querier) newConn(c *connIO) { | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
mk := mKey{ | ||
agent: c.agent, | ||
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself | ||
clientsOfAgent: c.client == uuid.Nil, | ||
} | ||
cm, ok := q.mappers[mk] | ||
if !ok { | ||
ctx, cancel := context.WithCancel(q.ctx) | ||
mpr := newMapper(ctx, q.logger, mk, q.heartbeats) | ||
cm = &countedMapper{ | ||
mapper: mpr, | ||
count: 0, | ||
cancel: cancel, | ||
} | ||
q.mappers[mk] = cm | ||
// we don't have any mapping state for this key yet | ||
q.workQ.enqueue(mk) | ||
} | ||
if err := sendCtx(cm.ctx, cm.add, c); err != nil { | ||
return | ||
} | ||
cm.count++ | ||
go q.waitForConn(c) | ||
} | ||
|
||
func (q *querier) waitForConn(c *connIO) { | ||
<-c.ctx.Done() | ||
q.mu.Lock() | ||
defer q.mu.Unlock() | ||
mk := mKey{ | ||
agent: c.agent, | ||
// if client is Nil, this is an agent connection, and it wants the mappings for all the clients of itself | ||
clientsOfAgent: c.client == uuid.Nil, | ||
} | ||
cm := q.mappers[mk] | ||
if err := sendCtx(cm.ctx, cm.del, c); err != nil { | ||
return | ||
} | ||
cm.count-- | ||
if cm.count == 0 { | ||
cm.cancel() | ||
delete(q.mappers, mk) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The locking logic here between newConn
and waitForConn
is tricky to follow. Is there a particular reason we lock, spawn a new goroutine, and unlock, only to immediately lock and unlock again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a rename would make this more clear --- waitForConn doesn't do anything immediately, it waits for the connIO's context to expire, and then cleans it up. It's on a separate goroutine so that we don't block newConn from returning.
How do you feed about cleanupConn()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, the key here is <-c.ctx.Done()
-- that's easy to skip over.
cleanupConn
does make more sense in this context.
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't had an opportunity to look through pgcoord
thoroughly, but this looks good from a brief glance. My only comment is related to duplicated heartbeats!
CREATE TABLE tailnet_coordinators ( | ||
id uuid NOT NULL PRIMARY KEY, | ||
heartbeat_at timestamp with time zone NOT NULL | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this be integrated into the replicas table somehow? Feels unnecessary to have both essentially doing the same thing (heartbeating).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe! I'll take a deeper look at the replicas stuff and see
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One positive to moving this out of the replicas table is that each update requires copying the entire row. The smaller the row the less garbage there is to clean up.
CREATE TABLE tailnet_coordinators ( | ||
id uuid NOT NULL PRIMARY KEY, | ||
heartbeat_at timestamp with time zone NOT NULL | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One positive to moving this out of the replicas table is that each update requires copying the entire row. The smaller the row the less garbage there is to clean up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it is a new feature/implementation of the coordinator rather than refactoring. Keep in mind, that it is hard to give sensible feedback due to the size and complexity of the coordinator, but I did my best.
I went through the RFC first to understand implemented ideas like withdrawals or mappings. I have a few high-level questions:
- Which database issues will be harmful to the PG coordinator? Are there ones he won't be able to recover without a restart?
- Considering the number of bugs you're fixing, I have a feeling that we should open-source it instead of putting it in the
enterprise
. I assume that it is better than the in-memory implementation, right? - Did you think about the load testing plan to make sure that it works as intended and can handle expected traffic?
- Security related: is it possible to sabotage the coordinator by sending fake/invalid queries? I know that this implementation is not purely distributed, so it can't be susceptible to typical Sybil attacks, but could it be possible to disconnect/isolate an agent or force it to connect to a node managed/taken over by an attacker?
} | ||
|
||
func (q *querier) listenClient(_ context.Context, msg []byte, err error) { | ||
if xerrors.Is(err, pubsub.ErrDroppedMessages) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If listenClient
expects only pubsub.ErrDroppedMessages
, shouldn't it be extracted to another wrapping function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid I don't understand what you're suggesting. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this to ensure type-safety on the method declaration level:
func (q *querier) listenClientRaw(_ context.Context, msg []byte, err error) {
if err != nil && !xerrors.Is(err, pubsub.ErrDroppedMessages) {
return
}
droppedMessagesErr := err.(pubsub.ErrDroppedMessages)
q.listenClient(msg, droppedMessagesErr)
}
func (q *querier) listenClient(msg []byte, err pubsub.ErrDroppedMessages) {
...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
listenClient()
is a pubsub listener with error. The pubsub either sends a message or an error. If err
is non-nil, then the msg
is zero length and not an actual pubsub message.
Right now, the only error that the pubsub can send, and the only one this code handles is ErrDroppedMessages
.
The usual suspects: data corruption, incompatible schema changes, etc. In particular, it is designed to recover gracefully from temporary loss of connectivity to the database. When the pubsub reconnects, it triggers a resync of mappings. Bindings errors are retried with a backoff. |
I think our current position is that features supporting multiple Coderd replicas are Enterprise licensed. If you have only one Coderd, then the in-memory coordinator is totally sufficient, and will in fact have less latency to set up a new connection. |
After merging, I plan to enable it behind an experiment flag and then run some load tests comparing it to the old |
The coordinator facilitates connectivity between clients and agents. The Coderd API authenticates and authorizes connections into the coordinator, so by design we trust the information being provided by clients and agents. It's of course true that an employee or contractor could attempt to act maliciously, as an insider attack. What could such a malicious actor do?
|
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
I see. It would be awesome to "port" any extra fixes you identified in the existing coordinator to the in-memory one (if you fixed any).
Hopefully not decreased :)
Do you think that we should ensure any integrity by signing exchanged data? I agree that this is a topic for an open discussion, but it can be asked during any security review on the customer side.
I was thinking also about a different vector. Assuming that the attacker has access to the database, are they able to reconfigure agents or coordinators by spoofing mapping entries directly in the database? I know that the attacker can do other bad things, but was curious if it is possible to perform a truly stealth attack against the customer infra. I apologize if this is a silly question, but it is better to be safe than sorry. |
These data are exchanged over a websocket, which, for any serious deployment, will be TLS protected, and therefore, integrity protected. |
It's not a silly question, but the answer is that we consider database access as complete compromise of the system. It would be possible to stealthily attack the system in numerous ways that I won't get into here. |
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
Signed-off-by: Spike Curtis <spike@coder.com>
c.f. #7295 and RFC
This PR adds a postgres-backed, enterprise, distributed tailnet coordinator.
Apologies for the large-ish PR, but this felt like the smallest amount of code that could be coherently understood and reviewed.
This PR adds the new coordinator, but leaves the old, pubsub-only coordinator, and does not wire up the new coordinator to anything. Once this is merged, in a follow-on PR, I plan to introduce the postgres coordinator behind an experiment flag. That will allow me to test the performance impact of the new coordinator in a scale test. Then, I suggest we enable the experiment on our dogfood instance and test for a while.
Other items missing from this PR that will be added later: