1
1
package aibridged
2
2
3
3
import (
4
- "context"
5
- "fmt"
6
- "io"
7
4
"net/http"
8
5
"time"
9
6
10
7
"golang.org/x/xerrors"
11
8
12
9
"cdr.dev/slog"
13
10
14
- "github.com/google/uuid"
15
-
16
11
"github.com/coder/coder/v2/aibridged/proto"
17
12
"github.com/coder/coder/v2/codersdk"
18
13
)
@@ -39,20 +34,18 @@ type Bridge struct {
39
34
tools map [string ]* MCPTool
40
35
}
41
36
42
- func NewBridge (cfg codersdk.AIBridgeConfig , logger slog.Logger , clientFn func () (proto.DRPCAIBridgeDaemonClient , error ), tools map [string ][]* MCPTool ) (* Bridge , error ) {
43
- var bridge Bridge
44
-
45
- openAIChatProvider := NewOpenAIChatProvider (cfg .OpenAI .BaseURL .String (), cfg .OpenAI .Key .String ())
46
- anthropicMessagesProvider := NewAnthropicMessagesProvider (cfg .Anthropic .BaseURL .String (), cfg .Anthropic .Key .String ())
47
-
37
+ func NewBridge (cfg codersdk.AIBridgeConfig , logger slog.Logger , clientFn func () (proto.DRPCAIBridgeDaemonClient , error ), tools ToolRegistry ) (* Bridge , error ) {
48
38
drpcClient , err := clientFn ()
49
39
if err != nil {
50
40
return nil , xerrors .Errorf ("could not acquire coderd client for tracking: %w" , err )
51
41
}
52
42
43
+ openAIProvider := NewOpenAIProvider (cfg .OpenAI .BaseURL .String (), cfg .OpenAI .Key .String ())
44
+ anthropicMessagesProvider := NewAnthropicMessagesProvider (cfg .Anthropic .BaseURL .String (), cfg .Anthropic .Key .String ())
45
+
53
46
mux := & http.ServeMux {}
54
- mux .HandleFunc ("/v1/chat/completions" , handleOpenAIChat ( openAIChatProvider , drpcClient , tools , logger . Named ( "openai" ) ))
55
- mux .HandleFunc ("/v1/messages" , handleAnthropicMessages (anthropicMessagesProvider , drpcClient , tools , logger . Named ( "anthropic" ) ))
47
+ mux .HandleFunc ("/v1/chat/completions" , NewSessionProcessor ( openAIProvider , logger , drpcClient , tools ))
48
+ mux .HandleFunc ("/v1/messages" , NewSessionProcessor (anthropicMessagesProvider , logger , drpcClient , tools ))
56
49
57
50
srv := & http.Server {
58
51
Handler : mux ,
@@ -64,6 +57,7 @@ func NewBridge(cfg codersdk.AIBridgeConfig, logger slog.Logger, clientFn func()
64
57
ReadHeaderTimeout : 10 * time .Second ,
65
58
}
66
59
60
+ var bridge Bridge
67
61
bridge .cfg = cfg
68
62
bridge .httpSrv = srv
69
63
bridge .clientFn = clientFn
@@ -82,131 +76,3 @@ func NewBridge(cfg codersdk.AIBridgeConfig, logger slog.Logger, clientFn func()
82
76
func (b * Bridge ) Handler () http.Handler {
83
77
return b .httpSrv .Handler
84
78
}
85
-
86
- func handleOpenAIChat (provider * OpenAIChatProvider , drpcClient proto.DRPCAIBridgeDaemonClient , tools map [string ][]* MCPTool , logger slog.Logger ) func (http.ResponseWriter , * http.Request ) {
87
- return func (w http.ResponseWriter , r * http.Request ) {
88
- // Read and parse request.
89
- body , err := io .ReadAll (r .Body )
90
- if err != nil {
91
- if isConnectionError (err ) {
92
- logger .Debug (r .Context (), "client disconnected during request body read" , slog .Error (err ))
93
- return // Don't send error response if client already disconnected
94
- }
95
- logger .Error (r .Context (), "failed to read body" , slog .Error (err ))
96
- http .Error (w , "failed to read body" , http .StatusInternalServerError )
97
- return
98
- }
99
- req , err := provider .ParseRequest (body )
100
- if err != nil {
101
- logger .Error (r .Context (), "failed to parse request" , slog .Error (err ))
102
- http .Error (w , "failed to parse request" , http .StatusBadRequest )
103
- return
104
- }
105
-
106
- // Create a new session.
107
- var sess Session
108
- if req .Stream {
109
- sess = provider .NewStreamingSession (req )
110
- } else {
111
- sess = provider .NewBlockingSession (req )
112
- }
113
-
114
- userID , ok := r .Context ().Value (ContextKeyBridgeUserID {}).(uuid.UUID )
115
- if ! ok {
116
- logger .Error (r .Context (), "missing initiator ID in context" )
117
- http .Error (w , "unable to retrieve initiator" , http .StatusInternalServerError )
118
- return
119
- }
120
-
121
- resp , err := drpcClient .StartSession (r .Context (), & proto.StartSessionRequest {
122
- InitiatorId : userID .String (),
123
- Provider : "openai" ,
124
- Model : req .Model ,
125
- })
126
- if err != nil {
127
- logger .Error (r .Context (), "failed to start session" , slog .Error (err ))
128
- http .Error (w , "failed to start session" , http .StatusInternalServerError )
129
- return
130
- }
131
-
132
- sessID := resp .GetSessionId ()
133
-
134
- sess .Init (sessID , logger , provider .baseURL , provider .key , NewDRPCTracker (drpcClient ), NewInjectedToolManager (tools ))
135
- logger .Debug (context .Background (), "starting openai session" , slog .F ("session_id" , sessID ))
136
-
137
- defer func () {
138
- if err := sess .Close (); err != nil {
139
- logger .Warn (context .Background (), "failed to close session" , slog .Error (err ), slog .F ("session_id" , sessID ), slog .F ("kind" , fmt .Sprintf ("%T" , sess )))
140
- }
141
- }()
142
-
143
- // Process the request.
144
- if err := sess .ProcessRequest (w , r ); err != nil {
145
- logger .Error (r .Context (), "session execution failed" , slog .Error (err ))
146
- }
147
- }
148
- }
149
-
150
- func handleAnthropicMessages (provider * AnthropicMessagesProvider , drpcClient proto.DRPCAIBridgeDaemonClient , tools map [string ][]* MCPTool , logger slog.Logger ) func (http.ResponseWriter , * http.Request ) {
151
- return func (w http.ResponseWriter , r * http.Request ) {
152
- // Read and parse request.
153
- body , err := io .ReadAll (r .Body )
154
- if err != nil {
155
- if isConnectionError (err ) {
156
- logger .Debug (r .Context (), "client disconnected during request body read" , slog .Error (err ))
157
- return // Don't send error response if client already disconnected
158
- }
159
- logger .Error (r .Context (), "failed to read body" , slog .Error (err ))
160
- http .Error (w , "failed to read body" , http .StatusInternalServerError )
161
- return
162
- }
163
- req , err := provider .ParseRequest (body )
164
- if err != nil {
165
- logger .Error (r .Context (), "failed to parse request" , slog .Error (err ))
166
- http .Error (w , "failed to parse request" , http .StatusBadRequest )
167
- return
168
- }
169
-
170
- // Create a new session.
171
- var sess Session
172
- if req .UseStreaming () {
173
- sess = provider .NewStreamingSession (req )
174
- } else {
175
- sess = provider .NewBlockingSession (req )
176
- }
177
-
178
- userID , ok := r .Context ().Value (ContextKeyBridgeUserID {}).(uuid.UUID )
179
- if ! ok {
180
- logger .Error (r .Context (), "missing initiator ID in context" )
181
- http .Error (w , "unable to retrieve initiator" , http .StatusInternalServerError )
182
- return
183
- }
184
-
185
- resp , err := drpcClient .StartSession (r .Context (), & proto.StartSessionRequest {
186
- InitiatorId : userID .String (),
187
- Provider : "anthropic" ,
188
- Model : string (req .Model ),
189
- })
190
- if err != nil {
191
- logger .Error (r .Context (), "failed to start session" , slog .Error (err ))
192
- http .Error (w , "failed to start session" , http .StatusInternalServerError )
193
- return
194
- }
195
-
196
- sessID := resp .GetSessionId ()
197
-
198
- sess .Init (sessID , logger , provider .baseURL , provider .key , NewDRPCTracker (drpcClient ), NewInjectedToolManager (tools ))
199
- logger .Debug (context .Background (), "starting anthropic messages session" , slog .F ("session_id" , sessID ))
200
-
201
- defer func () {
202
- if err := sess .Close (); err != nil {
203
- logger .Warn (context .Background (), "failed to close session" , slog .Error (err ), slog .F ("session_id" , sessID ), slog .F ("kind" , fmt .Sprintf ("%T" , sess )))
204
- }
205
- }()
206
-
207
- // Process the request.
208
- if err := sess .ProcessRequest (w , r ); err != nil {
209
- logger .Error (r .Context (), "session execution failed" , slog .Error (err ))
210
- }
211
- }
212
- }
0 commit comments