2
2
Inject ,
3
3
Injectable ,
4
4
Logger ,
5
+ OnModuleDestroy ,
5
6
OnModuleInit ,
6
7
Optional ,
7
8
} from '@nestjs/common' ;
@@ -59,9 +60,12 @@ import IEventsAndMetadatasStacker, {
59
60
import EventBatch from '../reliability/interface/event-batch' ;
60
61
import { EventStoreHealthIndicator } from '../health' ;
61
62
import MetadatasContextDatas from '../reliability/interface/metadatas-context-datas' ;
63
+ import Timeout = NodeJS . Timeout ;
62
64
63
65
@Injectable ( )
64
- export class EventStoreService implements OnModuleInit , IEventStoreService {
66
+ export class EventStoreService
67
+ implements OnModuleInit , OnModuleDestroy , IEventStoreService
68
+ {
65
69
private logger : Logger = new Logger ( this . constructor . name ) ;
66
70
private persistentSubscriptions : PersistentSubscription [ ] ;
67
71
@@ -70,6 +74,7 @@ export class EventStoreService implements OnModuleInit, IEventStoreService {
70
74
private isTryingToWriteEvents = false ;
71
75
private isTryingToWriteMetadatas = false ;
72
76
77
+ private connectionRetryFallback : Timeout ;
73
78
constructor (
74
79
@Inject ( EVENT_STORE_CONNECTOR )
75
80
private readonly eventStore : Client ,
@@ -85,6 +90,10 @@ export class EventStoreService implements OnModuleInit, IEventStoreService {
85
90
return await this . connect ( ) ;
86
91
}
87
92
93
+ public onModuleDestroy ( ) : void {
94
+ clearTimeout ( this . connectionRetryFallback ) ;
95
+ }
96
+
88
97
private async connect ( ) : Promise < void > {
89
98
try {
90
99
if ( this . subsystems . subscriptions )
@@ -93,7 +102,9 @@ export class EventStoreService implements OnModuleInit, IEventStoreService {
93
102
this . subsystems . subscriptions . persistent ,
94
103
) ;
95
104
if ( this . subsystems . projections )
96
- this . upsertProjections ( this . subsystems . projections ) . then ( ( ) => { } ) ;
105
+ await this . upsertProjections ( this . subsystems . projections ) . catch ( ( e ) =>
106
+ this . logger . error ( e ) ,
107
+ ) ;
97
108
98
109
this . isOnError = false ;
99
110
this . isTryingToConnect = false ;
@@ -116,7 +127,10 @@ export class EventStoreService implements OnModuleInit, IEventStoreService {
116
127
117
128
private async retryToConnect ( ) : Promise < void > {
118
129
this . logger . log ( `EventStore connection failed : trying to reconnect` ) ;
119
- setTimeout ( async ( ) => await this . connect ( ) , RECONNECTION_TRY_DELAY_IN_MS ) ;
130
+ this . connectionRetryFallback = setTimeout (
131
+ async ( ) => await this . connect ( ) ,
132
+ RECONNECTION_TRY_DELAY_IN_MS ,
133
+ ) ;
120
134
}
121
135
122
136
public async createProjection (
@@ -206,7 +220,6 @@ export class EventStoreService implements OnModuleInit, IEventStoreService {
206
220
} ,
207
221
) . catch ( async ( e ) => {
208
222
if ( EventStoreService . isNotAProjectionAlreadyExistsError ( e ) ) {
209
- this . logger . error ( e ) ;
210
223
throw Error ( e ) ;
211
224
}
212
225
await this . updateProjection ( projection , content ) ;
0 commit comments