Skip to content

Commit eb611b0

Browse files
authored
Add row count and error meta-data to json output files (#937)
* add metadata to GCS objects * bug fix * remove unused function
1 parent 839cb87 commit eb611b0

File tree

1 file changed

+36
-19
lines changed

1 file changed

+36
-19
lines changed

storage/rowwriter.go

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@ import (
55
"context"
66
"encoding/json"
77
"errors"
8+
"fmt"
89
"log"
910
"net/http"
1011
"strings"
12+
"time"
1113

14+
gcs "cloud.google.com/go/storage"
1215
"github.com/googleapis/google-cloud-go-testing/storage/stiface"
1316
"google.golang.org/api/googleapi"
1417

@@ -18,41 +21,39 @@ import (
1821
"github.com/m-lab/etl/row"
1922
)
2023

21-
// ObjectWriter creates a writer to a named object.
22-
// It may overwrite an existing object.
23-
// Caller must Close() the writer, or cancel the context.
24-
func ObjectWriter(ctx context.Context, client stiface.Client, bucket string, path string) stiface.Writer {
25-
b := client.Bucket(bucket)
26-
o := b.Object(path)
27-
w := o.NewWriter(ctx)
28-
// Set smaller chunk size to conserve memory.
29-
w.SetChunkSize(4 * 1024 * 1024)
30-
return w
31-
}
32-
3324
// RowWriter implements row.Sink to a GCS file backend.
3425
type RowWriter struct {
3526
w stiface.Writer
27+
o stiface.ObjectHandle
28+
a gcs.ObjectAttrsToUpdate
29+
30+
rows int
31+
writeErr error
3632

3733
bucket string
3834
path string
3935

4036
// These act as tokens to serialize access to the writer.
4137
// This allows concurrent encoding and writing, while ensuring
4238
// that single client access is correctly ordered.
43-
encoding chan struct{} // Token required for metric updates.
44-
writing chan struct{} // Token required for metric updates.
39+
encoding chan struct{} // Token required for encoding.
40+
writing chan struct{} // Token required for writing.
4541
}
4642

4743
// NewRowWriter creates a RowWriter.
4844
func NewRowWriter(ctx context.Context, client stiface.Client, bucket string, path string) (row.Sink, error) {
49-
w := ObjectWriter(ctx, client, bucket, path)
45+
b := client.Bucket(bucket)
46+
o := b.Object(path)
47+
w := o.NewWriter(ctx)
48+
// Set smaller chunk size to conserve memory.
49+
w.SetChunkSize(4 * 1024 * 1024)
50+
5051
encoding := make(chan struct{}, 1)
5152
encoding <- struct{}{}
5253
writing := make(chan struct{}, 1)
5354
writing <- struct{}{}
5455

55-
return &RowWriter{bucket: bucket, path: path, w: w, encoding: encoding, writing: writing}, nil
56+
return &RowWriter{bucket: bucket, path: path, o: o, w: w, encoding: encoding, writing: writing}, nil
5657
}
5758

5859
// Acquire the encoding token.
@@ -110,6 +111,7 @@ func (rw *RowWriter) Commit(rows []interface{}, label string) (int, error) {
110111
defer rw.releaseWritingToken()
111112
n, err := buf.WriteTo(rw.w) // This is buffered (by 4MB chunks). Are the writes to GCS synchronous?
112113
if err != nil {
114+
rw.writeErr = err
113115
switch typedErr := err.(type) {
114116
case *googleapi.Error:
115117
metrics.BackendFailureCount.WithLabelValues(
@@ -128,10 +130,13 @@ func (rw *RowWriter) Commit(rows []interface{}, label string) (int, error) {
128130
// The caller should likely abandon the archive at this point,
129131
// as further writing will likely result in a corrupted file.
130132
// See https://github.com/m-lab/etl/issues/899
131-
return int(n) * len(rows) / numBytes, err
133+
rowEstimate := int(n) * len(rows) / numBytes
134+
rw.rows += rowEstimate
135+
return rowEstimate, err
132136
}
133137

134138
// TODO - these may not be committed, so the returned value may be wrong.
139+
rw.rows += len(rows)
135140
return len(rows), nil
136141
}
137142

@@ -148,10 +153,22 @@ func (rw *RowWriter) Close() error {
148153
err := rw.w.Close()
149154
if err != nil {
150155
log.Println(err)
151-
} else {
152-
log.Println(rw.w.Attrs())
156+
return err
153157
}
158+
159+
oa := gcs.ObjectAttrsToUpdate{}
160+
oa.Metadata = make(map[string]string, 1)
161+
oa.Metadata["rows"] = fmt.Sprint(rw.rows)
162+
if rw.writeErr != nil {
163+
oa.Metadata["writeError"] = rw.writeErr.Error()
164+
}
165+
166+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
167+
defer cancel()
168+
attr, err := rw.o.Update(ctx, oa)
169+
log.Println(attr, err)
154170
return err
171+
155172
}
156173

157174
// SinkFactory implements factory.SinkFactory.

0 commit comments

Comments
 (0)