Chapter 5: Argo Workflows Advanced Features

Haiyue
23min

Chapter 5: Argo Workflows Advanced Features

Learning Objectives
  • Master Artifact management and data transfer
  • Learn to configure workflow templates and reuse strategies
  • Understand retry mechanisms and error handling
  • Become proficient in using CronWorkflow for scheduled tasks

Knowledge Points

Artifact Overview

Artifacts are files or data passed between workflow steps. Argo Workflows supports multiple Artifact storage backends, including S3, GCS, MinIO, OSS, etc.

🔄 正在渲染 Mermaid 图表...

Artifact Storage Configuration

# Configure default Artifact repository
apiVersion: v1
kind: ConfigMap
metadata:
  name: artifact-repositories
  namespace: argo
data:
  default-v1: |
    archiveLogs: true
    s3:
      endpoint: minio.argo:9000
      bucket: argo-artifacts
      insecure: true
      accessKeySecret:
        name: argo-artifacts-secret
        key: accesskey
      secretKeySecret:
        name: argo-artifacts-secret
        key: secretkey
# Create storage credentials
apiVersion: v1
kind: Secret
metadata:
  name: argo-artifacts-secret
  namespace: argo
type: Opaque
stringData:
  accesskey: minioadmin
  secretkey: minioadmin

Artifact Management

Output Artifacts

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-output-
spec:
  entrypoint: main
  templates:
  - name: main
    container:
      image: python:3.9
      command: [python, -c]
      args:
      - |
        import json
        data = {"result": "success", "value": 42}
        with open("/tmp/output.json", "w") as f:
            json.dump(data, f)
        print("Output generated")
    outputs:
      artifacts:
      - name: output-file
        path: /tmp/output.json
        # Optional: custom storage location
        s3:
          key: "{{workflow.name}}/output.json"

Input Artifacts

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-input-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: generate
        template: generate-data
    - - name: process
        template: process-data
        arguments:
          artifacts:
          - name: input-data
            from: "{{steps.generate.outputs.artifacts.output-file}}"

  - name: generate-data
    container:
      image: python:3.9
      command: [python, -c]
      args:
      - |
        import json
        data = [1, 2, 3, 4, 5]
        with open("/tmp/data.json", "w") as f:
            json.dump(data, f)
    outputs:
      artifacts:
      - name: output-file
        path: /tmp/data.json

  - name: process-data
    inputs:
      artifacts:
      - name: input-data
        path: /tmp/input.json
    container:
      image: python:3.9
      command: [python, -c]
      args:
      - |
        import json
        with open("/tmp/input.json") as f:
            data = json.load(f)
        print(f"Sum: {sum(data)}")

Fetching External Artifacts

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: external-artifact-
spec:
  entrypoint: main
  templates:
  - name: main
    inputs:
      artifacts:
      # Fetch from HTTP URL
      - name: config-file
        path: /tmp/config.yaml
        http:
          url: https://raw.githubusercontent.com/myorg/configs/main/app.yaml

      # Fetch from Git
      - name: source-code
        path: /workspace/src
        git:
          repo: https://github.com/myorg/myapp.git
          revision: main

      # Fetch from S3
      - name: model
        path: /models/model.pkl
        s3:
          endpoint: s3.amazonaws.com
          bucket: my-models
          key: models/v1/model.pkl
          accessKeySecret:
            name: aws-credentials
            key: accessKey
          secretKeySecret:
            name: aws-credentials
            key: secretKey

    container:
      image: python:3.9
      command: [sh, -c]
      args:
      - |
        echo "Config file:"
        cat /tmp/config.yaml
        echo "Source code:"
        ls /workspace/src
        echo "Model file:"
        ls -la /models/

