Skip to content

Commit 0795839

Browse files
authored
Merge pull request #47 from lbarcziova/agents-logging
Add logging about agent processing
2 parents 5330538 + 13f5c15 commit 0795839

File tree

3 files changed

+92
-6
lines changed

3 files changed

+92
-6
lines changed

beeai/agents/backport_agent.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import os
34
import sys
45
import traceback
@@ -25,6 +26,8 @@
2526
from triage_agent import BackportData, ErrorData
2627
from utils import redis_client, get_git_finalization_steps
2728

29+
logger = logging.getLogger(__name__)
30+
2831

2932
class InputSchema(BaseModel):
3033
package: str = Field(description="Package to update")
@@ -140,6 +143,8 @@ def prompt(self) -> str:
140143

141144

142145
async def main() -> None:
146+
logging.basicConfig(level=logging.INFO)
147+
143148
setup_observability(os.getenv("COLLECTOR_ENDPOINT"))
144149
agent = BackportAgent()
145150

@@ -149,29 +154,41 @@ async def main() -> None:
149154
and (jira_issue := os.getenv("JIRA_ISSUE", None))
150155
and (branch := os.getenv("BRANCH", None))
151156
):
157+
logger.info("Running in direct mode with environment variables")
152158
input = InputSchema(
153159
package=package,
154160
upstream_fix=upstream_fix,
155161
jira_issue=jira_issue,
156162
dist_git_branch=branch,
157163
)
158164
output = await agent.run_with_schema(input)
159-
print(output.model_dump_json(indent=4))
165+
logger.info(f"Direct run completed: {output.model_dump_json(indent=4)}")
160166
return
161167

162168
class Task(BaseModel):
163169
metadata: dict = Field(description="Task metadata")
164170
attempts: int = Field(default=0, description="Number of processing attempts")
165171

172+
logger.info("Starting backport agent in queue mode")
166173
async with redis_client(os.getenv("REDIS_URL")) as redis:
167174
max_retries = int(os.getenv("MAX_RETRIES", 3))
175+
logger.info(f"Connected to Redis, max retries set to {max_retries}")
176+
168177
while True:
178+
logger.info("Waiting for tasks from backport_queue (timeout: 30s)...")
169179
element = await redis.brpop("backport_queue", timeout=30)
170180
if element is None:
181+
logger.info("No tasks received, continuing to wait...")
171182
continue
183+
172184
_, payload = element
185+
logger.info(f"Received task from queue.")
186+
173187
task = Task.model_validate_json(payload)
174188
backport_data = BackportData.model_validate(task.metadata)
189+
logger.info(f"Processing backport for package: {backport_data.package}, "
190+
f"JIRA: {backport_data.jira_issue}, attempt: {task.attempts + 1}")
191+
175192
input = InputSchema(
176193
package=backport_data.package,
177194
upstream_fix=backport_data.patch_url,
@@ -182,22 +199,32 @@ class Task(BaseModel):
182199
async def retry(task, error):
183200
task.attempts += 1
184201
if task.attempts < max_retries:
202+
logger.warning(f"Task failed (attempt {task.attempts}/{max_retries}), "
203+
f"re-queuing for retry: {backport_data.jira_issue}")
185204
await redis.lpush("backport_queue", task.model_dump_json())
186205
else:
206+
logger.error(f"Task failed after {max_retries} attempts, "
207+
f"moving to error list: {backport_data.jira_issue}")
187208
await redis.lpush("error_list", error)
188209

189210
try:
211+
logger.info(f"Starting backport processing for {backport_data.jira_issue}")
190212
output = await agent.run_with_schema(input)
213+
logger.info(f"Backport processing completed for {backport_data.jira_issue}, "
214+
f"success: {output.success}")
191215
except Exception as e:
192216
error = "".join(traceback.format_exception(e))
193-
print(error, file=sys.stderr)
217+
logger.error(f"Exception during backport processing for {backport_data.jira_issue}: {error}")
194218
await retry(
195219
task, ErrorData(details=error, jira_issue=input.jira_issue).model_dump_json()
196220
)
197221
else:
198222
if output.success:
223+
logger.info(f"Backport successful for {backport_data.jira_issue}, "
224+
f"adding to completed list")
199225
await redis.lpush("completed_backport_list", output.model_dump_json())
200226
else:
227+
logger.warning(f"Backport failed for {backport_data.jira_issue}: {output.error}")
201228
await retry(task, output.error)
202229

203230

beeai/agents/rebase_agent.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import os
34
import sys
45
import traceback
@@ -25,6 +26,7 @@
2526
from triage_agent import RebaseData, ErrorData
2627
from utils import redis_client, get_git_finalization_steps
2728

29+
logger = logging.getLogger(__name__)
2830

2931
class InputSchema(BaseModel):
3032
package: str = Field(description="Package to update")
@@ -171,6 +173,8 @@ def prompt(self) -> str:
171173

172174

173175
async def main() -> None:
176+
logging.basicConfig(level=logging.INFO)
177+
174178
setup_observability(os.getenv("COLLECTOR_ENDPOINT"))
175179
agent = RebaseAgent()
176180

@@ -180,29 +184,42 @@ async def main() -> None:
180184
and (jira_issue := os.getenv("JIRA_ISSUE", None))
181185
and (branch := os.getenv("BRANCH", None))
182186
):
187+
logger.info("Running in direct mode with environment variables")
183188
input = InputSchema(
184189
package=package,
185190
version=version,
186191
jira_issue=jira_issue,
187192
dist_git_branch=branch,
188193
)
189194
output = await agent.run_with_schema(input)
190-
print(output.model_dump_json(indent=4))
195+
logger.info(f"Direct run completed: {output.model_dump_json(indent=4)}")
191196
return
192197

