Skip to content

Feature/schedule#807

Open
rainsonGain wants to merge 12 commits intomainfrom
feature/schedule
Open

Feature/schedule#807
rainsonGain wants to merge 12 commits intomainfrom
feature/schedule

Conversation

@rainsonGain
Copy link
Collaborator

No description provided.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a foundational task scheduling framework, enabling more sophisticated management and execution of tasks within the system. It centralizes task state handling, provides flexible scheduling strategies, and refines the event-driven architecture with more precise lifecycle hooks. These changes lay the groundwork for advanced automation and workflow orchestration, ensuring tasks are processed efficiently and reliably according to defined schedules and dependencies.

Highlights

  • Comprehensive Task Scheduling System Introduced: A new, robust task scheduling system has been implemented, featuring a TaskManager for persistence, a TaskScheduler supporting various strategies (FIFO, Priority, DAG, Auto), and dedicated types for schedulable and scheduled tasks with cron, one-time, and delayed execution capabilities.
  • Refactored Task Status Management: The task status system has been standardized and improved by converting TaskStatusValue to an enum-like TaskStatus(str) and introducing a new DISABLED status, enhancing type consistency and clarity across the codebase.
  • Enhanced Hook Point System: The runner's hook point system has been significantly refactored to provide more granular lifecycle events, moving from generic 'Pre/Post' hooks to specific 'OnStart', 'OnFinished', 'OnSuccess', and 'OnError' hooks for LLM calls, tool calls, and tasks.
  • Codebase Cleanup and Simplification: Numerous unused imports and redundant code sections have been removed, particularly in aworld/core/common.py, aworld/core/task.py, and aworld/runners/handler/background_task.py, contributing to a cleaner and more maintainable codebase.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • aworld/core/common.py
    • Removed an empty line for cleaner formatting
    • Added a new DISABLED status to TaskStatusValue
    • Changed TaskStatusValue to inherit from str and renamed it to TaskStatus
    • Introduced TaskTypeValue enum for INSTANT and SCHEDULED task types
  • aworld/core/context/amni/init.py
    • Removed the import of TaskStatus from aworld.core.common
  • aworld/core/context/amni/contexts.py
    • Updated the import of TaskStatusValue to TaskStatus
  • aworld/core/context/amni/state/task_state.py
    • Updated the import of TaskStatus from aworld.core.task to aworld.core.common
  • aworld/core/context/base.py
    • Removed TaskStatusValue from TYPE_CHECKING imports
    • Updated get_task_status method to use TaskStatus.SUCCESS instead of TaskStatusValue.SUCCESS
  • aworld/core/event/message_future.py
    • Replaced TaskStatusValue with TaskStatus in the wait method for status checks
  • aworld/core/task.py
    • Removed unused imports: asyncio, enum, Literal, TYPE_CHECKING
    • Updated task_status field default from TaskStatusValue.INIT to TaskStatus.INIT
    • Updated status field default in TaskResponse from TaskStatusValue.SUCCESS to TaskStatus.SUCCESS
  • aworld/evaluations/scorers/output_validators.py
    • Removed model_config arguments from scorer_register decorators for OutputRelevanceScorer, OutputCompletenessScorer, and OutputQualityScorer
  • aworld/events/util.py
    • Replaced TaskStatusValue with TaskStatus in send_message, send_and_wait_message, and _send_finish_message functions
  • aworld/runners/event_runner.py
    • Updated import of TaskStatusValue to TaskStatus
    • Replaced TaskStatusValue with TaskStatus in various task status updates within the runner
  • aworld/runners/handler/background_task.py
    • Removed several unused imports including asyncio, time, EnvChannelMessage, ContextManager, Runners, and HookPoint
    • Replaced TaskStatusValue with TaskStatus in task status handling
    • Commented out code related to EnvChannelMessage processing
  • aworld/runners/handler/task.py
    • Updated import of TaskStatusValue to TaskStatus
    • Replaced TaskStatusValue with TaskStatus in task status updates for failed, cancelled, and interrupted tasks
  • aworld/runners/hook/agent_hooks.py
    • Removed abc import and __metaclass__ = abc.ABCMeta from hook classes
    • Changed base classes for PreLLMCallContextProcessHook and PostLLMCallContextProcessHook to OnStartLLMCallHook and OnFinishedLLMCallHook respectively
    • Removed empty exec method implementations from hook classes
  • aworld/runners/hook/hook_factory.py
    • Removed the import of StartHook
  • aworld/runners/hook/hooks.py
    • Removed ModelResponse import
    • Refactored HookPoint constants to introduce more granular LLM, tool, and task lifecycle events (e.g., ON_START_LLM_CALL, ON_SUCCESS_TASK)
    • Removed __metaclass__ = abc.ABCMeta from all hook classes
    • Provided a default pass implementation for the exec method in the base Hook class
    • Removed PreToolCallHook, PostToolCallHook, PreTaskCallHook, and PostTaskCallHook classes
  • aworld/runners/hook/task_hooks.py
    • Added new file defining specific task lifecycle hooks: OnRunHook, OnSuccessHook, OnErrorHook, OnStartHook, and OnFinishHook
  • aworld/runners/hook/tool_hooks.py
    • Added new file defining specific tool call lifecycle hooks: OnStartToolCallHook, OnFinishedToolCallHook, OnToolCallHook, OnSuccessToolCallHook, and OnErrorToolCallHook
  • aworld/runners/task_manager.py
    • Added new file implementing TaskManager for managing task persistence, CRUD operations, status-based retrieval, and ready task detection
  • aworld/schedule/init.py
    • Added new empty __init__.py file to mark the directory as a Python package
  • aworld/schedule/scheduler.py
    • Added new file implementing TaskScheduler for managing and executing scheduled tasks, supporting various scheduling strategies, resource quotas, and periodic task handling
  • aworld/schedule/strategy.py
    • Added new file defining abstract ScheduleStrategy and concrete implementations like FIFOStrategy, PriorityStrategy, DAGStrategy, and AutoStrategy
    • Introduced TaskQueue base class with FIFOTaskQueue, PriorityTaskQueue, and ResourceAwareTaskQueue implementations
  • aworld/schedule/task_graph.py
    • Added new file implementing TaskGraph for Directed Acyclic Graph (DAG) based task scheduling, including methods for adding tasks, getting ready tasks, marking completion/failure, determining execution order, and cycle detection
  • aworld/schedule/types.py
    • Added new file defining data classes for SchedulableTask and ScheduledTask
    • ScheduledTask supports cron expressions, one-time scheduling, and delayed execution, along with time range constraints and execution limits
    • Introduced TaskStatistics for tracking global scheduling state and ResourceQuota for managing machine resources
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request primarily refactors task status handling by renaming TaskStatusValue to TaskStatus and making it inherit from str for consistency, along with updating all references across multiple files including aworld/core/common.py, aworld/core/context/amni/contexts.py, aworld/core/context/base.py, aworld/core/event/message_future.py, aworld/core/task.py, aworld/events/util.py, aworld/runners/event_runner.py, aworld/runners/handler/background_task.py, aworld/runners/handler/task.py, aworld/runners/task_manager.py, and aworld/schedule/scheduler.py. It also introduces a new TaskTypeValue class and adds new hook points and corresponding classes (OnStartLLMCallHook, OnFinishedLLMCallHook, OnLLMCallHook, OnSuccessLLMCallHook, OnErrorLLMCallHook, OnStartToolCallHook, OnFinishedToolCallHook, OnToolCallHook, OnSuccessToolCallHook, OnErrorToolCallHook, OnRunHook, OnSuccessHook, OnErrorHook, OnStartHook, OnFinishHook) to aworld/runners/hook/hooks.py, aworld/runners/hook/agent_hooks.py, aworld/runners/hook/task_hooks.py, and aworld/runners/hook/tool_hooks.py, while removing older Pre/Post hook types. Additionally, it removes model_config arguments from @scorer_register decorators in aworld/evaluations/scorers/output_validators.py and removes several unused imports. Review comments highlight a type hint mismatch for the executor's return value in TaskScheduler, an AttributeError risk in execute_schedulable_tasks due to incorrect return type, a bug in create_strategy where AutoStrategy is instantiated prematurely, an issue with PriorityTaskQueue using a min-heap instead of a max-heap for priority, a potential security vulnerability in _merge_by_topic due to reliance on untrusted agent_id from message payload, and a need to update the TaskTypeValue class to inherit from str for consistency, as well as updating a docstring example in TaskManager to reflect the TaskStatus renaming.

