Skip to main content
The AgentWorkflow CRD lets you describe a multi-step AI pipeline entirely in YAML. When you apply an AgentWorkflow, the Flokoa operator compiles it into an Argo WorkflowTemplate — a reusable, versioned pipeline definition managed by Argo Workflows. Your pipeline can call deployed Agent CRs via the A2A protocol, run Marvin-powered tasks in ephemeral containers, execute arbitrary container workloads, make HTTP requests, and branch conditionally — all wired together as a directed acyclic graph (DAG) through dependsOn declarations.
AgentWorkflow requires Argo Workflows to be installed in your cluster. The operator compiles each AgentWorkflow into an Argo WorkflowTemplate. Individual pipeline runs are triggered by creating Argo Workflow resources that reference the generated template.

API reference

apiVersion: agent.flokoa.ai/v1alpha1
kind: AgentWorkflow

Spec fields

FieldTypeRequiredDefaultDescription
descriptionstringHuman-readable description of the workflow’s purpose
paramsobject[]Workflow-level input parameters (see below)
tasksobject[]The pipeline tasks to execute; at least one is required
timeoutdurationMaximum wall-clock duration for the entire workflow (e.g. "1h")
retryStrategyobjectDefault retry policy applied to all tasks unless a task overrides it
serviceAccountNamestringflokoa-workflowKubernetes ServiceAccount used by workflow pods
automountServiceAccountTokenbooltrueWhether to mount the service account token into workflow pods (required by Argo executor plugins)

params

Each parameter in spec.params has:
FieldRequiredDescription
nameParameter name, referenceable in task expressions as {{params.<name>}}
descriptionHuman-readable explanation of the parameter
valueDefault value; if omitted, the parameter must be supplied at submission time

tasks

Each task in spec.tasks must have a name and exactly one task type:
Task typeDescription
agentCall a deployed Agent CR via the A2A protocol
agentTaskRun a Marvin-powered task in an ephemeral container
containerRun an arbitrary container workload
httpMake an HTTP request to an external service
switchRoute to different tasks based on a previous task’s output
Each task also supports:
FieldDescription
dependsOnList of task names that must complete before this task starts (DAG edges)
timeoutMaximum duration for this individual task
retryStrategyOverride the workflow-level retry policy for this task
conditionExpression that must evaluate to true for the task to run; skipped otherwise

Basic example

The following workflow takes a topic as input, calls a research agent to gather information, and then passes that output to a summarisation agent:
apiVersion: agent.flokoa.ai/v1alpha1
kind: AgentWorkflow
metadata:
  name: research-and-summarise
  namespace: production
spec:
  description: "Research a topic and produce a concise summary"

  params:
    - name: topic
      description: "The topic to research"
      value: "quantum computing"
    - name: max_length
      description: "Maximum word count for the summary"
      value: "200"

  timeout: "30m"

  retryStrategy:
    limit: 2
    backoff:
      duration: "30s"
      factor: 2

  serviceAccountName: flokoa-workflow

  tasks:
    - name: research
      agent:
        name: research-agent
        namespace: production
        text: "Research the following topic thoroughly: {{params.topic}}"

    - name: summarise
      dependsOn:
        - research
      agent:
        name: summarisation-agent
        namespace: production
        text: |
          Summarise the following research in {{params.max_length}} words or fewer:
          {{tasks.research.output}}
Apply it with:
kubectl apply -f research-and-summarise.yaml

Multi-task pipeline with conditional branching

apiVersion: agent.flokoa.ai/v1alpha1
kind: AgentWorkflow
metadata:
  name: support-triage-pipeline
  namespace: production
