1
1
import { Pool } from "../deps/postgres.ts" ;
2
2
import type { PoolClient , ClientOptions } from "../deps/postgres.ts" ;
3
- import { CompiledQuery } from "../deps/kysely.ts" ;
3
+ import { CompiledQuery , PostgresCursorConstructor } from "../deps/kysely.ts" ;
4
4
import type {
5
5
DatabaseConnection ,
6
6
QueryResult ,
7
7
Driver ,
8
8
TransactionSettings ,
9
9
} from "../deps/kysely.ts" ;
10
+ import { extendStackTrace } from "../deps/kysely.ts" ;
10
11
11
12
const PRIVATE_RELEASE_METHOD = Symbol ( ) ;
12
13
@@ -30,7 +31,7 @@ export class PostgresDriver implements Driver {
30
31
let connection = this . #connections. get ( client ) ;
31
32
32
33
if ( ! connection ) {
33
- connection = new PostgresConnection ( client ) ;
34
+ connection = new PostgresConnection ( client , { cursor : null } ) ;
34
35
this . #connections. set ( client , connection ) ;
35
36
36
37
// The driver must take care of calling `onCreateConnection` when a new
@@ -82,29 +83,51 @@ export class PostgresDriver implements Driver {
82
83
}
83
84
}
84
85
86
+ interface PostgresConnectionOptions {
87
+ cursor : PostgresCursorConstructor | null ;
88
+ }
89
+
85
90
class PostgresConnection implements DatabaseConnection {
86
91
#client: PoolClient ;
92
+ #options: PostgresConnectionOptions ;
87
93
88
- constructor ( client : PoolClient ) {
94
+ constructor ( client : PoolClient , options : PostgresConnectionOptions ) {
89
95
this . #client = client ;
96
+ this . #options = options ;
90
97
}
91
98
92
99
async executeQuery < O > ( compiledQuery : CompiledQuery ) : Promise < QueryResult < O > > {
93
- const result = await this . #client. queryObject < O > (
94
- compiledQuery . sql ,
95
- ...compiledQuery . parameters
96
- ) ;
100
+ try {
101
+ const result = await this . #client. queryObject < O > ( compiledQuery . sql , [
102
+ ...compiledQuery . parameters ,
103
+ ] ) ;
104
+
105
+ if ( result . command === "UPDATE" || result . command === "DELETE" ) {
106
+ return {
107
+ numUpdatedOrDeletedRows : BigInt ( result . rowCount ! ) ,
108
+ rows : result . rows ?? [ ] ,
109
+ } ;
110
+ }
97
111
98
- if ( result . command === "UPDATE" || result . command === "DELETE" ) {
99
112
return {
100
- numUpdatedOrDeletedRows : BigInt ( result . rowCount ! ) ,
101
113
rows : result . rows ?? [ ] ,
102
114
} ;
115
+ } catch ( err ) {
116
+ throw extendStackTrace ( err , new Error ( ) ) ;
117
+ }
118
+ }
119
+
120
+ async * streamQuery < O > (
121
+ _compiledQuery : CompiledQuery ,
122
+ _chunkSize : number
123
+ ) : AsyncIterableIterator < QueryResult < O > > {
124
+ if ( ! this . #options. cursor ) {
125
+ throw new Error (
126
+ "'cursor' is not present in your postgres dialect config. It's required to make streaming work in postgres."
127
+ ) ;
103
128
}
104
129
105
- return {
106
- rows : result . rows ?? [ ] ,
107
- } ;
130
+ // Deno postgress does not support streaming
108
131
}
109
132
110
133
[ PRIVATE_RELEASE_METHOD ] ( ) : void {
0 commit comments