Skip to content

Commit 146da83

Browse files
committed
more simple timestamp to days from 1970-01-01 conversion
1 parent 433201c commit 146da83

File tree

12 files changed

+107
-122
lines changed

12 files changed

+107
-122
lines changed

carbon/collector.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"go.uber.org/zap"
1111

1212
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
13-
"github.com/lomik/carbon-clickhouse/helper/days1970"
1413
"github.com/lomik/stop"
1514
"github.com/lomik/zapwriter"
1615
)
@@ -178,14 +177,13 @@ func (c *Collector) readData(exit chan struct{}) []*Point {
178177
}
179178

180179
func (c *Collector) local(exit chan struct{}) {
181-
days := &days1970.Days{}
182-
183180
for {
184181
points := c.readData(exit)
185182
if points == nil || len(points) == 0 {
186183
// exit closed
187184
return
188185
}
186+
now := uint32(time.Now().Unix())
189187

190188
b := RowBinary.GetWriteBuffer()
191189

@@ -206,8 +204,7 @@ func (c *Collector) local(exit chan struct{}) {
206204
[]byte(p.Metric),
207205
p.Value,
208206
p.Timestamp,
209-
days.Timestamp(p.Timestamp),
210-
uint32(time.Now().Unix()),
207+
now,
211208
)
212209
}
213210

helper/RowBinary/date.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package RowBinary
2+
3+
import "time"
4+
5+
var daysTimestampStart []int64
6+
7+
func init() {
8+
daysTimestampStart = make([]int64, 0)
9+
end := time.Now().UTC().Add(10 * 365 * 24 * time.Hour)
10+
11+
t := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
12+
for t.Before(end) {
13+
ts := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.Local).Unix()
14+
daysTimestampStart = append(daysTimestampStart, ts)
15+
t = t.Add(24 * time.Hour)
16+
}
17+
}
18+
19+
func TimestampToDays(timestamp uint32) uint16 {
20+
if int64(timestamp) < daysTimestampStart[0] {
21+
return 0
22+
}
23+
24+
i := int(timestamp / 86400)
25+
ts := int64(timestamp)
26+
27+
if i < 10 || i > len(daysTimestampStart)-10 {
28+
// fallback to slow method
29+
return SlowTimestampToDays(timestamp)
30+
}
31+
32+
FindLoop:
33+
for {
34+
if ts < daysTimestampStart[i] {
35+
i--
36+
continue FindLoop
37+
}
38+
if ts >= daysTimestampStart[i+1] {
39+
i++
40+
continue FindLoop
41+
}
42+
return uint16(i)
43+
}
44+
}
45+
46+
func SlowTimestampToDays(timestamp uint32) uint16 {
47+
t := time.Unix(int64(timestamp), 0)
48+
return uint16(time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC).Unix() / 86400)
49+
}

helper/RowBinary/date_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package RowBinary
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestTimestampToDays(t *testing.T) {
9+
ts := uint32(0)
10+
end := uint32(time.Now().Unix()) + 473040000 // +15 years
11+
for ts < end {
12+
d1 := SlowTimestampToDays(ts)
13+
d2 := TimestampToDays(ts)
14+
if d1 != d2 {
15+
t.FailNow()
16+
}
17+
18+
ts += 780 // step 13 minutes
19+
}
20+
}
21+
func BenchmarkTimestampToDays(b *testing.B) {
22+
timestamp := uint32(time.Now().Unix())
23+
x := SlowTimestampToDays(timestamp)
24+
25+
for i := 0; i < b.N; i++ {
26+
if TimestampToDays(timestamp) != x {
27+
b.FailNow()
28+
}
29+
}
30+
}
31+
32+
func BenchmarkSlowTimestampToDays(b *testing.B) {
33+
timestamp := uint32(time.Now().Unix())
34+
x := SlowTimestampToDays(timestamp)
35+
36+
for i := 0; i < b.N; i++ {
37+
if SlowTimestampToDays(timestamp) != x {
38+
b.FailNow()
39+
}
40+
}
41+
}