return self._executor

@executor.setter
def executor(self, executor: Callable[[Task], Awaitable[bool]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type hint for the executor's return value is Awaitable[bool], but the execute method expects it to return a TaskResponse object (line 161: result: TaskResponse = await self._executor(task)). The type hint should be updated to Awaitable[TaskResponse].

Additionally, the docstring for the setter should be updated to reflect this change.

  • Current docstring: executor: Async function that takes a Task and returns bool (success)
  • Suggested docstring: executor: Async function that takes a Task and returns a TaskResponse object
Suggested change
def executor(self, executor: Callable[[Task], Awaitable[bool]]):
def executor(self, executor: Callable[[Task], Awaitable[TaskResponse]]):

Comment on lines +476 to +482
async def task_execute(st=scheduled_task):
st.task_status = TaskStatus.RUNNING
st.started_at = time.time()
res = await exec_tasks(tasks=[st])
st.task_status = res.get(st.id).status
st.completed_at = time.time()
return res
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The task_execute function returns res, which is a dictionary of {'task_id': TaskResponse}. However, the LocalRuntime's execute method, which consumes this, expects an object with an .id attribute to build its result dictionary. This will cause an AttributeError. The task_execute function should return the TaskResponse object directly.

Suggested change
async def task_execute(st=scheduled_task):
st.task_status = TaskStatus.RUNNING
st.started_at = time.time()
res = await exec_tasks(tasks=[st])
st.task_status = res.get(st.id).status
st.completed_at = time.time()
return res
async def task_execute(st=scheduled_task):
st.task_status = TaskStatus.RUNNING
st.started_at = time.time()
res_dict = await exec_tasks(tasks=[st])
task_response = res_dict.get(st.id)
if task_response:
st.task_status = task_response.status
st.completed_at = time.time()
return task_response


def create_strategy(strategy_type: Optional[str] = None, **kwargs) -> ScheduleStrategy:
"""Create strategy instance."""
return STRATEGY_MAP.get(strategy_type, AutoStrategy())(**kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a bug in the create_strategy factory. AutoStrategy() creates an instance of the class, and then you are trying to call that instance. The intention is to get the class from the map and then instantiate it with **kwargs. You should retrieve the class AutoStrategy itself as the default value, not an instance of it.

Suggested change
return STRATEGY_MAP.get(strategy_type, AutoStrategy())(**kwargs)
return STRATEGY_MAP.get(strategy_type, AutoStrategy)(**kwargs)

def push(self, task: ScheduledTask):
# (priority_value, counter, task)
priority_value = task.priority
heapq.heappush(self.heap, (priority_value, self.counter, task))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The PriorityTaskQueue uses heapq, which is a min-heap. Pushing priority_value directly will result in tasks with lower priority values being processed first. This contradicts the common expectation and the implementation in other parts of the code (like TaskManager.get_ready_tasks) where higher priority values are treated as more important. To implement a max-heap for high-priority-first scheduling, you should push the negative of the priority value.

Suggested change
heapq.heappush(self.heap, (priority_value, self.counter, task))
heapq.heappush(self.heap, (-priority_value, self.counter, task))

Comment on lines 168 to 169
if not agent_id:
agent_id = data.get('env_content', {}).get('agent_id')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

In _merge_by_topic, the agent_id is extracted from the message payload (data.get('env_content', {}).get('agent_id')) if it's not already present in the message. Since the payload is the result of a background task, which may be untrusted or compromised, an attacker could manipulate this value to cause the runner to send messages to arbitrary agents. This could lead to unauthorized actions or lateral movement within the agent system. Furthermore, when forwarding messages to parent tasks (lines 105-112), the agent_id is omitted, forcing the receiver to rely on the untrusted payload.

Comment on lines +141 to +144
class TaskTypeValue:
"""Task type constants."""
INSTANT = 'instant'
SCHEDULED = 'scheduled'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency with the TaskStatus class, TaskTypeValue should also inherit from str. This makes it clearer that it's a group of string constants and allows for potential use in type hints if needed in the future.

Suggested change
class TaskTypeValue:
"""Task type constants."""
INSTANT = 'instant'
SCHEDULED = 'scheduled'
class TaskTypeValue(str):
"""Task type constants."""
INSTANT = 'instant'
SCHEDULED = 'scheduled'


# List tasks
all_tasks = await manager.list()
pending_tasks = await manager.list(status=TaskStatusValue.INIT)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The example in the docstring uses TaskStatusValue, which has been removed in this pull request. It should be updated to use TaskStatus.

Suggested change
pending_tasks = await manager.list(status=TaskStatusValue.INIT)
pending_tasks = await manager.list(status=TaskStatus.INIT)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants