@@ -88,6 +88,10 @@ func (s *AnthropicMessagesStreamingSession) ProcessRequest(w http.ResponseWriter
88
88
messages := s .req .BetaMessageNewParams
89
89
logger := s .logger .With (slog .F ("model" , s .req .Model ))
90
90
91
+ // Accumulate usage across the entire streaming interaction (including tool reinvocations).
92
+ var cumulativeInputTokens int64
93
+ var cumulativeOutputTokens int64
94
+
91
95
isFirst := true
92
96
for {
93
97
newStream:
@@ -134,19 +138,9 @@ func (s *AnthropicMessagesStreamingSession) ProcessRequest(w http.ResponseWriter
134
138
continue
135
139
}
136
140
case string (ant_constant .ValueOf [ant_constant.MessageStart ]()):
137
- // Track token usage
138
141
start := event .AsMessageStart ()
139
- metadata := Metadata {
140
- "web_search_requests" : start .Message .Usage .ServerToolUse .WebSearchRequests ,
141
- "cache_creation_input" : start .Message .Usage .CacheCreationInputTokens ,
142
- "cache_read_input" : start .Message .Usage .CacheReadInputTokens ,
143
- "cache_ephemeral_1h_input" : start .Message .Usage .CacheCreation .Ephemeral1hInputTokens ,
144
- "cache_ephemeral_5m_input" : start .Message .Usage .CacheCreation .Ephemeral5mInputTokens ,
145
- }
146
- if err := s .tracker .TrackTokensUsage (streamCtx , s .id , message .ID , start .Message .Usage .InputTokens , start .Message .Usage .OutputTokens , metadata ); err != nil {
147
- logger .Warn (ctx , "failed to track token usage" , slog .Error (err ))
148
- }
149
-
142
+ cumulativeInputTokens += start .Message .Usage .InputTokens
143
+ cumulativeOutputTokens += start .Message .Usage .OutputTokens
150
144
if ! isFirst {
151
145
// Don't send message_start unless first message!
152
146
// We're sending multiple messages back and forth with the API, but from the client's perspective
@@ -155,19 +149,8 @@ func (s *AnthropicMessagesStreamingSession) ProcessRequest(w http.ResponseWriter
155
149
}
156
150
case string (ant_constant .ValueOf [ant_constant.MessageDelta ]()):
157
151
delta := event .AsMessageDelta ()
158
- // Track token usage
159
- metadata := Metadata {
160
- "web_search_requests" : delta .Usage .ServerToolUse .WebSearchRequests ,
161
- "cache_creation_input" : delta .Usage .CacheCreationInputTokens ,
162
- "cache_read_input" : delta .Usage .CacheReadInputTokens ,
163
- // Note: CacheCreation fields are not available in MessageDeltaUsage
164
- "cache_ephemeral_1h_input" : 0 ,
165
- "cache_ephemeral_5m_input" : 0 ,
166
- }
167
- if err := s .tracker .TrackTokensUsage (streamCtx , s .id , message .ID , delta .Usage .InputTokens , delta .Usage .OutputTokens , metadata ); err != nil {
168
- logger .Warn (ctx , "failed to track token usage" , slog .Error (err ))
169
- }
170
-
152
+ cumulativeInputTokens += delta .Usage .InputTokens
153
+ cumulativeOutputTokens += delta .Usage .OutputTokens
171
154
// Don't relay message_delta events which indicate injected tool use.
172
155
if len (pendingToolCalls ) > 0 && s .toolMgr .GetTool (lastToolName ) != nil {
173
156
continue
@@ -183,7 +166,6 @@ func (s *AnthropicMessagesStreamingSession) ProcessRequest(w http.ResponseWriter
183
166
184
167
// Don't send message_stop until all tools have been called.
185
168
case string (ant_constant .ValueOf [ant_constant.MessageStop ]()):
186
-
187
169
if len (pendingToolCalls ) > 0 {
188
170
// Append the whole message from this stream as context since we'll be sending a new request with the tool results.
189
171
messages .Messages = append (messages .Messages , message .ToParam ())
@@ -347,6 +329,18 @@ func (s *AnthropicMessagesStreamingSession) ProcessRequest(w http.ResponseWriter
347
329
}
348
330
}
349
331
332
+ // Emit a single, final token usage total for this stream.
333
+ metadata := Metadata {
334
+ "web_search_requests" : message .Usage .ServerToolUse .WebSearchRequests ,
335
+ "cache_creation_input" : message .Usage .CacheCreationInputTokens ,
336
+ "cache_read_input" : message .Usage .CacheReadInputTokens ,
337
+ "cache_ephemeral_1h_input" : message .Usage .CacheCreation .Ephemeral1hInputTokens ,
338
+ "cache_ephemeral_5m_input" : message .Usage .CacheCreation .Ephemeral5mInputTokens ,
339
+ }
340
+ if err := s .tracker .TrackTokensUsage (streamCtx , s .id , message .ID , cumulativeInputTokens , cumulativeOutputTokens , metadata ); err != nil {
341
+ logger .Warn (ctx , "failed to track token usage" , slog .Error (err ))
342
+ }
343
+
350
344
var streamErr error
351
345
if streamErr = stream .Err (); streamErr != nil {
352
346
if isConnectionError (streamErr ) {
0 commit comments