From ab118aa6169df2af0e6fa6dbb4bbf54429e1267e Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Sun, 28 Jul 2024 14:27:36 -0500 Subject: [PATCH 1/7] fix!: simplify the tracer by removing read functionality and guaranteeing only full writes occur --- config/config.go | 7 +- config/toml.go | 7 +- pkg/trace/buffered_file.go | 101 ---------- pkg/trace/cached_file.go | 114 ++++++++++++ pkg/trace/cahced_file_test.go | 86 +++++++++ pkg/trace/decoder.go | 4 +- pkg/trace/doc.go | 2 - pkg/trace/fileserver.go | 331 --------------------------------- pkg/trace/local_tracer.go | 126 ++----------- pkg/trace/local_tracer_test.go | 183 ------------------ 10 files changed, 224 insertions(+), 737 deletions(-) delete mode 100644 pkg/trace/buffered_file.go create mode 100644 pkg/trace/cached_file.go create mode 100644 pkg/trace/cahced_file_test.go delete mode 100644 pkg/trace/doc.go delete mode 100644 pkg/trace/fileserver.go delete mode 100644 pkg/trace/local_tracer_test.go diff --git a/config/config.go b/config/config.go index d5e0312551..e1399f89c3 100644 --- a/config/config.go +++ b/config/config.go @@ -1194,18 +1194,19 @@ type InstrumentationConfig struct { // Instrumentation namespace. Namespace string `mapstructure:"namespace"` - // TracePushConfig is the relative path of the push config. This second + // Deprecated: TracePushConfig is the relative path of the push config. This second // config contains credentials for where and how often to. TracePushConfig string `mapstructure:"trace_push_config"` - // TracePullAddress is the address that the trace server will listen on for + // Deprecated: TracePullAddress is the address that the trace server will listen on for // pulling data. TracePullAddress string `mapstructure:"trace_pull_address"` // TraceType is the type of tracer used. Options are "local" and "noop". TraceType string `mapstructure:"trace_type"` - // TraceBufferSize is the number of traces to write in a single batch. + // TraceBufferSize is the size of the buffer in number of events that will + // be kept before dropping events of a single type. TraceBufferSize int `mapstructure:"trace_push_batch_size"` // TracingTables is the list of tables that will be traced. See the diff --git a/config/toml.go b/config/toml.go index 11d69130d1..96cc7201b2 100644 --- a/config/toml.go +++ b/config/toml.go @@ -556,16 +556,21 @@ namespace = "{{ .Instrumentation.Namespace }}" # This second config contains credentials for where and how often to # push trace data to. For example, if the config is next to this config, # it would be "push_config.json". +# +# WARNING: deprecated trace_push_config = "{{ .Instrumentation.TracePushConfig }}" # The tracer pull address specifies which address will be used for pull based # event collection. If empty, the pull based server will not be started. +# +# WARNING: deprecated trace_pull_address = "{{ .Instrumentation.TracePullAddress }}" # The tracer to use for collecting trace data. trace_type = "{{ .Instrumentation.TraceType }}" -# The size of the batches that are sent to the database. +# The number of events for each type that will buffered before writing. +# If this buffer is reached events will be dropped to avoid blocking. trace_push_batch_size = {{ .Instrumentation.TraceBufferSize }} # The list of tables that are updated when tracing. All available tables and diff --git a/pkg/trace/buffered_file.go b/pkg/trace/buffered_file.go deleted file mode 100644 index 9b228e3f9e..0000000000 --- a/pkg/trace/buffered_file.go +++ /dev/null @@ -1,101 +0,0 @@ -package trace - -import ( - "bufio" - "errors" - "io" - "os" - "sync" - "sync/atomic" -) - -// bufferedFile is a file that is being written to and read from. It is thread -// safe, however, when reading from the file, writes will be ignored. -type bufferedFile struct { - // reading protects the file from being written to while it is being read - // from. This is needed beyond in addition to the mutex so that writes can - // be ignored while reading. - reading atomic.Bool - - // mut protects the buffered writer. - mut *sync.Mutex - - // file is the file that is being written to. - file *os.File - - // writer is the buffered writer that is writing to the file. - wr *bufio.Writer -} - -// newbufferedFile creates a new buffered file that writes to the given file. -func newbufferedFile(file *os.File) *bufferedFile { - return &bufferedFile{ - file: file, - wr: bufio.NewWriter(file), - reading: atomic.Bool{}, - mut: &sync.Mutex{}, - } -} - -// Write writes the given bytes to the file. If the file is currently being read -// from, the write will be lost. -func (f *bufferedFile) Write(b []byte) (int, error) { - if f.reading.Load() { - return 0, nil - } - f.mut.Lock() - defer f.mut.Unlock() - return f.wr.Write(b) -} - -func (f *bufferedFile) startReading() error { - f.reading.Store(true) - f.mut.Lock() - defer f.mut.Unlock() - - err := f.wr.Flush() - if err != nil { - f.reading.Store(false) - return err - } - - _, err = f.file.Seek(0, io.SeekStart) - if err != nil { - f.reading.Store(false) - return err - } - - return nil -} - -func (f *bufferedFile) stopReading() error { - f.mut.Lock() - defer f.mut.Unlock() - _, err := f.file.Seek(0, io.SeekEnd) - f.reading.Store(false) - return err -} - -// File returns the underlying file with the seek point reset. The caller should -// not close the file. The caller must call the returned function when they are -// done reading from the file. This function resets the seek point to where it -// was being written to. -func (f *bufferedFile) File() (*os.File, func() error, error) { - if f.reading.Load() { - return nil, func() error { return nil }, errors.New("file is currently being read from") - } - err := f.startReading() - if err != nil { - return nil, func() error { return nil }, err - } - return f.file, f.stopReading, nil -} - -// Close closes the file. -func (f *bufferedFile) Close() error { - // set reading to true to prevent writes while closing the file. - f.mut.Lock() - defer f.mut.Unlock() - f.reading.Store(true) - return f.file.Close() -} diff --git a/pkg/trace/cached_file.go b/pkg/trace/cached_file.go new file mode 100644 index 0000000000..ad5664bd0f --- /dev/null +++ b/pkg/trace/cached_file.go @@ -0,0 +1,114 @@ +package trace + +import ( + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/tendermint/tendermint/libs/log" +) + +// cachedFile wraps the os.File with a channel based cache that ensures only +// complete data is written to the file. Data is serialized to JSON before being +// written. The cache is flushed when the chunk size is reached. WARNING: Errors +// are only logged and if the cache is filled writes are ignored! +type cachedFile struct { + wg *sync.WaitGroup + cache chan Event[Entry] + file *os.File + chunkSize int + logger log.Logger +} + +// newcachedFile creates a cachedFile which wraps a normal file to ensure that +// only complete data is ever written. cacheSize is the number of events that +// will be cached and chunkSize is the number of events that will trigger a +// write. cacheSize needs to be sufficiently larger (10x to be safe) than chunkSize in order to +// avoid blocking. +func newCachedFile(file *os.File, logger log.Logger, cacheSize int, chunkSize int) *cachedFile { + cf := &cachedFile{ + file: file, + cache: make(chan Event[Entry], cacheSize), + chunkSize: chunkSize, + logger: logger, + wg: &sync.WaitGroup{}, + } + cf.wg.Add(1) + go cf.startFlushing() + return cf +} + +// Cache caches the given bytes to be written to the file. +func (f *cachedFile) Cache(b Event[Entry]) { + select { + case f.cache <- b: + default: + f.logger.Error(fmt.Sprintf("tracing cache full, dropping event: %T", b)) + } +} + +// startFlushing reads from the cache, serializes the event, and writes to the +// file. +func (f *cachedFile) startFlushing() { + buffer := make([][]byte, 0, f.chunkSize) + defer f.wg.Done() + + for { + b, ok := <-f.cache + if !ok { + // Channel closed, flush remaining data and exit + if len(buffer) > 0 { + _, err := f.flush(buffer) + if err != nil { + f.logger.Error("failure to flush remaining events", "error", err) + } + } + return + } + + bz, err := json.Marshal(b) + if err != nil { + f.logger.Error("failed to marshal event", "err", err) + close(f.cache) + return + } + + // format the file to jsonl + bz = append(bz, '\n') + + buffer = append(buffer, bz) + if len(buffer) >= f.chunkSize { + _, err := f.flush(buffer) + if err != nil { + f.logger.Error("tracer failed to write buffered files to file", "error", err) + } + buffer = buffer[:0] // reset buffer + } + } +} + +// flush writes the given bytes to the file. +func (f *cachedFile) flush(buffer [][]byte) (int, error) { + total := 0 + for _, b := range buffer { + i, err := f.file.Write(b) + if err != nil { + return total, err + } + total += i + } + return total, f.file.Sync() +} + +// Close closes the file. +func (f *cachedFile) Close() error { + // set reading to true to prevent writes while closing the file. + close(f.cache) + f.wg.Wait() + err := f.file.Sync() + if err != nil { + return err + } + return f.file.Close() +} diff --git a/pkg/trace/cahced_file_test.go b/pkg/trace/cahced_file_test.go new file mode 100644 index 0000000000..2102a97cb3 --- /dev/null +++ b/pkg/trace/cahced_file_test.go @@ -0,0 +1,86 @@ +package trace + +import ( + "io" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + cmtlog "github.com/tendermint/tendermint/libs/log" +) + +func Test_cachedFile(t *testing.T) { + type test struct { + name string + cacheSize int + events int + close bool + readEventsFunc func([]Event[TestEntry]) bool + } + tests := []test{ + {"don't exceed write threshold", 10, 5, false, func(e []Event[TestEntry]) bool { return len(e) == 0 }}, + {"close writes all cached events", 10, 5, true, func(e []Event[TestEntry]) bool { return len(e) == 5 }}, + {"exceed write threshold", 10, 10, true, func(e []Event[TestEntry]) bool { return len(e) == 10 }}, + {"doesn't block when buffer is full", 10, 100, true, func(e []Event[TestEntry]) bool { + return len(e) < 1000 && len(e) > 0 + }, + }, + } + + logger := cmtlog.TestingLogger() + for _, tt := range tests { + t.Run( + tt.name, func(t *testing.T) { + tmp := t.TempDir() + fdir := filepath.Join(tmp, "test.jsonl") + f, err := os.OpenFile(fdir, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0777) + require.NoError(t, err) + + cf := newCachedFile(f, logger, tt.cacheSize, 10) + + events := generateEvents(tt.events) + + for _, event := range events { + cf.Cache(event) + } + + if tt.close { + err = cf.Close() + require.NoError(t, err) + } + + file, err := os.OpenFile(fdir, os.O_RDONLY, 0777) + require.NoError(t, err) + + _, err = file.Seek(0, io.SeekStart) + require.NoError(t, err) + + entries, err := DecodeFile[TestEntry](file) + require.NoError(t, err) + + require.True(t, tt.readEventsFunc(entries)) + }, + ) + } +} + +var _ Entry = &TestEntry{} + +type TestEntry struct { + table string +} + +func (te *TestEntry) Table() string { + return te.table +} + +func generateEvents(count int) []Event[Entry] { + events := make([]Event[Entry], 0, count) + for i := 0; i < count; i++ { + var entry Entry + entry = &TestEntry{"test"} + events = append(events, NewEvent("test", "test", "test", entry)) + } + return events +} diff --git a/pkg/trace/decoder.go b/pkg/trace/decoder.go index abf24f4006..98ac21879e 100644 --- a/pkg/trace/decoder.go +++ b/pkg/trace/decoder.go @@ -15,7 +15,7 @@ func DecodeFile[T any](f *os.File) ([]Event[T], error) { var out []Event[T] r := bufio.NewReader(f) for { - line, err := r.ReadString('\n') + line, err := r.ReadBytes('\n') if err == io.EOF { break } else if err != nil { @@ -23,7 +23,7 @@ func DecodeFile[T any](f *os.File) ([]Event[T], error) { } var e Event[T] - if err := json.Unmarshal([]byte(line), &e); err != nil { + if err := json.Unmarshal(line, &e); err != nil { return nil, err } diff --git a/pkg/trace/doc.go b/pkg/trace/doc.go deleted file mode 100644 index 27cf777c20..0000000000 --- a/pkg/trace/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -/**/ -package trace diff --git a/pkg/trace/fileserver.go b/pkg/trace/fileserver.go deleted file mode 100644 index 21747d5902..0000000000 --- a/pkg/trace/fileserver.go +++ /dev/null @@ -1,331 +0,0 @@ -package trace - -import ( - "bufio" - "encoding/json" - "errors" - "fmt" - "io" - "math/rand" - "mime" - "mime/multipart" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - "time" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" -) - -func (lt *LocalTracer) getTableHandler() http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - // Parse the request to get the data - if err := r.ParseForm(); err != nil { - http.Error(w, "Failed to parse form", http.StatusBadRequest) - return - } - - inputString := r.FormValue("table") - if inputString == "" { - http.Error(w, "No data provided", http.StatusBadRequest) - return - } - - f, done, err := lt.readTable(inputString) - if err != nil { - http.Error(w, fmt.Sprintf("failed to read table: %v", err), http.StatusInternalServerError) - return - } - defer done() //nolint:errcheck - - // Use the pump function to continuously read from the file and write to - // the response writer - reader, writer := pump(inputString, bufio.NewReader(f)) - defer reader.Close() - - // Set the content type to the writer's form data content type - w.Header().Set("Content-Type", writer.FormDataContentType()) - - // Copy the data from the reader to the response writer - if _, err := io.Copy(w, reader); err != nil { - http.Error(w, "Failed to send data", http.StatusInternalServerError) - return - } - } -} - -// pump continuously reads from a bufio.Reader and writes to a multipart.Writer. -// It returns the reader end of the pipe and the writer for consumption by the -// server. -func pump(table string, br *bufio.Reader) (*io.PipeReader, *multipart.Writer) { - r, w := io.Pipe() - m := multipart.NewWriter(w) - - go func( - table string, - m *multipart.Writer, - w *io.PipeWriter, - br *bufio.Reader, - ) { - defer w.Close() - defer m.Close() - - part, err := m.CreateFormFile("filename", table+".jsonl") - if err != nil { - return - } - - if _, err = io.Copy(part, br); err != nil { - return - } - - }(table, m, w, br) - - return r, m -} - -func (lt *LocalTracer) servePullData() { - mux := http.NewServeMux() - mux.HandleFunc("/get_table", lt.getTableHandler()) - err := http.ListenAndServe(lt.cfg.Instrumentation.TracePullAddress, mux) //nolint:gosec - if err != nil { - lt.logger.Error("trace pull server failure", "err", err) - } - lt.logger.Info("trace pull server started", "address", lt.cfg.Instrumentation.TracePullAddress) -} - -// GetTable downloads a table from the server and saves it to the given directory. It uses a multipart -// response to download the file. -func GetTable(serverURL, table, dirPath string) error { - data := url.Values{} - data.Set("table", table) - - serverURL = serverURL + "/get_table" - - resp, err := http.PostForm(serverURL, data) //nolint:gosec - if err != nil { - return err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) - } - - _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Type")) - if err != nil { - return err - } - - boundary, ok := params["boundary"] - if !ok { - panic("Not a multipart response") - } - - err = os.MkdirAll(dirPath, 0755) - if err != nil { - return err - } - - outputFile, err := os.Create(path.Join(dirPath, table+".jsonl")) - if err != nil { - return err - } - defer outputFile.Close() - - reader := multipart.NewReader(resp.Body, boundary) - - for { - part, err := reader.NextPart() - if err == io.EOF { - break // End of multipart - } - if err != nil { - return err - } - - contentDisposition, params, err := mime.ParseMediaType(part.Header.Get("Content-Disposition")) - if err != nil { - return err - } - - if contentDisposition == "form-data" && params["filename"] != "" { - _, err = io.Copy(outputFile, part) - if err != nil { - return err - } - } - - part.Close() - } - - return nil -} - -// S3Config is a struct that holds the configuration for an S3 bucket. -type S3Config struct { - BucketName string `json:"bucket_name"` - Region string `json:"region"` - AccessKey string `json:"access_key"` - SecretKey string `json:"secret_key"` - // PushDelay is the time in seconds to wait before pushing the file to S3. - // If this is 0, it defaults is used. - PushDelay int64 `json:"push_delay"` -} - -// readS3Config reads an S3Config from a file in the given directory. -func readS3Config(dir string) (S3Config, error) { - cfg := S3Config{} - f, err := os.Open(filepath.Join(dir, "s3.json")) - if errors.Is(err, os.ErrNotExist) { - return cfg, nil - } - if err != nil { - return cfg, err - } - defer f.Close() - err = json.NewDecoder(f).Decode(&cfg) - if cfg.PushDelay == 0 { - cfg.PushDelay = 60 - } - return cfg, err -} - -// PushS3 pushes a file to an S3 bucket using the given S3Config. It uses the -// chainID and the nodeID to organize the files in the bucket. The directory -// structure is chainID/nodeID/table.jsonl . -func PushS3(chainID, nodeID string, s3cfg S3Config, f *os.File) error { - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(s3cfg.Region), - Credentials: credentials.NewStaticCredentials( - s3cfg.AccessKey, - s3cfg.SecretKey, - "", - ), - HTTPClient: &http.Client{ - Timeout: time.Duration(15) * time.Second, - }, - }, - ) - if err != nil { - return err - } - - s3Svc := s3.New(sess) - - key := fmt.Sprintf("%s/%s/%s", chainID, nodeID, filepath.Base(f.Name())) - - _, err = s3Svc.PutObject(&s3.PutObjectInput{ - Bucket: aws.String(s3cfg.BucketName), - Key: aws.String(key), - Body: f, - }) - - return err -} - -func (lt *LocalTracer) pushLoop() { - for { - time.Sleep(time.Second * time.Duration(lt.s3Config.PushDelay)) - err := lt.PushAll() - if err != nil { - lt.logger.Error("failed to push tables", "error", err) - } - } -} - -func (lt *LocalTracer) PushAll() error { - for table := range lt.fileMap { - f, done, err := lt.readTable(table) - if err != nil { - return err - } - for i := 0; i < 3; i++ { - err = PushS3(lt.chainID, lt.nodeID, lt.s3Config, f) - if err == nil { - break - } - lt.logger.Error("failed to push table", "table", table, "error", err) - time.Sleep(time.Second * time.Duration(rand.Intn(3))) //nolint:gosec - } - err = done() - if err != nil { - return err - } - } - return nil -} - -// S3Download downloads files that match some prefix from an S3 bucket to a -// local directory dst. -func S3Download(dst, prefix string, cfg S3Config) error { - // Ensure local directory structure exists - err := os.MkdirAll(dst, os.ModePerm) - if err != nil { - return err - } - - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(cfg.Region), - Credentials: credentials.NewStaticCredentials( - cfg.AccessKey, - cfg.SecretKey, - "", - ), - }, - ) - if err != nil { - return err - } - - s3Svc := s3.New(sess) - input := &s3.ListObjectsV2Input{ - Bucket: aws.String(cfg.BucketName), - Prefix: aws.String(prefix), - Delimiter: aws.String(""), - } - - err = s3Svc.ListObjectsV2Pages(input, func(page *s3.ListObjectsV2Output, lastPage bool) bool { - for _, content := range page.Contents { - localFilePath := filepath.Join(dst, prefix, strings.TrimPrefix(*content.Key, prefix)) - fmt.Printf("Downloading %s to %s\n", *content.Key, localFilePath) - - // Create the directories in the path - if err := os.MkdirAll(filepath.Dir(localFilePath), os.ModePerm); err != nil { - return false - } - - // Create a file to write the S3 Object contents to. - f, err := os.Create(localFilePath) - if err != nil { - return false - } - - resp, err := s3Svc.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(cfg.BucketName), - Key: aws.String(*content.Key), - }) - if err != nil { - f.Close() - continue - } - defer resp.Body.Close() - - // Copy the contents of the S3 object to the local file - if _, err := io.Copy(f, resp.Body); err != nil { - return false - } - - fmt.Printf("Successfully downloaded %s to %s\n", *content.Key, localFilePath) - f.Close() - } - return !lastPage // continue paging - }) - return err -} diff --git a/pkg/trace/local_tracer.go b/pkg/trace/local_tracer.go index 0d48515eda..fd48c534b0 100644 --- a/pkg/trace/local_tracer.go +++ b/pkg/trace/local_tracer.go @@ -1,11 +1,9 @@ package trace import ( - "encoding/json" "fmt" "os" "path" - "strconv" "strings" "time" @@ -13,14 +11,6 @@ import ( "github.com/tendermint/tendermint/libs/log" ) -const ( - PushBucketName = "TRACE_PUSH_BUCKET_NAME" - PushRegion = "TRACE_PUSH_REGION" - PushAccessKey = "TRACE_PUSH_ACCESS_KEY" - PushKey = "TRACE_PUSH_SECRET_KEY" - PushDelay = "TRACE_PUSH_DELAY" -) - // Event wraps some trace data with metadata that dictates the table and things // like the chainID and nodeID. type Event[T any] struct { @@ -52,15 +42,11 @@ type LocalTracer struct { chainID, nodeID string logger log.Logger cfg *config.Config - s3Config S3Config // fileMap maps tables to their open files files are threadsafe, but the map // is not. Therefore don't create new files after initialization to remain // threadsafe. - fileMap map[string]*bufferedFile - // canal is a channel for all events that are being written. It acts as an - // extra buffer to avoid blocking the caller when writing to files. - canal chan Event[Entry] + fileMap map[string]*cachedFile } // NewLocalTracer creates a struct that will save all of the events passed to @@ -70,7 +56,7 @@ type LocalTracer struct { // to the returned channel. Call CloseAll to close all open files. Goroutine to // save events is started in this function. func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID string) (*LocalTracer, error) { - fm := make(map[string]*bufferedFile) + fm := make(map[string]*cachedFile) p := path.Join(cfg.RootDir, "data", "traces") for _, table := range splitAndTrimEmpty(cfg.Instrumentation.TracingTables, ",", " ") { fileName := fmt.Sprintf("%s/%s.jsonl", p, table) @@ -82,132 +68,44 @@ func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID strin if err != nil { return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err) } - fm[table] = newbufferedFile(file) + + fm[table] = newCachedFile(file, logger, cfg.Instrumentation.TraceBufferSize, 10) } lt := &LocalTracer{ fileMap: fm, cfg: cfg, - canal: make(chan Event[Entry], cfg.Instrumentation.TraceBufferSize), chainID: chainID, nodeID: nodeID, logger: logger, } - go lt.drainCanal() - if cfg.Instrumentation.TracePullAddress != "" { - logger.Info("starting pull server", "address", cfg.Instrumentation.TracePullAddress) - go lt.servePullData() - } - - if cfg.Instrumentation.TracePushConfig != "" { - s3Config, err := readS3Config(path.Join(cfg.RootDir, "config", cfg.Instrumentation.TracePushConfig)) - if err != nil { - return nil, fmt.Errorf("failed to read s3 config: %w", err) - } - lt.s3Config = s3Config - go lt.pushLoop() - } else if s3Config, err := GetPushConfigFromEnv(); err == nil { - lt.s3Config = s3Config - go lt.pushLoop() - } - return lt, nil } -// GetPushConfigFromEnv reads the required environment variables to push trace -func GetPushConfigFromEnv() (S3Config, error) { - bucketName := os.Getenv(PushBucketName) - region := os.Getenv(PushRegion) - accessKey := os.Getenv(PushAccessKey) - secretKey := os.Getenv(PushKey) - pushDelay, err := strconv.ParseInt(os.Getenv(PushDelay), 10, 64) - if err != nil { - return S3Config{}, err - } - if bucketName == "" || region == "" || accessKey == "" || secretKey == "" { - return S3Config{}, fmt.Errorf("missing required environment variables") - } - var s3Config = S3Config{ - BucketName: bucketName, - Region: region, - AccessKey: accessKey, - SecretKey: secretKey, - PushDelay: pushDelay, - } - return s3Config, nil -} - func (lt *LocalTracer) Write(e Entry) { - if !lt.IsCollecting(e.Table()) { - return - } - lt.canal <- NewEvent(lt.chainID, lt.nodeID, e.Table(), e) -} - -// ReadTable returns a file for the given table. If the table is not being -// collected, an error is returned. The caller should not close the file. -func (lt *LocalTracer) readTable(table string) (*os.File, func() error, error) { - bf, has := lt.getFile(table) + cf, has := lt.getFile(e.Table()) if !has { - return nil, func() error { return nil }, fmt.Errorf("table %s not found", table) + return } - - return bf.File() -} - -func (lt *LocalTracer) IsCollecting(table string) bool { - _, has := lt.getFile(table) - return has + cf.Cache(NewEvent(lt.chainID, lt.nodeID, e.Table(), e)) } // getFile gets a file for the given type. This method is purposely // not thread-safe to avoid the overhead of locking with each event save. -func (lt *LocalTracer) getFile(table string) (*bufferedFile, bool) { +func (lt *LocalTracer) getFile(table string) (*cachedFile, bool) { f, has := lt.fileMap[table] return f, has } -// saveEventToFile marshals an Event into JSON and appends it to a file named after the event's Type. -func (lt *LocalTracer) saveEventToFile(event Event[Entry]) error { - file, has := lt.getFile(event.Table) - if !has { - return fmt.Errorf("table %s not found", event.Table) - } - - eventJSON, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("failed to marshal event: %v", err) - } - - if _, err := file.Write(append(eventJSON, '\n')); err != nil { - return fmt.Errorf("failed to write event to file: %v", err) - } - - return nil -} - -// draincanal takes a variadic number of channels of Event pointers and drains them into files. -func (lt *LocalTracer) drainCanal() { - // purposefully do not lock, and rely on the channel to provide sync - // actions, to avoid overhead of locking with each event save. - for ev := range lt.canal { - if err := lt.saveEventToFile(ev); err != nil { - lt.logger.Error("failed to save event to file", "error", err) - } - } +// IsCollecting returns true if the table is being collected. +func (lt *LocalTracer) IsCollecting(table string) bool { + _, has := lt.getFile(table) + return has } // Stop optionally uploads and closes all open files. func (lt *LocalTracer) Stop() { - if lt.s3Config.SecretKey != "" { - lt.logger.Info("pushing all tables before stopping") - err := lt.PushAll() - if err != nil { - lt.logger.Error("failed to push tables", "error", err) - } - } - for _, file := range lt.fileMap { err := file.Close() if err != nil { diff --git a/pkg/trace/local_tracer_test.go b/pkg/trace/local_tracer_test.go deleted file mode 100644 index 68841a34b7..0000000000 --- a/pkg/trace/local_tracer_test.go +++ /dev/null @@ -1,183 +0,0 @@ -package trace - -import ( - "fmt" - "io" - "net" - "os" - "path" - "testing" - "time" - - "github.com/stretchr/testify/require" - "github.com/tendermint/tendermint/config" - "github.com/tendermint/tendermint/libs/log" -) - -const ( - // testEventTable is the table name for the testEvent struct. - testEventTable = "testEvent" -) - -type testEvent struct { - City string `json:"city"` - Length int `json:"length"` -} - -func (c testEvent) Table() string { - return testEventTable -} - -// TestLocalTracerReadWrite tests the local client by writing some events, -// reading them back and comparing them, writing at the same time as reading. -func TestLocalTracerReadWrite(t *testing.T) { - port, err := getFreePort() - require.NoError(t, err) - client := setupLocalTracer(t, port) - - annecy := testEvent{"Annecy", 420} - paris := testEvent{"Paris", 420} - client.Write(annecy) - client.Write(paris) - - time.Sleep(100 * time.Millisecond) - - f, done, err := client.readTable(testEventTable) - require.NoError(t, err) - - // write at the same time as reading to test thread safety this test will be - // flakey if this is not being handled correctly. Since we're reading from - // the file, we expect these write to be ignored. - migenees := testEvent{"Migennes", 620} - pontivy := testEvent{"Pontivy", 720} - client.Write(migenees) - client.Write(pontivy) - - // wait to ensure that the write have been processed (and ignored in this case) - time.Sleep(100 * time.Millisecond) - - events, err := DecodeFile[testEvent](f) - require.NoError(t, err) - err = done() - require.NoError(t, err) - - // even though we've written twice, we expect only the first two events to be - // be written to the file. When reading the file, all writes are ignored. - require.GreaterOrEqual(t, len(events), 2) - require.Equal(t, annecy, events[0].Msg) - require.Equal(t, paris, events[1].Msg) - - // write again to the file and read it back this time, we expect the writes - // to be written since we've called the done() function. - client.Write(migenees) - client.Write(pontivy) - - time.Sleep(100 * time.Millisecond) - - f, done, err = client.readTable(testEventTable) - require.NoError(t, err) - events, err = DecodeFile[testEvent](f) - require.NoError(t, err) - err = done() - require.NoError(t, err) - require.Len(t, events, 4) - require.Equal(t, migenees, events[2].Msg) - require.Equal(t, pontivy, events[3].Msg) -} - -// TestLocalTracerServerPull tests the pull portion of the server. -func TestLocalTracerServerPull(t *testing.T) { - port, err := getFreePort() - require.NoError(t, err) - client := setupLocalTracer(t, port) - - for i := 0; i < 5; i++ { - client.Write(testEvent{"Annecy", i}) - } - - // Wait for the server to start - time.Sleep(100 * time.Millisecond) - - // Test the server - newDir := t.TempDir() - - url := fmt.Sprintf("http://localhost:%d", port) - - // try to read a table that is not being collected. error expected. - err = GetTable(url, "canal", newDir) - require.Error(t, err) - - err = GetTable(url, testEventTable, newDir) - require.NoError(t, err) - - originalFile, done, err := client.readTable(testEventTable) - require.NoError(t, err) - originalBz, err := io.ReadAll(originalFile) - require.NoError(t, err) - err = done() - require.NoError(t, err) - - path := path.Join(newDir, testEventTable+".jsonl") - downloadedFile, err := os.Open(path) - require.NoError(t, err) - defer downloadedFile.Close() - - downloadedBz, err := io.ReadAll(downloadedFile) - require.NoError(t, err) - require.Equal(t, originalBz, downloadedBz) - - _, err = downloadedFile.Seek(0, 0) // reset the seek on the file to read it again - require.NoError(t, err) - events, err := DecodeFile[testEvent](downloadedFile) - require.NoError(t, err) - require.Len(t, events, 5) - for i := 0; i < 5; i++ { - require.Equal(t, i, events[i].Msg.Length) - } -} - -// TestReadPushConfigFromConfigFile tests reading the push config from the environment variables. -func TestReadPushConfigFromEnvVars(t *testing.T) { - os.Setenv(PushBucketName, "bucket") - os.Setenv(PushRegion, "region") - os.Setenv(PushAccessKey, "access") - os.Setenv(PushKey, "secret") - os.Setenv(PushDelay, "10") - - lt := setupLocalTracer(t, 0) - require.Equal(t, "bucket", lt.s3Config.BucketName) - require.Equal(t, "region", lt.s3Config.Region) - require.Equal(t, "access", lt.s3Config.AccessKey) - require.Equal(t, "secret", lt.s3Config.SecretKey) - require.Equal(t, int64(10), lt.s3Config.PushDelay) -} -func setupLocalTracer(t *testing.T, port int) *LocalTracer { - logger := log.NewNopLogger() - cfg := config.DefaultConfig() - cfg.SetRoot(t.TempDir()) - cfg.Instrumentation.TraceBufferSize = 100 - cfg.Instrumentation.TracingTables = testEventTable - cfg.Instrumentation.TracePullAddress = fmt.Sprintf(":%d", port) - - client, err := NewLocalTracer(cfg, logger, "test_chain", "test_node") - if err != nil { - t.Fatalf("failed to create local client: %v", err) - } - - return client -} - -// getFreePort returns a free port and optionally an error. -func getFreePort() (int, error) { - a, err := net.ResolveTCPAddr("tcp", "localhost:0") - if err != nil { - return 0, err - } - - l, err := net.ListenTCP("tcp", a) - if err != nil { - return 0, err - } - defer l.Close() - return l.Addr().(*net.TCPAddr).Port, nil -} From 3f7e302decc289a5d9a85f9aefc4cd1b181fc208 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Mon, 29 Jul 2024 06:48:46 -0500 Subject: [PATCH 2/7] chore: clean up and optimize by not syncing too many times --- pkg/trace/cached_file.go | 2 +- pkg/trace/cahced_file_test.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/trace/cached_file.go b/pkg/trace/cached_file.go index ad5664bd0f..230cb46d20 100644 --- a/pkg/trace/cached_file.go +++ b/pkg/trace/cached_file.go @@ -98,7 +98,7 @@ func (f *cachedFile) flush(buffer [][]byte) (int, error) { } total += i } - return total, f.file.Sync() + return total, nil } // Close closes the file. diff --git a/pkg/trace/cahced_file_test.go b/pkg/trace/cahced_file_test.go index 2102a97cb3..95018373d2 100644 --- a/pkg/trace/cahced_file_test.go +++ b/pkg/trace/cahced_file_test.go @@ -1,7 +1,6 @@ package trace import ( - "io" "os" "path/filepath" "testing" @@ -53,9 +52,6 @@ func Test_cachedFile(t *testing.T) { file, err := os.OpenFile(fdir, os.O_RDONLY, 0777) require.NoError(t, err) - _, err = file.Seek(0, io.SeekStart) - require.NoError(t, err) - entries, err := DecodeFile[TestEntry](file) require.NoError(t, err) From 99dd93e48eb574992a905dbe5f6a1173e0a10334 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Mon, 29 Jul 2024 09:12:23 -0500 Subject: [PATCH 3/7] chore: linter --- cmd/cometbft/commands/run_node.go | 4 ++-- pkg/trace/cahced_file_test.go | 3 +-- test/e2e/runner/setup.go | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/cmd/cometbft/commands/run_node.go b/cmd/cometbft/commands/run_node.go index 0bef6485ba..a5016c4c68 100644 --- a/cmd/cometbft/commands/run_node.go +++ b/cmd/cometbft/commands/run_node.go @@ -97,13 +97,13 @@ func AddNodeFlags(cmd *cobra.Command) { cmd.PersistentFlags().String( trace.FlagTracePushConfig, - config.Instrumentation.TracePushConfig, + config.Instrumentation.TracePushConfig, //nolint:staticcheck trace.FlagTracePushConfigDescription, ) cmd.PersistentFlags().String( trace.FlagTracePullAddress, - config.Instrumentation.TracePullAddress, + config.Instrumentation.TracePullAddress, //nolint:staticcheck trace.FlagTracePullAddressDescription, ) diff --git a/pkg/trace/cahced_file_test.go b/pkg/trace/cahced_file_test.go index 95018373d2..42dde0dd7d 100644 --- a/pkg/trace/cahced_file_test.go +++ b/pkg/trace/cahced_file_test.go @@ -74,8 +74,7 @@ func (te *TestEntry) Table() string { func generateEvents(count int) []Event[Entry] { events := make([]Event[Entry], 0, count) for i := 0; i < count; i++ { - var entry Entry - entry = &TestEntry{"test"} + entry := Entry(&TestEntry{"test"}) events = append(events, NewEvent("test", "test", "test", entry)) } return events diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index f2cec6b227..6e76ef8c73 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -167,8 +167,8 @@ func MakeConfig(node *e2e.Node) (*config.Config, error) { cfg.StateSync.DiscoveryTime = 5 * time.Second cfg.Instrumentation.TraceType = "celestia" - cfg.Instrumentation.TracePushConfig = node.TracePushConfig - cfg.Instrumentation.TracePullAddress = node.TracePullAddress + cfg.Instrumentation.TracePushConfig = node.TracePushConfig //nolint:staticcheck + cfg.Instrumentation.TracePullAddress = node.TracePullAddress //nolint:staticcheck cfg.Instrumentation.PyroscopeTrace = node.PyroscopeTrace cfg.Instrumentation.PyroscopeURL = node.PyroscopeURL cfg.Instrumentation.PyroscopeProfileTypes = node.PyroscopeProfileTypes From 2b66d3f4c010b9f29049a78aefe6a5462ab70418 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Tue, 30 Jul 2024 11:16:49 -0500 Subject: [PATCH 4/7] docs: update readme --- pkg/trace/README.md | 71 ++++++--------------------------------------- 1 file changed, 9 insertions(+), 62 deletions(-) diff --git a/pkg/trace/README.md b/pkg/trace/README.md index dfccaa0544..3dcf3549a9 100644 --- a/pkg/trace/README.md +++ b/pkg/trace/README.md @@ -10,7 +10,8 @@ To enable the local tracer, add the following to the config.toml file: # The tracer to use for collecting trace data. trace_type = "local" -# The size of the batches that are sent to the database. +# The size of the cache for each table. Data is constantly written to disk, +# but if this is hit data past this limit is ignored. trace_push_batch_size = 1000 # The list of tables that are updated when tracing. All available tables and @@ -32,71 +33,17 @@ if err != nil { } ``` -### Pull Based Event Collection +### Event Collection -Pull based event collection is where external servers connect to and pull trace -data from the consensus node. +Collect the events after the data collection is completed by simply transfering +the files however you see fit. For example, using the `scp` command: -To use this, change the config.toml to store traces in the -.celestia-app/data/traces directory. - -```toml -# The tracer pull address specifies which address will be used for pull based -# event collection. If empty, the pull based server will not be started. -trace_pull_address = ":26661" -``` - -To retrieve a table remotely using the pull based server, call the following -function: - -```go -err := GetTable("http://1.2.3.4:26661", "mempool_tx", "directory to store the file") -if err != nil { - return err -} -``` - -This stores the data locally in the specified directory. - - -### Push Based Event Collection - -Push based event collection is where the consensus node pushes trace data to an -external server. At the moment, this is just an S3 bucket. To use this, two options are available: -#### Using push config file - -Add the following to the config.toml file: - -```toml -# TracePushConfig is the relative path of the push config. -# This second config contains credentials for where and how often to -# push trace data to. For example, if the config is next to this config, -# it would be "push_config.json". -trace_push_config = "{{ .Instrumentation.TracePushConfig }}" -``` - -The push config file is a JSON file that should look like this: - -```json -{ - "bucket": "bucket-name", - "region": "region", - "access_key": "", - "secret_key": "", - "push_delay": 60 // number of seconds to wait between intervals of pushing all files -} +```bash +scp -r user@host:/path/to/.celestia-app/data/traces /path/to/local/directory ``` -#### Using environment variables for s3 bucket - -Alternatively, you can set the following environment variables: +or using aws s3 (after setting up the aws cli ofc): ```bash -export TRACE_PUSH_BUCKET_NAME=bucket-name -export TRACE_PUSH_REGION=region -export TRACE_PUSH_ACCESS_KEY=access-key -export TRACE_PUSH_SECRET_KEY=secret-key -export TRACE_PUSH_DELAY=push-delay +aws s3 cp /path/to/.celestia-app/data/traces s3:/// --recursive ``` - -`bucket_name` , `region`, `access_key`, `secret_key` and `push_delay` are the s3 bucket name, region, access key, secret key and the delay between pushes respectively. From 18ce0ea75200c9e7e553d49184b07f970f7b8ed2 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Wed, 14 Aug 2024 20:26:41 -0500 Subject: [PATCH 5/7] fix: readd the sync to finalize flush --- pkg/trace/cached_file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/trace/cached_file.go b/pkg/trace/cached_file.go index 230cb46d20..ad5664bd0f 100644 --- a/pkg/trace/cached_file.go +++ b/pkg/trace/cached_file.go @@ -98,7 +98,7 @@ func (f *cachedFile) flush(buffer [][]byte) (int, error) { } total += i } - return total, nil + return total, f.file.Sync() } // Close closes the file. From 9a476ab0f4e71df7cdf053d3bdc30adaa38807d4 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Thu, 15 Aug 2024 16:37:18 -0500 Subject: [PATCH 6/7] fix: try forcing the file to be opened with os.O_SYNC --- pkg/trace/cached_file.go | 20 +++++++++----------- pkg/trace/cahced_file_test.go | 2 +- pkg/trace/local_tracer.go | 2 +- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/pkg/trace/cached_file.go b/pkg/trace/cached_file.go index ad5664bd0f..9796a97ae4 100644 --- a/pkg/trace/cached_file.go +++ b/pkg/trace/cached_file.go @@ -24,8 +24,9 @@ type cachedFile struct { // newcachedFile creates a cachedFile which wraps a normal file to ensure that // only complete data is ever written. cacheSize is the number of events that // will be cached and chunkSize is the number of events that will trigger a -// write. cacheSize needs to be sufficiently larger (10x to be safe) than chunkSize in order to -// avoid blocking. +// write. cacheSize needs to be sufficiently larger (10x to be safe) than +// chunkSize in order to avoid blocking. Files must be opened using os.O_SYNC in +// order for rows of data to be written atomically. func newCachedFile(file *os.File, logger log.Logger, cacheSize int, chunkSize int) *cachedFile { cf := &cachedFile{ file: file, @@ -88,17 +89,14 @@ func (f *cachedFile) startFlushing() { } } -// flush writes the given bytes to the file. -func (f *cachedFile) flush(buffer [][]byte) (int, error) { - total := 0 +// flush writes the given bytes to the file. This method requires that the file +// be opened with os.O_SYNC in order to write atomically to the file. +func (f *cachedFile) flush(total int, buffer [][]byte) (int, error) { + bz := make([]byte, 0, total) for _, b := range buffer { - i, err := f.file.Write(b) - if err != nil { - return total, err - } - total += i + bz = append(bz, b...) } - return total, f.file.Sync() + return f.file.Write(bz) } // Close closes the file. diff --git a/pkg/trace/cahced_file_test.go b/pkg/trace/cahced_file_test.go index 42dde0dd7d..50beb85027 100644 --- a/pkg/trace/cahced_file_test.go +++ b/pkg/trace/cahced_file_test.go @@ -33,7 +33,7 @@ func Test_cachedFile(t *testing.T) { tt.name, func(t *testing.T) { tmp := t.TempDir() fdir := filepath.Join(tmp, "test.jsonl") - f, err := os.OpenFile(fdir, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0777) + f, err := os.OpenFile(fdir, os.O_APPEND|os.O_CREATE|os.O_WRONLY|os.O_SYNC, 0777) require.NoError(t, err) cf := newCachedFile(f, logger, tt.cacheSize, 10) diff --git a/pkg/trace/local_tracer.go b/pkg/trace/local_tracer.go index fd48c534b0..956da4c7d9 100644 --- a/pkg/trace/local_tracer.go +++ b/pkg/trace/local_tracer.go @@ -64,7 +64,7 @@ func NewLocalTracer(cfg *config.Config, logger log.Logger, chainID, nodeID strin if err != nil { return nil, fmt.Errorf("failed to create directory %s: %w", p, err) } - file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0644) + file, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY|os.O_SYNC, 0644) if err != nil { return nil, fmt.Errorf("failed to open or create file %s: %w", fileName, err) } From 1d9d9bfdcaa244566d33715852b57accce9f8cd4 Mon Sep 17 00:00:00 2001 From: evan-forbes Date: Thu, 15 Aug 2024 16:43:55 -0500 Subject: [PATCH 7/7] fix: forgot to pass total --- pkg/trace/cached_file.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/trace/cached_file.go b/pkg/trace/cached_file.go index 9796a97ae4..95ae341569 100644 --- a/pkg/trace/cached_file.go +++ b/pkg/trace/cached_file.go @@ -53,6 +53,7 @@ func (f *cachedFile) Cache(b Event[Entry]) { // file. func (f *cachedFile) startFlushing() { buffer := make([][]byte, 0, f.chunkSize) + total := 0 defer f.wg.Done() for { @@ -60,7 +61,7 @@ func (f *cachedFile) startFlushing() { if !ok { // Channel closed, flush remaining data and exit if len(buffer) > 0 { - _, err := f.flush(buffer) + _, err := f.flush(total, buffer) if err != nil { f.logger.Error("failure to flush remaining events", "error", err) } @@ -77,14 +78,16 @@ func (f *cachedFile) startFlushing() { // format the file to jsonl bz = append(bz, '\n') + total += len(bz) buffer = append(buffer, bz) if len(buffer) >= f.chunkSize { - _, err := f.flush(buffer) + _, err := f.flush(total, buffer) if err != nil { f.logger.Error("tracer failed to write buffered files to file", "error", err) } buffer = buffer[:0] // reset buffer + total = 0 } } }