@@ -7,8 +7,10 @@ import (
77 "fmt"
88 "net/http"
99 "strings"
10+ "time"
1011
1112 "github.com/julienschmidt/httprouter"
13+ "github.com/openai/openai-go/v2/responses"
1214 "github.com/opendatahub-io/gen-ai/internal/constants"
1315 "github.com/opendatahub-io/gen-ai/internal/integrations"
1416 k8s "github.com/opendatahub-io/gen-ai/internal/integrations/kubernetes"
@@ -56,12 +58,13 @@ type StreamingEvent struct {
5658
5759// ResponseData represents the response structure for both streaming and non-streaming
5860type ResponseData struct {
59- ID string `json:"id"`
60- Model string `json:"model"`
61- Status string `json:"status"`
62- CreatedAt int64 `json:"created_at"`
63- Output []OutputItem `json:"output,omitempty"`
64- PreviousResponseID string `json:"previous_response_id,omitempty"` // Reference to previous response in conversation thread
61+ ID string `json:"id"`
62+ Model string `json:"model"`
63+ Status string `json:"status"`
64+ CreatedAt int64 `json:"created_at"`
65+ Output []OutputItem `json:"output,omitempty"`
66+ PreviousResponseID string `json:"previous_response_id,omitempty"` // Reference to previous response in conversation thread
67+ Metrics * ResponseMetrics `json:"metrics,omitempty"` // Response metrics (latency, usage)
6568}
6669
6770// OutputItem represents an output item with essential fields
@@ -97,6 +100,26 @@ type SearchResult struct {
97100 Filename string `json:"filename,omitempty"`
98101}
99102
103+ // ResponseMetrics contains timing and usage metrics for the response
104+ type ResponseMetrics struct {
105+ LatencyMs int64 `json:"latency_ms"` // Total response time in milliseconds
106+ TimeToFirstTokenMs * int64 `json:"time_to_first_token_ms,omitempty"` // TTFT for streaming (nil for non-streaming)
107+ Usage * UsageData `json:"usage,omitempty"` // Token usage data
108+ }
109+
110+ // UsageData contains token usage information from LlamaStack
111+ type UsageData struct {
112+ InputTokens int `json:"input_tokens"`
113+ OutputTokens int `json:"output_tokens"`
114+ TotalTokens int `json:"total_tokens"`
115+ }
116+
117+ // MetricsEvent represents the response.metrics streaming event
118+ type MetricsEvent struct {
119+ Type string `json:"type"` // "response.metrics"
120+ Metrics ResponseMetrics `json:"metrics"` // Metrics data
121+ }
122+
100123// MCPServer represents MCP server configuration for responses
101124type MCPServer struct {
102125 ServerLabel string `json:"server_label"` // Label identifier for the MCP server
@@ -164,6 +187,57 @@ func convertToResponseData(llamaResponse interface{}) ResponseData {
164187 return responseData
165188}
166189
190+ // extractUsage extracts usage data from a LlamaStack response
191+ func extractUsage (llamaResponse interface {}) * UsageData {
192+ // Use type assertion for efficiency (avoids marshal/unmarshal overhead)
193+ if resp , ok := llamaResponse .(* responses.Response ); ok {
194+ return & UsageData {
195+ InputTokens : int (resp .Usage .InputTokens ),
196+ OutputTokens : int (resp .Usage .OutputTokens ),
197+ TotalTokens : int (resp .Usage .TotalTokens ),
198+ }
199+ }
200+ return nil
201+ }
202+
203+ // extractUsageFromEvent extracts usage data from a streaming event (response.completed)
204+ func extractUsageFromEvent (event interface {}) * UsageData {
205+ // The response.completed event contains the full response with usage
206+ eventJSON , err := json .Marshal (event )
207+ if err != nil {
208+ return nil
209+ }
210+
211+ var raw struct {
212+ Response * struct {
213+ Usage * struct {
214+ InputTokens int `json:"input_tokens"`
215+ OutputTokens int `json:"output_tokens"`
216+ TotalTokens int `json:"total_tokens"`
217+ } `json:"usage"`
218+ } `json:"response"`
219+ }
220+
221+ if err := json .Unmarshal (eventJSON , & raw ); err != nil || raw .Response == nil || raw .Response .Usage == nil {
222+ return nil
223+ }
224+
225+ return & UsageData {
226+ InputTokens : raw .Response .Usage .InputTokens ,
227+ OutputTokens : raw .Response .Usage .OutputTokens ,
228+ TotalTokens : raw .Response .Usage .TotalTokens ,
229+ }
230+ }
231+
232+ // calculateTTFT calculates Time to First Token in milliseconds
233+ func calculateTTFT (startTime time.Time , firstTokenTime * time.Time ) * int64 {
234+ if firstTokenTime == nil {
235+ return nil
236+ }
237+ ttft := firstTokenTime .Sub (startTime ).Milliseconds ()
238+ return & ttft
239+ }
240+
167241// LlamaStackCreateResponseHandler handles POST /gen-ai/api/v1/responses
168242func (app * App ) LlamaStackCreateResponseHandler (w http.ResponseWriter , r * http.Request , _ httprouter.Params ) {
169243 ctx := r .Context ()
@@ -322,6 +396,11 @@ func (app *App) LlamaStackCreateResponseHandler(w http.ResponseWriter, r *http.R
322396
323397// handleStreamingResponse handles streaming response creation
324398func (app * App ) handleStreamingResponse (w http.ResponseWriter , r * http.Request , ctx context.Context , params llamastack.CreateResponseParams ) {
399+ // Track start time for latency and TTFT calculation
400+ startTime := time .Now ()
401+ var firstTokenTime * time.Time
402+ var usage * UsageData
403+
325404 // Check if ResponseWriter supports streaming - fail fast if not
326405 flusher , ok := w .(http.Flusher )
327406 if ! ok {
@@ -373,6 +452,17 @@ func (app *App) handleStreamingResponse(w http.ResponseWriter, r *http.Request,
373452 continue
374453 }
375454
455+ // Track TTFT on first text delta event
456+ if streamingEvent .Type == "response.output_text.delta" && firstTokenTime == nil {
457+ now := time .Now ()
458+ firstTokenTime = & now
459+ }
460+
461+ // Extract usage from completed event
462+ if streamingEvent .Type == "response.completed" {
463+ usage = extractUsageFromEvent (event )
464+ }
465+
376466 // Convert clean streaming event to JSON
377467 eventData , err := json .Marshal (streamingEvent )
378468 if err != nil {
@@ -406,16 +496,40 @@ func (app *App) handleStreamingResponse(w http.ResponseWriter, r *http.Request,
406496 errorJSON , _ := json .Marshal (errorData )
407497 fmt .Fprintf (w , "data: %s\n \n " , errorJSON )
408498 }
499+
500+ // Send metrics event after stream completes
501+ latencyMs := time .Since (startTime ).Milliseconds ()
502+ metricsEvent := MetricsEvent {
503+ Type : "response.metrics" ,
504+ Metrics : ResponseMetrics {
505+ LatencyMs : latencyMs ,
506+ TimeToFirstTokenMs : calculateTTFT (startTime , firstTokenTime ),
507+ Usage : usage ,
508+ },
509+ }
510+ eventData , err := json .Marshal (metricsEvent )
511+ if err != nil {
512+ app .logger .Error ("failed to marshal metrics event" , "error" , err )
513+ return
514+ }
515+ fmt .Fprintf (w , "data: %s\n \n " , eventData )
516+ flusher .Flush ()
409517}
410518
411519// handleNonStreamingResponse handles regular (non-streaming) response creation
412520func (app * App ) handleNonStreamingResponse (w http.ResponseWriter , r * http.Request , ctx context.Context , params llamastack.CreateResponseParams ) {
521+ // Track start time for latency calculation
522+ startTime := time .Now ()
523+
413524 llamaResponse , err := app .repositories .Responses .CreateResponse (ctx , params )
414525 if err != nil {
415526 app .handleLlamaStackClientError (w , r , err )
416527 return
417528 }
418529
530+ // Calculate latency
531+ latencyMs := time .Since (startTime ).Milliseconds ()
532+
419533 // Convert to clean response data
420534 responseData := convertToResponseData (llamaResponse )
421535
@@ -444,6 +558,12 @@ func (app *App) handleNonStreamingResponse(w http.ResponseWriter, r *http.Reques
444558 responseData .PreviousResponseID = params .PreviousResponseID
445559 }
446560
561+ // Add metrics to response
562+ responseData .Metrics = & ResponseMetrics {
563+ LatencyMs : latencyMs ,
564+ Usage : extractUsage (llamaResponse ),
565+ }
566+
447567 apiResponse := llamastack.APIResponse {
448568 Data : responseData ,
449569 }
0 commit comments