Artifact Compression and Archiving

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: artifact-archive-
spec:
  entrypoint: main
  templates:
  - name: main
    container:
      image: busybox
      command: [sh, -c]
      args:
      - |
        mkdir -p /tmp/output
        echo "file1" > /tmp/output/file1.txt
        echo "file2" > /tmp/output/file2.txt
        echo "file3" > /tmp/output/file3.txt
    outputs:
      artifacts:
      - name: output-dir
        path: /tmp/output
        archive:
          tar:
            compressionLevel: 9
        # Or use none to disable compression
        # archive:
        #   none: {}

WorkflowTemplate

Creating Reusable Templates

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: ci-template
  namespace: argo
spec:
  arguments:
    parameters:
    - name: repo
    - name: branch
      value: main
    - name: image

  templates:
  - name: ci-pipeline
    dag:
      tasks:
      - name: checkout
        template: git-checkout
      - name: build
        dependencies: [checkout]
        template: docker-build
      - name: push
        dependencies: [build]
        template: docker-push

  - name: git-checkout
    container:
      image: alpine/git
      command: [sh, -c]
      args:
      - |
        git clone --branch {{workflow.parameters.branch}} {{workflow.parameters.repo}} /workspace
      volumeMounts:
      - name: workspace
        mountPath: /workspace

  - name: docker-build
    container:
      image: docker:dind
      command: [sh, -c]
      args:
      - |
        cd /workspace
        docker build -t {{workflow.parameters.image}}:{{workflow.name}} .
      volumeMounts:
      - name: workspace
        mountPath: /workspace
      securityContext:
        privileged: true

  - name: docker-push
    container:
      image: docker:dind
      command: [sh, -c]
      args:
      - |
        docker push {{workflow.parameters.image}}:{{workflow.name}}
      securityContext:
        privileged: true

  volumeClaimTemplates:
  - metadata:
      name: workspace
    spec:
      accessModes: ["ReadWriteOnce"]
      resources:
        requests:
          storage: 1Gi

Using WorkflowTemplate

# Method 1: Reference entire template
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: ci-run-
spec:
  workflowTemplateRef:
    name: ci-template
  arguments:
    parameters:
    - name: repo
      value: https://github.com/myorg/myapp.git
    - name: image
      value: myregistry.com/myapp

---
# Method 2: Reference specific steps from template in workflow
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: custom-ci-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: checkout
        templateRef:
          name: ci-template
          template: git-checkout
    - - name: custom-step
        template: my-custom-step
    - - name: build
        templateRef:
          name: ci-template
          template: docker-build

  - name: my-custom-step
    container:
      image: busybox
      command: [echo, "Custom step"]

ClusterWorkflowTemplate

# Cluster-level template, available in all namespaces
apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
  name: common-tasks
spec:
  templates:
  - name: slack-notify
    inputs:
      parameters:
      - name: message
      - name: channel
        value: "#general"
    container:
      image: curlimages/curl
      command: [sh, -c]
      args:
      - |
        curl -X POST -H 'Content-type: application/json' \
          --data '{"channel":"{{inputs.parameters.channel}}","text":"{{inputs.parameters.message}}"}' \
          $SLACK_WEBHOOK_URL
      env:
      - name: SLACK_WEBHOOK_URL
        valueFrom:
          secretKeyRef:
            name: slack-webhook
            key: url

  - name: send-email
    inputs:
      parameters:
      - name: to
      - name: subject
      - name: body
    container:
      image: mailhog/mhsendmail
      command: [sh, -c]
      args:
      - |
        echo "{{inputs.parameters.body}}" | sendmail -to {{inputs.parameters.to}} -subject "{{inputs.parameters.subject}}"
# Reference ClusterWorkflowTemplate
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: use-cluster-template-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: do-work
        template: work
    - - name: notify
        templateRef:
          name: common-tasks
          template: slack-notify
          clusterScope: true
        arguments:
          parameters:
          - name: message
            value: "Work completed!"

  - name: work
    container:
      image: busybox
      command: [echo, "Working..."]

Retry and Error Handling

