|
| 1 | +## Remote Write: Restart from checkpoint |
| 2 | + |
| 3 | +* **Owners:** |
| 4 | + * [@kgeckhart](https://github.com/kgeckhart) |
| 5 | + |
| 6 | +* **Implementation Status:** `Not implemented` |
| 7 | + |
| 8 | +* **Related Issues and PRs:** |
| 9 | + |
| 10 | +First issue on the matter https://github.com/prometheus/prometheus/issues/8809 spawned from https://github.com/prometheus/prometheus/pull/7710. |
| 11 | + |
| 12 | +Since then there have been a lot of discussion / attempts but nothing has been merged. See |
| 13 | +* https://github.com/prometheus/prometheus/pull/8918 |
| 14 | +* https://github.com/prometheus/prometheus/pull/9862 |
| 15 | +* https://github.com/ptodev/prometheus/pull/1 |
| 16 | + |
| 17 | +I believe the issue for [tsdb/agent: Prevent unread segments from being truncated](https://github.com/prometheus/prometheus/issues/17616) would need to be completed but can stand on its own as it's purely for notifying when segments have been read. |
| 18 | + |
| 19 | +* **Other docs or links:** |
| 20 | + |
| 21 | +> This effort aims to have an agreed upon design with requirements for completing the work to allow remote write to restart data delivery from a checkpoint and not from `time.Now()` |
| 22 | +
|
| 23 | +## Why |
| 24 | + |
| 25 | +Remote write is backed by a write-ahead-log (WAL) where all data is persisted before it is sent. |
| 26 | +If a config is reloaded or prometheus/agent is restarted before flushing pending samples we will skip those samples. |
| 27 | +Given we have a persistent WAL this behavior is unexpected by users and can cause a lot of confusion. |
| 28 | + |
| 29 | +### Pitfalls of the current solution |
| 30 | + |
| 31 | +As mentioned in the why, this behavior is often confusing to users who know a WAL is in use but still finds they have missing data on restart. |
| 32 | + |
| 33 | +## Goals |
| 34 | + |
| 35 | +1. Support resuming from a checkpoint for each configured `remote_write` destination. |
| 36 | +2. Taking a checkpoint for a remote_write destination should not incur significant overhead. |
| 37 | +3. Changing the `queue_configuration` for a `remote_write` destination should not result in a new checkpoint entry. |
| 38 | + * The `queue_configuration` includes fields like min/max shards and other performance tuning parameter.s |
| 39 | + * These can be expected to change under normal circumstances and should not trigger a data loss scenario. |
| 40 | +4. Guards need to be in place to protect against infinite WAL growth. |
| 41 | +5. Stretch: Remote write supports at-least-once delivery of samples in the WAL. |
| 42 | + * Note: This has appeared to be the largest challenge with any existing implementation as it can cause significant overhead. |
| 43 | + |
| 44 | +### Audience |
| 45 | + |
| 46 | +`remote_write` users. |
| 47 | + |
| 48 | +## Non-Goals |
| 49 | + |
| 50 | +* Creating a watcher that is capable of tracking offsets. |
| 51 | +* Remote write supports exactly-once delivery |
| 52 | + |
| 53 | +## How |
| 54 | + |
| 55 | +Enabling replay will require changes across `remote.WriteStorage`, `remote.QueueManager`, and `wlog.Watcher`. Implementing https://github.com/prometheus/prometheus/issues/17616 will help as it will provide a hook to signal when segments have been fully read from the watcher through `remote.WriteStorage`. Anytime the `wlog.Watcher` [changes the current segment](https://github.com/prometheus/prometheus/blob/18efd9d629c467877ebe674bbc1edbba8abe54be/tsdb/wlog/watcher.go#L314) the following would happen, |
| 56 | + |
| 57 | +```mermaid |
| 58 | +sequenceDiagram |
| 59 | + wlog.Watcher->>wlog.Watcher: Run() |
| 60 | + loop Until closed |
| 61 | + wlog.Watcher->>wlog.Watcher: currentSegmentMetric.Set |
| 62 | + wlog.Watcher->>remote.QueueManager: OnSegmentChange() |
| 63 | + remote.QueueManager->>remote.WriteStorage: OnSegmentChange() |
| 64 | + remote.WriteStorage->>remote.WriteStorage: Update currentSegment for Queue |
| 65 | + remote.WriteStorage->>remote.WriteStorage: Are all queues at or ahead of currentSegment? |
| 66 | + alt yes |
| 67 | + remote.WriteStorage->>Subscriber: SegmentChange() |
| 68 | + end |
| 69 | + end |
| 70 | +``` |
| 71 | + |
| 72 | +`remote.WriteStorage` would be tracking the currentSegment for each queue supporting. Since this isn't completed it's open to discussion but it seems like a reasonable chunk to start with that has values on its own. |
| 73 | + |
| 74 | +A basic replay to accomplishing all non-stretch goals would be as follows |
| 75 | + |
| 76 | +### Code flow |
| 77 | + |
| 78 | +Enabling replay will require changes across `remote.WriteStorage`, `remote.QueueManager`, and `wlog.Watcher`. Implementing https://github.com/prometheus/prometheus/issues/17616 will help as it will provide a hook to signal when segments have been fully read from the watcher through `remote.WriteStorage`. Anytime the `wlog.Watcher` [changes the current segment](https://github.com/prometheus/prometheus/blob/18efd9d629c467877ebe674bbc1edbba8abe54be/tsdb/wlog/watcher.go#L314) the following would happen, |
| 79 | + |
| 80 | +1. Adding another configurable timer to [`remote.WriteStorage.run()`](https://github.com/prometheus/prometheus/blob/f50ff0a40ad4ef24d9bb8e81a6546c8c994a924a/storage/remote/write.go#L114-L125) periodically persisting the current segments for each queue. |
| 81 | +2. Ensure `remote.WriteStorage.Close()` will also attempt to write current segments |
| 82 | +3. Read persisted queue segment positions in `remote.NewWriteStorage()` |
| 83 | +4. Update `remote.WriteStorage.ApplyConfig` to provide the persisted current segment to `remote.NewQueueManager` |
| 84 | +5. Update `remote.NewQueueManager` to provide a starting segment to `wlog.NewWatcher` |
| 85 | +6. Update `wlog.NewWatcher.Run()` to start sending samples if a starting segment is configured |
| 86 | +7. Walk through the `remote.QueueManager` send code to ensure duplicate data errors will not cause slow downs in data delivery (we have a high probability of sending duplicate data). |
| 87 | + |
| 88 | +This flow should be enough to to accomplish Goal 1: Support resuming from a checkpoint for each configured `remote_write` destination. |
| 89 | + |
| 90 | +The act of taking a checkpoint will require a lock to be held but given we do it on a schedule this will be infrequent enough that the implementation should safely accomplish Goal 2: Taking a checkpoint for a remote_write destination should not incur significant overhead (see testing for further info). |
| 91 | + |
| 92 | +### Checkpoint file format/location |
| 93 | + |
| 94 | +The segment checkpoint would be stored in the `remote.WriteStorage.dir` which would be next to the `/wal` directory. |
| 95 | + |
| 96 | +We only care about the queue hash and the current segment so a json encoded file seems reasonable for this. A key value format should make it easier to evolve over time vs a more basic delimited file. |
| 97 | + |
| 98 | +Solving for, Goal 3: Changing the `queue_configuration` for a `remote_write` destination should not result in a new checkpoint entry. |
| 99 | + |
| 100 | +This will be done via adding a specific toHash function for RemoteWriteConfig which zeros the QueueConfig before taking the hash. RemoteWriteConfig is managed as a pointer so we'll need to keep the value before, set to empty, and put the original value back but all is reasonably managed. We could look at identifying other "operational" fields which could be excluded from hashing for the same reasons. |
| 101 | + |
| 102 | +This will change existing queue hashes but I don't believe that to be a big problem and if it is we can do this hashing specifically for segment tracking only. It is proposed as the first task so we can reduce the amount of use cases which can trigger data loss. |
| 103 | + |
| 104 | +### Testing / Safety |
| 105 | + |
| 106 | +Goal 4: Guards need to be in place to protect against infinite WAL growth is capable of being accomplished through adjusting config defaults when replaying is enabled. We would require `remote_write.queue_config.sample_age_limit` be non-zero and would have a default of `2h`. |
| 107 | + |
| 108 | +I believe prombench is sufficient to prove Goal 2: Taking a checkpoint for a remote_write destination should not incur significant overhead. Open to further benchmarking ideas but given the components + time necessary for a proper test ensuring prombench is capable of covering this would be the most ideal. |
| 109 | + |
| 110 | +### Further reducing duplicated data sent |
| 111 | + |
| 112 | +Replaying a whole segment can still result in a fair amount of duplicated data on startup. If we added tracking the lowest timestamp delivered via remote write to in the checkpoint it could reduce this number (lowest timestamp is required because the WAL supports out of order writes). At startup the tracked lowest timestamp would be used as marker for where to start writing data from within the checkpointed segment ideally reducing the amount of duplicated data replayed. At worst it would start from the beginning of the segment. |
| 113 | + |
| 114 | +### Goal 5: Stretch: Remote write supports at-least-once delivery of samples in the WAL. |
| 115 | + |
| 116 | +The amount of complexity in this goal is large, it is my opinion that our current state where all samples are lost is worse than implementing a replay which does not give us at-least-once delivery. I believe the proposed replay implementation would provide a good basis for an at-least-once solution. |
| 117 | + |
| 118 | +A solution would need to involve internals of `remote.QueueManager` as part of an `OnSegmentChange` pipeline. One option could be to implement the same pattern where `remote.WriteStorage` tracks the segment for each `remote.QueueManager`, in this case `remote.QueueManager` would track the segment of each shard and take responsibility for propagating the notification when all shards are at or beyond the segment. |
| 119 | + |
| 120 | +```mermaid |
| 121 | +sequenceDiagram |
| 122 | + wlog.Watcher->>wlog.Watcher: Run() |
| 123 | + loop Until closed |
| 124 | + wlog.Watcher->>wlog.Watcher: currentSegmentMetric.Set |
| 125 | + wlog.Watcher->>remote.QueueManager: OnSegmentChange() |
| 126 | + remote.QueueManager->>remote.QueueManager.shards: OnSegmentChange() |
| 127 | + remote.QueueManager.shards->>remote.QueueManager.shards: Store new segment and current batchQueue depth + 1 <br> (number of batches to send to clear the segment) |
| 128 | + remote.QueueManager.shards->>remote.QueueManager.shards: Decrement depths on send <br> (we could have more than 1 segment enqueued) |
| 129 | + remote.QueueManager.shards->>remote.QueueManager.shards: Depth is zero for a segment? |
| 130 | + alt yes |
| 131 | + remote.QueueManager.shards->>remote.QueueManager: Update currentSegment for shard id |
| 132 | + end |
| 133 | + remote.QueueManager->>remote.QueueManager: All segments at or ahead of currentSegment? |
| 134 | + alt yes |
| 135 | + remote.QueueManager->>remote.WriteStorage: OnSegmentChange() |
| 136 | + end |
| 137 | + remote.WriteStorage->>remote.WriteStorage: Update currentSegment for Queue |
| 138 | + remote.WriteStorage->>remote.WriteStorage: Are all queues at or ahead of currentSegment? |
| 139 | + alt yes |
| 140 | + remote.WriteStorage->>Subscriber: SegmentChange() |
| 141 | + end |
| 142 | + end |
| 143 | +``` |
| 144 | + |
| 145 | +I believe this bypasses resharding complexity, as a reshard triggers a purging of all queues clearing which would also clear any pending segment changes. The complexity will come from ensuring the overhead from added locking is low enough to keep remote write delivery rates relatively unchanged. |
| 146 | + |
| 147 | +## Alternatives |
| 148 | + |
| 149 | +1. `remote.QueueManager` should own syncing its own checkpoint (most early implementations took this approach). |
| 150 | + * `remote.QueueManager` already has a lot of responsibilities and will take on more for at-least-once. |
| 151 | + * `remote.WriteStorage` has reasonable hook points to run this logic without adding a lot more complexity. |
| 152 | +2. The checkpoint should be synchronously updated when segments change. |
| 153 | + * Introducing a bit of time between knowing that a segment changed to persisting it gives us more time to fully deliver the batch before we persist the change. |
| 154 | + * Synchronously committing it makes the potential gap larger. |
| 155 | + * If we assume a 15 second queue delay then syncing the checkpoint every 30 seconds gives a lot of room for the segment to be fully processed before being committed. |
| 156 | + * The trade-off being more unnecessary data being replayed on startup. |
| 157 | + * After implementing a solution for at-least-once we can reassess how often we commit/if we should make it synchronous. |
| 158 | + |
| 159 | +## Action Plan |
| 160 | + |
| 161 | +The tasks to do in order to migrate to the new idea. |
| 162 | + |
| 163 | +* [ ] Adjust the queue hash function to exclude parameters often adjusted during normal operations (reduces the surface area where data can be lost). |
| 164 | +* [ ] Implement the segment change notification pattern proposed in https://github.com/prometheus/prometheus/issues/17616. |
| 165 | +* [ ] Add the functionality proposed in the How section (I think it can be accomplished in a single PR without being massive). |
0 commit comments