-
Notifications
You must be signed in to change notification settings - Fork 59
log/logstorage: asynchronous task for deleting arbitrary logs #4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
when can I expect this to be merged.. |
|
@abhinavgarg89 It's basically done and just needs review cycles, so I expect it will take 2–3 weeks. |
|
thanks @func25 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements asynchronous task support for deleting arbitrary logs in VictoriaLogs. The changes introduce a comprehensive framework for background deletion operations that allows marking rows for deletion without blocking the caller. The implementation includes delete markers, RLE encoding for efficient storage, background task workers, and support for tracking task progress across the entire system.
Key changes:
- Background async task framework with persistence and reconciliation
- Delete marker system using RLE encoding for efficient row-level deletion
- Specialized search functionality to process only specific parts during deletion
Reviewed Changes
Copilot reviewed 25 out of 30 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| lib/logstorage/storage_search.go | Adds part-specific search capabilities and async task infrastructure |
| lib/logstorage/storage.go | Implements core async task management and delete row validation |
| lib/logstorage/partition.go | Adds partition-level async task persistence |
| lib/logstorage/part.go | Integrates delete markers into part structure |
| lib/logstorage/marker_*.go | Complete delete marker implementation with RLE encoding |
| lib/logstorage/datadb.go | Prevents merging parts with pending tasks |
| lib/logstorage/block_*.go | Integrates delete markers into block processing |
| lib/logstorage/async_task*.go | Background worker and task management system |
| app/vlstorage/main.go | HTTP endpoints for delete operations and task monitoring |
| app/vlselect/main.go | Select-side delete endpoint integration |
Comments suppressed due to low confidence (3)
lib/logstorage/storage_search.go:835
- Debug logging statements should be removed or changed to debug level before production. This logger.Infof call with DEBUG prefix indicates temporary debugging code.
}
lib/logstorage/storage_search.go:1120
- The comment refers to 'searchOnParts' but the function is named 'searchOnPartitions'. The comment should be updated to match the function name.
// searchOnParts is similar to storage.search but only processes the specified allowed parts.
lib/logstorage/marker_test.go:129
- The integration test for delete during merge is skipped. This is an important scenario that should have test coverage to ensure the delete markers work correctly during concurrent merge operations.
t.Skip("TODO: Implement integration test for delete during merge")
|
Hi, for example VictoriaMetrics is using /api/xxx/delete/xxx and don't what do you thing? |
|
@yuriydzobak Yes, makes sense to me. Though the reason I did this initially is because we already have Possibly, |
|
Does this feature work per tenant or globally? |
|
When can I expect this feature to be merged? |
603b717 to
bf64876
Compare
a7ab08e to
c7c83ef
Compare
be2b7ab to
4c7abb9
Compare
| return | ||
| } | ||
|
|
||
| data, err := os.ReadFile(tasksPath) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 2 months ago
General approach:
The code must strictly validate that user-supplied partition names are valid and safe before using them for file system operations. Typically, for partition names, the expected format is YYYYMMDD, as commented. Either:
a) Use a regular expression or string check to ensure the name is exactly that format, contains only digits, and is of the right length/structure, disallow all others, or
b) Ensure that the function getPartitionDayFromName is the sole place this parsing/validation occurs, and that it rejects any bad input.
Best implementation:
- In
getPartitionDayFromName(inlib/logstorage/partition.goorlib/logstorage/storage.go, function not shown but invoked inPartitionAttach), add strict validation:- Check the input is exactly 8 characters long
- All are digits
- Optionally, a valid date (month: 1..12, day: valid for month)
- Return an error for any mismatch
- Make sure all code paths go through this validation and do not use partition names unchecked when forming file system paths.
Lines/regions to change:
- The implementation of
getPartitionDayFromName(which must be in one of the supplied Go files), or if absent, insert a safe helper function and ensure it is used. - Optionally, add a helper isValidPartitionName function (with a clear regex or explicit check) and use that before creating/joining partition paths, defensively.
Needed:
- If using regex, import
"regexp". - Or just use string checks (length, unicode digit).
- Ensure error on any deviation.
-
Copy modified lines R282-R304
| @@ -279,6 +279,29 @@ | ||
| return name | ||
| } | ||
|
|
||
| // getPartitionDayFromName parses and validates a partition name in YYYYMMDD format. | ||
| // Returns (day int64, error) if valid; else error. | ||
| func getPartitionDayFromName(name string) (int64, error) { | ||
| if len(name) != 8 { | ||
| return 0, fmt.Errorf("invalid partition name: wrong length") | ||
| } | ||
| for i := 0; i < 8; i++ { | ||
| if name[i] < '0' || name[i] > '9' { | ||
| return 0, fmt.Errorf("invalid partition name: must be digits") | ||
| } | ||
| } | ||
| year := name[0:4] | ||
| month := name[4:6] | ||
| day := name[6:8] | ||
| dateStr := fmt.Sprintf("%s-%s-%s", year, month, day) | ||
| t, err := time.Parse("2006-01-02", dateStr) | ||
| if err != nil { | ||
| return 0, fmt.Errorf("invalid partition name: not a valid date") | ||
| } | ||
| // Day expressed as days since Unix epoch (UTC). | ||
| return t.Unix() / 86400, nil | ||
| } | ||
|
|
||
| const partitionNameFormat = "20060102" | ||
|
|
||
| // mustSaveDeleteTasks persists the current delete tasks to disk. |
-
Copy modified line R199
| @@ -196,7 +196,7 @@ | ||
| func (s *Storage) PartitionAttach(name string) error { | ||
| day, err := getPartitionDayFromName(name) | ||
| if err != nil { | ||
| return err | ||
| return fmt.Errorf("invalid partition name %q: %w", name, err) | ||
| } | ||
|
|
||
| s.partitionsLock.Lock() |
4c7abb9 to
93a9357
Compare
…ilter Expose the following HTTP endpoints: - /delete/run_task?filter=<filter> - starts deletion of the logs, which match the given LogsQL <filter>. This endpoint returns an ID of the started deletion task. This id is useful for tracking the task status or canceling it. - /delete/stop_task?task_id=<id> - stops the deletion task with the given <id>. It doesn't restore already deleted logs by this task. - /delete/active_tasks - returns a list of active deletion tasks. The logs are guaranteed to be deleted when the corresponding deletion task is finished. See docs on how to use the delete API at https://docs.victoriametrics.com/victorialogs/#how-to-delete-logs The logs' deletion is implemented in the following way: 1. Scan every per-day partition for logs matching the given filter. 2. Merge parts per every per-day partition, which have the matching logs. The merge process skips logs matching the filter from the delete task. A deletion task may take significant amounts of time (days) when it is executed on large amounts of logs. Deletion tasks are executed sequentially in order to minimize additional load on VictoriaLogs. The deletion task needs up to a full CPU core plus disk read and write bandwidth. Deletion tasks survive VictoriaLogs restarts - if VictoriaLogs is restarted in the middle of the deletion task, it is continued after the restart. All the pending deletion tasks are also persisted between VictoriaLogs restarts in the `<-storageDataPath>/delete_tasks.json` file. Updates #43 The pull request is based on the original idea and the initial implementation proposed by @func25 at #4
|
Closing this pull request because it has been overridden by the commit ac99e78 . This commit is based on this pull request. |
Related: #43
This PR introduces:
select/deletewith aqueryparameter to delete matching log entries./select/async_tasksto monitor the progress of delete operations.Technically, this implementation introduces an async task system. All async tasks are persisted to disk as JSON. Each task receives a unique sequence number for tracking and idempotency.