Step Retry

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: retry-example-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: flaky-step
        template: flaky-task

  - name: flaky-task
    retryStrategy:
      limit: 3
      retryPolicy: "Always"  # Always, OnFailure, OnError, OnTransientError
      backoff:
        duration: "5s"
        factor: 2
        maxDuration: "1m"
    container:
      image: python:3.9
      command: [python, -c]
      args:
      - |
        import random
        import sys
        if random.random() < 0.7:
            print("Failed!")
            sys.exit(1)
        print("Success!")

DAG Task Retry

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: dag-retry-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: task-a
        template: flaky-task

      - name: task-b
        dependencies: [task-a]
        template: stable-task

  - name: flaky-task
    retryStrategy:
      limit: 3
      backoff:
        duration: "10s"
        factor: 2
    container:
      image: busybox
      command: [sh, -c, "exit $((RANDOM % 2))"]

  - name: stable-task
    container:
      image: busybox
      command: [echo, "Stable task"]

Error Handling Strategy

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: error-handling-
spec:
  entrypoint: main
  onExit: exit-handler

  templates:
  - name: main
    dag:
      failFast: false  # Don't fail fast, let other tasks continue
      tasks:
      - name: task-a
        template: may-fail
        continueOn:
          failed: true  # Continue even on failure

      - name: task-b
        template: stable

      - name: task-c
        dependencies: [task-a, task-b]
        template: final-task

  - name: may-fail
    container:
      image: busybox
      command: [sh, -c, "exit 1"]

  - name: stable
    container:
      image: busybox
      command: [echo, "Stable"]

  - name: final-task
    container:
      image: busybox
      command: [echo, "Final"]

  # Exit handler
  - name: exit-handler
    steps:
    - - name: check-status
        template: check-and-notify

  - name: check-and-notify
    container:
      image: busybox
      command: [sh, -c]
      args:
      - |
        if [ "{{workflow.status}}" = "Succeeded" ]; then
          echo "Workflow succeeded!"
        else
          echo "Workflow failed with status: {{workflow.status}}"
        fi

Timeout Configuration

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: timeout-example-
spec:
  entrypoint: main
  # Workflow-level timeout
  activeDeadlineSeconds: 3600  # 1 hour

  templates:
  - name: main
    steps:
    - - name: long-task
        template: long-running

  - name: long-running
    # Template-level timeout
    activeDeadlineSeconds: 300  # 5 minutes
    container:
      image: busybox
      command: [sh, -c]
      args: ["sleep 600"]  # Will be terminated by timeout

CronWorkflow

Basic Scheduled Workflow

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: daily-backup
  namespace: argo
spec:
  # Cron expression
  schedule: "0 2 * * *"  # Daily at 2 AM
  timezone: "Asia/Shanghai"

  # Concurrency policy
  concurrencyPolicy: Replace  # Allow, Forbid, Replace

  # History retention
  successfulJobsHistoryLimit: 3
  failedJobsHistoryLimit: 3

  # Whether to execute immediately on start
  startingDeadlineSeconds: 0

  workflowSpec:
    entrypoint: backup
    templates:
    - name: backup
      container:
        image: backup-tool
        command: [backup, --full]

Advanced CronWorkflow

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: scheduled-pipeline
  namespace: argo
