Skip to content

Commit 188548a

Browse files
committed
feat: implement chunked file transfer protocol
This change replaces the simple full-file transfer mechanism with a robust chunked protocol that supports: - Concurrent uploads/downloads with unique transfer IDs - 1MB chunk streaming to support large files - Transfer state management with acknowledgments - Round-robin scheduling for fair resource usage during concurrent transfers Signed-off-by: llogen <christoph.lange@blindspot.software>
1 parent f795b4c commit 188548a

17 files changed

Lines changed: 2169 additions & 764 deletions

File tree

cmds/dutagent/states.go

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type runCmdArgs struct {
2929
cmdMsg *pb.Command
3030
dev dut.Device
3131
cmd dut.Command
32+
broker *dutagent.Broker
3233
session module.Session
3334
moduleErrCh chan error
3435
brokerErrCh <-chan error
@@ -98,8 +99,10 @@ func findDUTCmd(_ context.Context, args runCmdArgs) (runCmdArgs, fsm.State[runCm
9899
// in a separate goroutine, this state will not wait for the execution to finish.
99100
// Further, worker goroutines will be started to serve the module-to-client communication
100101
// during the module execution.
102+
//
103+
//nolint:funlen
101104
func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State[runCmdArgs], error) {
102-
broker := &dutagent.Broker{}
105+
args.broker = &dutagent.Broker{}
103106

104107
// Deferred initialization of the moduleErr channel: only create if not already provided
105108
// (tests may still pass a custom channel).
@@ -110,19 +113,26 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State
110113
rpcCtx := ctx
111114
modCtx, modCtxCancel := context.WithCancel(rpcCtx)
112115

113-
moduleSession, brokerErrCh := broker.Start(modCtx, args.stream)
116+
moduleSession, brokerErrCh := args.broker.Start(modCtx, args.stream)
114117
args.brokerErrCh = brokerErrCh
115118
args.session = moduleSession
116119

117120
// Run the modules in a goroutine.
118121
// Termination of the module execution is signaled by closing the moduleErrCh channel.
119122
go func() {
123+
defer modCtxCancel() // Ensure workers exit even if stream doesn't close
124+
120125
cnt := len(args.cmd.Modules)
121126

122127
for idx, module := range args.cmd.Modules {
123128
if ctx.Err() != nil {
124129
log.Printf("Execution aborted, %d of %d modules done: %v", idx, cnt, ctx.Err())
125-
modCtxCancel()
130+
args.broker.Shutdown()
131+
132+
// Wait for file transfers to complete (workers will exit gracefully)
133+
log.Print("Waiting for file transfers to complete...")
134+
<-args.broker.WaitForTransfersToComplete()
135+
log.Print("All file transfers completed")
126136

127137
return
128138
}
@@ -141,14 +151,25 @@ func executeModules(ctx context.Context, args runCmdArgs) (runCmdArgs, fsm.State
141151
args.moduleErrCh <- err
142152

143153
log.Printf("Module %q failed: %v", module.Config.Name, err)
144-
modCtxCancel()
154+
args.broker.Shutdown()
155+
156+
// Wait for file transfers to complete (workers will exit gracefully)
157+
log.Print("Waiting for file transfers to complete...")
158+
<-args.broker.WaitForTransfersToComplete()
159+
log.Print("All file transfers completed")
145160

146161
return
147162
}
148163
}
149164

150165
log.Print("All modules finished successfully")
151-
modCtxCancel()
166+
args.broker.Shutdown()
167+
168+
// Wait for file transfers to complete (workers will exit gracefully)
169+
log.Print("Waiting for file transfers to complete...")
170+
<-args.broker.WaitForTransfersToComplete()
171+
log.Print("All file transfers completed")
172+
152173
close(args.moduleErrCh)
153174
}()
154175

0 commit comments

Comments
 (0)