Skip to content

Commit d2e99dc

Browse files
Use GCS archive path date for row.Date (#896)
* Use gcs archive date for row.date * Parse PathDate in GCSSource * Add Date to etl.TestSource interface * Update to bionic
1 parent 1ca83a6 commit d2e99dc

File tree

11 files changed

+66
-20
lines changed

11 files changed

+66
-20
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# TODO(soltesz): Add deployment automation when fine-grained permissions are
1616
# possible.
1717

18+
dist: bionic
1819
language: go
1920
go:
2021
- "1.13.8"

etl/etl.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"cloud.google.com/go/bigquery"
10+
"cloud.google.com/go/civil"
1011
)
1112

1213
// ProcessingError extends error to provide dataType and detail for metrics,
@@ -143,8 +144,9 @@ type TestSource interface {
143144
NextTest(maxSize int64) (string, []byte, error)
144145
Close() error
145146

146-
Detail() string // Detail for logs.
147-
Type() string // Data type for logs and metrics
147+
Detail() string // Detail for logs.
148+
Type() string // Data type for logs and metrics
149+
Date() civil.Date // Date associated with test source
148150
}
149151

150152
//========================================================================

parser/annotation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func (ap *AnnotationParser) ParseAndInsert(meta map[string]bigquery.Value, testN
103103
// the given timestamp, regardless of the timestamp's timezone. Since we
104104
// run our systems in UTC, all timestamps will be relative to UTC and as
105105
// will these dates.
106-
row.Date = civil.DateOf(raw.Timestamp)
106+
row.Date = meta["date"].(civil.Date)
107107

108108
// Estimate the row size based on the input JSON size.
109109
metrics.RowSizeHistogram.WithLabelValues(ap.TableName()).Observe(float64(len(test)))

parser/annotation_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
"cloud.google.com/go/bigquery"
8+
"cloud.google.com/go/civil"
89
"github.com/m-lab/etl/parser"
910
"github.com/m-lab/go/rtx"
1011
)
@@ -39,6 +40,7 @@ func TestAnnotationParser_ParseAndInsert(t *testing.T) {
3940

4041
meta := map[string]bigquery.Value{
4142
"filename": "gs://mlab-test-bucket/ndt/ndt7/2020/03/18/" + tt.file,
43+
"date": civil.Date{Year: 2020, Month: 3, Day: 18},
4244
}
4345

4446
if err := n.ParseAndInsert(meta, tt.file, data); (err != nil) != tt.wantErr {

parser/ndt7_result.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (dp *NDT7ResultParser) ParseAndInsert(meta map[string]bigquery.Value, testN
9393
// the given timestamp, regardless of the timestamp's timezone. Since we
9494
// run our systems in UTC, all timestamps will be relative to UTC and as
9595
// will these dates.
96-
row.Date = civil.DateOf(row.Raw.StartTime)
96+
row.Date = meta["date"].(civil.Date)
9797
if row.Raw.Download != nil {
9898
row.A = downSummary(row.Raw.Download)
9999
} else if row.Raw.Upload != nil {

parser/ndt7_result_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"testing"
77

88
"cloud.google.com/go/bigquery"
9+
"cloud.google.com/go/civil"
910
"github.com/go-test/deep"
1011

1112
"github.com/m-lab/etl/parser"
@@ -39,6 +40,7 @@ func TestNDT7ResultParser_ParseAndInsert(t *testing.T) {
3940
}
4041
meta := map[string]bigquery.Value{
4142
"filename": "gs://mlab-test-bucket/ndt/ndt7/2020/03/18/ndt_ndt7_2020_03_18_20200318T003853.425987Z-ndt7-mlab3-syd03-ndt.tgz",
43+
"date": civil.Date{Year: 2020, Month: 3, Day: 18},
4244
}
4345

4446
if err := n.ParseAndInsert(meta, tt.testName, resultData); (err != nil) != tt.wantErr {

parser/tcpinfo_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
v2 "github.com/m-lab/annotation-service/api/v2"
1616

1717
"cloud.google.com/go/bigquery"
18+
"cloud.google.com/go/civil"
1819
"github.com/m-lab/annotation-service/api"
1920
"github.com/m-lab/etl/etl"
2021
"github.com/m-lab/etl/parser"
@@ -52,7 +53,8 @@ func fileSource(fn string) (etl.TestSource, error) {
5253
tarReader := tar.NewReader(rdr)
5354

5455
timeout := 16 * time.Millisecond
55-
return &storage.GCSSource{TarReader: tarReader, Closer: raw, RetryBaseTime: timeout, TableBase: "test"}, nil
56+
return &storage.GCSSource{TarReader: tarReader, Closer: raw,
57+
RetryBaseTime: timeout, TableBase: "test", PathDate: civil.Date{Year: 2020, Month: 6, Day: 11}}, nil
5658
}
5759

5860
type fakeAnnotator struct{}

storage/storage.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"strings"
2121
"time"
2222

23+
"cloud.google.com/go/civil"
2324
gcs "cloud.google.com/go/storage"
2425
"google.golang.org/api/option"
2526

@@ -44,6 +45,7 @@ type GCSSource struct {
4445
io.Closer // Closer interface to be provided by an embedded struct.
4546
RetryBaseTime time.Duration // The base time for backoff and retry.
4647
TableBase string // TableBase is BQ table associated with this source, or "invalid".
48+
PathDate civil.Date // Date associated with YYYY/MM/DD in FilePath.
4749
}
4850

4951
// Retrieve next file header.
@@ -125,6 +127,11 @@ func (src *GCSSource) Detail() string {
125127
return src.FilePath
126128
}
127129

130+
// Date returns a civil.Date associated with the GCSSource archive path.
131+
func (src *GCSSource) Date() civil.Date {
132+
return src.PathDate
133+
}
134+
128135
// NextTest reads the next test object from the tar file.
129136
// Skips reading contents of any file larger than maxSize, returning empty data
130137
// and storage.ErrOversizeFile.
@@ -218,25 +225,30 @@ var errNoClient = errors.New("client should be non-null")
218225
//
219226
// uri should be of form gs://bucket/filename.tar or gs://bucket/filename.tgz
220227
// FYI Using a persistent client saves about 80 msec, and 220 allocs, totalling 70kB.
221-
func NewTestSource(client *gcs.Client, uri string, label string) (etl.TestSource, error) {
228+
func NewTestSource(client *gcs.Client, dp etl.DataPath, label string) (etl.TestSource, error) {
222229
if client == nil {
223230
return nil, errNoClient
224231
}
225232
// For now only handle gcs paths.
226-
if !strings.HasPrefix(uri, "gs://") {
227-
return nil, errors.New("invalid file path: " + uri)
233+
if !strings.HasPrefix(dp.URI, "gs://") {
234+
return nil, errors.New("invalid file path: " + dp.URI)
228235
}
229-
parts := strings.SplitN(uri, "/", 4)
236+
parts := strings.SplitN(dp.URI, "/", 4)
230237
if len(parts) != 4 {
231-
return nil, errors.New("invalid file path: " + uri)
238+
return nil, errors.New("invalid file path: " + dp.URI)
232239
}
233240
bucket := parts[2]
234241
fn := parts[3]
235242

243+
archiveDate, err := time.Parse("2006/01/02", dp.DatePath)
244+
if err != nil {
245+
return nil, fmt.Errorf("failed to parse archive date path: %w", err)
246+
}
247+
236248
// TODO - consider just always testing for valid gzip file.
237249
if !(strings.HasSuffix(fn, ".tgz") || strings.HasSuffix(fn, ".tar") ||
238250
strings.HasSuffix(fn, ".tar.gz")) {
239-
return nil, errors.New("not tar or tgz: " + uri)
251+
return nil, errors.New("not tar or tgz: " + dp.URI)
240252
}
241253

242254
// TODO(prod) Evaluate whether this is long enough.
@@ -268,7 +280,15 @@ func NewTestSource(client *gcs.Client, uri string, label string) (etl.TestSource
268280
tarReader := tar.NewReader(rdr)
269281

270282
baseTimeout := 16 * time.Millisecond
271-
return &GCSSource{uri, tarReader, closer, baseTimeout, label}, nil
283+
gcs := &GCSSource{
284+
FilePath: dp.URI,
285+
TarReader: tarReader,
286+
Closer: closer,
287+
RetryBaseTime: baseTimeout,
288+
TableBase: label,
289+
PathDate: civil.DateOf(archiveDate),
290+
}
291+
return gcs, nil
272292
}
273293

274294
// GetStorageClient provides a storage reader client.
@@ -304,7 +324,7 @@ func (sf *gcsSourceFactory) Get(ctx context.Context, dp etl.DataPath) (etl.TestS
304324
http.StatusInternalServerError, etl.ErrBadDataType)
305325
}
306326

307-
tr, err := NewTestSource(sf.client, dp.URI, label)
327+
tr, err := NewTestSource(sf.client, dp, label)
308328
if err != nil {
309329
log.Printf("Error opening gcs file: %v", err)
310330
// TODO - anything better we could do here?

storage/storage_test.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import (
1818
)
1919

2020
var testBucket = "mlab-testing.appspot.com"
21-
var tarFile = "gs://" + testBucket + "/test.tar"
22-
var tgzFile = "gs://" + testBucket + "/test.tgz"
21+
var tarFile = "gs://" + testBucket + "/ndt/ndt5/2020/06/11/20200611T123456.12345Z-ndt5-mlab1-foo01-ndt.tar"
22+
var tgzFile = "gs://" + testBucket + "/ndt/ndt5/2020/06/11/20200611T123456.12345Z-ndt5-mlab1-foo01-ndt.tgz"
2323

2424
func assertGCSourceIsTestSource(in etl.TestSource) {
2525
func(in etl.TestSource) {}(&GCSSource{})
@@ -46,7 +46,11 @@ func TestNewTarReader(t *testing.T) {
4646
if testing.Short() {
4747
t.Skip("Skipping tests that access GCS")
4848
}
49-
src, err := NewTestSource(client, tarFile, "label")
49+
dpf, err := etl.ValidateTestPath(tarFile)
50+
if err != nil {
51+
t.Fatal(err)
52+
}
53+
src, err := NewTestSource(client, dpf, "label")
5054
if err != nil {
5155
t.Fatal(err)
5256
}
@@ -68,7 +72,11 @@ func TestNewTarReaderGzip(t *testing.T) {
6872
if testing.Short() {
6973
t.Skip("Skipping tests that access GCS")
7074
}
71-
src, err := NewTestSource(client, tgzFile, "label")
75+
dpf, err := etl.ValidateTestPath(tgzFile)
76+
if err != nil {
77+
t.Fatal(err)
78+
}
79+
src, err := NewTestSource(client, dpf, "label")
7280
if err != nil {
7381
t.Fatal(err)
7482
}
@@ -98,17 +106,25 @@ func init() {
98106
}
99107

100108
func BenchmarkNewTarReader(b *testing.B) {
109+
dpf, err := etl.ValidateTestPath(tarFile)
110+
if err != nil {
111+
b.Fatal(err)
112+
}
101113
for i := 0; i < b.N; i++ {
102-
src, err := NewTestSource(client, tarFile, "label")
114+
src, err := NewTestSource(client, dpf, "label")
103115
if err == nil {
104116
src.Close()
105117
}
106118
}
107119
}
108120

109121
func BenchmarkNewTarReaderGzip(b *testing.B) {
122+
dpf, err := etl.ValidateTestPath(tgzFile)
123+
if err != nil {
124+
b.Fatal(err)
125+
}
110126
for i := 0; i < b.N; i++ {
111-
src, err := NewTestSource(client, tgzFile, "label")
127+
src, err := NewTestSource(client, dpf, "label")
112128
if err == nil {
113129
src.Close()
114130
}

task/task.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func NewTask(filename string, src etl.TestSource, prsr etl.Parser) *Task {
4949
meta["filename"] = filename
5050
meta["parse_time"] = time.Now()
5151
meta["attempt"] = 1
52+
meta["date"] = src.Date()
5253
t := Task{src, prsr, meta, DefaultMaxFileSize}
5354
return &t
5455
}

0 commit comments

Comments
 (0)