@@ -5,14 +5,13 @@ mod error;
5
5
use crate :: {
6
6
ast:: { Query , Value } ,
7
7
connector:: { bind:: Bind , metrics, queryable:: * , timeout:: timeout, ResultSet , Transaction } ,
8
- error:: Error ,
9
8
visitor:: { self , Visitor } ,
10
9
} ;
11
10
use async_trait:: async_trait;
12
11
pub use config:: * ;
13
12
use either:: Either ;
14
- use futures:: { lock:: Mutex , TryStreamExt } ;
15
- use sqlx:: { Column as _, Connect , Executor , PgConnection , Row as _ } ;
13
+ use futures:: lock:: Mutex ;
14
+ use sqlx:: { Column as _, Connect , Executor , PgConnection } ;
16
15
use std:: time:: Duration ;
17
16
18
17
/// A connector interface for the PostgreSQL database.
@@ -71,6 +70,7 @@ impl Queryable for PostgreSql {
71
70
metrics:: query_new ( "postgres.query_raw" , sql, params, |params| async move {
72
71
let mut conn = self . connection . lock ( ) . await ;
73
72
let describe = timeout ( self . socket_timeout , conn. describe ( sql) ) . await ?;
73
+ let columns = describe. columns ( ) . into_iter ( ) . map ( |c| c. name ( ) . to_string ( ) ) . collect ( ) ;
74
74
75
75
let mut query = sqlx:: query ( sql) ;
76
76
@@ -90,22 +90,10 @@ impl Queryable for PostgreSql {
90
90
}
91
91
} ;
92
92
93
- let mut columns = Vec :: new ( ) ;
94
- let mut rows = Vec :: new ( ) ;
95
-
96
- timeout ( self . socket_timeout , async {
97
- let mut stream = query. fetch ( & mut * conn) ;
98
-
99
- while let Some ( row) = stream. try_next ( ) . await ? {
100
- if columns. is_empty ( ) {
101
- columns = row. columns ( ) . iter ( ) . map ( |c| c. name ( ) . to_string ( ) ) . collect ( ) ;
102
- }
103
-
104
- rows. push ( conversion:: map_row ( row) ?) ;
105
- }
106
-
107
- Ok :: < ( ) , Error > ( ( ) )
108
- } )
93
+ let rows = timeout (
94
+ self . socket_timeout ,
95
+ query. try_map ( conversion:: map_row) . fetch_all ( & mut * conn) ,
96
+ )
109
97
. await ?;
110
98
111
99
Ok ( ResultSet :: new ( columns, rows) )
0 commit comments