spec:
  schedule: "*/30 * * * *"  # Every 30 minutes
  timezone: "UTC"
  concurrencyPolicy: Forbid  # Forbid concurrent execution
  successfulJobsHistoryLimit: 5
  failedJobsHistoryLimit: 3

  # Suspend scheduling
  suspend: false

  workflowSpec:
    entrypoint: pipeline
    arguments:
      parameters:
      - name: run-type
        value: "scheduled"

    # TTL strategy
    ttlStrategy:
      secondsAfterCompletion: 86400  # Delete after 1 day
      secondsAfterSuccess: 43200     # Delete 12 hours after success
      secondsAfterFailure: 172800    # Delete 2 days after failure

    # Pod GC strategy
    podGC:
      strategy: OnPodCompletion

    templates:
    - name: pipeline
      dag:
        tasks:
        - name: fetch-data
          template: fetch
        - name: process
          dependencies: [fetch-data]
          template: process
        - name: report
          dependencies: [process]
          template: report

    - name: fetch
      container:
        image: curlimages/curl
        command: [sh, -c]
        args: ["curl -o /tmp/data.json https://api.example.com/data"]
      outputs:
        artifacts:
        - name: data
          path: /tmp/data.json

    - name: process
      inputs:
        artifacts:
        - name: data
          from: "{{tasks.fetch-data.outputs.artifacts.data}}"
          path: /tmp/data.json
      container:
        image: python:3.9
        command: [python, -c]
        args:
        - |
          import json
          with open('/tmp/data.json') as f:
              data = json.load(f)
          print(f"Processed {len(data)} records")

    - name: report
      container:
        image: busybox
        command: [echo, "Report generated at {{workflow.creationTimestamp}}"]
# Manage CronWorkflow
argo cron list -n argo
argo cron get daily-backup -n argo
argo cron suspend daily-backup -n argo
argo cron resume daily-backup -n argo

# Manual trigger
argo cron trigger daily-backup -n argo

Advanced Features

Workflow-Level Configuration

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: advanced-workflow-
spec:
  entrypoint: main

  # Service account
  serviceAccountName: argo-workflow

  # Node selection
  nodeSelector:
    kubernetes.io/os: linux

  # Tolerations
  tolerations:
  - key: "dedicated"
    operator: "Equal"
    value: "argo"
    effect: "NoSchedule"

  # Affinity
  affinity:
    podAntiAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - weight: 100
        podAffinityTerm:
          labelSelector:
            matchLabels:
              workflows.argoproj.io/workflow: "{{workflow.name}}"
          topologyKey: kubernetes.io/hostname

  # Priority
  priority: 100
  priorityClassName: high-priority

  # Security context
  securityContext:
    runAsNonRoot: true
    runAsUser: 1000

  # Parallelism limit
  parallelism: 10

  templates:
  - name: main
    container:
      image: busybox
      command: [echo, "Hello"]

Synchronization and Mutual Exclusion

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: sync-example-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: critical-section
        template: critical

  - name: critical
    # Mutex lock
    synchronization:
      mutex:
        name: my-mutex
    container:
      image: busybox
      command: [sh, -c, "echo 'Critical section'; sleep 10"]

---
# Semaphore to limit concurrency
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: semaphore-example-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: concurrent-tasks
        template: limited-task
        withItems: ["1", "2", "3", "4", "5"]

  - name: limited-task
    synchronization:
      semaphore:
        configMapKeyRef:
          name: my-semaphore
          key: workflow
    container:
      image: busybox
      command: [sh, -c, "echo 'Task {{item}}'; sleep 30"]
# Semaphore ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
  name: my-semaphore
  namespace: argo
data:
  workflow: "3"  # Maximum 3 concurrent

Memoization Caching

apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: memoize-example-
spec:
  entrypoint: main
  templates:
  - name: main
    steps:
    - - name: expensive-computation
        template: compute
        arguments:
          parameters:
          - name: input
            value: "same-input"

  - name: compute
    inputs:
      parameters:
      - name: input
    memoize:
      key: "compute-{{inputs.parameters.input}}"
      maxAge: "1h"
      cache:
        configMap:
          name: compute-cache
    container:
      image: python:3.9
      command: [python, -c]
      args:
      - |
        import time
        print("Starting expensive computation...")
        time.sleep(60)  # Simulate time-consuming computation
        print("Done!")

Practical Exercise

Complete ML Training Pipeline

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: ml-pipeline
  namespace: argo