helper/RowBinary/reader.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,19 +10,15 @@ import (
1010
"math"
1111
"os"
1212
"time"
13-
14-
"github.com/lomik/carbon-clickhouse/helper/days1970"
1513
)
1614

1715
// Read all good records from unfinished RowBinary file.
1816
type Reader struct {
19-
now uint32
2017
fd *os.File
2118
reader *bufio.Reader
2219
offset int
2320
size int
2421
eof bool
25-
days days1970.Days
2622
line [524288]byte
2723
isReverse bool
2824
}
@@ -97,7 +93,7 @@ func (r *Reader) readRecord() ([]byte, error) {
9793
}
9894
r.size += 18
9995

100-
if r.Days() != r.days.TimestampWithNow(r.Timestamp(), r.now) {
96+
if r.Days() != TimestampToDays(r.Timestamp()) {
10197
return nil, errors.New("date and timestamp mismatch")
10298
}
10399

@@ -158,7 +154,6 @@ func NewReader(filename string) (*Reader, error) {
158154
return &Reader{
159155
fd: fd,
160156
reader: bufio.NewReader(fd),
161-
now: uint32(time.Now().Unix()),
162157
}, nil
163158
}
164159

helper/RowBinary/write_buffer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,11 @@ func (wb *WriteBuffer) Write(p []byte) {
127127
wb.Used += copy(wb.Body[wb.Used:], p)
128128
}
129129

130-
func (wb *WriteBuffer) WriteGraphitePoint(name []byte, value float64, timestamp uint32, days uint16, version uint32) {
130+
func (wb *WriteBuffer) WriteGraphitePoint(name []byte, value float64, timestamp uint32, version uint32) {
131131
wb.WriteBytes(name)
132132
wb.WriteFloat64(value)
133133
wb.WriteUint32(timestamp)
134-
wb.WriteUint16(days)
134+
wb.WriteUint16(TimestampToDays(timestamp))
135135
wb.WriteUint32(version)
136136
}
137137

helper/days1970/days.go

Lines changed: 0 additions & 47 deletions
This file was deleted.

helper/days1970/days_test.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

receiver/grpc.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package receiver
22

33
import (
4-
"encoding/binary"
54
"errors"
65
"net"
76
"sync"
@@ -15,7 +14,6 @@ import (
1514
"github.com/golang/protobuf/ptypes/empty"
1615
pb "github.com/lomik/carbon-clickhouse/grpc"
1716
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
18-
"github.com/lomik/carbon-clickhouse/helper/days1970"
1917
"github.com/lomik/carbon-clickhouse/helper/tags"
2018
"github.com/lomik/stop"
2119
"go.uber.org/zap"
@@ -130,11 +128,6 @@ func (g *GRPC) doStore(ctx context.Context, in *pb.Payload, confirmRequired bool
130128
// save to buffers
131129
now := uint32(time.Now().Unix())
132130

133-
version := make([]byte, 4)
134-
binary.LittleEndian.PutUint32(version, now)
135-
136-
var days days1970.Days
137-
138131
var wg *sync.WaitGroup
139132
var errorChan chan error
140133

@@ -165,11 +158,12 @@ func (g *GRPC) doStore(ctx context.Context, in *pb.Payload, confirmRequired bool
165158
wb = RowBinary.GetWriterBufferWithConfirm(wg, errorChan)
166159
}
167160

168-
wb.WriteBytes([]byte(m.Metric))
169-
wb.WriteFloat64(m.Points[j].Value)
170-
wb.WriteUint32(m.Points[j].Timestamp)
171-
wb.WriteUint16(days.TimestampWithNow(m.Points[j].Timestamp, now))
172-
wb.Write(version)
161+
wb.WriteGraphitePoint(
162+
[]byte(m.Metric),
163+
m.Points[j].Value,
164+
m.Points[j].Timestamp,
165+
now,
166+
)
173167
}
174168
}
175169

receiver/pickle_parser.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,23 @@ import (
55
"time"
66

77
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
8-
"github.com/lomik/carbon-clickhouse/helper/days1970"
98
"github.com/lomik/carbon-clickhouse/helper/tags"
109
pickle "github.com/lomik/graphite-pickle"
1110
)
1211

1312
func PickleParser(exit chan struct{}, in chan []byte, out chan *RowBinary.WriteBuffer, metricsReceived *uint32, errors *uint32) {
14-
days := &days1970.Days{}
1513

1614
for {
1715
select {
1816
case <-exit:
1917
return
2018
case b := <-in:
21-
PickeParseBytes(exit, b, uint32(time.Now().Unix()), out, days, metricsReceived, errors)
19+
PickeParseBytes(exit, b, uint32(time.Now().Unix()), out, metricsReceived, errors)
2220
}
2321
}
2422
}
2523

26-
func PickeParseBytes(exit chan struct{}, b []byte, now uint32, out chan *RowBinary.WriteBuffer, days *days1970.Days, metricsReceived *uint32, errors *uint32) {
24+
func PickeParseBytes(exit chan struct{}, b []byte, now uint32, out chan *RowBinary.WriteBuffer, metricsReceived *uint32, errors *uint32) {
2725
metricCount := uint32(0)
2826
wb := RowBinary.GetWriteBuffer()
2927

@@ -69,7 +67,6 @@ func PickeParseBytes(exit chan struct{}, b []byte, now uint32, out chan *RowBina
6967
[]byte(name),
7068
value,
7169
uint32(timestamp),
72-
days.TimestampWithNow(uint32(timestamp), now),
7370
now,
7471
)
7572

receiver/plain.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@ package receiver
22

33
import (
44
"bytes"
5-
"encoding/binary"
65
"fmt"
76
"math"
87
"strconv"
98
"sync/atomic"
109
"unsafe"
1110

1211
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
13-
"github.com/lomik/carbon-clickhouse/helper/days1970"
1412
"github.com/lomik/carbon-clickhouse/helper/tags"
1513
)
1614

@@ -90,14 +88,11 @@ func PlainParseLine(p []byte) ([]byte, float64, uint32, error) {
9088
return RemoveDoubleDot(p[:i1]), value, uint32(tsf), nil
9189
}
9290

93-
func PlainParseBuffer(exit chan struct{}, b *Buffer, out chan *RowBinary.WriteBuffer, days *days1970.Days, metricsReceived *uint32, errors *uint32) {
91+
func PlainParseBuffer(exit chan struct{}, b *Buffer, out chan *RowBinary.WriteBuffer, metricsReceived *uint32, errors *uint32) {
9492
offset := 0
9593
metricCount := uint32(0)
9694
errorCount := uint32(0)
9795

98-
version := make([]byte, 4)
99-
binary.LittleEndian.PutUint32(version, b.Time)
100-
10196
wb := RowBinary.GetWriteBuffer()
10297

10398
MainLoop:
@@ -125,11 +120,7 @@ MainLoop:
125120
}
126121

127122
// write result to buffer for clickhouse
128-
wb.WriteBytes(name)
129-
wb.WriteFloat64(value)
130-
wb.WriteUint32(timestamp)
131-
wb.WriteUint16(days.TimestampWithNow(timestamp, b.Time))
132-
wb.Write(version)
123+
wb.WriteGraphitePoint(name, value, timestamp, b.Time)
133124
metricCount++
134125
}
135126

@@ -154,14 +145,12 @@ MainLoop:
154145
}
155146

156147
func PlainParser(exit chan struct{}, in chan *Buffer, out chan *RowBinary.WriteBuffer, metricsReceived *uint32, errors *uint32) {
157-
days := &days1970.Days{}
158-
159148
for {
160149
select {
161150
case <-exit:
162151
return
163152
case b := <-in:
164-
PlainParseBuffer(exit, b, out, days, metricsReceived, errors)
153+
PlainParseBuffer(exit, b, out, metricsReceived, errors)
165154
b.Release()
166155
}
167156
}

0 commit comments

Comments
 (0)