Chapter 5: Argo Workflows Advanced Features
Haiyue
23min
Chapter 5: Argo Workflows Advanced Features
Learning Objectives
- Master Artifact management and data transfer
- Learn to configure workflow templates and reuse strategies
- Understand retry mechanisms and error handling
- Become proficient in using CronWorkflow for scheduled tasks
Knowledge Points
Artifact Overview
Artifacts are files or data passed between workflow steps. Argo Workflows supports multiple Artifact storage backends, including S3, GCS, MinIO, OSS, etc.
🔄 正在渲染 Mermaid 图表...
Artifact Storage Configuration
# Configure default Artifact repository
apiVersion: v1
kind: ConfigMap
metadata:
name: artifact-repositories
namespace: argo
data:
default-v1: |
archiveLogs: true
s3:
endpoint: minio.argo:9000
bucket: argo-artifacts
insecure: true
accessKeySecret:
name: argo-artifacts-secret
key: accesskey
secretKeySecret:
name: argo-artifacts-secret
key: secretkey
# Create storage credentials
apiVersion: v1
kind: Secret
metadata:
name: argo-artifacts-secret
namespace: argo
type: Opaque
stringData:
accesskey: minioadmin
secretkey: minioadmin
Artifact Management
Output Artifacts
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-output-
spec:
entrypoint: main
templates:
- name: main
container:
image: python:3.9
command: [python, -c]
args:
- |
import json
data = {"result": "success", "value": 42}
with open("/tmp/output.json", "w") as f:
json.dump(data, f)
print("Output generated")
outputs:
artifacts:
- name: output-file
path: /tmp/output.json
# Optional: custom storage location
s3:
key: "{{workflow.name}}/output.json"
Input Artifacts
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-input-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: generate
template: generate-data
- - name: process
template: process-data
arguments:
artifacts:
- name: input-data
from: "{{steps.generate.outputs.artifacts.output-file}}"
- name: generate-data
container:
image: python:3.9
command: [python, -c]
args:
- |
import json
data = [1, 2, 3, 4, 5]
with open("/tmp/data.json", "w") as f:
json.dump(data, f)
outputs:
artifacts:
- name: output-file
path: /tmp/data.json
- name: process-data
inputs:
artifacts:
- name: input-data
path: /tmp/input.json
container:
image: python:3.9
command: [python, -c]
args:
- |
import json
with open("/tmp/input.json") as f:
data = json.load(f)
print(f"Sum: {sum(data)}")
Fetching External Artifacts
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: external-artifact-
spec:
entrypoint: main
templates:
- name: main
inputs:
artifacts:
# Fetch from HTTP URL
- name: config-file
path: /tmp/config.yaml
http:
url: https://raw.githubusercontent.com/myorg/configs/main/app.yaml
# Fetch from Git
- name: source-code
path: /workspace/src
git:
repo: https://github.com/myorg/myapp.git
revision: main
# Fetch from S3
- name: model
path: /models/model.pkl
s3:
endpoint: s3.amazonaws.com
bucket: my-models
key: models/v1/model.pkl
accessKeySecret:
name: aws-credentials
key: accessKey
secretKeySecret:
name: aws-credentials
key: secretKey
container:
image: python:3.9
command: [sh, -c]
args:
- |
echo "Config file:"
cat /tmp/config.yaml
echo "Source code:"
ls /workspace/src
echo "Model file:"
ls -la /models/
Artifact Compression and Archiving
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: artifact-archive-
spec:
entrypoint: main
templates:
- name: main
container:
image: busybox
command: [sh, -c]
args:
- |
mkdir -p /tmp/output
echo "file1" > /tmp/output/file1.txt
echo "file2" > /tmp/output/file2.txt
echo "file3" > /tmp/output/file3.txt
outputs:
artifacts:
- name: output-dir
path: /tmp/output
archive:
tar:
compressionLevel: 9
# Or use none to disable compression
# archive:
# none: {}
WorkflowTemplate
Creating Reusable Templates
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: ci-template
namespace: argo
spec:
arguments:
parameters:
- name: repo
- name: branch
value: main
- name: image
templates:
- name: ci-pipeline
dag:
tasks:
- name: checkout
template: git-checkout
- name: build
dependencies: [checkout]
template: docker-build
- name: push
dependencies: [build]
template: docker-push
- name: git-checkout
container:
image: alpine/git
command: [sh, -c]
args:
- |
git clone --branch {{workflow.parameters.branch}} {{workflow.parameters.repo}} /workspace
volumeMounts:
- name: workspace
mountPath: /workspace
- name: docker-build
container:
image: docker:dind
command: [sh, -c]
args:
- |
cd /workspace
docker build -t {{workflow.parameters.image}}:{{workflow.name}} .
volumeMounts:
- name: workspace
mountPath: /workspace
securityContext:
privileged: true
- name: docker-push
container:
image: docker:dind
command: [sh, -c]
args:
- |
docker push {{workflow.parameters.image}}:{{workflow.name}}
securityContext:
privileged: true
volumeClaimTemplates:
- metadata:
name: workspace
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 1Gi
Using WorkflowTemplate
# Method 1: Reference entire template
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: ci-run-
spec:
workflowTemplateRef:
name: ci-template
arguments:
parameters:
- name: repo
value: https://github.com/myorg/myapp.git
- name: image
value: myregistry.com/myapp
---
# Method 2: Reference specific steps from template in workflow
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: custom-ci-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: checkout
templateRef:
name: ci-template
template: git-checkout
- - name: custom-step
template: my-custom-step
- - name: build
templateRef:
name: ci-template
template: docker-build
- name: my-custom-step
container:
image: busybox
command: [echo, "Custom step"]
ClusterWorkflowTemplate
# Cluster-level template, available in all namespaces
apiVersion: argoproj.io/v1alpha1
kind: ClusterWorkflowTemplate
metadata:
name: common-tasks
spec:
templates:
- name: slack-notify
inputs:
parameters:
- name: message
- name: channel
value: "#general"
container:
image: curlimages/curl
command: [sh, -c]
args:
- |
curl -X POST -H 'Content-type: application/json' \
--data '{"channel":"{{inputs.parameters.channel}}","text":"{{inputs.parameters.message}}"}' \
$SLACK_WEBHOOK_URL
env:
- name: SLACK_WEBHOOK_URL
valueFrom:
secretKeyRef:
name: slack-webhook
key: url
- name: send-email
inputs:
parameters:
- name: to
- name: subject
- name: body
container:
image: mailhog/mhsendmail
command: [sh, -c]
args:
- |
echo "{{inputs.parameters.body}}" | sendmail -to {{inputs.parameters.to}} -subject "{{inputs.parameters.subject}}"
# Reference ClusterWorkflowTemplate
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: use-cluster-template-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: do-work
template: work
- - name: notify
templateRef:
name: common-tasks
template: slack-notify
clusterScope: true
arguments:
parameters:
- name: message
value: "Work completed!"
- name: work
container:
image: busybox
command: [echo, "Working..."]
Retry and Error Handling
Step Retry
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: retry-example-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: flaky-step
template: flaky-task
- name: flaky-task
retryStrategy:
limit: 3
retryPolicy: "Always" # Always, OnFailure, OnError, OnTransientError
backoff:
duration: "5s"
factor: 2
maxDuration: "1m"
container:
image: python:3.9
command: [python, -c]
args:
- |
import random
import sys
if random.random() < 0.7:
print("Failed!")
sys.exit(1)
print("Success!")
DAG Task Retry
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-retry-
spec:
entrypoint: main
templates:
- name: main
dag:
tasks:
- name: task-a
template: flaky-task
- name: task-b
dependencies: [task-a]
template: stable-task
- name: flaky-task
retryStrategy:
limit: 3
backoff:
duration: "10s"
factor: 2
container:
image: busybox
command: [sh, -c, "exit $((RANDOM % 2))"]
- name: stable-task
container:
image: busybox
command: [echo, "Stable task"]
Error Handling Strategy
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: error-handling-
spec:
entrypoint: main
onExit: exit-handler
templates:
- name: main
dag:
failFast: false # Don't fail fast, let other tasks continue
tasks:
- name: task-a
template: may-fail
continueOn:
failed: true # Continue even on failure
- name: task-b
template: stable
- name: task-c
dependencies: [task-a, task-b]
template: final-task
- name: may-fail
container:
image: busybox
command: [sh, -c, "exit 1"]
- name: stable
container:
image: busybox
command: [echo, "Stable"]
- name: final-task
container:
image: busybox
command: [echo, "Final"]
# Exit handler
- name: exit-handler
steps:
- - name: check-status
template: check-and-notify
- name: check-and-notify
container:
image: busybox
command: [sh, -c]
args:
- |
if [ "{{workflow.status}}" = "Succeeded" ]; then
echo "Workflow succeeded!"
else
echo "Workflow failed with status: {{workflow.status}}"
fi
Timeout Configuration
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: timeout-example-
spec:
entrypoint: main
# Workflow-level timeout
activeDeadlineSeconds: 3600 # 1 hour
templates:
- name: main
steps:
- - name: long-task
template: long-running
- name: long-running
# Template-level timeout
activeDeadlineSeconds: 300 # 5 minutes
container:
image: busybox
command: [sh, -c]
args: ["sleep 600"] # Will be terminated by timeout
CronWorkflow
Basic Scheduled Workflow
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: daily-backup
namespace: argo
spec:
# Cron expression
schedule: "0 2 * * *" # Daily at 2 AM
timezone: "Asia/Shanghai"
# Concurrency policy
concurrencyPolicy: Replace # Allow, Forbid, Replace
# History retention
successfulJobsHistoryLimit: 3
failedJobsHistoryLimit: 3
# Whether to execute immediately on start
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: backup
templates:
- name: backup
container:
image: backup-tool
command: [backup, --full]
Advanced CronWorkflow
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: scheduled-pipeline
namespace: argo
spec:
schedule: "*/30 * * * *" # Every 30 minutes
timezone: "UTC"
concurrencyPolicy: Forbid # Forbid concurrent execution
successfulJobsHistoryLimit: 5
failedJobsHistoryLimit: 3
# Suspend scheduling
suspend: false
workflowSpec:
entrypoint: pipeline
arguments:
parameters:
- name: run-type
value: "scheduled"
# TTL strategy
ttlStrategy:
secondsAfterCompletion: 86400 # Delete after 1 day
secondsAfterSuccess: 43200 # Delete 12 hours after success
secondsAfterFailure: 172800 # Delete 2 days after failure
# Pod GC strategy
podGC:
strategy: OnPodCompletion
templates:
- name: pipeline
dag:
tasks:
- name: fetch-data
template: fetch
- name: process
dependencies: [fetch-data]
template: process
- name: report
dependencies: [process]
template: report
- name: fetch
container:
image: curlimages/curl
command: [sh, -c]
args: ["curl -o /tmp/data.json https://api.example.com/data"]
outputs:
artifacts:
- name: data
path: /tmp/data.json
- name: process
inputs:
artifacts:
- name: data
from: "{{tasks.fetch-data.outputs.artifacts.data}}"
path: /tmp/data.json
container:
image: python:3.9
command: [python, -c]
args:
- |
import json
with open('/tmp/data.json') as f:
data = json.load(f)
print(f"Processed {len(data)} records")
- name: report
container:
image: busybox
command: [echo, "Report generated at {{workflow.creationTimestamp}}"]
# Manage CronWorkflow
argo cron list -n argo
argo cron get daily-backup -n argo
argo cron suspend daily-backup -n argo
argo cron resume daily-backup -n argo
# Manual trigger
argo cron trigger daily-backup -n argo
Advanced Features
Workflow-Level Configuration
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: advanced-workflow-
spec:
entrypoint: main
# Service account
serviceAccountName: argo-workflow
# Node selection
nodeSelector:
kubernetes.io/os: linux
# Tolerations
tolerations:
- key: "dedicated"
operator: "Equal"
value: "argo"
effect: "NoSchedule"
# Affinity
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
workflows.argoproj.io/workflow: "{{workflow.name}}"
topologyKey: kubernetes.io/hostname
# Priority
priority: 100
priorityClassName: high-priority
# Security context
securityContext:
runAsNonRoot: true
runAsUser: 1000
# Parallelism limit
parallelism: 10
templates:
- name: main
container:
image: busybox
command: [echo, "Hello"]
Synchronization and Mutual Exclusion
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: sync-example-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: critical-section
template: critical
- name: critical
# Mutex lock
synchronization:
mutex:
name: my-mutex
container:
image: busybox
command: [sh, -c, "echo 'Critical section'; sleep 10"]
---
# Semaphore to limit concurrency
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: semaphore-example-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: concurrent-tasks
template: limited-task
withItems: ["1", "2", "3", "4", "5"]
- name: limited-task
synchronization:
semaphore:
configMapKeyRef:
name: my-semaphore
key: workflow
container:
image: busybox
command: [sh, -c, "echo 'Task {{item}}'; sleep 30"]
# Semaphore ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: my-semaphore
namespace: argo
data:
workflow: "3" # Maximum 3 concurrent
Memoization Caching
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: memoize-example-
spec:
entrypoint: main
templates:
- name: main
steps:
- - name: expensive-computation
template: compute
arguments:
parameters:
- name: input
value: "same-input"
- name: compute
inputs:
parameters:
- name: input
memoize:
key: "compute-{{inputs.parameters.input}}"
maxAge: "1h"
cache:
configMap:
name: compute-cache
container:
image: python:3.9
command: [python, -c]
args:
- |
import time
print("Starting expensive computation...")
time.sleep(60) # Simulate time-consuming computation
print("Done!")
Practical Exercise
Complete ML Training Pipeline
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: ml-pipeline
namespace: argo
spec:
entrypoint: ml-training
arguments:
parameters:
- name: dataset-url
- name: model-name
- name: epochs
value: "10"
templates:
- name: ml-training
dag:
tasks:
- name: download-data
template: download-dataset
- name: preprocess
dependencies: [download-data]
template: preprocess-data
arguments:
artifacts:
- name: raw-data
from: "{{tasks.download-data.outputs.artifacts.dataset}}"
- name: train
dependencies: [preprocess]
template: train-model
arguments:
artifacts:
- name: training-data
from: "{{tasks.preprocess.outputs.artifacts.processed-data}}"
parameters:
- name: epochs
value: "{{workflow.parameters.epochs}}"
- name: evaluate
dependencies: [train]
template: evaluate-model
arguments:
artifacts:
- name: model
from: "{{tasks.train.outputs.artifacts.model}}"
- name: test-data
from: "{{tasks.preprocess.outputs.artifacts.test-data}}"
- name: deploy
dependencies: [evaluate]
when: "{{tasks.evaluate.outputs.parameters.accuracy}} > 0.9"
template: deploy-model
arguments:
artifacts:
- name: model
from: "{{tasks.train.outputs.artifacts.model}}"
- name: download-dataset
container:
image: curlimages/curl
command: [sh, -c]
args:
- |
curl -o /tmp/dataset.zip {{workflow.parameters.dataset-url}}
unzip /tmp/dataset.zip -d /tmp/data
outputs:
artifacts:
- name: dataset
path: /tmp/data
- name: preprocess-data
inputs:
artifacts:
- name: raw-data
path: /data/raw
container:
image: python:3.9
command: [python, /scripts/preprocess.py]
volumeMounts:
- name: scripts
mountPath: /scripts
outputs:
artifacts:
- name: processed-data
path: /data/processed
- name: test-data
path: /data/test
- name: train-model
inputs:
artifacts:
- name: training-data
path: /data/train
parameters:
- name: epochs
container:
image: tensorflow/tensorflow:latest-gpu
command: [python, /scripts/train.py]
args:
- --epochs={{inputs.parameters.epochs}}
- --data=/data/train
- --output=/models
resources:
limits:
nvidia.com/gpu: 1
outputs:
artifacts:
- name: model
path: /models
- name: evaluate-model
inputs:
artifacts:
- name: model
path: /models
- name: test-data
path: /data/test
container:
image: tensorflow/tensorflow:latest
command: [python, /scripts/evaluate.py]
outputs:
parameters:
- name: accuracy
valueFrom:
path: /tmp/accuracy.txt
- name: deploy-model
inputs:
artifacts:
- name: model
path: /models
container:
image: google/cloud-sdk
command: [sh, -c]
args:
- |
gcloud ai-platform models upload \
--name={{workflow.parameters.model-name}} \
--path=/models
Best Practices
- Use WorkflowTemplate: Reuse common workflow steps
- Configure Artifact Storage: Properly configure persistent storage
- Error Handling: Configure retry strategies and timeouts
- Resource Cleanup: Use TTL and PodGC strategies
- Monitoring: Integrate Prometheus to monitor workflow metrics
Summary
Through this chapter, you should have mastered:
- Artifact Management: Input/output artifacts, external storage
- Workflow Templates: WorkflowTemplate and ClusterWorkflowTemplate
- Error Handling: Retry strategies, timeouts, exit handlers
- Scheduled Tasks: CronWorkflow configuration and management
- Advanced Features: Synchronization, caching, parallelism control
In the next chapter, we will learn about Argo Rollouts to master progressive delivery and advanced deployment strategies.