1010package main
1111
1212import (
13+ "bufio"
1314 "fmt"
1415 "io"
1516 "os"
1617 "os/signal"
18+ "strconv"
1719 "strings"
1820 "sync"
1921 "syscall"
@@ -46,7 +48,10 @@ const (
4648 flagOutputBufferSize string = "output-buffer-size"
4749 flagOutputAppend string = "output-append"
4850 flagOutputOverwrite string = "output-overwrite"
51+ flagSplitLines string = "split-lines"
4952 flagVerbose string = "verbose"
53+
54+ NumberReplacementCharacter string = "#"
5055)
5156
5257func initFlags (flag * pflag.FlagSet ) {
@@ -65,9 +70,49 @@ func initFlags(flag *pflag.FlagSet) {
6570 flag .BoolP (flagOutputAppend , "a" , false , "append to output files" )
6671 flag .BoolP (flagOutputOverwrite , "o" , false , "overwrite output if it already exists" )
6772
73+ flag .IntP (
74+ flagSplitLines ,
75+ "l" ,
76+ - 1 ,
77+ fmt .Sprintf ("split output by a number of lines, replaces %q in output uri with file number starting with 1." , NumberReplacementCharacter ),
78+ )
79+
6880 flag .BoolP (flagVerbose , "v" , false , "verbose output" )
6981}
7082
83+ func initViper (flag * pflag.FlagSet ) (* viper.Viper , error ) {
84+ v := viper .New ()
85+ err := v .BindPFlags (flag )
86+ if err != nil {
87+ return nil , err
88+ }
89+ v .SetEnvKeyReplacer (strings .NewReplacer ("-" , "_" ))
90+ v .AutomaticEnv ()
91+ return v , nil
92+ }
93+
94+ func checkConfig (args []string , v * viper.Viper ) error {
95+
96+ if len (args ) != 2 {
97+ return fmt .Errorf ("expecting 2 arguments, found %d arguments" , len (args ))
98+ }
99+
100+ outputUri := args [1 ]
101+
102+ splitLines := v .GetInt (flagSplitLines )
103+ if splitLines > 0 {
104+
105+ if ! strings .Contains (outputUri , NumberReplacementCharacter ) {
106+ return fmt .Errorf (
107+ "when splitting by lines, you must include the number replacement character (%q) in the output uri" ,
108+ NumberReplacementCharacter ,
109+ )
110+ }
111+
112+ }
113+ return nil
114+ }
115+
71116func main () {
72117
73118 rootCommand := cobra.Command {
@@ -85,16 +130,14 @@ func main() {
85130
86131 flag := cmd .Flags ()
87132
88- v := viper .New ()
89- err = v .BindPFlags (flag )
133+ v , err := initViper (flag )
90134 if err != nil {
91- return err
135+ return errors . Wrap ( err , "error initializing viper" )
92136 }
93- v .SetEnvKeyReplacer (strings .NewReplacer ("-" , "_" ))
94- v .AutomaticEnv ()
95137
96- if len (args ) != 2 {
97- return fmt .Errorf ("expecting 2 arguments, found %d arguments" , len (args ))
138+ err = checkConfig (args , v )
139+ if err != nil {
140+ return err
98141 }
99142
100143 inputUri := args [0 ]
@@ -148,9 +191,12 @@ func main() {
148191 }
149192
150193 outputCompression := v .GetString (flagOutputCompression )
194+ outputOverwrite := v .GetBool (flagOutputOverwrite )
151195 outputAppend := v .GetBool (flagOutputAppend )
152196 outputBufferSize := v .GetInt (flagOutputBufferSize )
153197
198+ splitLines := v .GetInt (flagSplitLines )
199+
154200 var outputWriter grw.ByteWriteCloser
155201 var outputBuffer grw.Buffer
156202
@@ -165,7 +211,24 @@ func main() {
165211 return errors .Wrapf (err , "error opening bytes buffer for %q" , outputUri )
166212 }
167213 } else {
168- outputWriter , err = grw .WriteToResource (outputUri , outputCompression , outputAppend , s3Client )
214+ uri := outputUri
215+ if splitLines > 0 {
216+ uri = strings .ReplaceAll (outputUri , NumberReplacementCharacter , "1" )
217+ }
218+ if (! outputOverwrite ) && (! outputAppend ) {
219+ exists , _ , err := grw .Stat (uri )
220+ if err != nil {
221+ return errors .Wrapf (err , "error statting uri %q" , uri )
222+ }
223+ if exists {
224+ return fmt .Errorf ("file already exists at uri %q and neither append or overwrite is set" , uri )
225+ }
226+ }
227+ outputWriter , err = grw .WriteToResource (
228+ uri ,
229+ outputCompression ,
230+ outputAppend ,
231+ s3Client )
169232 if err != nil {
170233 return errors .Wrapf (err , "error opening resource at uri %q" , outputUri )
171234 }
@@ -197,38 +260,124 @@ func main() {
197260 }()
198261
199262 brokenPipe := false
200- go func () {
201- eof := false
202- for ( ! updateGracefulShutdown ( nil )) && ( ! eof ) && ( ! brokenPipe ) {
263+ if splitLines > 0 {
264+ go func () {
265+ eof := false
203266
204- b := make ([]byte , outputBufferSize )
205- n , err := inputReader .Read (b )
206- if err != nil {
207- if err == io .EOF {
208- eof = true
209- } else {
210- fmt .Fprint (os .Stderr , errors .Wrapf (err , "error reading from resource at uri %q" , inputUri ).Error ())
267+ scanner := bufio .NewScanner (inputReader )
268+ files := 1
269+ lines := 0
270+
271+ for (! updateGracefulShutdown (nil )) && (! eof ) && (! brokenPipe ) && scanner .Scan () {
272+
273+ if lines == splitLines {
274+
275+ err := outputWriter .Flush ()
276+ if err != nil {
277+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error flushing resource at uri %q" , strings .ReplaceAll (outputUri , NumberReplacementCharacter , strconv .Itoa (files ))).Error ())
278+ break
279+ }
280+
281+ err = outputWriter .Close ()
282+ if err != nil {
283+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error closing resource at uri %q" , strings .ReplaceAll (outputUri , NumberReplacementCharacter , strconv .Itoa (files ))).Error ())
284+ break
285+ }
286+
287+ // increment files number
288+ files ++
289+
290+ uri := strings .ReplaceAll (outputUri , NumberReplacementCharacter , strconv .Itoa (files ))
291+
292+ if (! outputOverwrite ) && (! outputAppend ) {
293+ exists , _ , err := grw .Stat (uri )
294+ if err != nil {
295+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error statting uri %q" , uri ).Error ())
296+ break
297+ }
298+ if exists {
299+ fmt .Fprint (os .Stderr , fmt .Errorf ("file already exists at uri %q and neither append or overwrite is set" , uri ).Error ())
300+ break
301+ }
302+ }
303+
304+ ow , err := grw .WriteToResource (
305+ uri ,
306+ outputCompression ,
307+ outputAppend ,
308+ s3Client )
309+ if err != nil {
310+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error opening resource at uri %q" , outputUri ).Error ())
311+ break
312+ }
313+
314+ outputWriter = ow
315+
316+ lines = 0
317+ }
318+
319+ line := scanner .Text ()
320+
321+ if gracefulShutdown {
322+ break
323+ }
324+
325+ _ , err = outputWriter .WriteLine (line )
326+ if err != nil {
327+ if perr , ok := err .(* os.PathError ); ok {
328+ if perr .Err == syscall .EPIPE {
329+ brokenPipe = true
330+ break
331+ }
332+ }
333+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error writing to resource at uri %q" , outputUri ).Error ())
334+ break
211335 }
336+
337+ // increment counter
338+ lines ++
212339 }
213340
214- if gracefulShutdown {
215- break
341+ if err := scanner . Err (); err != nil {
342+ fmt . Fprint ( os . Stderr , errors . Wrapf ( err , "error reading from resource at uri %q" , inputUri ). Error ())
216343 }
217344
218- _ , err = outputWriter .Write (b [:n ])
219- if err != nil {
220- if perr , ok := err .(* os.PathError ); ok {
221- if perr .Err == syscall .EPIPE {
222- brokenPipe = true
223- break
345+ wg .Done ()
346+ }()
347+ } else {
348+ go func () {
349+ eof := false
350+ for (! updateGracefulShutdown (nil )) && (! eof ) && (! brokenPipe ) {
351+
352+ b := make ([]byte , outputBufferSize )
353+ n , err := inputReader .Read (b )
354+ if err != nil {
355+ if err == io .EOF {
356+ eof = true
357+ } else {
358+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error reading from resource at uri %q" , inputUri ).Error ())
224359 }
225360 }
226- fmt .Fprint (os .Stderr , errors .Wrapf (err , "error writing to resource at uri %q" , outputUri ).Error ())
227- }
228361
229- }
230- wg .Done ()
231- }()
362+ if gracefulShutdown {
363+ break
364+ }
365+
366+ _ , err = outputWriter .Write (b [:n ])
367+ if err != nil {
368+ if perr , ok := err .(* os.PathError ); ok {
369+ if perr .Err == syscall .EPIPE {
370+ brokenPipe = true
371+ break
372+ }
373+ }
374+ fmt .Fprint (os .Stderr , errors .Wrapf (err , "error writing to resource at uri %q" , outputUri ).Error ())
375+ }
376+
377+ }
378+ wg .Done ()
379+ }()
380+ }
232381
233382 wg .Wait () // wait until done writing or received signal for graceful shutdown
234383
@@ -259,6 +408,7 @@ func main() {
259408 if verbose && ! brokenPipe {
260409 fmt .Println ("Done in " + elapsed .String ())
261410 }
411+
262412 return nil
263413 },
264414 }
0 commit comments