From a498e74cd87b74f652f25ad11387fc6f8e1c6aa1 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 3 Dec 2020 11:18:42 +0800 Subject: [PATCH 1/7] init Signed-off-by: crazycs520 --- executor/slow_query.go | 78 +++++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 0d6bf9df4ed86..193ce235a17d3 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -68,7 +68,7 @@ type slowQueryRetriever struct { func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if !e.initialized { - err := e.initialize(sctx) + err := e.initialize(ctx, sctx) if err != nil { return nil, err } @@ -95,7 +95,7 @@ func (e *slowQueryRetriever) retrieve(ctx context.Context, sctx sessionctx.Conte return retRows, nil } -func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error { +func (e *slowQueryRetriever) initialize(ctx context.Context, sctx sessionctx.Context) error { var err error var hasProcessPriv bool if pm := privilege.GetPrivilegeManager(sctx); pm != nil { @@ -112,7 +112,7 @@ func (e *slowQueryRetriever) initialize(sctx sessionctx.Context) error { e.checker.endTime = types.NewTime(types.FromGoTime(e.extractor.EndTime), mysql.TypeDatetime, types.MaxFsp) } e.initialized = true - e.files, err = e.getAllFiles(sctx, sctx.GetSessionVars().SlowQueryFile) + e.files, err = e.getAllFiles(ctx, sctx, sctx.GetSessionVars().SlowQueryFile) return err } @@ -243,12 +243,15 @@ type offset struct { length int } -func (e *slowQueryRetriever) getBatchLog(reader *bufio.Reader, offset *offset, num int) ([]string, error) { +func (e *slowQueryRetriever) getBatchLog(ctx context.Context, reader *bufio.Reader, offset *offset, num int) ([]string, error) { var line string log := make([]string, 0, num) var err error for i := 0; i < num; i++ { for { + if isCtxDone(ctx) { + return nil, ctx.Err() + } e.fileLine++ lineByte, err := getOneLine(reader) if err != nil { @@ -290,9 +293,9 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C defer close(ch) for { startTime := time.Now() - log, err := e.getBatchLog(reader, &offset, logNum) + log, err := e.getBatchLog(ctx, reader, &offset, logNum) if err != nil { - e.parsedSlowLogCh <- parsedSlowLog{nil, err} + e.sendParsedSlowLogCh(ctx, parsedSlowLog{nil, err}) break } if len(log) == 0 { @@ -306,8 +309,8 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C ch <- 1 go func() { defer wg.Done() - result, err := e.parseLog(sctx, log, start) - e.parsedSlowLogCh <- parsedSlowLog{result, err} + result, err := e.parseLog(ctx, sctx, log, start) + e.sendParsedSlowLogCh(ctx, parsedSlowLog{result, err}) <-ch }() offset.offset = e.fileLine @@ -321,6 +324,14 @@ func (e *slowQueryRetriever) parseSlowLog(ctx context.Context, sctx sessionctx.C wg.Wait() } +func (e *slowQueryRetriever) sendParsedSlowLogCh(ctx context.Context, re parsedSlowLog) { + select { + case e.parsedSlowLogCh <- re: + case <-ctx.Done(): + return + } +} + func getLineIndex(offset offset, index int) int { var fileLine int if offset.length <= index { @@ -331,7 +342,7 @@ func getLineIndex(offset offset, index int) int { return fileLine } -func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) { +func (e *slowQueryRetriever) parseLog(ctx context.Context, sctx sessionctx.Context, log []string, offset offset) (data [][]types.Datum, err error) { start := time.Now() defer func() { if r := recover(); r != nil { @@ -347,15 +358,18 @@ func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offs } }) var st *slowQueryTuple - tz := ctx.GetSessionVars().Location() + tz := sctx.GetSessionVars().Location() startFlag := false for index, line := range log { + if isCtxDone(ctx) { + return nil, ctx.Err() + } fileLine := getLineIndex(offset, index) if !startFlag && strings.HasPrefix(line, variable.SlowLogStartPrefixStr) { st = &slowQueryTuple{} valid, err := st.setFieldValue(tz, variable.SlowLogTimeStr, line[len(variable.SlowLogStartPrefixStr):], fileLine, e.checker) if err != nil { - ctx.GetSessionVars().StmtCtx.AppendWarning(err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } if valid { @@ -372,7 +386,7 @@ func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offs value := line[len(variable.SlowLogUserAndHostStr+variable.SlowLogSpaceMarkStr):] valid, err := st.setFieldValue(tz, variable.SlowLogUserAndHostStr, value, fileLine, e.checker) if err != nil { - ctx.GetSessionVars().StmtCtx.AppendWarning(err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } if !valid { @@ -381,7 +395,7 @@ func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offs } else if strings.HasPrefix(line, variable.SlowLogCopBackoffPrefix) { valid, err := st.setFieldValue(tz, variable.SlowLogBackoffDetail, line, fileLine, e.checker) if err != nil { - ctx.GetSessionVars().StmtCtx.AppendWarning(err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } if !valid { @@ -396,7 +410,7 @@ func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offs } valid, err := st.setFieldValue(tz, field, fieldValues[i+1], fileLine, e.checker) if err != nil { - ctx.GetSessionVars().StmtCtx.AppendWarning(err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } if !valid { @@ -414,7 +428,7 @@ func (e *slowQueryRetriever) parseLog(ctx sessionctx.Context, log []string, offs // Get the sql string, and mark the start flag to false. _, err := st.setFieldValue(tz, variable.SlowLogQuerySQLStr, string(hack.Slice(line)), fileLine, e.checker) if err != nil { - ctx.GetSessionVars().StmtCtx.AppendWarning(err) + sctx.GetSessionVars().StmtCtx.AppendWarning(err) continue } if e.checker.hasPrivilege(st.user) { @@ -804,7 +818,7 @@ type logFile struct { } // getAllFiles is used to get all slow-log needed to parse, it is exported for test. -func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath string) ([]logFile, error) { +func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Context, logFilePath string) ([]logFile, error) { totalFileNum := 0 if e.stats != nil { startTime := time.Now() @@ -847,6 +861,9 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st if !strings.HasPrefix(path, prefix) { return nil } + if isCtxDone(ctx) { + return ctx.Err() + } totalFileNum++ file, err := os.OpenFile(path, os.O_RDONLY, os.ModePerm) if err != nil { @@ -859,7 +876,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st } }() // Get the file start time. - fileStartTime, err := e.getFileStartTime(file) + fileStartTime, err := e.getFileStartTime(ctx, file) if err != nil { return handleErr(err) } @@ -869,7 +886,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st } // Get the file end time. - fileEndTime, err := e.getFileEndTime(file) + fileEndTime, err := e.getFileEndTime(ctx, file) if err != nil { return handleErr(err) } @@ -902,7 +919,7 @@ func (e *slowQueryRetriever) getAllFiles(sctx sessionctx.Context, logFilePath st return logFiles, err } -func (e *slowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) { +func (e *slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File) (time.Time, error) { var t time.Time _, err := file.Seek(0, io.SeekStart) if err != nil { @@ -923,6 +940,9 @@ func (e *slowQueryRetriever) getFileStartTime(file *os.File) (time.Time, error) if maxNum <= 0 { break } + if isCtxDone(ctx) { + return t, ctx.Err() + } } return t, errors.Errorf("malform slow query file %v", file.Name()) } @@ -974,7 +994,7 @@ func (s *slowQueryRuntimeStats) Tp() int { return execdetails.TpSlowQueryRuntimeStat } -func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { +func (e *slowQueryRetriever) getFileEndTime(ctx context.Context, file *os.File) (time.Time, error) { var t time.Time var tried int stat, err := file.Stat() @@ -984,7 +1004,7 @@ func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { endCursor := stat.Size() maxLineNum := 128 for { - lines, readBytes, err := readLastLines(file, endCursor) + lines, readBytes, err := readLastLines(ctx, file, endCursor) if err != nil { return t, err } @@ -1002,24 +1022,31 @@ func (e *slowQueryRetriever) getFileEndTime(file *os.File) (time.Time, error) { if tried >= maxLineNum { break } + if isCtxDone(ctx) { + return t, ctx.Err() + } } return t, errors.Errorf("invalid slow query file %v", file.Name()) } +const maxReadCacheSize = 1024 * 1024 * 64 + // Read lines from the end of a file // endCursor initial value should be the filesize -func readLastLines(file *os.File, endCursor int64) ([]string, int, error) { +func readLastLines(ctx context.Context, file *os.File, endCursor int64) ([]string, int, error) { var lines []byte var firstNonNewlinePos int var cursor = endCursor + var size int64 = 2048 for { // stop if we are at the beginning // check it in the start to avoid read beyond the size if cursor <= 0 { break } - - var size int64 = 4096 + if size < maxReadCacheSize { + size = size * 2 + } if cursor < size { size = cursor } @@ -1051,6 +1078,9 @@ func readLastLines(file *os.File, endCursor int64) ([]string, int, error) { if firstNonNewlinePos > 0 { break } + if isCtxDone(ctx) { + return nil, 0, ctx.Err() + } } finalStr := string(lines[firstNonNewlinePos:]) return strings.Split(strings.ReplaceAll(finalStr, "\r\n", "\n"), "\n"), len(finalStr), nil From 10e16221b8ca6f9cded42408d7135d003c4b53df Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 3 Dec 2020 11:40:32 +0800 Subject: [PATCH 2/7] add bench test Signed-off-by: crazycs520 --- executor/benchmark_test.go | 35 +++++++++++++++++++++++++++++++++++ executor/slow_query_test.go | 4 ++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index e95ac82b0c0cc..fcabecc32e01b 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -17,7 +17,9 @@ import ( "context" "encoding/base64" "fmt" + "io/ioutil" "math/rand" + "os" "sort" "strings" "sync" @@ -1981,3 +1983,36 @@ func BenchmarkLimitExec(b *testing.B) { }) } } + +func BenchmarkReadLastLinesOfHugeLine(b *testing.B) { + // step 1. initial a huge line log file + hugeLine := make([]byte, 1024*1024*10) + for i := range hugeLine { + hugeLine[i] = 'a' + byte(i%26) + } + fileName := "tidb.log" + err := ioutil.WriteFile(fileName, hugeLine, 0644) + if err != nil { + b.Fatal(err) + } + file, err := os.OpenFile(fileName, os.O_RDONLY, os.ModePerm) + if err != nil { + b.Fatal(err) + } + defer func() { + file.Close() + os.Remove(fileName) + }() + stat, _ := file.Stat() + filesize := stat.Size() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, n, err := readLastLines(context.Background(), file, filesize) + if err != nil { + b.Fatal(err) + } + if n != len(hugeLine) { + b.Fatalf("len %v, expected: %v", n, len(hugeLine)) + } + } +} diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index a22142774cec4..88b36b967a6fb 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -48,7 +48,7 @@ func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bu func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader, logNum int) ([][]types.Datum, error) { retriever := &slowQueryRetriever{} // Ignore the error is ok for test. - terror.Log(retriever.initialize(sctx)) + terror.Log(retriever.initialize(context.Background(), sctx)) rows, err := parseLog(retriever, sctx, reader, logNum) return rows, err } @@ -450,7 +450,7 @@ select 7;` } retriever := &slowQueryRetriever{extractor: extractor} - err := retriever.initialize(sctx) + err := retriever.initialize(context.Background(), sctx) c.Assert(err, IsNil) comment := Commentf("case id: %v", i) c.Assert(retriever.files, HasLen, len(cas.files), comment) From 64d24f65331207728d68b6529d5433639febd392 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 3 Dec 2020 16:18:53 +0800 Subject: [PATCH 3/7] add cache init Signed-off-by: crazycs520 --- domain/domain.go | 6 ++ executor/slow_query.go | 32 ++++++++- util/logutil/cache.go | 133 +++++++++++++++++++++++++++++++++++++ util/logutil/cache_test.go | 12 ++++ 4 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 util/logutil/cache.go create mode 100644 util/logutil/cache_test.go diff --git a/domain/domain.go b/domain/domain.go index d944d26021366..dee5d276376f7 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -85,6 +85,7 @@ type Domain struct { statsUpdating sync2.AtomicInt32 cancel context.CancelFunc indexUsageSyncLease time.Duration + logFileMetaCache *logutil.LogFileMetaCache serverID uint64 serverIDSession *concurrency.Session @@ -679,6 +680,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio infoHandle: infoschema.NewHandle(store), slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), indexUsageSyncLease: idxUsageSyncLease, + logFileMetaCache: logutil.NewLogFileMetaCache(), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) @@ -1310,6 +1312,10 @@ func (do *Domain) IsLostConnectionToPD() bool { return do.isLostConnectionToPD.Get() != 0 } +func (do *Domain) GetLogFileMetaCache() *logutil.LogFileMetaCache { + return do.logFileMetaCache +} + const ( serverIDEtcdPath = "/tidb/server_id" refreshServerIDRetryCnt = 3 diff --git a/executor/slow_query.go b/executor/slow_query.go index 193ce235a17d3..1bc3d5dc5f22c 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege" @@ -853,6 +854,11 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co if err != nil { return nil, err } + dom := domain.GetDomain(sctx) + var cache *logutil.LogFileMetaCache + if dom != nil { + cache = dom.GetLogFileMetaCache() + } walkFn := func(path string, info os.FileInfo) error { if info.IsDir() { return nil @@ -875,18 +881,40 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co terror.Log(file.Close()) } }() + stat, err := file.Stat() + if err != nil { + return handleErr(err) + } + var meta *logutil.LogFileMeta + if cache != nil { + meta = cache.GetFileMata(stat) + } + if meta == nil { + meta = logutil.NewLogFileMeta(logutil.FileTypeSlowLog, stat) + if cache != nil { + defer cache.AddFileMataToCache(stat, meta) + } + } + if meta.Type != logutil.FileTypeSlowLog { + return nil + } // Get the file start time. - fileStartTime, err := e.getFileStartTime(ctx, file) + fileStartTime, err := meta.GetStartTime(stat, func() (time.Time, error) { + return e.getFileStartTime(ctx, file) + }) if err != nil { return handleErr(err) } + start := types.NewTime(types.FromGoTime(fileStartTime), mysql.TypeDatetime, types.MaxFsp) if start.Compare(e.checker.endTime) > 0 { return nil } // Get the file end time. - fileEndTime, err := e.getFileEndTime(ctx, file) + fileEndTime, err := meta.GetEndTime(stat, func() (time.Time, error) { + return e.getFileEndTime(ctx, file) + }) if err != nil { return handleErr(err) } diff --git a/util/logutil/cache.go b/util/logutil/cache.go new file mode 100644 index 0000000000000..5a6f9ef3bfb24 --- /dev/null +++ b/util/logutil/cache.go @@ -0,0 +1,133 @@ +package logutil + +import ( + "fmt" + "os" + "sync" + "time" +) + +var ( + defaultLoLogFileMetaCacheCapacity = 10000 + zeroTime = time.Time{} +) + +type LogFileMetaCache struct { + mu sync.RWMutex + cache map[string]*LogFileMeta + Capacity int +} + +func NewLogFileMetaCache() *LogFileMetaCache { + return &LogFileMetaCache{ + cache: make(map[string]*LogFileMeta), + Capacity: defaultLoLogFileMetaCacheCapacity, + } +} + +func (c *LogFileMetaCache) GetFileMata(stat os.FileInfo) *LogFileMeta { + if stat == nil { + return nil + } + c.mu.RLock() + m, ok := c.cache[stat.Name()] + c.mu.RUnlock() + if !ok { + return nil + } + if m.checkFileNotModified(stat) { + return m + } + return nil +} + +func (c *LogFileMetaCache) AddFileMataToCache(stat os.FileInfo, meta *LogFileMeta) { + if stat == nil || meta == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + // TODO: Use LRU ? + if len(c.cache) < c.Capacity { + c.cache[stat.Name()] = meta + } +} + +type FileType = int + +const ( + FileTypeUnknown FileType = 0 + FileTypeLog FileType = 1 + FileTypeSlowLog FileType = 2 +) + +type LogFileMeta struct { + mu sync.Mutex + Type FileType + ModTime time.Time + startTime time.Time + endTime time.Time +} + +func NewLogFileMeta(fileType FileType, info os.FileInfo) *LogFileMeta { + return &LogFileMeta{ + Type: fileType, + ModTime: info.ModTime(), + } +} + +func (l *LogFileMeta) SetEndTime(end time.Time) { + l.endTime = end +} + +func (l *LogFileMeta) GetStartTime(stat os.FileInfo, getStartTime func() (time.Time, error)) (time.Time, error) { + if stat == nil { + return zeroTime, fmt.Errorf("file stat can't be nil") + } + l.mu.Lock() + defer l.mu.Unlock() + t := l.startTime + if l.checkLogTimeValid(t) && l.checkFileNotModified(stat) { + return t, nil + } + if getStartTime == nil { + return t, fmt.Errorf("can't get file '%v' start time", stat.Name()) + } + t, err := getStartTime() + if err != nil { + return t, err + } + l.startTime = t + return t, nil +} + +func (l *LogFileMeta) GetEndTime(stat os.FileInfo, getEndTime func() (time.Time, error)) (time.Time, error) { + if stat == nil { + return zeroTime, fmt.Errorf("file stat can't be nil") + } + l.mu.Lock() + defer l.mu.Unlock() + t := l.endTime + if l.checkLogTimeValid(t) && l.checkFileNotModified(stat) { + return t, nil + } + if getEndTime == nil { + return t, fmt.Errorf("can't get file '%v' end time", stat.Name()) + } + t, err := getEndTime() + if err != nil { + return t, err + } + l.endTime = t + return t, nil +} + +// checkLogTimeValid returns true if t != zeroTime. +func (l *LogFileMeta) checkLogTimeValid(t time.Time) bool { + return !t.Equal(zeroTime) +} + +// checkFileNotModified returns true if the file hasn't been modified. +func (l *LogFileMeta) checkFileNotModified(info os.FileInfo) bool { + return l.ModTime.Equal(info.ModTime()) +} diff --git a/util/logutil/cache_test.go b/util/logutil/cache_test.go new file mode 100644 index 0000000000000..cbec6cedc3308 --- /dev/null +++ b/util/logutil/cache_test.go @@ -0,0 +1,12 @@ +package logutil_test + +import ( + . "github.com/pingcap/check" +) + +var _ = Suite(&testCacheSuite{}) + +type testCacheSuite struct{} + +func (s *testCacheSuite) TestStringToLogLevel(c *C) { +} From 2be741735b86cc5bb6ace4b212cddfbd0c58d50c Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 3 Dec 2020 19:46:32 +0800 Subject: [PATCH 4/7] add invalid flag Signed-off-by: crazycs520 --- executor/slow_query.go | 25 +++++++++---------- util/logutil/cache.go | 54 ++++++++++++++++++++++++++---------------- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/executor/slow_query.go b/executor/slow_query.go index 1bc3d5dc5f22c..7154435a3741a 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -858,6 +858,8 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co var cache *logutil.LogFileMetaCache if dom != nil { cache = dom.GetLogFileMetaCache() + } else { + cache = logutil.NewLogFileMetaCache() } walkFn := func(path string, info os.FileInfo) error { if info.IsDir() { @@ -885,19 +887,15 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co if err != nil { return handleErr(err) } - var meta *logutil.LogFileMeta - if cache != nil { - meta = cache.GetFileMata(stat) - } + meta := cache.GetFileMata(stat) if meta == nil { - meta = logutil.NewLogFileMeta(logutil.FileTypeSlowLog, stat) - if cache != nil { - defer cache.AddFileMataToCache(stat, meta) + meta = logutil.NewLogFileMeta(stat) + defer cache.AddFileMataToCache(stat, meta) + } else { + if meta.CheckFileNotModified(stat) && meta.IsInValid() { + return nil } } - if meta.Type != logutil.FileTypeSlowLog { - return nil - } // Get the file start time. fileStartTime, err := meta.GetStartTime(stat, func() (time.Time, error) { return e.getFileStartTime(ctx, file) @@ -958,6 +956,9 @@ func (e *slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File for { lineByte, err := getOneLine(reader) if err != nil { + if err == io.EOF { + return t, logutil.InvalidLogFile + } return t, err } line := string(lineByte) @@ -972,7 +973,7 @@ func (e *slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File return t, ctx.Err() } } - return t, errors.Errorf("malform slow query file %v", file.Name()) + return t, logutil.InvalidLogFile } func (e *slowQueryRetriever) getRuntimeStats() execdetails.RuntimeStats { @@ -1054,7 +1055,7 @@ func (e *slowQueryRetriever) getFileEndTime(ctx context.Context, file *os.File) return t, ctx.Err() } } - return t, errors.Errorf("invalid slow query file %v", file.Name()) + return t, logutil.InvalidLogFile } const maxReadCacheSize = 1024 * 1024 * 64 diff --git a/util/logutil/cache.go b/util/logutil/cache.go index 5a6f9ef3bfb24..b8428dda9dfbf 100644 --- a/util/logutil/cache.go +++ b/util/logutil/cache.go @@ -1,6 +1,7 @@ package logutil import ( + "errors" "fmt" "os" "sync" @@ -12,16 +13,19 @@ var ( zeroTime = time.Time{} ) +// InvalidLogFile indicates the log file format is invalid. +var InvalidLogFile = errors.New("invalid format of log file") + type LogFileMetaCache struct { mu sync.RWMutex cache map[string]*LogFileMeta - Capacity int + capacity int } func NewLogFileMetaCache() *LogFileMetaCache { return &LogFileMetaCache{ cache: make(map[string]*LogFileMeta), - Capacity: defaultLoLogFileMetaCacheCapacity, + capacity: defaultLoLogFileMetaCacheCapacity, } } @@ -35,7 +39,7 @@ func (c *LogFileMetaCache) GetFileMata(stat os.FileInfo) *LogFileMeta { if !ok { return nil } - if m.checkFileNotModified(stat) { + if m.CheckFileNotModified(stat) { return m } return nil @@ -48,30 +52,21 @@ func (c *LogFileMetaCache) AddFileMataToCache(stat os.FileInfo, meta *LogFileMet c.mu.Lock() defer c.mu.Unlock() // TODO: Use LRU ? - if len(c.cache) < c.Capacity { + if len(c.cache) < c.capacity { c.cache[stat.Name()] = meta } } -type FileType = int - -const ( - FileTypeUnknown FileType = 0 - FileTypeLog FileType = 1 - FileTypeSlowLog FileType = 2 -) - type LogFileMeta struct { + inValid bool mu sync.Mutex - Type FileType ModTime time.Time startTime time.Time endTime time.Time } -func NewLogFileMeta(fileType FileType, info os.FileInfo) *LogFileMeta { +func NewLogFileMeta(info os.FileInfo) *LogFileMeta { return &LogFileMeta{ - Type: fileType, ModTime: info.ModTime(), } } @@ -87,7 +82,7 @@ func (l *LogFileMeta) GetStartTime(stat os.FileInfo, getStartTime func() (time.T l.mu.Lock() defer l.mu.Unlock() t := l.startTime - if l.checkLogTimeValid(t) && l.checkFileNotModified(stat) { + if l.CheckLogTimeValid(t) && l.CheckFileNotModified(stat) { return t, nil } if getStartTime == nil { @@ -95,8 +90,13 @@ func (l *LogFileMeta) GetStartTime(stat os.FileInfo, getStartTime func() (time.T } t, err := getStartTime() if err != nil { + if err == InvalidLogFile { + l.inValid = true + } return t, err } + l.inValid = false + l.ModTime = stat.ModTime() l.startTime = t return t, nil } @@ -108,7 +108,7 @@ func (l *LogFileMeta) GetEndTime(stat os.FileInfo, getEndTime func() (time.Time, l.mu.Lock() defer l.mu.Unlock() t := l.endTime - if l.checkLogTimeValid(t) && l.checkFileNotModified(stat) { + if l.CheckLogTimeValid(t) && l.CheckFileNotModified(stat) { return t, nil } if getEndTime == nil { @@ -116,18 +116,30 @@ func (l *LogFileMeta) GetEndTime(stat os.FileInfo, getEndTime func() (time.Time, } t, err := getEndTime() if err != nil { + if err == InvalidLogFile { + l.inValid = true + } return t, err } + l.inValid = false + l.ModTime = stat.ModTime() l.endTime = t return t, nil } -// checkLogTimeValid returns true if t != zeroTime. -func (l *LogFileMeta) checkLogTimeValid(t time.Time) bool { +func (l *LogFileMeta) IsInValid() bool { + l.mu.Lock() + invalid := l.inValid + l.mu.Unlock() + return invalid +} + +// CheckLogTimeValid returns true if t != zeroTime. +func (l *LogFileMeta) CheckLogTimeValid(t time.Time) bool { return !t.Equal(zeroTime) } -// checkFileNotModified returns true if the file hasn't been modified. -func (l *LogFileMeta) checkFileNotModified(info os.FileInfo) bool { +// CheckFileNotModified returns true if the file hasn't been modified. +func (l *LogFileMeta) CheckFileNotModified(info os.FileInfo) bool { return l.ModTime.Equal(info.ModTime()) } From 7bcbb477b87812267a61e71fadb827c666a80e45 Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 3 Dec 2020 20:23:39 +0800 Subject: [PATCH 5/7] update Signed-off-by: crazycs520 --- util/logutil/cache.go | 30 +++++++++++++++++++----------- util/logutil/cache_test.go | 12 ------------ 2 files changed, 19 insertions(+), 23 deletions(-) delete mode 100644 util/logutil/cache_test.go diff --git a/util/logutil/cache.go b/util/logutil/cache.go index b8428dda9dfbf..bfd1749950410 100644 --- a/util/logutil/cache.go +++ b/util/logutil/cache.go @@ -34,15 +34,9 @@ func (c *LogFileMetaCache) GetFileMata(stat os.FileInfo) *LogFileMeta { return nil } c.mu.RLock() - m, ok := c.cache[stat.Name()] + m := c.cache[stat.Name()] c.mu.RUnlock() - if !ok { - return nil - } - if m.CheckFileNotModified(stat) { - return m - } - return nil + return m } func (c *LogFileMetaCache) AddFileMataToCache(stat os.FileInfo, meta *LogFileMeta) { @@ -51,12 +45,26 @@ func (c *LogFileMetaCache) AddFileMataToCache(stat os.FileInfo, meta *LogFileMet } c.mu.Lock() defer c.mu.Unlock() - // TODO: Use LRU ? - if len(c.cache) < c.capacity { - c.cache[stat.Name()] = meta + + name := stat.Name() + _, ok := c.cache[name] + if ok { + c.cache[name] = meta + } else { + // TODO: Use LRU ? + if len(c.cache) < c.capacity { + c.cache[name] = meta + } } } +func (c *LogFileMetaCache) Len() int { + c.mu.RLock() + l := len(c.cache) + c.mu.RUnlock() + return l +} + type LogFileMeta struct { inValid bool mu sync.Mutex diff --git a/util/logutil/cache_test.go b/util/logutil/cache_test.go deleted file mode 100644 index cbec6cedc3308..0000000000000 --- a/util/logutil/cache_test.go +++ /dev/null @@ -1,12 +0,0 @@ -package logutil_test - -import ( - . "github.com/pingcap/check" -) - -var _ = Suite(&testCacheSuite{}) - -type testCacheSuite struct{} - -func (s *testCacheSuite) TestStringToLogLevel(c *C) { -} From 07c74dcb3073bdcf7ed530178bef5c6d0c315c5a Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Thu, 3 Dec 2020 21:27:39 +0800 Subject: [PATCH 6/7] update pkg Signed-off-by: crazycs520 --- domain/domain.go | 7 +- executor/slow_query.go | 19 ++--- go.mod | 2 + go.sum | 4 ++ util/logutil/cache.go | 153 ----------------------------------------- 5 files changed, 20 insertions(+), 165 deletions(-) delete mode 100644 util/logutil/cache.go diff --git a/domain/domain.go b/domain/domain.go index dee5d276376f7..a986d0679168b 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/sysutil/cache" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/ddl" @@ -85,7 +86,7 @@ type Domain struct { statsUpdating sync2.AtomicInt32 cancel context.CancelFunc indexUsageSyncLease time.Duration - logFileMetaCache *logutil.LogFileMetaCache + logFileMetaCache *cache.LogFileMetaCache serverID uint64 serverIDSession *concurrency.Session @@ -680,7 +681,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio infoHandle: infoschema.NewHandle(store), slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), indexUsageSyncLease: idxUsageSyncLease, - logFileMetaCache: logutil.NewLogFileMetaCache(), + logFileMetaCache: cache.NewLogFileMetaCache(), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) @@ -1312,7 +1313,7 @@ func (do *Domain) IsLostConnectionToPD() bool { return do.isLostConnectionToPD.Get() != 0 } -func (do *Domain) GetLogFileMetaCache() *logutil.LogFileMetaCache { +func (do *Domain) GetLogFileMetaCache() *cache.LogFileMetaCache { return do.logFileMetaCache } diff --git a/executor/slow_query.go b/executor/slow_query.go index 7154435a3741a..a5245e3756387 100755 --- a/executor/slow_query.go +++ b/executor/slow_query.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" + "github.com/pingcap/sysutil/cache" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/infoschema" plannercore "github.com/pingcap/tidb/planner/core" @@ -855,11 +856,11 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co return nil, err } dom := domain.GetDomain(sctx) - var cache *logutil.LogFileMetaCache + var ca *cache.LogFileMetaCache if dom != nil { - cache = dom.GetLogFileMetaCache() + ca = dom.GetLogFileMetaCache() } else { - cache = logutil.NewLogFileMetaCache() + ca = cache.NewLogFileMetaCache() } walkFn := func(path string, info os.FileInfo) error { if info.IsDir() { @@ -887,10 +888,10 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co if err != nil { return handleErr(err) } - meta := cache.GetFileMata(stat) + meta := ca.GetFileMata(stat) if meta == nil { - meta = logutil.NewLogFileMeta(stat) - defer cache.AddFileMataToCache(stat, meta) + meta = cache.NewLogFileMeta(stat) + defer ca.AddFileMataToCache(stat, meta) } else { if meta.CheckFileNotModified(stat) && meta.IsInValid() { return nil @@ -957,7 +958,7 @@ func (e *slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File lineByte, err := getOneLine(reader) if err != nil { if err == io.EOF { - return t, logutil.InvalidLogFile + return t, cache.InvalidLogFile } return t, err } @@ -973,7 +974,7 @@ func (e *slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File return t, ctx.Err() } } - return t, logutil.InvalidLogFile + return t, cache.InvalidLogFile } func (e *slowQueryRetriever) getRuntimeStats() execdetails.RuntimeStats { @@ -1055,7 +1056,7 @@ func (e *slowQueryRetriever) getFileEndTime(ctx context.Context, file *os.File) return t, ctx.Err() } } - return t, logutil.InvalidLogFile + return t, cache.InvalidLogFile } const maxReadCacheSize = 1024 * 1024 * 64 diff --git a/go.mod b/go.mod index 22b182f8b1d66..bd59cfa896060 100644 --- a/go.mod +++ b/go.mod @@ -89,3 +89,5 @@ require ( ) go 1.13 + +replace github.com/pingcap/sysutil => github.com/crazycs520/sysutil v0.0.0-20201207085517-9dd18ed31826 diff --git a/go.sum b/go.sum index e91086cb9da87..43028c7d8126c 100644 --- a/go.sum +++ b/go.sum @@ -196,6 +196,10 @@ github.com/couchbase/vellum v1.0.1/go.mod h1:FcwrEivFpNi24R3jLOs3n+fs5RnuQnQqCLB github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/crazycs520/sysutil v0.0.0-20201203131959-c22c2028beb7 h1:6uU9EvYWskrDjxuS+x/qyiBvOz7E3If9gH/vqtUki7I= +github.com/crazycs520/sysutil v0.0.0-20201203131959-c22c2028beb7/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= +github.com/crazycs520/sysutil v0.0.0-20201207085517-9dd18ed31826 h1:t3zIDqYADdl/S4dtKEMCZMZ2GN131/qWjMEiXwhHQ7g= +github.com/crazycs520/sysutil v0.0.0-20201207085517-9dd18ed31826/go.mod h1:EB/852NMQ+aRKioCpToQ94Wl7fktV+FNnxf3CX/TTXI= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creasty/defaults v1.3.0/go.mod h1:CIEEvs7oIVZm30R8VxtFJs+4k201gReYyuYHJxZc68I= diff --git a/util/logutil/cache.go b/util/logutil/cache.go deleted file mode 100644 index bfd1749950410..0000000000000 --- a/util/logutil/cache.go +++ /dev/null @@ -1,153 +0,0 @@ -package logutil - -import ( - "errors" - "fmt" - "os" - "sync" - "time" -) - -var ( - defaultLoLogFileMetaCacheCapacity = 10000 - zeroTime = time.Time{} -) - -// InvalidLogFile indicates the log file format is invalid. -var InvalidLogFile = errors.New("invalid format of log file") - -type LogFileMetaCache struct { - mu sync.RWMutex - cache map[string]*LogFileMeta - capacity int -} - -func NewLogFileMetaCache() *LogFileMetaCache { - return &LogFileMetaCache{ - cache: make(map[string]*LogFileMeta), - capacity: defaultLoLogFileMetaCacheCapacity, - } -} - -func (c *LogFileMetaCache) GetFileMata(stat os.FileInfo) *LogFileMeta { - if stat == nil { - return nil - } - c.mu.RLock() - m := c.cache[stat.Name()] - c.mu.RUnlock() - return m -} - -func (c *LogFileMetaCache) AddFileMataToCache(stat os.FileInfo, meta *LogFileMeta) { - if stat == nil || meta == nil { - return - } - c.mu.Lock() - defer c.mu.Unlock() - - name := stat.Name() - _, ok := c.cache[name] - if ok { - c.cache[name] = meta - } else { - // TODO: Use LRU ? - if len(c.cache) < c.capacity { - c.cache[name] = meta - } - } -} - -func (c *LogFileMetaCache) Len() int { - c.mu.RLock() - l := len(c.cache) - c.mu.RUnlock() - return l -} - -type LogFileMeta struct { - inValid bool - mu sync.Mutex - ModTime time.Time - startTime time.Time - endTime time.Time -} - -func NewLogFileMeta(info os.FileInfo) *LogFileMeta { - return &LogFileMeta{ - ModTime: info.ModTime(), - } -} - -func (l *LogFileMeta) SetEndTime(end time.Time) { - l.endTime = end -} - -func (l *LogFileMeta) GetStartTime(stat os.FileInfo, getStartTime func() (time.Time, error)) (time.Time, error) { - if stat == nil { - return zeroTime, fmt.Errorf("file stat can't be nil") - } - l.mu.Lock() - defer l.mu.Unlock() - t := l.startTime - if l.CheckLogTimeValid(t) && l.CheckFileNotModified(stat) { - return t, nil - } - if getStartTime == nil { - return t, fmt.Errorf("can't get file '%v' start time", stat.Name()) - } - t, err := getStartTime() - if err != nil { - if err == InvalidLogFile { - l.inValid = true - } - return t, err - } - l.inValid = false - l.ModTime = stat.ModTime() - l.startTime = t - return t, nil -} - -func (l *LogFileMeta) GetEndTime(stat os.FileInfo, getEndTime func() (time.Time, error)) (time.Time, error) { - if stat == nil { - return zeroTime, fmt.Errorf("file stat can't be nil") - } - l.mu.Lock() - defer l.mu.Unlock() - t := l.endTime - if l.CheckLogTimeValid(t) && l.CheckFileNotModified(stat) { - return t, nil - } - if getEndTime == nil { - return t, fmt.Errorf("can't get file '%v' end time", stat.Name()) - } - t, err := getEndTime() - if err != nil { - if err == InvalidLogFile { - l.inValid = true - } - return t, err - } - l.inValid = false - l.ModTime = stat.ModTime() - l.endTime = t - return t, nil -} - -func (l *LogFileMeta) IsInValid() bool { - l.mu.Lock() - invalid := l.inValid - l.mu.Unlock() - return invalid -} - -// CheckLogTimeValid returns true if t != zeroTime. -func (l *LogFileMeta) CheckLogTimeValid(t time.Time) bool { - return !t.Equal(zeroTime) -} - -// CheckFileNotModified returns true if the file hasn't been modified. -func (l *LogFileMeta) CheckFileNotModified(info os.FileInfo) bool { - return l.ModTime.Equal(info.ModTime()) -} From 858da4d61565812caa0932b1f505e6f3b3ceb9be Mon Sep 17 00:00:00 2001 From: crazycs520 Date: Mon, 7 Dec 2020 19:29:28 +0800 Subject: [PATCH 7/7] make ci happy Signed-off-by: crazycs520 --- domain/domain.go | 1 + 1 file changed, 1 insertion(+) diff --git a/domain/domain.go b/domain/domain.go index 754959ed06af1..d11c33c9d3253 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -1314,6 +1314,7 @@ func (do *Domain) IsLostConnectionToPD() bool { return do.isLostConnectionToPD.Get() != 0 } +// GetLogFileMetaCache gets slow-log file meta cache. func (do *Domain) GetLogFileMetaCache() *cache.LogFileMetaCache { return do.logFileMetaCache }