22
33import com .igot .cb .cassandra .CassandraOperation ;
44import com .igot .cb .model .ApiResponse ;
5+ import com .igot .cb .util .CbExtServerProperties ;
56import com .igot .cb .util .Constants ;
67import lombok .extern .slf4j .Slf4j ;
78import org .apache .commons .collections4 .MapUtils ;
9+ import org .springframework .beans .factory .annotation .Autowired ;
810import org .springframework .stereotype .Service ;
911import org .springframework .util .CollectionUtils ;
12+ import org .springframework .util .StringUtils ;
1013
1114import java .time .Instant ;
1215import java .time .LocalDate ;
16+ import java .time .ZoneId ;
1317import java .util .*;
1418
1519@ Slf4j
@@ -18,10 +22,16 @@ public class ContentRetirementService {
1822
1923 private final CassandraOperation cassandraOperation ;
2024 private final ContentInfoServiceImpl contentService ;
25+ private final NotificationService notificationService ;
26+ private final OutboundRequestHandlerServiceImpl outboundRequestHandlerService ;
27+ private final CbExtServerProperties props ;
2128
22- public ContentRetirementService (CassandraOperation cassandraOperation , ContentInfoServiceImpl contentService ) {
29+ public ContentRetirementService (CassandraOperation cassandraOperation , ContentInfoServiceImpl contentService , NotificationService notificationService , OutboundRequestHandlerServiceImpl outboundRequestHandlerService , CbExtServerProperties props ) {
2330 this .cassandraOperation = cassandraOperation ;
2431 this .contentService = contentService ;
32+ this .notificationService = notificationService ;
33+ this .outboundRequestHandlerService = outboundRequestHandlerService ;
34+ this .props = props ;
2535 }
2636
2737 public ApiResponse processDueRetirements () {
@@ -119,5 +129,259 @@ private Map<String, Object> retireContent(Map<String, Object> retirementRecord)
119129
120130 return result ;
121131 }
132+
133+
134+ public void sendContentRetirementNotifications () {
135+
136+ LocalDate today = LocalDate .now ();
137+
138+ log .info ("Running content retirement notification job for {}" , today );
139+
140+ Map <String , Object > properties = Map .of (
141+ Constants .STATUS , Constants .APPROVED
142+ );
143+
144+ List <Map <String , Object >> retirementRequests =
145+ cassandraOperation .getRecordsByProperties (
146+ Constants .KEYSPACE_SUNBIRD_COURSE ,
147+ Constants .CONTENT_RETIREMENT_REQUEST_TABLE ,
148+ properties ,
149+ Arrays .asList (
150+ Constants .CONTENT_ID_KEY ,
151+ Constants .APPROVED_AT ,
152+ Constants .RETIREMENT_DATE_NOTIFICATION
153+ ),
154+ null
155+ );
156+
157+ if (CollectionUtils .isEmpty (retirementRequests )) {
158+ log .info ("No approved retirement requests found" );
159+ }
160+
161+ for (Map <String , Object > record : retirementRequests ) {
162+
163+ String contentId = (String ) record .get (Constants .CONTENT_ID );
164+ Instant approvedInstant =
165+ (Instant ) record .get (Constants .APPROVED_AT );
166+
167+ LocalDate approvedDate = approvedInstant != null
168+ ? approvedInstant .atZone (ZoneId .systemDefault ()).toLocalDate ()
169+ : null ;
170+ LocalDate retirementDate = (LocalDate ) record .get (Constants .RETIREMENT_DATE );
171+
172+ String notificationType = null ;
173+
174+ if (approvedDate != null && approvedDate .equals (today )) {
175+ notificationType = Constants .CONTENT_RETIREMENT_APPROVED_NOTIFICATION ;
176+ } else if (retirementDate != null && retirementDate .equals (today .plusDays (1 ))) {
177+ notificationType = Constants .REMINDER_NOTIFICATION_ONE_DAY ;
178+ } else if (retirementDate != null && retirementDate .equals (today .plusDays (7 ))) {
179+ notificationType = Constants .REMINDER_NOTIFICATION_SEVEN_DAY ;
180+ }
181+
182+ if (!StringUtils .hasText (notificationType )) {
183+ continue ;
184+ }
185+
186+ log .info ("Triggering {} notification for content {}" , notificationType , contentId );
187+
188+ Map <String , Object > content =
189+ contentService .readContent (contentId , Arrays .asList ("name" , "batches" ));
190+
191+ List <Map <String , Object >> batches =
192+ (List <Map <String , Object >>) content .get ("batches" );
193+
194+ if (CollectionUtils .isEmpty (batches )) {
195+ log .info ("No batches found for content {}" , contentId );
196+ continue ;
197+ }
198+
199+ for (Map <String , Object > batch : batches ) {
200+
201+ String batchId = (String ) batch .get (Constants .BATCH_ID );
202+
203+ List <Map <String , Object >> batchUsers =
204+ cassandraOperation .getRecordsByProperties (
205+ Constants .KEYSPACE_SUNBIRD_COURSE ,
206+ Constants .ENROLLMENT_BATCH_LOOKUP ,
207+ Map .of (Constants .BATCH_ID , batchId ),
208+ Arrays .asList (Constants .USER_ID ),
209+ null
210+ );
211+
212+ if (CollectionUtils .isEmpty (batchUsers )) {
213+ continue ;
214+ }
215+
216+ for (Map <String , Object > batchUser : batchUsers ) {
217+
218+ String userId = (String ) batchUser .get (Constants .USER_ID );
219+ Map <String , Object > enrolmentProperties = Map .of (
220+ Constants .USER_ID , userId ,
221+ Constants .COURSE_ID , contentId ,
222+ Constants .BATCH_ID , batchId
223+ );
224+
225+ List <Map <String , Object >> enrolment =
226+ cassandraOperation .getRecordsByProperties (
227+ Constants .KEYSPACE_SUNBIRD_COURSE ,
228+ Constants .USER_ENROLMENTS_V2_TABLE ,
229+ enrolmentProperties ,
230+ null ,
231+ null
232+ );
233+
234+ if (CollectionUtils .isEmpty (enrolment )) {
235+ continue ;
236+ }
237+
238+ List <Map <String , Object >> eligibleEnrolments =
239+ enrolment .stream ()
240+ .filter (Objects ::nonNull )
241+ .filter (e -> {
242+ Object statusObj = e .get (Constants .STATUS );
243+ Object activeObj = e .get (Constants .ACTIVE );
244+ Object certificates = e .get (Constants .ISSUED_CERTIFICATES );
245+
246+ return statusObj instanceof Integer
247+ && activeObj instanceof Boolean
248+ && !Objects .equals (statusObj , 2 )
249+ && (certificates == null || ((List <?>) certificates ).isEmpty ())
250+ && Boolean .TRUE .equals (activeObj );
251+ })
252+ .toList ();
253+
254+
255+ if (CollectionUtils .isEmpty (eligibleEnrolments )) {
256+ continue ;
257+ }
258+ String courseName = (String ) content .get (Constants .NAME );
259+ notificationService .sendNotificationForContentRetirement (
260+ contentId ,
261+ courseName ,
262+ retirementDate ,
263+ List .of (userId ),
264+ notificationType
265+ );
266+
267+ }
268+ }
269+ }
270+ }
271+
272+ public void sendContentRetirementNotificationsToSpv () {
273+ LocalDate today = LocalDate .now ();
274+ log .info ("Running content retirement notification job for spv admins {}" , today );
275+ List <Map <String , Object >> retirementRequests =
276+ cassandraOperation .getRecordsByProperties (
277+ Constants .KEYSPACE_SUNBIRD_COURSE ,
278+ Constants .CONTENT_RETIREMENT_REQUEST_TABLE ,
279+ null ,
280+ Arrays .asList (
281+ Constants .CONTENT_ID_KEY ,
282+ Constants .CREATED_AT_FIELD ,
283+ Constants .USER_ID_RAISED_FIELD ,
284+ Constants .RETIREMENT_DATE_KEY
285+ ),
286+ 1000
287+ );
288+ if (CollectionUtils .isEmpty (retirementRequests )) {
289+ log .info ("No approved retirement requests found" );
290+ return ;
291+ }
292+ List <String > spvPublishers = fetchSpvPublishers ();
293+ for (Map <String , Object > record : retirementRequests ) {
294+ String contentId = (String ) record .get (Constants .CONTENT_ID );
295+ Object createdObj = record .get (Constants .CREATED_AT_FIELD );
296+ LocalDate createdDate = null ;
297+ if (createdObj instanceof Instant instant ) {
298+ createdDate = instant .atZone (ZoneId .systemDefault ()).toLocalDate ();
299+ } else if (createdObj instanceof LocalDate localDate ) {
300+ createdDate = localDate ;
301+ }
302+ String requestedBy = (String ) record .get (Constants .USER_ID_RAISED_FIELD );
303+ if (createdDate == null || !createdDate .equals (today )) continue ;
304+ Set <String > finalRecipients = new HashSet <>(spvPublishers );
305+ if (StringUtils .hasText (requestedBy )) {
306+ finalRecipients .add (requestedBy );
307+ }
308+ if (finalRecipients .isEmpty ()) continue ;
309+ log .info ("Triggering retirement approved notification for content {}" , contentId );
310+ Map <String , Object > content =
311+ contentService .readContent (contentId , List .of ("name" ));
312+ String contentName =
313+ (String ) content .get ("name" );
314+ Object retirementDateObj = record .get (Constants .RETIREMENT_DATE );
315+ LocalDate retirementDate = null ;
316+ if (retirementDateObj instanceof Instant instant ) {
317+ retirementDate = instant .atZone (ZoneId .systemDefault ()).toLocalDate ();
318+ } else if (retirementDateObj instanceof LocalDate localDate ) {
319+ retirementDate = localDate ;
320+ }
321+ notificationService .sendNotificationForContentRetirementSpv (
322+ contentId , contentName ,
323+ new ArrayList <>(finalRecipients ),
324+ Constants .CONTENT_RETIREMENT_SCHEDULED_NOTIFICATION , retirementDate
325+ );
326+ }
327+ }
328+
329+ private List <String > fetchSpvPublishers () {
330+ List <String > userIds = new ArrayList <>();
331+ Map <String , Object > filters = Map .of (
332+ "organisations.roles" , List .of ("SPV_PUBLISHER" ),
333+ "status" , 1
334+ );
335+ List <String > userFields = List .of (Constants .USER_ID );
336+ Map <String , Object > requestObject = Map .of (
337+ Constants .REQUEST , Map .of (
338+ Constants .QUERY , "" ,
339+ Constants .FILTERS , filters ,
340+ Constants .FIELDS , userFields ,
341+ Constants .LIMIT , 1000
342+ )
343+ );
344+ Map <String , String > headers = Map .of (
345+ Constants .CONTENT_TYPE , Constants .APPLICATION_JSON
346+ );
347+ String url = props .getSbUrl () + props .getUserSearchEndPoint ();
348+ Map <String , Object > resp =
349+ outboundRequestHandlerService .fetchResultUsingPost (url , requestObject , headers );
350+
351+ if (MapUtils .isEmpty (resp ) ||
352+ !"OK" .equalsIgnoreCase (String .valueOf (resp .get (Constants .RESPONSE_CODE )))) {
353+ log .error ("[FETCH-SPV][FAILED] Invalid response {}" , resp );
354+ return userIds ;
355+ }
356+
357+ Object contentsObj = Optional .ofNullable (resp .get (Constants .RESULT ))
358+ .filter (Map .class ::isInstance )
359+ .map (Map .class ::cast )
360+ .map (result -> result .get (Constants .RESPONSE ))
361+ .filter (Map .class ::isInstance )
362+ .map (Map .class ::cast )
363+ .map (response -> response .get (Constants .CONTENT ))
364+ .orElse (null );
365+
366+ if (!(contentsObj instanceof List <?> contents )) {
367+ log .warn ("[FETCH-SPV][EMPTY] No content in response" );
368+ return userIds ;
369+ }
370+
371+ for (Object item : contents ) {
372+ if (!(item instanceof Map <?, ?> content )) continue ;
373+
374+ Object userIdObj = content .get (Constants .USER_ID );
375+ if (userIdObj instanceof String userId && StringUtils .hasText (userId )) {
376+ userIds .add (userId );
377+ }
378+ }
379+
380+ log .info ("[FETCH-SPV][SUCCESS] totalPublishers={}" , userIds .size ());
381+ return userIds .stream ().distinct ().toList ();
382+ }
383+
384+
385+
122386}
123387
0 commit comments