Skip to content

Commit 9b020bf

Browse files
authored
Merge pull request #92 from v3io/development
development --> Master 0.0.8
2 parents 99e45a4 + d69642f commit 9b020bf

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

Diff for: pkg/partmgr/partmgr.go

+36-13
Original file line numberDiff line numberDiff line change
@@ -41,19 +41,7 @@ func NewPartitionMngr(cfg *config.Schema, partPath string, cont *v3io.Container)
4141
return nil, err
4242
}
4343
newMngr := &PartitionManager{cfg: cfg, path: partPath, cyclic: false, container: cont, currentPartitionInterval: currentPartitionInterval}
44-
for _, part := range cfg.Partitions {
45-
partPath := path.Join(newMngr.path, strconv.FormatInt(part.StartTime/1000, 10)) + "/"
46-
newPart, err := NewDBPartition(newMngr, part.StartTime, partPath)
47-
if err != nil {
48-
return nil, err
49-
}
50-
newMngr.partitions = append(newMngr.partitions, newPart)
51-
if newMngr.headPartition == nil {
52-
newMngr.headPartition = newPart
53-
} else if newMngr.headPartition.startTime < newPart.startTime {
54-
newMngr.headPartition = newPart
55-
}
56-
}
44+
newMngr.updatePartitions(cfg)
5745
return newMngr, nil
5846
}
5947

@@ -193,6 +181,41 @@ func (p *PartitionManager) updatePartitionInSchema(partition *DBPartition) error
193181
return err
194182
}
195183

184+
func (p *PartitionManager) ReadAndUpdateSchema() error {
185+
fullPath := path.Join(p.path, config.SCHEMA_CONFIG)
186+
resp, err := p.container.Sync.GetObject(&v3io.GetObjectInput{Path: fullPath})
187+
if err != nil {
188+
return errors.Wrap(err, "Failed to read schema at path: "+fullPath)
189+
}
190+
191+
schema := config.Schema{}
192+
err = json.Unmarshal(resp.Body(), &schema)
193+
if err != nil {
194+
return errors.Wrap(err, "Failed to Unmarshal schema at path: "+fullPath)
195+
}
196+
p.cfg = &schema
197+
p.updatePartitions(&schema)
198+
return nil
199+
}
200+
201+
func (p *PartitionManager) updatePartitions(schema *config.Schema) error {
202+
p.partitions = []*DBPartition{}
203+
for _, part := range schema.Partitions {
204+
partPath := path.Join(p.path, strconv.FormatInt(part.StartTime/1000, 10)) + "/"
205+
newPart, err := NewDBPartition(p, part.StartTime, partPath)
206+
if err != nil {
207+
return err
208+
}
209+
p.partitions = append(p.partitions, newPart)
210+
if p.headPartition == nil {
211+
p.headPartition = newPart
212+
} else if p.headPartition.startTime < newPart.startTime {
213+
p.headPartition = newPart
214+
}
215+
}
216+
return nil
217+
}
218+
196219
func (p *PartitionManager) PartsForRange(mint, maxt int64) []*DBPartition {
197220
var parts []*DBPartition
198221
for _, part := range p.partitions {

Diff for: pkg/querier/querier.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ func (q *V3ioQuerier) selectQry(name, functions string, step int64, windows []in
6565

6666
filter = strings.Replace(filter, "__name__", "_name", -1)
6767
q.logger.DebugWith("Select query", "func", functions, "step", step, "filter", filter, "window", windows)
68-
68+
err := q.partitionMngr.ReadAndUpdateSchema()
69+
if err != nil {
70+
return nullSeriesSet{}, err
71+
}
6972
parts := q.partitionMngr.PartsForRange(q.mint, q.maxt)
7073
if len(parts) == 0 {
7174
return nullSeriesSet{}, nil

0 commit comments

Comments
 (0)