11package cj .logs ;
22
33import cj .OS ;
4+ import cj .TimeUtils ;
45import cj .aws .AWSTask ;
56import cj .fs .TaskFiles ;
6- import cj .hello .HelloConfiguration ;
7- import software .amazon .awssdk .services .cloudwatch .CloudWatchClient ;
87import software .amazon .awssdk .services .cloudwatchlogs .CloudWatchLogsClient ;
98import software .amazon .awssdk .services .cloudwatchlogs .model .*;
109
1110import javax .enterprise .context .Dependent ;
1211import javax .inject .Inject ;
1312import javax .inject .Named ;
1413import java .nio .file .Path ;
14+ import java .time .LocalDate ;
15+ import java .time .LocalDateTime ;
16+ import java .util .Comparator ;
17+ import java .util .Deque ;
18+ import java .util .LinkedList ;
19+ import java .util .List ;
1520
1621@ Dependent
1722@ Named ("logs-put" )
@@ -25,10 +30,11 @@ public void apply() {
2530 info ("Putting logs" );
2631 var username = OS .username ();
2732 var dataDir = files .dataDir ();
28- debug ("Collecting logs from [{}]" , dataDir );
2933 var dataDirName = dataDir .getFileName ().toString ();
3034 var logGroup = "/%s/%s" .formatted (username , dataDirName );
3135 var logs = files .findLogFiles ();
36+ debug ("Collected [{}] log files from [{}]" , logs .size (), dataDirName );
37+
3238 try (var cw = aws ().cloudwatchlogs ()) {
3339 checkLogGroup (cw , logGroup );
3440 logs .forEach (log -> putLog (cw , logGroup , dataDir , log ));
@@ -62,10 +68,13 @@ private boolean logGroupExists(CloudWatchLogsClient cw, String logGroup) {
6268
6369 private void putLog (CloudWatchLogsClient cw , String logGroup , Path dataDir , Path logFile ) {
6470 try {
71+ var baseDay = TimeUtils .parseLocalDateTime (logFile .toAbsolutePath ().toString ())
72+ .orElse (LocalDateTime .now ())
73+ .toLocalDate ();
6574 var logStream = dataDir .relativize (logFile ).toString ();
6675 checkLogStream (cw , logGroup , logStream );
67- debug ("Putting log [{}] to group [{}]" , logStream , logGroup );
68- cloudwatchPutLog (cw , logGroup , logStream , logFile );
76+ debug ("Putting log [{}] to group [{}] with baseDay [{}] " , logStream , logGroup , baseDay );
77+ cloudwatchPutLog (cw , logGroup , logStream , logFile , baseDay );
6978 }catch (Exception e ){
7079 error ("Failed to put log [{}] to group [{}]" , logFile , logGroup );
7180 error (e .getMessage ());
@@ -102,8 +111,9 @@ private boolean logStreamExists(CloudWatchLogsClient cw, String logGroup, String
102111 return result ;
103112 }
104113
105- private void cloudwatchPutLog (CloudWatchLogsClient cw , String logGroup , String logStream , Path logFile ) {
106- var events = eventsFromFile (logFile );
114+ private void cloudwatchPutLog (CloudWatchLogsClient cw , String logGroup , String logStream , Path logFile , LocalDate baseDay ) {
115+ var lines = linesOf (logFile );
116+ var events = eventsFromFile (logFile , baseDay );
107117 if (events .length > 0 ){
108118 var request = PutLogEventsRequest .builder ()
109119 .logGroupName (logGroup )
@@ -118,35 +128,69 @@ private void cloudwatchPutLog(CloudWatchLogsClient cw, String logGroup, String l
118128
119129 }
120130
121- //TODO: set correct timestamp for untimed log messages instead of filtering out
122- private InputLogEvent [] eventsFromFile (Path logFile ) {
131+ private List <String > linesOf (Path logFile ) {
123132 var lines = files .readLines (logFile );
124- var events = lines .stream ()
125- .map (line -> eventOf (line , logFile ))
126- .filter (event -> event != null )
127- .toArray (InputLogEvent []::new );
128- return events ;
133+ debug ("Read [{}] lines from [{}]" , lines .size (), logFile );
134+ return lines ;
129135 }
130136
131- private InputLogEvent eventOf (String line , Path logFile ) {
132- var time = timestampOf (line , logFile .getFileName ());
133- if (line .length () > CWLOGS_MAX_LINE_LENGTH ){
134- warn ("Log line truncated: [{}]" , line );
135- line = line .substring (0 , CWLOGS_MAX_LINE_LENGTH );
137+ //TODO: set correct timestamp for untimed log messages instead of filtering out
138+ private InputLogEvent [] eventsFromFile (Path logFile , LocalDate baseDay ) {
139+ var lines = files .readLines (logFile );
140+ var events = new LinkedList <InputLogEvent >();
141+ var time = TimeUtils .toTimestamp (TimeUtils .atStartOfDay (baseDay ));
142+ for (String line : lines ) {
143+ if (line == null || line .isBlank ()) continue ;
144+ if (line .length () >= CWLOGS_MAX_LINE_LENGTH )
145+ line = line .substring (0 , CWLOGS_MAX_LINE_LENGTH - 1 );
146+ var timestamp = timestampOf (line , logFile , baseDay );
147+ if (timestamp != null ){
148+ time = timestamp ;
149+ } else {
150+ timestamp = time ;
151+ }
152+ var event = createEvent (timestamp , line );
153+ events .add (event );
136154 }
137- if (time != null ) {
138- return InputLogEvent .builder ()
139- //TODO: Set correct timestamp
140- .timestamp (time )
141- .message (line )
142- .build ();
155+ var ascendingOrder = checkAscendingTimeOrder (events );
156+ if (! ascendingOrder ){
157+ warn ("Events are not in ascending order. Resorting..." );
158+ warn ("File: [{}]" , logFile );
159+ warn ("Events: [{}]" , events .size ());
160+ events .sort (Comparator .comparing (InputLogEvent ::timestamp ));
161+ ascendingOrder = checkAscendingTimeOrder (events );
162+ if (! ascendingOrder ){
163+ error ("Events are still not in ascending order. Skipping..." );
164+ return new InputLogEvent [0 ];
165+ }
143166 }
144- return null ;
167+ var result = events .toArray (InputLogEvent []::new );
168+ return result ;
169+ }
170+
171+ private boolean checkAscendingTimeOrder (LinkedList <InputLogEvent > events ) {
172+ var last = Long .MIN_VALUE ;
173+ for (InputLogEvent event : events ) {
174+ if (event .timestamp () < last ){
175+ return false ;
176+ }
177+ last = event .timestamp ();
178+ }
179+ return true ;
180+ }
181+
182+
183+ private InputLogEvent createEvent (Long time , String line ) {
184+ return InputLogEvent .builder ()
185+ .timestamp (time )
186+ .message (line )
187+ .build ();
145188 }
146189
147- private Long timestampOf (String line , Path fileName ) {
148- //TODO: Map file names to timestamps
149- return System .currentTimeMillis ();
190+ private Long timestampOf (String line , Path fileName , LocalDate baseDay ) {
191+ var ldt = TimeUtils .parseLocalDateTime (line , baseDay );
192+ var timestamp = ldt .map (TimeUtils ::toTimestamp ).orElse (null );
193+ return timestamp ;
150194 }
151195
152196}
0 commit comments