Skip to content

Conversation

@weilicious
Copy link
Collaborator

@weilicious weilicious commented Dec 2, 2025

This pull request introduces a comprehensive terminology update, replacing the concept of "active/inactive" tasks with "enabled/disabled" across the codebase. Additionally, it improves concurrency safety, resource management, and operator notification handling in the task engine. The changes ensure more precise task lifecycle management and enhance the robustness of operator stream handling and graceful shutdown.

Task Status Terminology Update:

  • Replaces all references to "active/inactive" task status with "enabled/disabled" throughout the codebase, including status enums, logging, database queries, and API responses. This affects task creation, querying, simulation, and status toggling logic (core/taskengine/engine.go, aggregator/rpc_server.go, cmd/status.go, core/apqueue/cleanup.go). [1] [2] [3] [4] [5] [6] [7] [8] [9] [10] [11] [12]

Operator Stream and Graceful Shutdown Improvements:

  • Adds a sync.WaitGroup to track active operator streams, ensuring all goroutines are properly awaited during engine shutdown, and cancels operator tickers for a clean shutdown process (core/taskengine/engine.go). [1] [2] [3]
  • Improves concurrency safety by acquiring locks when accessing or modifying shared state, such as trackSyncedTasks and shutdown flags, to prevent race conditions (core/taskengine/engine.go). [1] [2] [3] [4]

Notification and Task Tracking Enhancements:

  • Refactors operator notification logic to snapshot operator-task associations under lock, and queues notifications outside of the main lock to avoid contention and deadlocks (core/taskengine/engine.go).
  • Updates notification batching and task tracking to handle the new "disable" operation and ensures thread-safe modifications (core/taskengine/engine.go).

API and Response Structure Updates:

  • Updates API response fields and internal structures to use EnabledTaskCount/DisabledTaskCount instead of ActiveTaskCount/InactiveTaskCount, and status strings to "enabled"/"disabled" where appropriate (core/taskengine/engine.go, aggregator/rpc_server.go). [1] [2] [3] [4] [5] [6] [7]

These changes collectively modernize task lifecycle management, improve reliability, and clarify operator interactions.


Note

Replaces Active/Inactive with Enabled/Disabled across protocol, APIs, storage, and operator messages, and hardens engine shutdown, locking, and notification handling.

  • Protocol/API (breaking):
    • Rename TaskStatus values Active/InactiveEnabled/Disabled in protobuf (avs.proto, node.proto), generated code, and status strings.
    • Replace RPC SetTaskActiveSetTaskEnabled with SetTaskEnabledReq/Resp.
    • Operator MessageOp_DeactivateTaskMessageOp_DisableTask.
    • Response fields: ActiveTaskCount/InactiveTaskCountEnabledTaskCount/DisabledTaskCount.
    • Execution responses: task_still_activetask_still_enabled.
  • Engine:
    • Load/query/store using Enabled/Disabled prefixes; default new tasks to Enabled.
    • Implement SetTaskEnabledByUser; internal DisableTask replaces DeactivateTask.
    • Add streamsWG and cancel-ticker logic for graceful shutdown; guard shutdown checks with locks.
    • Concurrency: lock-protect access to trackSyncedTasks, snapshot under lock; queue notifications outside main lock.
    • Notification batching updated to handle DisableTask and safe state mutations.
  • Operator:
    • Handle DisableTask for removal; logs and trigger notifications use task_still_enabled.
  • Storage/Executor/CLI:
    • Storage mapping: a = Enabled, i = Disabled; executor reads enabled key.
    • CLI/status and cleanup utilities now target Enabled/Disabled.
  • Docs/Tests:
    • Replace Active/Inactive docs with ENABLED_DISABLED_TASK_STATUS_MIGRATION.md; add RE-ENABLE_DISABLED_TASKS.md.
    • Update unit/integration tests and logs to new terminology and ops.

Written by Cursor Bugbot for commit b1bf33d. Configure here.

@chrisli30 chrisli30 changed the title fix: migrate to enabled/disabled; harden engine concurrency; update t… fix: migrate to enabled/disabled of task status Dec 2, 2025
@chrisli30 chrisli30 requested a review from Copilot December 2, 2025 08:54
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request implements a comprehensive terminology migration from "active/inactive" to "enabled/disabled" for task status management, alongside significant concurrency and resource management improvements in the task engine. The changes modernize the codebase with clearer task lifecycle semantics and enhance system reliability through better shutdown handling and thread-safe state management.

