Skip to content

Commit 4e7814e

Browse files
committed
blog: enhance article on Sync Load and Async Load with detailed implementation of the sync load handler and its components
Signed-off-by: Rustin170506 <[email protected]>
1 parent 9421f27 commit 4e7814e

File tree

1 file changed

+170
-17
lines changed

1 file changed

+170
-17
lines changed

content/posts/2025-02-10-sync-load-and-async-load.md

Lines changed: 170 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ If the statistics of column `a` in table `t` are missing, the query will wait fo
2626

2727
Although it might seem straightforward to implement Sync Load by loading the statistics before executing the query, the actual implementation is more complex. TiDB uses a sync load handler to manage this process. The sync load handler is a singleton in the TiDB server, responsible for loading statistics synchronously. When a query is executed, it submits a load request to the sync load handler. The handler then loads the statistics, and the optimizer checks if the statistics are available before proceeding with the query execution.
2828

29-
In TiDB, the query execution process is divided into two stages: optimization and execution. The optimization stage generates the execution plan, while the execution stage executes the plan. During optimization, the optimizer checks if the necessary statistics are available. If not, it submits a load request to the sync load handler. This process occurs during the logical optimization phase, specifically in the `CollectPredicateColumnsPoint` and `SyncWaitStatsLoadPoint` steps.
29+
In TiDB, the query execution process is divided into two stages: optimization and execution. During optimization, the optimizer checks if the necessary statistics are available. If not, it submits a load request to the sync load handler. This process occurs during the logical optimization phase, specifically in the `CollectPredicateColumnsPoint` and `SyncWaitStatsLoadPoint` steps.
3030

3131
`CollectPredicateColumnsPoint` collects columns used in the query that require statistics and sends a request to the sync load handler. `SyncWaitStatsLoadPoint` waits for the statistics to be loaded, ensuring that all necessary statistics are available before the query execution proceeds.
3232

