Skip to content

Commit b30ab38

Browse files
committed
edge case
1 parent bf27a06 commit b30ab38

File tree

3 files changed

+45
-7
lines changed

3 files changed

+45
-7
lines changed

app/vlstorage/netselect/async_tasks.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func (s *Storage) ListAsyncTasks(ctx context.Context) ([]logstorage.AsyncTaskInf
4242
for _, ts := range results {
4343
all = append(all, ts...)
4444
}
45+
4546
return all, nil
4647
}
4748

lib/logstorage/async_task_info.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,35 @@ func (s *Storage) ListAsyncTasks() []AsyncTaskInfo {
6262
}()
6363

6464
var out []AsyncTaskInfo
65+
seqToIdx := make(map[uint64]int)
66+
// helper to merge two tasks with the same Seq
67+
merge := func(dst *AsyncTaskInfo, src *AsyncTaskInfo) {
68+
// status: error > pending > success
69+
prioritize := func(s asyncTaskStatus) int {
70+
switch s {
71+
case taskError:
72+
return 3
73+
case taskPending:
74+
return 2
75+
case taskSuccess:
76+
return 1
77+
}
78+
return 0
79+
}
80+
if prioritize(src.Status) > prioritize(dst.Status) {
81+
dst.Status = src.Status
82+
dst.Error = src.Error
83+
}
84+
// earliest created time
85+
if dst.CreatedTime == 0 || (src.CreatedTime != 0 && src.CreatedTime < dst.CreatedTime) {
86+
dst.CreatedTime = src.CreatedTime
87+
}
88+
// latest done time
89+
if src.DoneTime > dst.DoneTime {
90+
dst.DoneTime = src.DoneTime
91+
}
92+
}
93+
6594
for _, p := range pws {
6695
a := p.pt.ats
6796

@@ -70,7 +99,13 @@ func (s *Storage) ListAsyncTasks() []AsyncTaskInfo {
7099
a.mu.Unlock()
71100

72101
for _, t := range tasks {
73-
out = append(out, asyncTaskToInfo(t))
102+
info := asyncTaskToInfo(t)
103+
if idx, ok := seqToIdx[info.Seq]; ok {
104+
merge(&out[idx], &info)
105+
continue
106+
}
107+
seqToIdx[info.Seq] = len(out)
108+
out = append(out, info)
74109
}
75110
}
76111

lib/logstorage/storage.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -824,12 +824,14 @@ func ValidateDeleteQuery(q *Query) error {
824824
}
825825

826826
minTS, maxTS := q.GetFilterTimeRange()
827-
now := int64(fasttime.UnixTimestamp())
828-
829-
// vlselect already adds a timestamp, but with floor(now).
830-
// To avoid adding _time twice, use seconds to compare with now().
831-
if maxTS/1e9 > now {
832-
q.AddTimeFilter(minTS, now*1e9)
827+
now := int64(fasttime.UnixTimestamp() * 1e9)
828+
829+
// vlselect already adds a timestamp floor(now).
830+
// vlstorage parses the time by adding +0.999, so we need to use now+1 to avoid
831+
// duplicating the addition of _time filters.
832+
if maxTS > now+int64(time.Second) {
833+
logger.Infof("DEBUG: maxTS=%d > now=%d, adjusting maxTS to now", maxTS, now)
834+
q.AddTimeFilter(minTS, now)
833835
}
834836

835837
return nil

0 commit comments

Comments
 (0)