193198
class Task(BaseModel):
194199
metadata: dict = Field(description="Task metadata")
195200
attempts: int = Field(default=0, description="Number of processing attempts")
196201

202+
logger.info("Starting rebase agent in queue mode")
197203
async with redis_client(os.getenv("REDIS_URL")) as redis:
198204
max_retries = int(os.getenv("MAX_RETRIES", 3))
205+
logger.info(f"Connected to Redis, max retries set to {max_retries}")
206+
199207
while True:
208+
logger.info("Waiting for tasks from rebase_queue (timeout: 30s)...")
200209
element = await redis.brpop("rebase_queue", timeout=30)
201210
if element is None:
211+
logger.info("No tasks received, continuing to wait...")
202212
continue
213+
203214
_, payload = element
215+
logger.info(f"Received task from queue.")
216+
204217
task = Task.model_validate_json(payload)
205218
rebase_data = RebaseData.model_validate(task.metadata)
219+
logger.info(f"Processing rebase for package: {rebase_data.package}, "
220+
f"version: {rebase_data.version}, JIRA: {rebase_data.jira_issue}, "
221+
f"attempt: {task.attempts + 1}")
222+
206223
input = InputSchema(
207224
package=rebase_data.package,
208225
version=rebase_data.version,
@@ -213,22 +230,32 @@ class Task(BaseModel):
213230
async def retry(task, error):
214231
task.attempts += 1
215232
if task.attempts < max_retries:
233+
logger.warning(f"Task failed (attempt {task.attempts}/{max_retries}), "
234+
f"re-queuing for retry: {rebase_data.jira_issue}")
216235
await redis.lpush("rebase_queue", task.model_dump_json())
217236
else:
237+
logger.error(f"Task failed after {max_retries} attempts, "
238+
f"moving to error list: {rebase_data.jira_issue}")
218239
await redis.lpush("error_list", error)
219240

220241
try:
242+
logger.info(f"Starting rebase processing for {rebase_data.jira_issue}")
221243
output = await agent.run_with_schema(input)
244+
logger.info(f"Rebase processing completed for {rebase_data.jira_issue}, "
245+
f"success: {output.success}")
222246
except Exception as e:
223247
error = "".join(traceback.format_exception(e))
224-
print(error, file=sys.stderr)
248+
logger.error(f"Exception during rebase processing for {rebase_data.jira_issue}: {error}")
225249
await retry(
226250
task, ErrorData(details=error, jira_issue=input.jira_issue).model_dump_json()
227251
)
228252
else:
229253
if output.success:
254+
logger.info(f"Rebase successful for {rebase_data.jira_issue}, "
255+
f"adding to completed list")
230256
await redis.lpush("completed_rebase_list", output.model_dump_json())
231257
else:
258+
logger.warning(f"Rebase failed for {rebase_data.jira_issue}: {output.error}")
232259
await retry(task, output.error)
233260

234261

beeai/agents/triage_agent.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import os
34
import sys
45
import traceback
@@ -22,6 +23,8 @@
2223
from tools import ShellCommandTool
2324
from utils import mcp_tools, redis_client
2425

26+
logger = logging.getLogger(__name__)
27+
2528

2629
class InputSchema(BaseModel):
2730
issue: str = Field(description="Jira issue identifier to analyze (e.g. RHEL-12345)")
@@ -252,57 +255,86 @@ async def run_with_schema(self, input: TInputSchema) -> TOutputSchema:
252255

253256

254257
async def main() -> None:
258+
logging.basicConfig(level=logging.INFO)
259+
255260
setup_observability(os.getenv("COLLECTOR_ENDPOINT"))
256261
agent = TriageAgent()
257262

258263
if jira_issue := os.getenv("JIRA_ISSUE", None):
264+
logger.info("Running in direct mode with environment variable")
259265
input = InputSchema(issue=jira_issue)
260266
output = await agent.run_with_schema(input)
261-
print(output.model_dump_json(indent=4))
267+
logger.info(f"Direct run completed: {output.model_dump_json(indent=4)}")
262268
return
263269

264270
class Task(BaseModel):
265271
metadata: dict = Field(description="Task metadata")
266272
attempts: int = Field(default=0, description="Number of processing attempts")
267273

274+
logger.info("Starting triage agent in queue mode")
268275
async with redis_client(os.getenv("REDIS_URL")) as redis:
269276
max_retries = int(os.getenv("MAX_RETRIES", 3))
277+
logger.info(f"Connected to Redis, max retries set to {max_retries}")
278+
270279
while True:
280+
logger.info("Waiting for tasks from triage_queue (timeout: 30s)...")
271281
element = await redis.brpop("triage_queue", timeout=30)
272282
if element is None:
283+
logger.info("No tasks received, continuing to wait...")
273284
continue
285+
274286
_, payload = element
287+
logger.info(f"Received task from queue")
288+
275289
task = Task.model_validate_json(payload)
276290
input = InputSchema.model_validate(task.metadata)
291+
logger.info(f"Processing triage for JIRA issue: {input.issue}, "
292+
f"attempt: {task.attempts + 1}")
277293

278294
async def retry(task, error):
279295
task.attempts += 1
280296
if task.attempts < max_retries:
297+
logger.warning(f"Task failed (attempt {task.attempts}/{max_retries}), "
298+
f"re-queuing for retry: {input.issue}")
281299
await redis.lpush("triage_queue", task.model_dump_json())
282300
else:
301+
logger.error(f"Task failed after {max_retries} attempts, "
302+
f"moving to error list: {input.issue}")
283303
await redis.lpush("error_list", error)
284304

285305
try:
306+
logger.info(f"Starting triage processing for {input.issue}")
286307
output = await agent.run_with_schema(input)
308+
logger.info(f"Triage processing completed for {input.issue}, "
309+
f"resolution: {output.resolution.value}")
287310
except Exception as e:
288311
error = "".join(traceback.format_exception(e))
289-
print(error, file=sys.stderr)
312+
logger.error(f"Exception during triage processing for {input.issue}: {error}")
290313
await retry(
291314
task, ErrorData(details=error, jira_issue=input.issue).model_dump_json()
292315
)
293316
else:
294317
if output.resolution == Resolution.REBASE:
318+
logger.info(f"Triage resolved as REBASE for {input.issue}, "
319+
f"adding to rebase queue")
295320
task = Task(metadata=output.data.model_dump())
296321
await redis.lpush("rebase_queue", task.model_dump_json())
297322
elif output.resolution == Resolution.BACKPORT:
323+
logger.info(f"Triage resolved as BACKPORT for {input.issue}, "
324+
f"adding to backport queue")
298325
task = Task(metadata=output.data.model_dump())
299326
await redis.lpush("backport_queue", task.model_dump_json())
300327
elif output.resolution == Resolution.CLARIFICATION_NEEDED:
328+
logger.info(f"Triage resolved as CLARIFICATION_NEEDED for {input.issue}, "
329+
f"adding to clarification needed queue")
301330
task = Task(metadata=output.data.model_dump())
302331
await redis.lpush("clarification_needed_queue", task.model_dump_json())
303332
elif output.resolution == Resolution.NO_ACTION:
333+
logger.info(f"Triage resolved as NO_ACTION for {input.issue}, "
334+
f"adding to no action list")
304335
await redis.lpush("no_action_list", output.data.model_dump_json())
305336
elif output.resolution == Resolution.ERROR:
337+
logger.warning(f"Triage resolved as ERROR for {input.issue}, retrying")
306338
await retry(task, output.data.model_dump_json())
307339

308340

0 commit comments

Comments
 (0)