File tree Expand file tree Collapse file tree 2 files changed +11
-9
lines changed
Expand file tree Collapse file tree 2 files changed +11
-9
lines changed Original file line number Diff line number Diff line change @@ -363,7 +363,6 @@ func (in *BQInserter) Flush() error {
363363// Commit implements row.Sink.
364364// NOTE: the label is ignored, and the TableBase is used instead.
365365func (in * BQInserter ) Commit (rows []interface {}, label string ) error {
366- // TODO - this causes large memory and large number of goroutines.
367366 in .acquire ()
368367 defer in .release ()
369368 return in .flushSlice (rows )
Original file line number Diff line number Diff line change @@ -271,6 +271,7 @@ func (pb *Base) commit(rows []interface{}) error {
271271 // TODO - care about error?
272272 _ = pb .ann .Annotate (rows , pb .label )
273273 // TODO do we need these to be done in order.
274+ // This is synchronous, blocking, and thread safe.
274275 err := pb .sink .Commit (rows , pb .label )
275276
276277 pb .statsLock .Lock ()
@@ -300,16 +301,18 @@ func (pb *Base) Flush() error {
300301func (pb * Base ) Put (row Annotatable ) {
301302 rows := pb .buf .Append (row )
302303 pb .statsLock .Lock ()
303- defer pb .statsLock .Unlock ()
304304 pb .stats .Total ++
305305 pb .stats .Pending ++
306+ pb .statsLock .Unlock ()
307+
306308 if rows != nil {
307- go func (rows []interface {}) {
308- // This allows pipelined parsing annotating, and writing.
309- err := pb .commit (rows )
310- if err != nil {
311- log .Println (err )
312- }
313- }(rows )
309+ // This allows pipelined parsing annotating, and writing.
310+ // Disabling for now, as it leads to large memory/goroutine footprint.
311+ // go func(rows []interface{}) {
312+ err := pb .commit (rows )
313+ if err != nil {
314+ log .Println (err )
315+ }
316+ // }(rows)
314317 }
315318}
You can’t perform that action at this time.
0 commit comments