Key Changes:

  • Replaced all task status references from "Active/Inactive" to "Enabled/Disabled" across protobuf definitions, storage schemas, API responses, logging, and documentation
  • Enhanced graceful shutdown by tracking active operator streams with sync.WaitGroup and canceling operator tickers properly
  • Improved concurrency safety by properly locking access to shared state (trackSyncedTasks, shutdown flags) and using snapshot-based notification queuing to avoid deadlocks

Reviewed changes

Copilot reviewed 33 out of 33 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
storage/schema/task.go Updated storage key mappings and comments from Active/Inactive to Enabled/Disabled
protobuf/node.proto Renamed DeactivateTask to DisableTask and updated field/comment terminology
protobuf/node.pb.go Generated Go code reflecting protobuf changes for MessageOp and NotifyTriggersResp
protobuf/avs_grpc.pb.go Updated gRPC method names from SetTaskActive to SetTaskEnabled
protobuf/avs.proto Updated TaskStatus enum, SetTaskEnabled RPC, and GetWalletResp field names
protobuf/avs.pb.go Generated Go code with Enabled/Disabled enums and updated response structures
operator/worker_loop_test.go Updated test cases and operation references to use DisableTask
operator/worker_loop.go Changed field references from TaskStillActive to TaskStillEnabled
operator/process_message.go Updated message operation handling to use DisableTask
model/task_test.go Updated test assertions to use TaskStatus_Enabled throughout
model/task.go Renamed methods from SetActive/SetInactive to SetEnabled/SetDisabled
integration_test/ticker_context_test.go Added mutex protection to SimpleMockServer for thread-safe mock operations
integration_test/orphaned_task_reclamation_test.go Updated test task status from Active to Enabled
integration_test/operator_reconnection_test.go Added mutex protection to MockSyncMessagesServer for concurrent access
integration_test/activation_deactivation_sync_test.go Updated test to use DisableTask operation and terminology
docs/RE-ENABLE_DISABLED_TASKS.md Added new implementation plan for re-enabling disabled tasks feature
docs/ENABLED_DISABLED_TASK_STATUS_MIGRATION.md Added comprehensive migration documentation for the terminology change
docs/ACTIVE_INACTIVE_TASK_STATUS_MIGRATION.md Removed obsolete documentation using old terminology
core/taskengine/trigger_helper.go Updated ExecutionState fields and comments to use enabled/disabled terminology
core/taskengine/task_field_control_test.go Updated test assertions to expect Enabled status
core/taskengine/stats_test.go Updated test data to use Disabled and Enabled statuses
core/taskengine/stats.go Updated switch cases to use TaskStatus_Enabled and TaskStatus_Disabled
core/taskengine/partial_success_test.go Updated test task initialization to use Enabled status
core/taskengine/operator_notification_test.go Updated test for DisableTask notifications and response field expectations
core/taskengine/non_blocking_full_lifecycle_test.go Updated test task status initialization to Enabled
core/taskengine/input_variables_test.go Updated test task status to Enabled
core/taskengine/executor.go Changed storage key lookup to use TaskStatus_Enabled
core/taskengine/execution_index_test.go Updated test task statuses to Enabled
core/taskengine/execution_index_mixed_test.go Updated test task status to Enabled
core/taskengine/engine.go Major update including terminology changes, graceful shutdown with WaitGroup, thread-safe notification handling, and renamed SetTaskEnabledByUser/DisableTask methods
core/apqueue/cleanup.go Updated comments and status references to use enabled/disabled terminology
cmd/status.go Updated to query TaskStatus_Enabled instead of Active
aggregator/rpc_server.go Renamed SetTaskActive to SetTaskEnabled handler and updated DisableTask logging
Comments suppressed due to low confidence (1)

integration_test/activation_deactivation_sync_test.go:122

  • [nitpick] The test file name activation_deactivation_sync_test.go should be renamed to enable_disable_sync_test.go to match the new "enabled/disabled" terminology.
//go:build integration
// +build integration

package integration_test

import (
	"testing"
	"time"

	"github.com/AvaProtocol/EigenLayer-AVS/core/config"
	"github.com/AvaProtocol/EigenLayer-AVS/core/taskengine"
	"github.com/AvaProtocol/EigenLayer-AVS/core/testutil"
	"github.com/AvaProtocol/EigenLayer-AVS/operator"
	avsproto "github.com/AvaProtocol/EigenLayer-AVS/protobuf"
	"github.com/AvaProtocol/EigenLayer-AVS/storage"
	"github.com/stretchr/testify/require"
)