spec:
  entrypoint: ml-training
  arguments:
    parameters:
    - name: dataset-url
    - name: model-name
    - name: epochs
      value: "10"

  templates:
  - name: ml-training
    dag:
      tasks:
      - name: download-data
        template: download-dataset

      - name: preprocess
        dependencies: [download-data]
        template: preprocess-data
        arguments:
          artifacts:
          - name: raw-data
            from: "{{tasks.download-data.outputs.artifacts.dataset}}"

      - name: train
        dependencies: [preprocess]
        template: train-model
        arguments:
          artifacts:
          - name: training-data
            from: "{{tasks.preprocess.outputs.artifacts.processed-data}}"
          parameters:
          - name: epochs
            value: "{{workflow.parameters.epochs}}"

      - name: evaluate
        dependencies: [train]
        template: evaluate-model
        arguments:
          artifacts:
          - name: model
            from: "{{tasks.train.outputs.artifacts.model}}"
          - name: test-data
            from: "{{tasks.preprocess.outputs.artifacts.test-data}}"

      - name: deploy
        dependencies: [evaluate]
        when: "{{tasks.evaluate.outputs.parameters.accuracy}} > 0.9"
        template: deploy-model
        arguments:
          artifacts:
          - name: model
            from: "{{tasks.train.outputs.artifacts.model}}"

  - name: download-dataset
    container:
      image: curlimages/curl
      command: [sh, -c]
      args:
      - |
        curl -o /tmp/dataset.zip {{workflow.parameters.dataset-url}}
        unzip /tmp/dataset.zip -d /tmp/data
    outputs:
      artifacts:
      - name: dataset
        path: /tmp/data

  - name: preprocess-data
    inputs:
      artifacts:
      - name: raw-data
        path: /data/raw
    container:
      image: python:3.9
      command: [python, /scripts/preprocess.py]
      volumeMounts:
      - name: scripts
        mountPath: /scripts
    outputs:
      artifacts:
      - name: processed-data
        path: /data/processed
      - name: test-data
        path: /data/test

  - name: train-model
    inputs:
      artifacts:
      - name: training-data
        path: /data/train
      parameters:
      - name: epochs
    container:
      image: tensorflow/tensorflow:latest-gpu
      command: [python, /scripts/train.py]
      args:
      - --epochs={{inputs.parameters.epochs}}
      - --data=/data/train
      - --output=/models
      resources:
        limits:
          nvidia.com/gpu: 1
    outputs:
      artifacts:
      - name: model
        path: /models

  - name: evaluate-model
    inputs:
      artifacts:
      - name: model
        path: /models
      - name: test-data
        path: /data/test
    container:
      image: tensorflow/tensorflow:latest
      command: [python, /scripts/evaluate.py]
    outputs:
      parameters:
      - name: accuracy
        valueFrom:
          path: /tmp/accuracy.txt

  - name: deploy-model
    inputs:
      artifacts:
      - name: model
        path: /models
    container:
      image: google/cloud-sdk
      command: [sh, -c]
      args:
      - |
        gcloud ai-platform models upload \
          --name={{workflow.parameters.model-name}} \
          --path=/models
Best Practices
  1. Use WorkflowTemplate: Reuse common workflow steps
  2. Configure Artifact Storage: Properly configure persistent storage
  3. Error Handling: Configure retry strategies and timeouts
  4. Resource Cleanup: Use TTL and PodGC strategies
  5. Monitoring: Integrate Prometheus to monitor workflow metrics

Summary

Through this chapter, you should have mastered:

  • Artifact Management: Input/output artifacts, external storage
  • Workflow Templates: WorkflowTemplate and ClusterWorkflowTemplate
  • Error Handling: Retry strategies, timeouts, exit handlers
  • Scheduled Tasks: CronWorkflow configuration and management
  • Advanced Features: Synchronization, caching, parallelism control

In the next chapter, we will learn about Argo Rollouts to master progressive delivery and advanced deployment strategies.