@@ -16,39 +16,24 @@ import (
1616 "github.com/google/uuid"
1717)
1818
19- type bqCSVReader struct {
20- closers []io.Closer
21- gcsFiles []io.Reader
22- }
23-
24- func (r * bqCSVReader ) Close () {
25- for _ , c := range r .closers {
26- c .Close ()
27- }
28- }
29-
30- func MakeBQCSVReader () * bqCSVReader {
31- return & bqCSVReader {}
32- }
33-
34- func (r * bqCSVReader ) InitNsRecords (ctx context.Context , ns , filePath , commit string , from , to civil.Date ) error {
19+ func InitNsRecords (ctx context.Context , ns , filePath , commit string , from , to civil.Date ) (io.ReadCloser , error ) {
3520 if err := validator .AnyError ("input validation failed" ,
3621 validator .NamespaceName (ns ),
3722 validator .AnyOk (validator .EmptyStr (filePath ), validator .KernelFilePath (filePath )),
3823 validator .AnyOk (validator .EmptyStr (commit ), validator .CommitHash (commit )),
3924 ); err != nil {
40- return err
25+ return nil , err
4126 }
4227 sessionUUID := uuid .New ().String ()
4328 gsBucket := "syzbot-temp"
4429 gsPath := fmt .Sprintf ("bq-exports/%s" , sessionUUID )
4530 gsURI := "gs://" + gsBucket + "/" + gsPath + "/*.csv.gz"
4631 client , err := bigquery .NewClient (ctx , "syzkaller" )
4732 if err != nil {
48- return fmt .Errorf ("failed to initialize bigquery client: %w" , err )
33+ return nil , fmt .Errorf ("failed to initialize bigquery client: %w" , err )
4934 }
5035 if err := client .EnableStorageReadClient (ctx ); err != nil {
51- return fmt .Errorf ("failed to client.EnableStorageReadClient: %w" , err )
36+ return nil , fmt .Errorf ("failed to client.EnableStorageReadClient: %w" , err )
5237 }
5338 selectCommit := ""
5439 if commit != "" {
@@ -77,48 +62,82 @@ func (r *bqCSVReader) InitNsRecords(ctx context.Context, ns, filePath, commit st
7762 ` , gsURI , ns , from .String (), to .String (), filePath , selectCommit ))
7863 job , err := q .Run (ctx )
7964 if err != nil {
80- return fmt .Errorf ("err during bigquery.Run: %w" , err )
65+ return nil , fmt .Errorf ("err during bigquery.Run: %w" , err )
8166 }
8267 status , err := job .Wait (ctx )
8368 if err != nil {
84- return fmt .Errorf ("err waiting for the bigquery.Job: %w" , err )
69+ return nil , fmt .Errorf ("err waiting for the bigquery.Job: %w" , err )
8570 }
8671 if status .Err () != nil {
87- return fmt .Errorf ("bigquery job failed with status %w" , status .Err ())
72+ return nil , fmt .Errorf ("bigquery job failed with status %w" , status .Err ())
8873 }
89- return r . initGCSFileReaders (ctx , gsBucket , gsPath )
74+ return initGCSMultiReader (ctx , gsBucket , gsPath )
9075}
9176
92- func ( r * bqCSVReader ) initGCSFileReaders ( ctx context.Context , bucket , path string ) error {
77+ func initGCSMultiReader ( ctx context.Context , bucket , path string ) (io. ReadCloser , error ) {
9378 var gcsClient gcs.Client
9479 var err error
9580 if gcsClient , err = gcs .NewClient (ctx ); err != nil {
96- return fmt .Errorf ("err creating gcs client: %w" , err )
81+ return nil , fmt .Errorf ("err creating gcs client: %w" , err )
9782 }
9883 var gcsFiles []* gcs.Object
9984 if gcsFiles , err = gcsClient .ListObjects (bucket + "/" + path ); err != nil {
100- return fmt .Errorf ("err enumerating gcs files: %w" , err )
85+ return nil , fmt .Errorf ("err enumerating gcs files: %w" , err )
10186 }
87+ paths := []string {}
10288 for _ , obj := range gcsFiles {
103- var readCloser io.ReadCloser
104- if readCloser , err = gcsClient .FileReader (bucket + "/" + obj .Path ); err != nil {
105- return fmt .Errorf ("failed to get %s reader: %w" , obj .Path , err )
106- }
107- r .closers = append (r .closers , readCloser )
108- r .gcsFiles = append (r .gcsFiles , readCloser )
89+ paths = append (paths , bucket + "/" + obj .Path )
10990 }
110- return nil
91+ return & gcsGZIPMultiReader {
92+ gcsClient : gcsClient ,
93+ gcsFiles : paths ,
94+ }, nil
11195}
11296
113- func (r * bqCSVReader ) Reader () (io.Reader , error ) {
114- var readers []io.Reader
115- for _ , file := range r .gcsFiles {
116- gzipReaderCloser , err := gzip .NewReader (file )
117- if err != nil {
118- return nil , fmt .Errorf ("err calling gzip.NewReader: %w" , err )
97+ type gcsGZIPMultiReader struct {
98+ gcsClient gcs.Client
99+ gcsFiles []string
100+
101+ curCloser io.Closer
102+ curReader io.Reader
103+ }
104+
105+ func (mr * gcsGZIPMultiReader ) Read (p []byte ) (n int , err error ) {
106+ for len (mr .gcsFiles ) > 0 {
107+ if mr .curReader == nil {
108+ var readCloser io.ReadCloser
109+ if readCloser , err = mr .gcsClient .FileReader (mr .gcsFiles [0 ]); err != nil {
110+ return 0 , fmt .Errorf ("failed to get %s reader: %w" , mr .gcsFiles [0 ], err )
111+ }
112+ mr .curCloser = readCloser
113+ mr .curReader , err = gzip .NewReader (readCloser )
114+ if err != nil {
115+ return 0 , fmt .Errorf ("err calling gzip.NewReader: %w" , err )
116+ }
117+ }
118+ n , err = mr .curReader .Read (p )
119+ if err == io .EOF {
120+ mr .gcsFiles = mr .gcsFiles [1 :]
121+ if err = mr .curCloser .Close (); err != nil {
122+ return 0 , fmt .Errorf ("mr.curCloser.Close: %w" , err )
123+ }
124+ mr .curCloser = nil
125+ mr .curReader = nil
119126 }
120- r .closers = append (r .closers , gzipReaderCloser )
121- readers = append (readers , gzipReaderCloser )
127+ if n > 0 || err != io .EOF {
128+ if err == io .EOF && len (mr .gcsFiles ) > 0 {
129+ // Don't return EOF yet. More readers remain.
130+ err = nil
131+ }
132+ return
133+ }
134+ }
135+ return 0 , io .EOF
136+ }
137+
138+ func (mr * gcsGZIPMultiReader ) Close () error {
139+ if mr .curCloser == nil {
140+ return nil
122141 }
123- return io . MultiReader ( readers ... ), nil
142+ return mr . curCloser . Close ()
124143}
0 commit comments