spec:
  description: "Triage a support request and route it to the right team"

  params:
    - name: ticket_id
      description: "Support ticket identifier"
    - name: ticket_content
      description: "Full ticket content to classify"

  timeout: "15m"

  tasks:
    # Step 1 — classify the ticket
    - name: classify
      agentTask:
        type: classify
        labels:
          - "billing"
          - "technical"
          - "general"
        input: "{{params.ticket_content}}"

    # Step 2 — route based on classification
    - name: route
      dependsOn:
        - classify
      switch:
        - condition: "{{tasks.classify.output}} == 'billing'"
          then: handle-billing
        - condition: "{{tasks.classify.output}} == 'technical'"
          then: handle-technical
        - default: handle-general

    # Step 3a — billing handler
    - name: handle-billing
      condition: "{{tasks.classify.output}} == 'billing'"
      dependsOn:
        - route
      agent:
        name: billing-agent
        text: "Handle this billing issue (ticket {{params.ticket_id}}): {{params.ticket_content}}"

    # Step 3b — technical handler
    - name: handle-technical
      condition: "{{tasks.classify.output}} == 'technical'"
      dependsOn:
        - route
      agent:
        name: technical-agent
        text: "Resolve this technical issue (ticket {{params.ticket_id}}): {{params.ticket_content}}"

    # Step 3c — general handler
    - name: handle-general
      condition: "{{tasks.classify.output}} == 'general'"
      dependsOn:
        - route
      agent:
        name: general-agent
        text: "Handle this general enquiry (ticket {{params.ticket_id}}): {{params.ticket_content}}"

Retry strategy

You can define a default retry strategy at the workflow level and override it per task:
spec:
  # Default: retry up to 3 times with exponential backoff
  retryStrategy:
    limit: 3
    backoff:
      duration: "30s"    # Wait 30s before the first retry
      factor: 2          # Double the wait after each retry: 30s, 60s, 120s

  tasks:
    - name: critical-task
      # This task overrides the workflow default — fail fast, no retries
      retryStrategy:
        limit: 0
      agent:
        name: my-agent
        text: "Do something critical"

Status fields

After the operator compiles the AgentWorkflow into an Argo WorkflowTemplate, it updates the following status fields:
status:
  ready: true
  workflowTemplateName: "research-and-summarise"
  specHash: "d4f1a3b2..."
  observedGeneration: 2
  conditions:
    - type: Ready
      status: "True"
      lastTransitionTime: "2026-01-15T10:30:00Z"
      reason: WorkflowTemplateApplied
      message: "Argo WorkflowTemplate compiled and applied successfully"
FieldDescription
readytrue when the Argo WorkflowTemplate has been successfully compiled and applied
workflowTemplateNameName of the generated Argo WorkflowTemplate CR
specHashHash of the current spec — the operator skips recompilation if this has not changed
observedGenerationLast spec generation reconciled by the operator
conditionsStandard condition array; the Ready condition carries error details

kubectl operations

# List all AgentWorkflows
kubectl get agentworkflows
# or using the short name:
kubectl get awf

# Inspect a workflow and its compiled status
kubectl describe agentworkflow research-and-summarise

# Check whether the WorkflowTemplate was created
kubectl get workflowtemplate research-and-summarise -n production

# Trigger a workflow run by creating an Argo Workflow that references the template
kubectl create -f - <<EOF
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: research-and-summarise-
  namespace: production
spec:
  workflowTemplateRef:
    name: research-and-summarise
  arguments:
    parameters:
      - name: topic
        value: "large language models"
EOF

# Watch active workflow runs
kubectl get workflows -n production -w

# Delete an AgentWorkflow (also removes the generated WorkflowTemplate)
kubectl delete agentworkflow research-and-summarise

Best practices

  1. Install Argo Workflows before creating AgentWorkflows — the operator depends on Argo CRDs being present. Verify with kubectl get crd workflowtemplates.argoproj.io.
  2. Name tasks clearly — task names appear in Argo’s UI and logs; descriptive names like classify-intent make pipelines much easier to debug than task-1.
  3. Use params for runtime inputs — avoid hard-coding values that change between runs directly in task text fields; parameterise them instead.
  4. Set workflow and task timeouts — unbounded workflows can consume cluster resources indefinitely; always define a maximum duration.
  5. Define retryStrategy at the workflow level and override only for tasks with special requirements — this keeps retry logic consistent and reduces verbosity.
  6. Use dependsOn to express ordering explicitly — do not rely on list order; the DAG is the source of truth for execution order.
  7. Keep serviceAccountName in a dedicated ServiceAccount with the minimum RBAC permissions needed by workflow pods.
  8. Version-control your AgentWorkflow manifests alongside the Agent CRs they call — changing an agent’s API without updating the workflow that calls it will cause runtime failures.