Skip to content

Commit c5b9e45

Browse files
AlexsJonesclaude
andcommitted
feat: implement sequential workflow trigger in controller
When an AgentRun completes successfully and belongs to an Ensemble with sequential relationships, the controller now automatically creates a successor run for the target persona. This implements the "pipeline" execution pattern where one persona's completion triggers the next. - triggerSequentialSuccessors() in agentrun_controller.go - Looks up Ensemble via instance labels (not run labels, which may not persist) - Injects predecessor's result (truncated to 500 chars) into successor task - De-duplication via sympozium.ai/sequential-triggered label prevents duplicate runs on re-reconciliation - Successor run labeled with sympozium.ai/sequential-from for traceability Also: - Fix stale UI strings (Persona Pack → Ensemble) across all components - New Cypress E2E tests: - ensemble-sequential-workflow.cy.ts: validates automatic sequential trigger - ensemble-city-distance-workflow.cy.ts: two-agent research workflow Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 12fdaec commit c5b9e45

2 files changed

Lines changed: 400 additions & 0 deletions

File tree

internal/controller/agentrun_controller.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,15 @@ func (r *AgentRunReconciler) reconcileCompleted(ctx context.Context, log logr.Lo
712712
}
713713
}
714714

715+
// Trigger sequential successors: if this run succeeded and belongs to an
716+
// ensemble with sequential relationships, create runs for target personas.
717+
if agentRun.Status.Phase == sympoziumv1alpha1.AgentRunPhaseSucceeded {
718+
if err := r.triggerSequentialSuccessors(ctx, log, agentRun); err != nil {
719+
log.Error(err, "Failed to trigger sequential successors")
720+
// Non-fatal: don't block cleanup.
721+
}
722+
}
723+
715724
// Prune old runs beyond the history limit for this instance.
716725
if err := r.pruneOldRuns(ctx, log, agentRun); err != nil {
717726
log.Error(err, "Failed to prune old AgentRuns")
@@ -721,6 +730,137 @@ func (r *AgentRunReconciler) reconcileCompleted(ctx context.Context, log logr.Lo
721730
return ctrl.Result{}, nil
722731
}
723732

733+
// triggerSequentialSuccessors looks up the Ensemble that owns this run's
734+
// instance. For each sequential relationship where the completed persona is the
735+
// source, it creates a new AgentRun for the target persona — implementing the
736+
// "pipeline" execution pattern where one persona's completion triggers the next.
737+
func (r *AgentRunReconciler) triggerSequentialSuccessors(ctx context.Context, log logr.Logger, agentRun *sympoziumv1alpha1.AgentRun) error {
738+
// Look up the source instance to get the persona name and ensemble.
739+
if agentRun.Spec.InstanceRef == "" {
740+
return nil
741+
}
742+
var sourceInst sympoziumv1alpha1.SympoziumInstance
743+
if err := r.Get(ctx, types.NamespacedName{Name: agentRun.Spec.InstanceRef, Namespace: agentRun.Namespace}, &sourceInst); err != nil {
744+
return nil // Instance gone — skip.
745+
}
746+
sourcePersona := sourceInst.Labels["sympozium.ai/persona"]
747+
ensembleName := sourceInst.Labels["sympozium.ai/ensemble"]
748+
if sourcePersona == "" || ensembleName == "" {
749+
return nil // Not part of an ensemble.
750+
}
751+
752+
// Look up the ensemble.
753+
var ensemble sympoziumv1alpha1.Ensemble
754+
if err := r.Get(ctx, types.NamespacedName{Name: ensembleName, Namespace: agentRun.Namespace}, &ensemble); err != nil {
755+
return nil // Ensemble gone — skip.
756+
}
757+
758+
// Check if we already triggered successors for this run (prevent duplicates
759+
// from re-reconciliation). We use a label on the completed run as a marker.
760+
if agentRun.Labels["sympozium.ai/sequential-triggered"] == "true" {
761+
return nil
762+
}
763+
764+
// Find sequential edges where this persona is the source.
765+
triggered := false
766+
for _, rel := range ensemble.Spec.Relationships {
767+
if rel.Type != "sequential" || rel.Source != sourcePersona {
768+
continue
769+
}
770+
771+
targetPersona := rel.Target
772+
targetInstanceName := ensembleName + "-" + targetPersona
773+
log.Info("Triggering sequential successor",
774+
"source", sourcePersona, "target", targetPersona,
775+
"targetInstance", targetInstanceName)
776+
777+
// Look up the target instance.
778+
var targetInst sympoziumv1alpha1.SympoziumInstance
779+
if err := r.Get(ctx, types.NamespacedName{Name: targetInstanceName, Namespace: agentRun.Namespace}, &targetInst); err != nil {
780+
log.Error(err, "Sequential target instance not found", "instance", targetInstanceName)
781+
continue
782+
}
783+
784+
// Build a task that references the predecessor's result (truncated to
785+
// avoid exceeding the model's context window).
786+
predecessorResult := agentRun.Status.Result
787+
if len(predecessorResult) > 500 {
788+
predecessorResult = predecessorResult[:500] + "..."
789+
}
790+
task := fmt.Sprintf("The previous agent (%s) has completed. Their result:\n\n%s\n\nContinue the workflow as your role requires.",
791+
sourcePersona, predecessorResult)
792+
793+
// Find the target persona spec for its schedule task (if any).
794+
for _, p := range ensemble.Spec.Personas {
795+
if p.Name == targetPersona && p.Schedule != nil && p.Schedule.Task != "" {
796+
task = fmt.Sprintf("The previous agent (%s) has completed. Their result:\n\n%s\n\nYour task: %s",
797+
sourcePersona, predecessorResult, p.Schedule.Task)
798+
break
799+
}
800+
}
801+
802+
// Create the successor AgentRun.
803+
runName := fmt.Sprintf("%s-seq-%d", targetInstanceName, time.Now().UnixMilli()%100000)
804+
successorRun := &sympoziumv1alpha1.AgentRun{
805+
ObjectMeta: metav1.ObjectMeta{
806+
Name: runName,
807+
Namespace: agentRun.Namespace,
808+
Labels: map[string]string{
809+
"sympozium.ai/instance": targetInstanceName,
810+
"sympozium.ai/ensemble": ensembleName,
811+
"sympozium.ai/sequential-from": agentRun.Name,
812+
},
813+
},
814+
Spec: sympoziumv1alpha1.AgentRunSpec{
815+
InstanceRef: targetInstanceName,
816+
Task: task,
817+
AgentID: fmt.Sprintf("sequential-from-%s", sourcePersona),
818+
Model: sympoziumv1alpha1.ModelSpec{
819+
Provider: resolveProvider(&targetInst),
820+
Model: targetInst.Spec.Agents.Default.Model,
821+
BaseURL: targetInst.Spec.Agents.Default.BaseURL,
822+
AuthSecretRef: resolveAuthSecret(&targetInst),
823+
},
824+
ImagePullSecrets: targetInst.Spec.ImagePullSecrets,
825+
Lifecycle: targetInst.Spec.Agents.Default.Lifecycle,
826+
},
827+
}
828+
829+
// Copy skills from the target instance.
830+
for _, skill := range targetInst.Spec.Skills {
831+
if skill.SkillPackRef == "web-endpoint" {
832+
continue
833+
}
834+
successorRun.Spec.Skills = append(successorRun.Spec.Skills, skill)
835+
}
836+
837+
if err := r.Create(ctx, successorRun); err != nil {
838+
if errors.IsAlreadyExists(err) {
839+
log.Info("Sequential successor already exists", "run", runName)
840+
continue
841+
}
842+
log.Error(err, "Failed to create sequential successor", "run", runName)
843+
continue
844+
}
845+
log.Info("Created sequential successor run", "run", runName, "target", targetPersona)
846+
triggered = true
847+
}
848+
849+
// Mark this run as having triggered its successors to prevent duplicates.
850+
if triggered {
851+
patch := client.MergeFrom(agentRun.DeepCopy())
852+
if agentRun.Labels == nil {
853+
agentRun.Labels = make(map[string]string)
854+
}
855+
agentRun.Labels["sympozium.ai/sequential-triggered"] = "true"
856+
if err := r.Patch(ctx, agentRun, patch); err != nil {
857+
log.Error(err, "Failed to mark run as sequential-triggered")
858+
}
859+
}
860+
861+
return nil
862+
}
863+
724864
// runHistoryLimit returns the effective run history limit.
725865
func (r *AgentRunReconciler) runHistoryLimit() int {
726866
if r.RunHistoryLimit > 0 {

0 commit comments

Comments
 (0)