From 51babd69d14b433278b10804d3efef198161afcb Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Mon, 24 Feb 2025 22:53:37 +0100 Subject: [PATCH 1/2] Add config to allow cardinality --- flatjsonl/config.go | 1 + flatjsonl/keys.go | 32 ++++++++++++++++++++++++-------- 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/flatjsonl/config.go b/flatjsonl/config.go index ab499e5..7e18dbb 100644 --- a/flatjsonl/config.go +++ b/flatjsonl/config.go @@ -16,4 +16,5 @@ type Config struct { Transpose map[string]string `json:"transpose" yaml:"transpose" description:"Map of key prefixes to transposed table names."` ExtractValuesRegex map[string]extract `json:"extractValuesRegex" yaml:"extractValuesRegex" description:"Map of key regex to extraction format, values can be 'URL', 'JSON'."` KeepJSON []string `json:"keepJSON" yaml:"keepJSON" description:"List of keys to keep as JSON literals."` + AllowCardinality []string `json:"allowCardinality" yaml:"allowCardinality" description:"List of keys to allow high cardinality of child keys."` } diff --git a/flatjsonl/keys.go b/flatjsonl/keys.go index 4e04a6d..55f2077 100644 --- a/flatjsonl/keys.go +++ b/flatjsonl/keys.go @@ -104,16 +104,28 @@ func (p *Processor) initKey(pk, parent uint64, path []string, t Type, isZero boo if parentCardinality > p.f.ChildrenLimit { pp := k.path[0 : len(k.path)-1] parentKey := KeyFromPath(pp) - grandParentKey := KeyFromPath(pp[:len(pp)-1]) - ppk, gpk := newHasher().hashParentBytes([]byte(parentKey), len(grandParentKey)) + allowCardinality := false - p.mu.Unlock() - // println("making parent key", parentKey, grandParentKey, ppk, gpk) - p.initKey(ppk, gpk, pp, TypeJSON, false) - p.mu.Lock() + for _, ac := range p.cfg.AllowCardinality { + if parentKey == ac { + allowCardinality = true + } + } + + if !allowCardinality { + grandParentKey := KeyFromPath(pp[:len(pp)-1]) + ppk, gpk := newHasher().hashParentBytes([]byte(parentKey), len(grandParentKey)) - p.cfg.KeepJSON = append(p.cfg.KeepJSON, parentKey) - p.parentHighCardinality.Store(parent, true) + p.mu.Unlock() + // println("making parent key", parentKey, grandParentKey, ppk, gpk) + p.initKey(ppk, gpk, pp, TypeJSON, false) + p.mu.Lock() + + p.cfg.KeepJSON = append(p.cfg.KeepJSON, parentKey) + p.parentHighCardinality.Store(parent, true) + } else { + p.parentCardinality[parent] = parentCardinality + } } else { p.parentCardinality[parent] = parentCardinality } @@ -382,6 +394,9 @@ func (p *Processor) prepareScannedKeys() { if k.t == TypeObject || k.t == TypeArray { deleted[k.original] = true + p.keyHierarchy.Add(k.path) + p.keyHierarchy.AddKey(k) + return true } @@ -396,6 +411,7 @@ func (p *Processor) prepareScannedKeys() { } p.keyHierarchy.Add(k.path) + p.keyHierarchy.AddKey(k) return true }) From 123d22793ac16d2764fc8c4f7b9cfdc9f60578c2 Mon Sep 17 00:00:00 2001 From: Viacheslav Poturaev Date: Tue, 25 Feb 2025 18:44:28 +0100 Subject: [PATCH 2/2] Add JSON schema, allow unexpected tail in JSON --- flatjsonl/flags.go | 11 ++-- flatjsonl/flattener.go | 104 +++++++++++++++++++++++++++++++++++- flatjsonl/keys.go | 50 +++++++++++------ flatjsonl/processor.go | 9 ++++ flatjsonl/processor_test.go | 1 + flatjsonl/reader.go | 6 +-- go.mod | 2 +- go.sum | 4 +- 8 files changed, 161 insertions(+), 26 deletions(-) diff --git a/flatjsonl/flags.go b/flatjsonl/flags.go index 24b4b51..1d80c3c 100644 --- a/flatjsonl/flags.go +++ b/flatjsonl/flags.go @@ -42,9 +42,10 @@ type Flags struct { MatchLinePrefix string CaseSensitiveKeys bool - ShowKeysFlat bool - ShowKeysHier bool - ShowKeysInfo bool + ShowKeysFlat bool + ShowKeysHier bool + ShowKeysInfo bool + ShowJSONSchema bool Concurrency int MemLimit int @@ -74,6 +75,8 @@ func (f *Flags) Register() { flag.BoolVar(&f.ShowKeysFlat, "show-keys-flat", false, "Show all available keys as flat list.") flag.BoolVar(&f.ShowKeysHier, "show-keys-hier", false, "Show all available keys as hierarchy.") flag.BoolVar(&f.ShowKeysInfo, "show-keys-info", false, "Show keys, their replaces and types.") + flag.BoolVar(&f.ShowJSONSchema, "show-json-schema", false, "Show hierarchy as JSON schema.") + flag.BoolVar(&f.SkipZeroCols, "skip-zero-cols", false, "Skip columns with zero values.") flag.BoolVar(&f.AddSequence, "add-sequence", false, "Add auto incremented sequence number.") flag.BoolVar(&f.CaseSensitiveKeys, "case-sensitive-keys", false, "Use case-sensitive keys (can fail for SQLite).") @@ -94,7 +97,7 @@ func (f *Flags) Register() { func (f *Flags) Parse() { flag.Parse() - if f.Output == "" && !f.ShowKeysHier && !f.ShowKeysFlat && !f.ShowKeysInfo { + if f.Output == "" && !f.ShowKeysHier && !f.ShowKeysFlat && !f.ShowKeysInfo && !f.ShowJSONSchema { inputs := f.Inputs() if len(inputs) > 0 && f.CSV == "" && f.SQLite == "" && f.Raw == "" && f.PGDump == "" { diff --git a/flatjsonl/flattener.go b/flatjsonl/flattener.go index e85d8d4..cc56ad5 100644 --- a/flatjsonl/flattener.go +++ b/flatjsonl/flattener.go @@ -6,7 +6,8 @@ import ( "strconv" "strings" - "github.com/valyala/fastjson" + "github.com/puzpuzpuz/xsync/v3" + "github.com/vearutop/fastjson" ) // KeyFromPath joins path elements into a dot-separated scalar key. @@ -171,6 +172,7 @@ func (fv *FastWalker) walkFastJSONString(seq int64, flatPath []byte, pl int, pat xs, name, err := x.extract(s) if err == nil { p := parserPool.Get() + p.AllowUnexpectedTail = true defer parserPool.Put(p) if v, err := p.ParseBytes(xs); err == nil { @@ -196,6 +198,7 @@ func (fv *FastWalker) walkFastJSONString(seq int64, flatPath []byte, pl int, pat // Check if string has nested JSON or URL. if s[0] == '{' || s[0] == '[' { p := parserPool.Get() + p.AllowUnexpectedTail = true defer parserPool.Put(p) v, err := p.ParseBytes(s) @@ -218,6 +221,7 @@ func (fv *FastWalker) walkFastJSONString(seq int64, flatPath []byte, pl int, pat us, _, err := (urlExtractor{}).extract(s) if err == nil { p := parserPool.Get() + p.AllowUnexpectedTail = true defer parserPool.Put(p) v, err := p.ParseBytes(us) @@ -255,8 +259,106 @@ func Format(v interface{}) string { } } +type JSONSchema struct { + Types []string `json:"type,omitempty"` + Properties map[string]*JSONSchema `json:"properties,omitempty"` + Items *JSONSchema `json:"items,omitempty"` +} + +func (j *JSONSchema) AddType(t Type) { + tt := "" + + switch t { + case TypeString: + tt = "string" + case TypeInt: + tt = "integer" + case TypeFloat: + tt = "number" + case TypeBool: + tt = "boolean" + case TypeArray: + tt = "array" + case TypeObject: + tt = "object" + case TypeJSON: + tt = "string" + } + + for _, t := range j.Types { + if t == tt { + return + } + } + + j.Types = append(j.Types, tt) +} + +func (j *JSONSchema) AddKey(k flKey, keys *xsync.MapOf[uint64, flKey]) { + if k.parent == 0 { + return + } + + parents := []flKey{k} + + parent := k.parent + for { + if parent == 0 { + break + } + + pk, ok := keys.Load(parent) + if !ok { + println("BUG: failed to load parent key:", parent) + return + } + + parents = append(parents, pk) + + parent = pk.parent + } + + parentSchema := j + parentType := TypeObject + for i := len(parents) - 1; i >= 0; i-- { + pk := parents[i] + name := pk.path[len(pk.path)-1] + + if i != 0 && pk.t == TypeString { + pk.t = TypeObject + } + + if parentType == TypeObject { + if parentSchema.Properties == nil { + parentSchema.Properties = make(map[string]*JSONSchema) + } + + property := parentSchema.Properties[name] + if property == nil { + property = &JSONSchema{} + } + parentSchema.Properties[name] = property + parentSchema = property + + parentType = pk.t + } else if parentType == TypeArray { + if parentSchema.Items == nil { + parentSchema.Items = &JSONSchema{} + } + + parentSchema = parentSchema.Items + parentType = pk.t + } + } + + parentSchema.AddType(k.t) + return +} + // KeyHierarchy collects structural relations. type KeyHierarchy struct { + Schema JSONSchema + Name string Sub map[string]KeyHierarchy } diff --git a/flatjsonl/keys.go b/flatjsonl/keys.go index 55f2077..2fbca63 100644 --- a/flatjsonl/keys.go +++ b/flatjsonl/keys.go @@ -239,21 +239,31 @@ func (h hasher) hashParentBytes(flatPath []byte, parentLen int) (pk uint64, par p1 := flatPath[:parentLen] - _, err := h.digest.Write(p1) - if err != nil { - panic("hashing failed: " + err.Error()) - } + if len(p1) == 0 { + par = 0 + } else { + _, err := h.digest.Write(p1) + if err != nil { + panic("hashing failed: " + err.Error()) + } - par = h.digest.Sum64() + par = h.digest.Sum64() + } p2 := flatPath[parentLen:] - _, err = h.digest.Write(p2) - if err != nil { - panic("hashing failed: " + err.Error()) + if len(p2) == 0 { + pk = 0 + } else { + _, err := h.digest.Write(p2) + if err != nil { + panic("hashing failed: " + err.Error()) + } + + pk = h.digest.Sum64() } - return h.digest.Sum64(), par + return pk, par } func (p *Processor) scanAvailableKeys() error { @@ -294,7 +304,7 @@ func (p *Processor) scanAvailableKeys() error { w.WantPath = true w.FnObjectStop = func(_ int64, flatPath []byte, pl int, path []string) (stop bool) { - if pl == 0 { + if len(flatPath) == 0 { return } @@ -305,7 +315,7 @@ func (p *Processor) scanAvailableKeys() error { return stop } w.FnArrayStop = func(_ int64, flatPath []byte, pl int, path []string) (stop bool) { - if pl == 0 { + if len(flatPath) == 0 { return } @@ -394,8 +404,13 @@ func (p *Processor) prepareScannedKeys() { if k.t == TypeObject || k.t == TypeArray { deleted[k.original] = true - p.keyHierarchy.Add(k.path) - p.keyHierarchy.AddKey(k) + if p.f.ShowKeysHier { + p.keyHierarchy.Add(k.path) + } + + if p.f.ShowJSONSchema { + p.jsonSchema.AddKey(k, p.flKeys) + } return true } @@ -410,8 +425,13 @@ func (p *Processor) prepareScannedKeys() { } } - p.keyHierarchy.Add(k.path) - p.keyHierarchy.AddKey(k) + if p.f.ShowKeysHier { + p.keyHierarchy.Add(k.path) + } + + if p.f.ShowJSONSchema { + p.jsonSchema.AddKey(k, p.flKeys) + } return true }) diff --git a/flatjsonl/processor.go b/flatjsonl/processor.go index 2cff773..f5f524e 100644 --- a/flatjsonl/processor.go +++ b/flatjsonl/processor.go @@ -56,6 +56,7 @@ type Processor struct { mu sync.Mutex flKeysList []string keyHierarchy KeyHierarchy + jsonSchema JSONSchema canonicalKeys map[string]flKey totalLines int @@ -343,6 +344,14 @@ func (p *Processor) maybeShowKeys() error { _, _ = fmt.Fprintln(p.Stdout, string(b)) } + if p.f.ShowJSONSchema { + b, err := assertjson.MarshalIndentCompact(p.jsonSchema, "", " ", 120) + if err != nil { + return err + } + _, _ = fmt.Fprintln(p.Stdout, string(b)) + } + return nil } diff --git a/flatjsonl/processor_test.go b/flatjsonl/processor_test.go index 8413b1e..9019246 100644 --- a/flatjsonl/processor_test.go +++ b/flatjsonl/processor_test.go @@ -30,6 +30,7 @@ func TestNewProcessor(t *testing.T) { f.ShowKeysFlat = true f.ShowKeysHier = true f.ShowKeysInfo = true + f.ShowJSONSchema = true f.Concurrency = 1 f.PrepareOutput() diff --git a/flatjsonl/reader.go b/flatjsonl/reader.go index 4a28c5b..123aa67 100644 --- a/flatjsonl/reader.go +++ b/flatjsonl/reader.go @@ -19,7 +19,7 @@ import ( "github.com/bool64/progress" "github.com/klauspost/compress/zstd" gzip "github.com/klauspost/pgzip" - "github.com/valyala/fastjson" + "github.com/vearutop/fastjson" ) const errEmptyFile = ctxd.SentinelError("empty file") @@ -194,7 +194,7 @@ func (rd *Reader) Read(sess *readSession) error { semaphore <- &syncWorker{ i: i, - p: &fastjson.Parser{}, + p: &fastjson.Parser{AllowUnexpectedTail: true}, used: 0, path: make([]string, 0, 20), flatPath: make([]byte, 0, 5000), @@ -260,7 +260,7 @@ func (rd *Reader) Read(sess *readSession) error { if worker.used >= 100 { worker.used = 0 - worker.p = &fastjson.Parser{} + worker.p = &fastjson.Parser{AllowUnexpectedTail: true} } atomic.AddInt64(&rd.Processor.inProgress, 1) diff --git a/go.mod b/go.mod index 9c17b56..eb6d473 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/puzpuzpuz/xsync/v3 v3.5.1 github.com/stretchr/testify v1.9.0 github.com/swaggest/assertjson v1.9.0 - github.com/valyala/fastjson v1.6.4 + github.com/vearutop/fastjson v1.0.0 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.35.0 ) diff --git a/go.sum b/go.sum index 371bfa4..39844e5 100644 --- a/go.sum +++ b/go.sum @@ -79,8 +79,8 @@ github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7 github.com/swaggest/assertjson v1.9.0/go.mod h1:b+ZKX2VRiUjxfUIal0HDN85W0nHPAYUbYH5WkkSsFsU= github.com/swaggest/usecase v1.2.0 h1:cHVFqxIbHfyTXp02JmWXk+ZADaSa87UZP+b3qL5Nz90= github.com/swaggest/usecase v1.2.0/go.mod h1:oc5+QoAxG3Et5Gl9lRXgEOm00l4VN9gdVQSMIa5EeLY= -github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ= -github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= +github.com/vearutop/fastjson v1.0.0 h1:4yn7BZj9R52INMqMj1q90gG206Qm9XY54aKfj3ZPC54= +github.com/vearutop/fastjson v1.0.0/go.mod h1:H1NX3WgvfAI1gJf9Pk3IKegysfqZOotwqiihA+txgMQ= github.com/yosuke-furukawa/json5 v0.1.2-0.20201207051438-cf7bb3f354ff h1:7YqG491bE4vstXRz1lD38rbSgbXnirvROz1lZiOnPO8= github.com/yosuke-furukawa/json5 v0.1.2-0.20201207051438-cf7bb3f354ff/go.mod h1:sw49aWDqNdRJ6DYUtIQiaA3xyj2IL9tjeNYmX2ixwcU= github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA=