Skip to content
This repository was archived by the owner on Nov 1, 2023. It is now read-only.

Commit 0f895d1

Browse files
authored
add context to logging of supervisor work queue interaction (#601)
1 parent c1a2c9f commit 0f895d1

File tree

3 files changed

+57
-27
lines changed

3 files changed

+57
-27
lines changed

src/agent/onefuzz-supervisor/src/agent.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl Agent {
123123
let msg = self.work_queue.poll().await?;
124124

125125
let next = if let Some(msg) = msg {
126-
debug!("received work set message: {:?}", msg);
126+
info!("received work set message: {:?}", msg);
127127

128128
let can_schedule = self.coordinator.can_schedule(&msg.work_set).await?;
129129

@@ -179,6 +179,7 @@ impl Agent {
179179
state.into()
180180
}
181181
} else {
182+
info!("no work available");
182183
self.sleep().await;
183184
state.into()
184185
};

src/agent/onefuzz-supervisor/src/work.rs

+24-16
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,10 @@ impl WorkQueue {
146146
}
147147

148148
async fn renew(&mut self) -> Result<()> {
149-
self.registration.renew().await?;
149+
self.registration
150+
.renew()
151+
.await
152+
.context("unable to renew registration in workqueue")?;
150153
let url = self.registration.dynamic_config.work_queue.clone();
151154
self.queue = QueueClient::new(url);
152155
Ok(())
@@ -159,25 +162,27 @@ impl WorkQueue {
159162
// it was just due to a stale SAS URL.
160163
if let Err(err) = &msg {
161164
if is_auth_error(err) {
162-
self.renew().await?;
165+
self.renew()
166+
.await
167+
.context("unable to renew registration in poll")?;
163168
msg = self.queue.pop().await;
164169
}
165170
}
166171

167172
// Now we've had a chance to ensure our SAS URL is fresh. For any other
168173
// error, including another auth error, bail.
169-
let msg = msg?;
170-
171-
if msg.is_none() {
172-
return Ok(None);
173-
}
174-
175-
let msg = msg.unwrap();
176-
let work_set = serde_json::from_slice(msg.data())?;
177-
let receipt = Receipt(msg.receipt);
178-
let msg = Message { receipt, work_set };
179-
180-
Ok(Some(msg))
174+
let msg = msg.context("unable to check work queue")?;
175+
176+
let result = match msg {
177+
Some(msg) => {
178+
let work_set =
179+
serde_json::from_slice(msg.data()).context("unable to parse WorkSet")?;
180+
let receipt = Receipt(msg.receipt);
181+
Some(Message { receipt, work_set })
182+
}
183+
None => None,
184+
};
185+
Ok(result)
181186
}
182187

183188
pub async fn claim(&mut self, receipt: Receipt) -> Result<()> {
@@ -189,8 +194,11 @@ impl WorkQueue {
189194
// it was just due to a stale SAS URL.
190195
if let Err(err) = &result {
191196
if is_auth_error(err) {
192-
self.renew().await?;
193-
self.queue.delete(receipt).await?;
197+
self.renew().await.context("unable to renew registration")?;
198+
self.queue
199+
.delete(receipt)
200+
.await
201+
.context("unable to claim work from queue")?;
194202
}
195203
}
196204

src/agent/storage-queue/src/lib.rs

+31-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT License.
33

4-
use anyhow::Result;
4+
use anyhow::{bail, Context, Result};
55
use reqwest::{Client, Url};
66
use reqwest_retry::SendRetry;
77
use serde::{Deserialize, Serialize};
@@ -47,8 +47,11 @@ impl QueueClient {
4747
.post(self.messages_url())
4848
.body(body)
4949
.send_retry_default()
50-
.await?;
51-
let _ = r.error_for_status()?;
50+
.await
51+
.context("storage queue enqueue failed")?;
52+
let _ = r
53+
.error_for_status()
54+
.context("storage queue enqueue failed with error")?;
5255
Ok(())
5356
}
5457

@@ -57,15 +60,23 @@ impl QueueClient {
5760
.http
5861
.get(self.messages_url())
5962
.send_retry_default()
60-
.await?
61-
.error_for_status()?;
62-
let text = response.text().await?;
63+
.await
64+
.context("storage queue pop failed")?
65+
.error_for_status()
66+
.context("storage queue pop failed with error")?;
67+
let text = response
68+
.text()
69+
.await
70+
.context("unable to parse response text")?;
6371
let msg = Message::parse(&text);
6472

6573
let msg = if let Some(msg) = msg {
6674
msg
6775
} else {
68-
return Ok(None);
76+
if is_empty_message(&text) {
77+
return Ok(None);
78+
}
79+
bail!("unable to parse response text body: {}", text);
6980
};
7081

7182
let msg = if msg.data.is_empty() { None } else { Some(msg) };
@@ -79,8 +90,10 @@ impl QueueClient {
7990
self.http
8091
.delete(url)
8192
.send_retry_default()
82-
.await?
83-
.error_for_status()?;
93+
.await
94+
.context("storage queue delete failed")?
95+
.error_for_status()
96+
.context("storage queue delete failed")?;
8497
Ok(())
8598
}
8699

@@ -145,11 +158,19 @@ impl Message {
145158
}
146159

147160
pub fn get<'a, T: serde::de::Deserialize<'a>>(&'a self) -> Result<T> {
148-
let data = serde_json::from_slice(&self.data)?;
161+
let data =
162+
serde_json::from_slice(&self.data).context("get storage queue message failed")?;
149163
Ok(data)
150164
}
151165
}
152166

167+
fn is_empty_message(text: &str) -> bool {
168+
regex::Regex::new(r".*<QueueMessagesList>[\s\n\r]*</QueueMessagesList>")
169+
.unwrap()
170+
.is_match(&text)
171+
|| text.contains(r"<QueueMessagesList />")
172+
}
173+
153174
fn parse_message_id(text: &str) -> Option<Uuid> {
154175
let pat = r"<MessageId>(.*)</MessageId>";
155176
let re = regex::Regex::new(pat).unwrap();

0 commit comments

Comments
 (0)