@@ -66,28 +66,181 @@ type StatsLoad struct {
6666
TimeoutItemsCh chan *NeededItemTask
6767
sync.Mutex
6868
}
69+
```
70+
71+
The core of the sync load handler is the `StatsLoad` structure, which contains two channels: `NeededItemsCh` and `TimeoutItemsCh`. The `NeededItemsCh` channel is used to submit load requests, while the `TimeoutItemsCh` channel is used to handle timeout events.
72+
73+
The handler implementation can be divided into three parts:
74+
1. Send requests to the handler
75+
2. Load statistics concurrently
76+
3. Wait for the statistics to be loaded
6977

70-
type NeededItemTask struct {
71-
ToTimeout time.Time
72-
ResultCh chan stmtctx.StatsLoadResult
73-
Item model.StatsLoadItem
74-
Retry int
78+
`statsSyncLoad` provides the `SendLoadRequests` method to allow the optimizer to send load requests to the handler.
79+
80+
```go
81+
func (s *statsSyncLoad) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems []model.StatsLoadItem, timeout time.Duration) error {
82+
remainedItems := s.removeHistLoadedColumns(neededHistItems)
83+
...
84+
sc.StatsLoad.Timeout = timeout
85+
sc.StatsLoad.NeededItems = remainedItems
86+
sc.StatsLoad.ResultCh = make([]<-chan singleflight.Result, 0, len(remainedItems))
87+
for _, item := range remainedItems {
88+
localItem := item
89+
resultCh := globalStatsSyncLoadSingleFlight.DoChan(localItem.Key(), func() (any, error) {
90+
timer := time.NewTimer(timeout)
91+
defer timer.Stop()
92+
task := &statstypes.NeededItemTask{
93+
Item: localItem,
94+
ToTimeout: time.Now().Local().Add(timeout),
95+
ResultCh: make(chan stmtctx.StatsLoadResult, 1),
96+
}
97+
select {
98+
case s.StatsLoad.NeededItemsCh <- task:
99+
metrics.SyncLoadDedupCounter.Inc()
100+
select {
101+
case <-timer.C:
102+
return nil, errors.New("sync load took too long to return")
103+
case result, ok := <-task.ResultCh:
104+
intest.Assert(ok, "task.ResultCh cannot be closed")
105+
return result, nil
106+
}
107+
case <-timer.C:
108+
return nil, errors.New("sync load stats channel is full and timeout sending task to channel")
109+
}
110+
})
111+
sc.StatsLoad.ResultCh = append(sc.StatsLoad.ResultCh, resultCh)
112+
}
113+
sc.StatsLoad.LoadStartTime = time.Now()
114+
return nil
75115
}
116+
```
117+
The `SendLoadRequests` method first filters out columns that have already been loaded or are unnecessary. For the remaining columns, it:
76118

77-
type TableItemID struct {
78-
TableID int64
79-
ID int64
80-
IsIndex bool
81-
IsSyncLoadFailed bool
119+
1. Creates a `NeededItemTask` for each column/index requiring statistics
120+
2. Sends these tasks to the `NeededItemsCh` channel
121+
3. Includes in each task:
122+
- Column/index information
123+
- A `ResultCh` channel for receiving loading results
124+
- Timeout settings
125+
126+
This design ensures efficient handling of statistics loading requests while preventing duplicate loads for the different queries from different sessions.
127+
128+
A thing to note is that we maintain the `ResultCh` and `NeededItems` in the `stmtctx.StatementContext` to keep track of the loading status for each statement from each session. This is a key point to track the loading status for each statement (query).
129+
130+
After sending the load tasks, the optimizer waits for the statistics to be loaded. This process is implemented in the `WaitLoadFinished` method.
131+
132+
```go
133+
func (*statsSyncLoad) SyncWaitStatsLoad(sc *stmtctx.StatementContext) error {
134+
...
135+
var errorMsgs []string
136+
defer func() {
137+
if len(errorMsgs) > 0 {
138+
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
139+
zap.Strings("errors", errorMsgs))
140+
}
141+
sc.StatsLoad.NeededItems = nil
142+
}()
143+
resultCheckMap := map[model.TableItemID]struct{}{}
144+
for _, col := range sc.StatsLoad.NeededItems {
145+
resultCheckMap[col.TableItemID] = struct{}{}
146+
}
147+
timer := time.NewTimer(sc.StatsLoad.Timeout)
148+
defer timer.Stop()
149+
for _, resultCh := range sc.StatsLoad.ResultCh {
150+
select {
151+
case result, ok := <-resultCh:
152+
...
153+
if !ok {
154+
return errors.New("sync load stats channel closed unexpectedly")
155+
}
156+
// this error is from statsSyncLoad.SendLoadRequests which start to task and send task into worker,
157+
// not the stats loading error
158+
if result.Err != nil {
159+
errorMsgs = append(errorMsgs, result.Err.Error())
160+
} else {
161+
val := result.Val.(stmtctx.StatsLoadResult)
162+
// this error is from the stats loading error
163+
if val.HasError() {
164+
errorMsgs = append(errorMsgs, val.ErrorMsg())
165+
}
166+
delete(resultCheckMap, val.Item)
167+
}
168+
case <-timer.C:
169+
metrics.SyncLoadCounter.Inc()
170+
metrics.SyncLoadTimeoutCounter.Inc()
171+
return errors.New("sync load stats timeout")
172+
}
173+
}
174+
if len(resultCheckMap) == 0 {
175+
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
176+
return nil
177+
}
178+
return nil
82179
}
180+
```
181+
182+
The `SyncWaitStatsLoad` method monitors the loading progress of statistics. It processes results from the `ResultCh` channels and handles any potential errors that occur during loading. The method operates under the fundamental premise that each result channel will receive exactly one result, and it will continue waiting until either:
183+
184+
1. All results are successfully received
185+
2. A timeout occurs
83186

84-
type StatsLoadItem struct {
85-
TableItemID
86-
FullLoad bool
187+
To handle the tasks, `statsSyncLoad` utilizes multiple sub-workers to load statistics concurrently. This design ensures efficient processing without blocking query execution.
188+
189+
```go
190+
func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{}, exitWg *util.WaitGroupEnhancedWrapper) {
191+
defer func() {
192+
exitWg.Done()
193+
logutil.BgLogger().Info("SubLoadWorker exited.")
194+
}()
195+
var lastTask *statstypes.NeededItemTask
196+
for {
197+
task, err := s.HandleOneTask(sctx, lastTask, exit)
198+
lastTask = task
199+
if err != nil {
200+
switch err {
201+
case errExit:
202+
return
203+
default:
204+
...
205+
r := rand.Intn(500)
206+
time.Sleep(s.statsHandle.Lease()/10 + time.Duration(r)*time.Microsecond)
207+
continue
208+
}
209+
}
210+
}
87211
}
212+
```
213+
214+
From the code snippet above, we can see that the `SubLoadWorker` function is responsible for loading statistics concurrently. It processes tasks from the `NeededItemsCh` channel, loading statistics for each task. The worker continues processing until all statistics are loaded or the worker is terminated. Additionally, it incorporates a retry mechanism to handle potential errors during the loading process.
88215

89-
type StatsLoadResult struct {
90-
Item model.TableItemID
91-
Error error
216+
```go
217+
func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) {
218+
...
219+
if lastTask == nil {
220+
task, err = s.drainColTask(sctx, exit)
221+
if err != nil {
222+
if err != errExit {
223+
logutil.BgLogger().Error("Fail to drain task for stats loading.", zap.Error(err))
224+
}
225+
return task, err
226+
}
227+
} else {
228+
task = lastTask
229+
}
230+
result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID}
231+
err = s.handleOneItemTask(task)
232+
if err == nil {
233+
task.ResultCh <- result
234+
return nil, nil
235+
}
236+
if !isVaildForRetry(task) {
237+
result.Error = err
238+
task.ResultCh <- result
239+
return nil, nil
240+
}
241+
return task, err
92242
}
93243
```
244+
245+
The `HandleOneTask` function processes a single task from either the `NeededItemsCh` or `TimeoutItemsCh` channel. It loads statistics for the task and sends the result back to the ResultCh channel. If an error occurs during the loading process, the function checks if the task is eligible for a retry. If so, the task is returned for retry; otherwise, the error is sent back to the `ResultCh` channel. The default retry limit is set to 2 attempts.
246+

0 commit comments

Comments
 (0)