File tree Expand file tree Collapse file tree 1 file changed +5
-16
lines changed
Expand file tree Collapse file tree 1 file changed +5
-16
lines changed Original file line number Diff line number Diff line change @@ -406,18 +406,13 @@ func (c *PagedWriter) writeInOrder() error {
406406 if ! ok {
407407 return nil
408408 }
409- if r .err != nil {
410- return r .err
411- }
412409 if _ , err := c .parent .Write (r .data ); err != nil {
413410 return err
414411 }
415412 delete (c .pendingResults , c .seqOut )
416413 putPageResult (r )
417414 c .seqOut ++
418- if c .numWorkers > 1 {
419- c .pagesCompressed ++
420- }
415+ c .pagesCompressed ++
421416 }
422417}
423418
@@ -474,17 +469,11 @@ func (c *PagedWriter) writePage() error {
474469
475470 // Async path with parallel workers
476471 item := getPageWorkItem ()
477- sent := false
478- defer func () {
479- if ! sent {
480- putPageWorkItem (item )
481- }
482- }()
483-
484472 var ok bool
485473 item .uncompressedData , ok = c .bytesUncompressedTo (item .uncompressedData )
486474 c .resetPage ()
487475 if ! ok {
476+ putPageWorkItem (item )
488477 return nil
489478 }
490479
@@ -493,11 +482,11 @@ func (c *PagedWriter) writePage() error {
493482
494483 select {
495484 case c .workCh <- item :
496- sent = true
485+ return nil
497486 case <- c .egCtx .Done ():
487+ putPageWorkItem (item )
498488 return c .egCtx .Err ()
499489 }
500- return nil
501490}
502491
503492func (c * PagedWriter ) Add (k , v []byte ) (err error ) {
@@ -522,7 +511,7 @@ func (c *PagedWriter) resetPage() {
522511 c .kLengths , c .vLengths = c .kLengths [:0 ], c .vLengths [:0 ]
523512 c .keys , c .vals = c .keys [:0 ], c .vals [:0 ]
524513}
525- func (c * PagedWriter ) Flush () ( err error ) {
514+ func (c * PagedWriter ) Flush () error {
526515 if c .pageSize <= 1 {
527516 return nil
528517 }
You can’t perform that action at this time.
0 commit comments