// SimpleMockServer replicates the lightweight mock stream used in other integration tests
// Reuse SimpleMockServer defined in ticker_context_test.go for consistency

// TestActivationDeactivationSyncWithConfigs loads aggregator and operator YAML configs
// and verifies the end-to-end sync sequence: MonitorTaskTrigger → DeactivateTask → MonitorTaskTrigger.
func TestActivationDeactivationSyncWithConfigs(t *testing.T) {
	logger := testutil.GetLogger()
	taskengine.SetLogger(logger)

	// Load aggregator config from YAML
	aggCfgPath := testutil.GetConfigPath(testutil.DefaultConfigPath) // config/aggregator-sepolia.yaml
	aggCfg, err := config.NewConfig(aggCfgPath)
	if err != nil {
		t.Skipf("Failed to load aggregator config at %s: %v", aggCfgPath, err)
	}

	// Load operator config from YAML
	var opCfg operator.OperatorConfig
	if err := config.ReadYamlConfig(testutil.GetConfigPath("operator-sepolia.yaml"), &opCfg); err != nil {
		t.Skipf("Failed to load operator config: %v", err)
	}

	db := testutil.TestMustDB()
	defer storage.Destroy(db.(*storage.BadgerStorage))

	engine := taskengine.New(db, aggCfg, nil, logger)
	require.NoError(t, engine.MustStart())
	defer engine.Stop()

	// Open operator stream using operator-sepolia.yaml values
	mockServer := NewSimpleMockServer()
	syncReq := &avsproto.SyncMessagesReq{
		Address:        opCfg.OperatorAddress,
		MonotonicClock: time.Now().UnixNano(),
		Capabilities: &avsproto.SyncMessagesReq_Capabilities{
			EventMonitoring: true,
			BlockMonitoring: true,
			TimeMonitoring:  true,
		},
	}

	errChan := make(chan error, 1)
	go func() {
		err := engine.StreamCheckToOperator(syncReq, mockServer)
		errChan <- err
	}()

	// Wait briefly for stream stabilization (shorter than the full 10s to keep test reasonably fast)
	stabilization := 2 * time.Second
	time.Sleep(stabilization)

	// Create a task (initially Active)
	user := testutil.TestUser1()
	taskReq := testutil.RestTask()
	taskReq.SmartWalletAddress = user.SmartAccountAddress.Hex()
	task, err := engine.CreateTask(user, taskReq)
	require.NoError(t, err)

	// Wait for MonitorTaskTrigger to be sent
	waitFor := func(cond func() bool, timeout time.Duration) bool {
		deadline := time.Now().Add(timeout)
		for time.Now().Before(deadline) {
			if cond() {
				return true
			}
			time.Sleep(100 * time.Millisecond)
		}
		return cond()
	}

	gotMonitor := waitFor(func() bool {
		for _, m := range mockServer.GetSentTasks() {
			if m.Op == avsproto.MessageOp_MonitorTaskTrigger && m.Id == task.Id {
				return true
			}
		}
		return false
	}, 12*time.Second) // allow enough time for stabilization loop to emit
	require.True(t, gotMonitor, "expected initial MonitorTaskTrigger for created task")

	// Disable the task and expect a DisableTask control message
	deactResp, err := engine.SetTaskEnabledByUser(user, task.Id, false)
	require.NoError(t, err)
	require.True(t, deactResp.Success)
	// ensure batch flush if needed
	time.Sleep(200 * time.Millisecond)

	gotDeactivate := waitFor(func() bool {
		for _, m := range mockServer.GetSentTasks() {
			if m.Op == avsproto.MessageOp_DisableTask && m.Id == task.Id {
				return true
			}
		}
		return false
	}, 3*time.Second)
	require.True(t, gotDeactivate, "expected DeactivateTask after deactivation")

	// Cleanup
	mockServer.Disconnect()
	select {
	case <-errChan:
	case <-time.After(3 * time.Second):
	}
}

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

You can also share your feedback on Copilot code review for a chance to win a $100 gift card. Take the survey.

chrisli30 pushed a commit that referenced this pull request Dec 20, 2025
* fix: migrate to enabled/disabled; harden engine concurrency; update tests/docs

* fix: complete enabled/disabled terminology migration per copilot feedback
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants