@@ -9,6 +9,7 @@ import dayjs from "dayjs";
99import logger from "../shared/utils/logger" ;
1010import PartitionedFeedArticleFieldInsert from "./types/pending-feed-article-field-insert.types" ;
1111import { AsyncLocalStorage } from "node:async_hooks" ;
12+ import { PostgreSqlDriver } from "@mikro-orm/postgresql" ;
1213
1314interface AsyncStore {
1415 toInsert : PartitionedFeedArticleFieldInsert [ ] ;
@@ -21,7 +22,7 @@ export class PartitionedFeedArticleFieldStoreService {
2122 connection : Connection ;
2223 TABLE_NAME = "feed_article_field_partitioned" ;
2324
24- constructor ( private readonly orm : MikroORM ) {
25+ constructor ( private readonly orm : MikroORM < PostgreSqlDriver > ) {
2526 this . connection = this . orm . em . getConnection ( ) ;
2627 }
2728
@@ -127,25 +128,30 @@ export class PartitionedFeedArticleFieldStoreService {
127128 [ oneMonthAgo , feedId , ...ids ]
128129 ) ;
129130 } else {
130- const temporaryTableName = `current_article_ids_${ feedId } ` ;
131- const sql =
132- `CREATE TEMP TABLE ${ temporaryTableName } AS` +
133- ` SELECT * FROM (VALUES ${ ids . map ( ( ) => "(?)" ) . join ( ", " ) } ) AS t(id)` ;
134-
135- await this . connection . execute ( sql , ids ) ;
136-
137- const result = await this . connection . execute (
138- `SELECT field_hashed_value` +
139- ` FROM ${ this . TABLE_NAME } ` +
140- ` INNER JOIN ${ temporaryTableName } t ON (field_hashed_value = t.id)` +
141- ` WHERE ${
142- olderThanOneMonth ? `created_at <= ?` : `created_at > ?`
143- } AND feed_id = ? AND field_name = 'id'` +
144- `` ,
145- [ oneMonthAgo , feedId ]
146- ) ;
147-
148- await this . connection . execute ( `DROP TABLE ${ temporaryTableName } ` ) ;
131+ const result = await this . orm . em . transactional ( async ( em ) => {
132+ const temporaryTableName = `current_article_ids_${ feedId } ` ;
133+ const sql =
134+ `CREATE TEMP TABLE ${ temporaryTableName } AS` +
135+ ` SELECT * FROM (VALUES ${ ids . map ( ( ) => "(?)" ) . join ( ", " ) } ) AS t(id)` ;
136+
137+ await em . execute ( sql , ids ) ;
138+
139+ const result = await em . execute (
140+ `SELECT field_hashed_value` +
141+ ` FROM ${ this . TABLE_NAME } ` +
142+ ` INNER JOIN ${ temporaryTableName } t ON (field_hashed_value = t.id)` +
143+ ` WHERE ${
144+ olderThanOneMonth ? `created_at <= ?` : `created_at > ?`
145+ } AND feed_id = ? AND field_name = 'id'` +
146+ `` ,
147+ [ oneMonthAgo , feedId ]
148+ ) ;
149+
150+ await em . execute ( `DROP TABLE ${ temporaryTableName } ` ) ;
151+
152+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
153+ return result as any ;
154+ } ) ;
149155
150156 return result ;
151157 }
0 commit comments