@@ -167,6 +167,8 @@ type clientConn struct {
167167 txStatus byte // current transaction status ('I', 'T', or 'E')
168168 ignoreTillSync bool // discard extended-query messages until Sync after an error; see runExtendedQueryMessage
169169 errorResponsesSent uint64 // ErrorResponses sent via sendError; observed by runExtendedQueryMessage
170+ lastErrorCode string // most recent SQLSTATE sent via sendError; observed by query metrics
171+ activeQueryMetrics * queryMetricsScope // active query attempt metrics scope for non-ErrorResponse failures
170172 passthrough bool // true for passthrough users (skip transpiler + pg_catalog)
171173 cursors map [string ]* cursorState // server-side cursor emulation
172174 catalogUseRewrite bool // true when bare `USE ducklake`/`USE iceberg` should expand to the reliable two-part target
@@ -873,10 +875,10 @@ func (c *clientConn) serve() error {
873875 c .sendInitialParams ()
874876
875877 // Send ready for query
876- if err := wire . WriteReadyForQuery ( c . writer , c .txStatus ); err != nil {
878+ if err := c . writeReadyForQuery ( c .txStatus ); err != nil {
877879 return err
878880 }
879- if err := c .writer . Flush (); err != nil {
881+ if err := c .flushWriter (); err != nil {
880882 return fmt .Errorf ("failed to flush writer: %w" , err )
881883 }
882884
@@ -978,7 +980,7 @@ func (c *clientConn) handleStartup() error {
978980 if err := wire .WriteAuthCleartextPassword (c .writer ); err != nil {
979981 return err
980982 }
981- if err := c .writer . Flush (); err != nil {
983+ if err := c .flushWriter (); err != nil {
982984 return fmt .Errorf ("failed to flush writer: %w" , err )
983985 }
984986
@@ -1096,10 +1098,10 @@ func (c *clientConn) messageLoop() error {
10961098 // Extended query protocol - Sync: ends any skip-until-Sync error
10971099 // recovery, then reports readiness.
10981100 c .ignoreTillSync = false
1099- if err := wire . WriteReadyForQuery ( c . writer , c .txStatus ); err != nil {
1101+ if err := c . writeReadyForQuery ( c .txStatus ); err != nil {
11001102 return err
11011103 }
1102- _ = c .writer . Flush ()
1104+ _ = c .flushWriter ()
11031105
11041106 case wire .MsgClose :
11051107 // Extended query protocol - Close
@@ -1109,7 +1111,7 @@ func (c *clientConn) messageLoop() error {
11091111 // Discarded during skip-until-Sync error recovery, like real
11101112 // PostgreSQL (the ErrorResponse was already flushed by sendError).
11111113 if ! c .ignoreTillSync {
1112- _ = c .writer . Flush ()
1114+ _ = c .flushWriter ()
11131115 }
11141116
11151117 case wire .MsgTerminate :
@@ -1121,16 +1123,16 @@ func (c *clientConn) messageLoop() error {
11211123 }
11221124}
11231125
1124- func (c * clientConn ) handleQuery (body []byte ) error {
1126+ func (c * clientConn ) handleQuery (body []byte ) ( retErr error ) {
11251127 query := string (bytes .TrimRight (body , "\x00 " ))
11261128 query = strings .TrimSpace (query )
11271129
11281130 // Treat empty queries or queries with just semicolons as empty
11291131 // PostgreSQL returns EmptyQueryResponse for queries like "" or ";" or ";;;"
11301132 if query == "" || isEmptyQuery (query ) {
11311133 _ = wire .WriteEmptyQueryResponse (c .writer )
1132- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1133- _ = c .writer . Flush ()
1134+ _ = c . writeReadyForQuery ( c .txStatus )
1135+ _ = c .flushWriter ()
11341136 return nil
11351137 }
11361138
@@ -1146,7 +1148,13 @@ func (c *clientConn) handleQuery(body []byte) error {
11461148 }()
11471149
11481150 start := time .Now ()
1149- defer func () { queryDurationHistogram .WithLabelValues (c .orgID ).Observe (time .Since (start ).Seconds ()) }()
1151+ queryMetrics := c .beginQueryMetrics (start )
1152+ defer func () {
1153+ if retErr != nil {
1154+ queryMetrics .markError (retErr )
1155+ }
1156+ c .finishQueryMetrics (queryMetrics )
1157+ }()
11501158
11511159 ctx , span := observe .Tracer ().Start (c .ctx , "duckgres.query" ,
11521160 trace .WithAttributes (
@@ -1244,8 +1252,8 @@ func (c *clientConn) handleQuery(body []byte) error {
12441252 if err != nil {
12451253 // Transform error - send error to client
12461254 c .sendError ("ERROR" , "42601" , fmt .Sprintf ("syntax error: %v" , err ))
1247- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1248- _ = c .writer . Flush ()
1255+ _ = c . writeReadyForQuery ( c .txStatus )
1256+ _ = c .flushWriter ()
12491257 return nil
12501258 }
12511259
@@ -1254,8 +1262,8 @@ func (c *clientConn) handleQuery(body []byte) error {
12541262 if err := c .validateWithDuckDB (query ); err != nil {
12551263 // Neither PostgreSQL nor DuckDB can parse this query
12561264 c .sendError ("ERROR" , "42601" , fmt .Sprintf ("syntax error: %v" , err ))
1257- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1258- _ = c .writer . Flush ()
1265+ _ = c . writeReadyForQuery ( c .txStatus )
1266+ _ = c .flushWriter ()
12591267 return nil
12601268 }
12611269 c .logger ().Debug ("Fallback to native DuckDB: query not valid PostgreSQL but valid DuckDB." , "query" , loggableQuery )
@@ -1264,8 +1272,8 @@ func (c *clientConn) handleQuery(body []byte) error {
12641272 // Handle transform-detected errors (e.g., unrecognized config parameter)
12651273 if result .Error != nil {
12661274 c .sendError ("ERROR" , transformErrorSQLState (result .Error ), result .Error .Error ())
1267- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1268- _ = c .writer . Flush ()
1275+ _ = c . writeReadyForQuery ( c .txStatus )
1276+ _ = c .flushWriter ()
12691277 return nil
12701278 }
12711279
@@ -1278,18 +1286,18 @@ func (c *clientConn) handleQuery(body []byte) error {
12781286 // Handle ignored SET parameters
12791287 if result .IsIgnoredSet {
12801288 c .logger ().Debug ("Ignoring PostgreSQL-specific SET." , "query" , query )
1281- _ = wire . WriteCommandComplete ( c . writer , "SET" )
1282- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1283- _ = c .writer . Flush ()
1289+ _ = c . writeCommandComplete ( "SET" )
1290+ _ = c . writeReadyForQuery ( c .txStatus )
1291+ _ = c .flushWriter ()
12841292 return nil
12851293 }
12861294
12871295 // Handle no-op commands (CREATE INDEX, VACUUM, etc.)
12881296 if result .IsNoOp {
12891297 c .logger ().Debug ("No-op command (DuckLake limitation)." , "query" , query )
1290- _ = wire . WriteCommandComplete ( c . writer , result .NoOpTag )
1291- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1292- _ = c .writer . Flush ()
1298+ _ = c . writeCommandComplete ( result .NoOpTag )
1299+ _ = c . writeReadyForQuery ( c .txStatus )
1300+ _ = c .flushWriter ()
12931301 return nil
12941302 }
12951303
@@ -1323,9 +1331,9 @@ func (c *clientConn) handleQuery(body []byte) error {
13231331 // while DuckDB throws an error. Match PostgreSQL behavior.
13241332 if cmdType == "BEGIN" && c .txStatus == txStatusTransaction {
13251333 c .sendNotice ("WARNING" , "25001" , "there is already a transaction in progress" )
1326- _ = wire . WriteCommandComplete ( c . writer , "BEGIN" )
1327- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1328- _ = c .writer . Flush ()
1334+ _ = c . writeCommandComplete ( "BEGIN" )
1335+ _ = c . writeReadyForQuery ( c .txStatus )
1336+ _ = c .flushWriter ()
13291337 return nil
13301338 }
13311339
@@ -1377,8 +1385,8 @@ func (c *clientConn) handleQuery(body []byte) error {
13771385 c .sendError ("ERROR" , errCode , errMsg )
13781386 c .setTxError ()
13791387 c .logQuery (start , originalQuery , query , cmdType , 0 , 0 , errCode , errMsg , "simple" )
1380- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1381- _ = c .writer . Flush ()
1388+ _ = c . writeReadyForQuery ( c .txStatus )
1389+ _ = c .flushWriter ()
13821390 return nil
13831391 }
13841392 }
@@ -1389,10 +1397,10 @@ func (c *clientConn) handleQuery(body []byte) error {
13891397 }
13901398 c .updateTxStatus (cmdType )
13911399 tag := c .buildCommandTag (cmdType , execResult )
1392- _ = wire . WriteCommandComplete ( c . writer , tag )
1400+ _ = c . writeCommandComplete ( tag )
13931401 c .logQuery (start , originalQuery , query , cmdType , 0 , writtenRows , "" , "" , "simple" )
1394- _ = wire . WriteReadyForQuery ( c . writer , c .txStatus )
1395- _ = c .writer . Flush ()
1402+ _ = c . writeReadyForQuery ( c .txStatus )
1403+ _ = c .flushWriter ()
13961404 return nil
13971405 }
13981406
0 commit comments