Skip to content

Commit b38eb16

Browse files
authored
Merge pull request lxc#2142 from nanjj/qmp03
incusd/instance/qmp: Associate request/reply with command ID
2 parents 7ecb7c1 + 9b53c40 commit b38eb16

File tree

6 files changed

+168
-82
lines changed

6 files changed

+168
-82
lines changed

internal/server/instance/drivers/driver_qemu.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,8 +1221,17 @@ func (d *qemu) startupHook(monitor *qmp.Monitor, stage string) error {
12211221
}
12221222

12231223
for _, command := range commandList {
1224-
jsonCommand, _ := json.Marshal(command)
1225-
err = monitor.RunJSON(jsonCommand, nil, true)
1224+
id := monitor.IncreaseID()
1225+
command["id"] = id
1226+
1227+
var jsonCommand []byte
1228+
jsonCommand, err = json.Marshal(command)
1229+
if err != nil {
1230+
err = fmt.Errorf("Failed to marshal command at %s stage: %w", stage, err)
1231+
return err
1232+
}
1233+
1234+
err = monitor.RunJSON(jsonCommand, nil, true, id)
12261235
if err != nil {
12271236
err = fmt.Errorf("Failed to run QMP command %s at %s stage: %w", jsonCommand, stage, err)
12281237
return err

internal/server/instance/drivers/qmp/commands.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -206,23 +206,20 @@ func (m *Monitor) SendFile(name string, file *os.File) error {
206206
return ErrMonitorDisconnect
207207
}
208208

209-
var req struct {
210-
Execute string `json:"execute"`
211-
Arguments struct {
212-
FDName string `json:"fdname"`
213-
} `json:"arguments"`
209+
id := m.qmp.qmpIncreaseID()
210+
req := &qmpCommand{
211+
ID: id,
212+
Execute: "getfd",
213+
Arguments: map[string]any{"fdname": name},
214214
}
215215

216-
req.Execute = "getfd"
217-
req.Arguments.FDName = name
218-
219216
reqJSON, err := json.Marshal(req)
220217
if err != nil {
221218
return err
222219
}
223220

224221
// Query the status.
225-
_, err = m.qmp.runWithFile(reqJSON, file)
222+
_, err = m.qmp.runWithFile(reqJSON, file, id)
226223
if err != nil {
227224
// Confirm the daemon didn't die.
228225
errPing := m.ping()
@@ -259,27 +256,26 @@ func (m *Monitor) SendFileWithFDSet(name string, file *os.File, readonly bool) (
259256
return nil, ErrMonitorDisconnect
260257
}
261258

262-
var req struct {
263-
Execute string `json:"execute"`
264-
Arguments struct {
265-
Opaque string `json:"opaque"`
266-
} `json:"arguments"`
267-
}
268-
269259
permissions := "rdwr"
270260
if readonly {
271261
permissions = "rdonly"
272262
}
273263

274-
req.Execute = "add-fd"
275-
req.Arguments.Opaque = fmt.Sprintf("%s:%s", permissions, name)
264+
id := m.qmp.qmpIncreaseID()
265+
req := &qmpCommand{
266+
ID: id,
267+
Execute: "add-fd",
268+
Arguments: map[string]any{
269+
"opaque": fmt.Sprintf("%s:%s", permissions, name),
270+
},
271+
}
276272

277273
reqJSON, err := json.Marshal(req)
278274
if err != nil {
279275
return nil, err
280276
}
281277

282-
ret, err := m.qmp.runWithFile(reqJSON, file)
278+
ret, err := m.qmp.runWithFile(reqJSON, file, id)
283279
if err != nil {
284280
// Confirm the daemon didn't die.
285281
errPing := m.ping()

internal/server/instance/drivers/qmp/monitor.go

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,11 @@ func (m *Monitor) ping() error {
179179
return ErrMonitorDisconnect
180180
}
181181

182+
id := m.qmp.qmpIncreaseID()
183+
182184
// Query the capabilities to validate the monitor.
183-
_, err := m.qmp.run([]byte("{'execute': 'query-version'}"))
185+
_, err := m.qmp.run(fmt.Appendf([]byte{},
186+
`{"execute": "query-version", "id": %d}`, id), id)
184187
if err != nil {
185188
m.Disconnect()
186189
return ErrMonitorDisconnect
@@ -190,7 +193,7 @@ func (m *Monitor) ping() error {
190193
}
191194

192195
// RunJSON executes a JSON-formatted command.
193-
func (m *Monitor) RunJSON(request []byte, resp any, logCommand bool) error {
196+
func (m *Monitor) RunJSON(request []byte, resp any, logCommand bool, id uint32) error {
194197
// Check if disconnected
195198
if m.disconnected {
196199
return ErrMonitorDisconnect
@@ -212,7 +215,7 @@ func (m *Monitor) RunJSON(request []byte, resp any, logCommand bool) error {
212215
}
213216
}
214217

215-
out, err := m.qmp.run(request)
218+
out, err := m.qmp.run(request, id)
216219
if err != nil {
217220
// Confirm the daemon didn't die.
218221
errPing := m.ping()
@@ -247,13 +250,18 @@ func (m *Monitor) RunJSON(request []byte, resp any, logCommand bool) error {
247250
return nil
248251
}
249252

253+
// IncreaseID returns on auto increment uint32 id.
254+
func (m *Monitor) IncreaseID() uint32 {
255+
return m.qmp.qmpIncreaseID()
256+
}
257+
250258
// run executes a command.
251259
func (m *Monitor) Run(cmd string, args any, resp any) error {
260+
id := m.IncreaseID()
261+
252262
// Construct the command.
253-
requestArgs := struct {
254-
Execute string `json:"execute"`
255-
Arguments any `json:"arguments,omitempty"`
256-
}{
263+
requestArgs := qmpCommand{
264+
ID: id,
257265
Execute: cmd,
258266
Arguments: args,
259267
}
@@ -264,7 +272,7 @@ func (m *Monitor) Run(cmd string, args any, resp any) error {
264272
}
265273

266274
logCommand := !slices.Contains(ExcludedCommands, cmd)
267-
return m.RunJSON(request, resp, logCommand)
275+
return m.RunJSON(request, resp, logCommand, id)
268276
}
269277

270278
// Connect creates or retrieves an existing QMP monitor for the path.

internal/server/instance/drivers/qmp/qmp.go

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,19 @@ import (
1414
"sync/atomic"
1515

1616
"golang.org/x/sys/unix"
17+
18+
"github.com/lxc/incus/v6/shared/logger"
1719
)
1820

1921
type qemuMachineProtocal struct {
20-
oobSupported bool // Out of band support or not
21-
c net.Conn // Underlying connection
22-
uc *net.UnixConn // Underlying unix socket connection
23-
mu sync.Mutex // Serialize running command
24-
stream <-chan rawResponse // Send command responses and errors
25-
events <-chan qmpEvent // Events channel
26-
listeners atomic.Uint32 // Listeners number
27-
cid atomic.Uint32 // Auto increase command id
22+
oobSupported bool // Out of band support or not
23+
c net.Conn // Underlying connection
24+
uc *net.UnixConn // Underlying unix socket connection
25+
mu sync.Mutex // Serialize running command
26+
replies sync.Map // Replies channels
27+
events <-chan qmpEvent // Events channel
28+
listeners atomic.Uint32 // Listeners number
29+
cid atomic.Uint32 // Auto increase command id
2830
}
2931

3032
// qmpEvent represents a QEMU QMP event.
@@ -104,9 +106,8 @@ func (qmp *qemuMachineProtocal) disconnect() error {
104106

105107
// qmpIncreaseID increase ID and skip zero.
106108
func (qmp *qemuMachineProtocal) qmpIncreaseID() uint32 {
107-
const ZeroKey = uint32(0)
108109
id := qmp.cid.Add(1)
109-
if id == ZeroKey {
110+
if id == 0 {
110111
id = qmp.cid.Add(1)
111112
}
112113

@@ -155,14 +156,10 @@ func (qmp *qemuMachineProtocal) connect() error {
155156
return fmt.Errorf("reply id %d and command id %d mismatch", r.ID, id)
156157
}
157158

158-
// Initialize listener for command responses and asynchronous events
159-
events := make(chan qmpEvent)
160-
stream := make(chan rawResponse)
161-
go qmp.listen(qmp.c, events, stream)
162-
159+
// Initialize listener for command responses and asynchronous events.
160+
events := make(chan qmpEvent, 128)
161+
go qmp.listen(qmp.c, events, &qmp.replies)
163162
qmp.events = events
164-
qmp.stream = stream
165-
166163
return nil
167164
}
168165

@@ -172,9 +169,8 @@ func (qmp *qemuMachineProtocal) getEvents(context.Context) (<-chan qmpEvent, err
172169
return qmp.events, nil
173170
}
174171

175-
func (qmp *qemuMachineProtocal) listen(r io.Reader, events chan<- qmpEvent, stream chan<- rawResponse) {
172+
func (qmp *qemuMachineProtocal) listen(r io.Reader, events chan<- qmpEvent, replies *sync.Map) {
176173
defer close(events)
177-
defer close(stream)
178174

179175
scanner := bufio.NewScanner(r)
180176
for scanner.Scan() {
@@ -194,9 +190,29 @@ func (qmp *qemuMachineProtocal) listen(r io.Reader, events chan<- qmpEvent, stre
194190
continue
195191
}
196192

193+
key := r.ID
194+
if key == 0 {
195+
// Discard response without a request ID.
196+
continue
197+
}
198+
199+
val, ok := replies.LoadAndDelete(key)
200+
if !ok {
201+
// Discard unexpected response.
202+
continue
203+
}
204+
205+
reply, ok := val.(chan rawResponse)
206+
if !ok {
207+
// Skip bad messages.
208+
logger.Error("Failed to cast QMP reply to chan rawResponse")
209+
continue
210+
}
211+
197212
r.raw = make([]byte, len(b))
198213
copy(r.raw, b)
199-
stream <- r
214+
reply <- r
215+
200216
continue
201217
}
202218

@@ -205,19 +221,28 @@ func (qmp *qemuMachineProtocal) listen(r io.Reader, events chan<- qmpEvent, stre
205221
continue
206222
}
207223

208-
events <- e
224+
select {
225+
case events <- e:
226+
logger.Debugf("Event dispatched: %s", b)
227+
default:
228+
logger.Debugf("Event discarded: %s", b)
229+
}
209230
}
210231

211232
err := scanner.Err()
212233
if err != nil {
213-
stream <- rawResponse{err: err}
234+
errReply := make(chan rawResponse, 1)
235+
replies.Store(0, errReply)
236+
237+
r := rawResponse{err: err}
238+
errReply <- r
214239
}
215240
}
216241

217242
// run executes the given QAPI command against a domain's QEMU instance.
218-
func (qmp *qemuMachineProtocal) run(command []byte) ([]byte, error) {
243+
func (qmp *qemuMachineProtocal) run(command []byte, id uint32) ([]byte, error) {
219244
// Just call RunWithFile with no file
220-
return qmp.runWithFile(command, nil)
245+
return qmp.runWithFile(command, nil, id)
221246
}
222247

223248
func (qmp *qemuMachineProtocal) qmpWriteMsg(b []byte, file *os.File) error {
@@ -242,19 +267,33 @@ func (qmp *qemuMachineProtocal) qmpWriteMsg(b []byte, file *os.File) error {
242267
}
243268

244269
// runWithFile executes for passing a file through out-of-band data.
245-
func (qmp *qemuMachineProtocal) runWithFile(command []byte, file *os.File) ([]byte, error) {
270+
func (qmp *qemuMachineProtocal) runWithFile(command []byte, file *os.File, id uint32) ([]byte, error) {
246271
// Only allow a single command to be run at a time to ensure that responses
247272
// to a command cannot be mixed with responses from another command
248273
qmp.mu.Lock()
249274
defer qmp.mu.Unlock()
250275

276+
if id == 0 {
277+
id = qmp.qmpIncreaseID()
278+
b, err := qmp.qmpInjectID(command, id)
279+
if err != nil {
280+
return nil, err
281+
}
282+
283+
command = b
284+
}
285+
286+
repCh := make(chan rawResponse, 1)
287+
qmp.replies.Store(id, repCh)
288+
251289
err := qmp.qmpWriteMsg(command, file)
252290
if err != nil {
291+
qmp.replies.Delete(id)
253292
return nil, err
254293
}
255294

256295
// Wait for a response or error to our command
257-
res := <-qmp.stream
296+
res := <-repCh
258297
if res.err != nil {
259298
return nil, res.err
260299
}
@@ -265,3 +304,19 @@ func (qmp *qemuMachineProtocal) runWithFile(command []byte, file *os.File) ([]by
265304

266305
return res.raw, nil
267306
}
307+
308+
func (qmp *qemuMachineProtocal) qmpInjectID(command []byte, id uint32) ([]byte, error) {
309+
req := &qmpCommand{}
310+
err := json.Unmarshal(command, req)
311+
if err != nil {
312+
return nil, err
313+
}
314+
315+
req.ID = id
316+
b, err := json.Marshal(req)
317+
if err != nil {
318+
return nil, err
319+
}
320+
321+
return b, nil
322+
}

0 commit comments

Comments
 (0)