Chapter 7: Argo Events Event-Driven
In-depth study of Argo Events core concepts including event sources, sensors, triggers, and building event-driven automation workflows
作者
41min
Argo Events Event-Driven
Chapter 7: Building Event-Driven Architecture with Argo Events
Argo Events is an event-driven workflow automation framework that connects various event sources (such as webhooks, message queues, timers, etc.) with Kubernetes resources or Argo Workflows.
7.1 Argo Events Overview
7.1.1 Event-Driven Architecture
🔄 正在渲染 Mermaid 图表...
7.1.2 Core Components
🔄 正在渲染 Mermaid 图表...
7.1.3 Installing Argo Events
# Create namespace
kubectl create namespace argo-events
# Install Argo Events
kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml
# Install EventBus (using NATS)
kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install-validating-webhook.yaml
# Verify installation
kubectl get pods -n argo-events
7.2 EventBus Event Bus
7.2.1 EventBus Concept
EventBus is the transport layer for events, responsible for passing events between EventSource and Sensor.
🔄 正在渲染 Mermaid 图表...
7.2.2 NATS EventBus
# eventbus-nats.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
namespace: argo-events
spec:
nats:
native:
# Number of replicas
replicas: 3
# Authentication strategy
auth: token
# Persistence configuration
persistence:
storageClassName: standard
accessMode: ReadWriteOnce
volumeSize: 10Gi
# Container resource configuration
containerTemplate:
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
# Metrics configuration
metricsContainerTemplate:
resources:
requests:
cpu: 50m
memory: 64Mi
# Anti-affinity
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
controller: eventbus-controller
topologyKey: kubernetes.io/hostname
7.2.3 Jetstream EventBus
# eventbus-jetstream.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
namespace: argo-events
spec:
jetstream:
version: "2.9.15"
replicas: 3
persistence:
storageClassName: fast-ssd
accessMode: ReadWriteOnce
volumeSize: 20Gi
# Stream configuration
streamConfig: |
max_msgs: 100000
max_bytes: 1073741824
max_age: 72h
max_msg_size: 1048576
retention: limits
storage: file
replicas: 3
containerTemplate:
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 1
memory: 1Gi
7.2.4 Kafka EventBus
# eventbus-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
namespace: argo-events
spec:
kafka:
url: kafka-cluster.kafka:9092
topic: argo-events
version: "3.4.0"
# TLS configuration
tls:
caCertSecret:
name: kafka-ca
key: ca.crt
clientCertSecret:
name: kafka-client-cert
key: tls.crt
clientKeySecret:
name: kafka-client-key
key: tls.key
# SASL authentication
sasl:
mechanism: SCRAM-SHA-512
userSecret:
name: kafka-credentials
key: username
passwordSecret:
name: kafka-credentials
key: password
# Consumer group configuration
consumerGroup:
groupName: argo-events-sensors
rebalanceStrategy: sticky
7.3 EventSource Event Sources
7.3.1 Webhook EventSource
# eventsource-webhook.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook
namespace: argo-events
spec:
service:
ports:
- port: 12000
targetPort: 12000
webhook:
# Generic webhook
generic:
port: "12000"
endpoint: /generic
method: POST
# CI/CD webhook
cicd:
port: "12000"
endpoint: /cicd
method: POST
# Filter
filter:
expression: "body.action == 'deploy'"
# Secured webhook
secure:
port: "12000"
endpoint: /secure
method: POST
# HMAC authentication
authSecret:
name: webhook-secret
key: token
---
# Expose webhook service
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: webhook-ingress
namespace: argo-events
spec:
rules:
- host: events.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: webhook-eventsource-svc
port:
number: 12000
7.3.2 GitHub EventSource
# eventsource-github.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: github
namespace: argo-events
spec:
service:
ports:
- port: 12000
targetPort: 12000
github:
# Monitor specific repositories
app-repo:
# Repository information
repositories:
- owner: myorg
names:
- myapp
- mylib
# Event types to listen for
events:
- push
- pull_request
- release
# Webhook configuration
webhook:
endpoint: /github/app
port: "12000"
method: POST
# GitHub API configuration
apiToken:
name: github-access
key: token
# Webhook secret
webhookSecret:
name: github-webhook-secret
key: secret
# SSL verification
insecure: false
# Active status
active: true
# Content type
contentType: json
# Listen to organization-level events
org-events:
owner: myorg
events:
- repository
- member
- team
webhook:
endpoint: /github/org
port: "12000"
method: POST
apiToken:
name: github-access
key: token
7.3.3 GitLab EventSource
# eventsource-gitlab.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: gitlab
namespace: argo-events
spec:
service:
ports:
- port: 12000
targetPort: 12000
gitlab:
app-project:
# GitLab project ID
projectID: "12345"
# Events to listen for
events:
- PushEvents
- MergeRequestEvents
- TagPushEvents
- PipelineEvents
# Webhook configuration
webhook:
endpoint: /gitlab/app
port: "12000"
method: POST
# GitLab URL
gitlabBaseURL: https://gitlab.example.com
# Access token
accessToken:
name: gitlab-access
key: token
# Webhook secret token
secretToken:
name: gitlab-secret
key: token
# SSL configuration
enableSSLVerification: true
7.3.4 Kafka EventSource
# eventsource-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: kafka
namespace: argo-events
spec:
kafka:
# Order events
orders:
url: kafka-cluster.kafka:9092
topic: orders
partition: "0"
consumerGroup:
groupName: argo-events-orders
rebalanceStrategy: sticky
# JSON data
jsonBody: true
# Rate limiting
limitEventsPerSecond: "100"
# User events
users:
url: kafka-cluster.kafka:9092
topic: users
partition: "0"
consumerGroup:
groupName: argo-events-users
jsonBody: true
# TLS configuration
tls:
caCertSecret:
name: kafka-ca
key: ca.crt
# SASL configuration
sasl:
mechanism: SCRAM-SHA-256
userSecret:
name: kafka-creds
key: username
passwordSecret:
name: kafka-creds
key: password
7.3.5 S3/MinIO EventSource
# eventsource-s3.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: minio
namespace: argo-events
spec:
minio:
# Monitor file uploads
file-upload:
bucket:
name: uploads
endpoint: minio.minio:9000
events:
- s3:ObjectCreated:*
- s3:ObjectRemoved:*
# Filter specific prefix
filter:
prefix: "data/"
suffix: ".csv"
# Use SSL
insecure: true
# Access credentials
accessKey:
name: minio-credentials
key: accesskey
secretKey:
name: minio-credentials
key: secretkey
# Monitor reports directory
reports:
bucket:
name: reports
endpoint: minio.minio:9000
events:
- s3:ObjectCreated:Put
filter:
prefix: "daily/"
suffix: ".pdf"
insecure: true
accessKey:
name: minio-credentials
key: accesskey
secretKey:
name: minio-credentials
key: secretkey
7.3.6 Calendar/Cron EventSource
# eventsource-calendar.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: calendar
namespace: argo-events
spec:
calendar:
# Daily report
daily-report:
# Cron expression
schedule: "0 8 * * *" # Every day at 8 AM
# Timezone
timezone: "Asia/Shanghai"
# Metadata
metadata:
task: daily-report
priority: high
# Hourly task
hourly-cleanup:
schedule: "0 * * * *" # Every hour
timezone: "UTC"
metadata:
task: cleanup
# Workday task
workday-sync:
schedule: "30 9 * * 1-5" # Workdays at 9:30
timezone: "Asia/Shanghai"
metadata:
task: data-sync
# Using interval
interval-task:
interval: 30m # Every 30 minutes
metadata:
task: health-check
7.3.7 Redis EventSource
# eventsource-redis.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: redis
namespace: argo-events
spec:
redis:
# Pub/Sub pattern
notifications:
hostAddress: redis.redis:6379
# Subscribed channels
channels:
- notifications
- alerts
# Password
password:
name: redis-secret
key: password
# JSON format
jsonBody: true
# TLS configuration
tls:
caCertSecret:
name: redis-ca
key: ca.crt
# Keyspace notifications
keyspace:
hostAddress: redis.redis:6379
channels:
- "__keyevent@0__:set"
- "__keyevent@0__:del"
password:
name: redis-secret
key: password
7.3.8 NATS EventSource
# eventsource-nats.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: nats
namespace: argo-events
spec:
nats:
# Order subject
orders:
url: nats://nats.nats:4222
subject: orders.>
jsonBody: true
# Authentication
auth:
credential:
name: nats-creds
key: creds
# Event subject
events:
url: nats://nats.nats:4222
subject: events.*
jsonBody: true
# Queue group (load balancing)
queue: argo-events-group
7.4 Sensor
7.4.1 Basic Sensor Configuration
# sensor-basic.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: webhook-sensor
namespace: argo-events
spec:
# Event dependencies
dependencies:
- name: webhook-dep
eventSourceName: webhook
eventName: generic
# Triggers
triggers:
- template:
name: log-trigger
log:
intervalSeconds: 1
7.4.2 Triggering Argo Workflow
# sensor-workflow.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: workflow-trigger
namespace: argo-events
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: github-push
eventSourceName: github
eventName: app-repo
# Event filtering
filters:
data:
- path: body.ref
type: string
value:
- refs/heads/main
- refs/heads/develop
- path: headers.X-GitHub-Event
type: string
value:
- push
triggers:
- template:
name: trigger-build-workflow
argoWorkflow:
# Workflow source
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: build-
spec:
entrypoint: build
arguments:
parameters:
- name: repo
- name: branch
- name: commit
templates:
- name: build
inputs:
parameters:
- name: repo
- name: branch
- name: commit
container:
image: docker:latest
command: ["/bin/sh", "-c"]
args:
- |
echo "Building repo: {{inputs.parameters.repo}}"
echo "Branch: {{inputs.parameters.branch}}"
echo "Commit: {{inputs.parameters.commit}}"
# Parameter mapping
parameters:
- src:
dependencyName: github-push
dataKey: body.repository.full_name
dest: spec.arguments.parameters.0.value
- src:
dependencyName: github-push
dataKey: body.ref
dataTemplate: "{{ .Input | replace \"refs/heads/\" \"\" }}"
dest: spec.arguments.parameters.1.value
- src:
dependencyName: github-push
dataKey: body.after
dest: spec.arguments.parameters.2.value
7.4.3 Triggering Kubernetes Resources
# sensor-k8s.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: k8s-trigger
namespace: argo-events
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: deploy-event
eventSourceName: webhook
eventName: cicd
triggers:
# Trigger Deployment update
- template:
name: update-deployment
k8s:
source:
resource:
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
namespace: production
spec:
template:
spec:
containers:
- name: app
image: myapp:latest
operation: patch
parameters:
- src:
dependencyName: deploy-event
dataKey: body.image
dest: spec.template.spec.containers.0.image
# Trigger ConfigMap creation
- template:
name: create-configmap
k8s:
source:
resource:
apiVersion: v1
kind: ConfigMap
metadata:
generateName: config-
namespace: production
data:
version: ""
timestamp: ""
operation: create
parameters:
- src:
dependencyName: deploy-event
dataKey: body.version
dest: data.version
- src:
dependencyName: deploy-event
dataTemplate: "{{ now | date \"2006-01-02T15:04:05Z\" }}"
dest: data.timestamp
7.4.4 Triggering HTTP Requests
# sensor-http.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: http-trigger
namespace: argo-events
spec:
dependencies:
- name: alert-event
eventSourceName: kafka
eventName: alerts
triggers:
# Send Slack notification
- template:
name: slack-notification
http:
url: https://hooks.slack.com/services/xxx/yyy/zzz
method: POST
headers:
Content-Type: application/json
payload:
- src:
dependencyName: alert-event
dataTemplate: |
{
"text": "Alert: {{ .Input.body.message }}",
"attachments": [{
"color": "danger",
"fields": [{
"title": "Service",
"value": "{{ .Input.body.service }}",
"short": true
}, {
"title": "Severity",
"value": "{{ .Input.body.severity }}",
"short": true
}]
}]
}
dest: body
# Call external API
- template:
name: external-api
http:
url: https://api.example.com/webhook
method: POST
headers:
Authorization: "Bearer ${API_TOKEN}"
Content-Type: application/json
secureHeaders:
- name: Authorization
valueFrom:
secretKeyRef:
name: api-credentials
key: token
payload:
- src:
dependencyName: alert-event
dataKey: body
dest: body
7.4.5 Conditional Triggering
# sensor-conditions.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: conditional-sensor
namespace: argo-events
spec:
dependencies:
- name: github-push
eventSourceName: github
eventName: app-repo
- name: github-pr
eventSourceName: github
eventName: app-repo
filters:
data:
- path: headers.X-GitHub-Event
type: string
value:
- pull_request
triggers:
# Deploy only when pushing to main
- template:
name: deploy-production
conditions: github-push
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: deploy-prod-
spec:
entrypoint: deploy
templates:
- name: deploy
container:
image: deployer:latest
args: ["--env", "production"]
# Only when pushing to main branch
retryStrategy:
steps: 3
duration: 1m
# PR event triggers tests
- template:
name: run-tests
conditions: github-pr
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: test-pr-
spec:
entrypoint: test
templates:
- name: test
container:
image: tester:latest
7.4.6 Multiple Event Dependencies
# sensor-multi-deps.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: multi-dependency-sensor
namespace: argo-events
spec:
dependencies:
# Code push event
- name: code-push
eventSourceName: github
eventName: app-repo
filters:
data:
- path: headers.X-GitHub-Event
type: string
value:
- push
# Configuration update event
- name: config-update
eventSourceName: webhook
eventName: config
# Scheduled event
- name: scheduled
eventSourceName: calendar
eventName: daily-deploy
triggers:
# Triggered by any event
- template:
name: any-event-trigger
conditions: "code-push || config-update"
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: build-
spec:
entrypoint: main
templates:
- name: main
container:
image: builder:latest
# Triggered when all events are satisfied
- template:
name: all-events-trigger
# Requires both code push and config update
conditions: "code-push && config-update"
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: full-deploy-
spec:
entrypoint: deploy
templates:
- name: deploy
container:
image: deployer:latest
7.5 Event Filtering and Transformation
7.5.1 Data Filtering
# sensor-data-filter.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: filtered-sensor
namespace: argo-events
spec:
dependencies:
- name: webhook-event
eventSourceName: webhook
eventName: generic
filters:
# Data filtering
data:
# String matching
- path: body.action
type: string
value:
- deploy
- rollback
# Numeric comparison
- path: body.priority
type: number
comparator: ">="
value:
- "5"
# Boolean value
- path: body.enabled
type: bool
value:
- "true"
# Regular expression
- path: body.version
type: string
value:
- "v[0-9]+\\.[0-9]+\\.[0-9]+"
# Time filtering
time:
start: "09:00:00"
stop: "18:00:00"
# Expression filtering
exprs:
- expr: body.environment == "production" && body.approved == true
fields:
- name: body.environment
path: body.environment
- name: body.approved
path: body.approved
triggers:
- template:
name: filtered-trigger
log:
intervalSeconds: 1
7.5.2 Context Filtering
# sensor-context-filter.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: context-filtered-sensor
namespace: argo-events
spec:
dependencies:
- name: github-event
eventSourceName: github
eventName: app-repo
filters:
# Context filtering
context:
# Event source filtering
source: github
# Event type filtering
type: push
# Subject filtering
subject: refs/heads/main
triggers:
- template:
name: main-branch-only
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: main-deploy-
spec:
entrypoint: deploy
templates:
- name: deploy
container:
image: deployer:latest
7.5.3 Data Transformation
# sensor-transform.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: transform-sensor
namespace: argo-events
spec:
dependencies:
- name: event
eventSourceName: webhook
eventName: generic
# Event transformation
transform:
jq: ".body | {app: .application, env: .environment, version: .version, timestamp: now}"
# Or use Lua script
# transform:
# script: |
# local event = obj.body
# return {
# app = event.application,
# env = event.environment,
# version = event.version,
# timestamp = os.time()
# }
triggers:
- template:
name: transformed-trigger
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: deploy-
spec:
entrypoint: main
arguments:
parameters:
- name: app
- name: env
- name: version
templates:
- name: main
inputs:
parameters:
- name: app
- name: env
- name: version
container:
image: deployer:latest
args:
- --app={{inputs.parameters.app}}
- --env={{inputs.parameters.env}}
- --version={{inputs.parameters.version}}
parameters:
- src:
dependencyName: event
dataKey: app
dest: spec.arguments.parameters.0.value
- src:
dependencyName: event
dataKey: env
dest: spec.arguments.parameters.1.value
- src:
dependencyName: event
dataKey: version
dest: spec.arguments.parameters.2.value
7.6 Practical Examples
7.6.1 GitOps CI/CD Pipeline
# gitops-pipeline.yaml
# EventSource - GitHub
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: github-events
namespace: argo-events
spec:
service:
ports:
- port: 12000
targetPort: 12000
github:
app:
repositories:
- owner: myorg
names:
- frontend
- backend
- api
events:
- push
- pull_request
webhook:
endpoint: /github
port: "12000"
method: POST
apiToken:
name: github-token
key: token
webhookSecret:
name: github-webhook
key: secret
---
# Sensor - CI/CD Pipeline
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: cicd-pipeline
namespace: argo-events
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
# Push to main branch
- name: main-push
eventSourceName: github-events
eventName: app
filters:
data:
- path: body.ref
type: string
value:
- refs/heads/main
- path: headers.X-GitHub-Event
type: string
value:
- push
# Push to develop branch
- name: develop-push
eventSourceName: github-events
eventName: app
filters:
data:
- path: body.ref
type: string
value:
- refs/heads/develop
- path: headers.X-GitHub-Event
type: string
value:
- push
# Pull Request
- name: pull-request
eventSourceName: github-events
eventName: app
filters:
data:
- path: headers.X-GitHub-Event
type: string
value:
- pull_request
- path: body.action
type: string
value:
- opened
- synchronize
triggers:
# Production deployment (main branch)
- template:
name: production-deploy
conditions: main-push
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: prod-deploy-
spec:
entrypoint: deploy-pipeline
serviceAccountName: argo-workflow-sa
arguments:
parameters:
- name: repo
- name: commit
- name: environment
value: production
templates:
- name: deploy-pipeline
dag:
tasks:
- name: build
template: build
- name: test
template: test
dependencies: [build]
- name: security-scan
template: scan
dependencies: [build]
- name: deploy
template: deploy
dependencies: [test, security-scan]
- name: build
container:
image: docker:latest
command: ["/bin/sh", "-c"]
args:
- echo "Building..."
- name: test
container:
image: tester:latest
command: ["/bin/sh", "-c"]
args:
- echo "Testing..."
- name: scan
container:
image: trivy:latest
command: ["/bin/sh", "-c"]
args:
- echo "Scanning..."
- name: deploy
inputs:
parameters:
- name: environment
container:
image: deployer:latest
args:
- --env={{inputs.parameters.environment}}
parameters:
- src:
dependencyName: main-push
dataKey: body.repository.full_name
dest: spec.arguments.parameters.0.value
- src:
dependencyName: main-push
dataKey: body.after
dest: spec.arguments.parameters.1.value
# Staging deployment (develop branch)
- template:
name: staging-deploy
conditions: develop-push
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: staging-deploy-
spec:
entrypoint: deploy
arguments:
parameters:
- name: environment
value: staging
templates:
- name: deploy
container:
image: deployer:latest
args:
- --env={{workflow.parameters.environment}}
# PR tests
- template:
name: pr-tests
conditions: pull-request
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pr-test-
spec:
entrypoint: test
arguments:
parameters:
- name: pr-number
- name: head-sha
templates:
- name: test
dag:
tasks:
- name: lint
template: run-lint
- name: unit-test
template: run-unit-tests
- name: integration-test
template: run-integration-tests
dependencies: [unit-test]
- name: run-lint
container:
image: linter:latest
- name: run-unit-tests
container:
image: tester:latest
- name: run-integration-tests
container:
image: integration-tester:latest
parameters:
- src:
dependencyName: pull-request
dataKey: body.number
dest: spec.arguments.parameters.0.value
- src:
dependencyName: pull-request
dataKey: body.pull_request.head.sha
dest: spec.arguments.parameters.1.value
7.6.2 Data Processing Pipeline
# data-pipeline.yaml
# EventSource - Monitor S3/MinIO file uploads
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: data-upload
namespace: argo-events
spec:
minio:
raw-data:
bucket:
name: data-lake
endpoint: minio.minio:9000
events:
- s3:ObjectCreated:*
filter:
prefix: "raw/"
suffix: ".csv"
insecure: true
accessKey:
name: minio-creds
key: accesskey
secretKey:
name: minio-creds
key: secretkey
---
# Sensor - Data processing
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: data-processor
namespace: argo-events
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: new-file
eventSourceName: data-upload
eventName: raw-data
triggers:
- template:
name: process-data
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: data-process-
spec:
entrypoint: etl-pipeline
arguments:
parameters:
- name: bucket
- name: key
templates:
- name: etl-pipeline
dag:
tasks:
- name: validate
template: validate-data
- name: transform
template: transform-data
dependencies: [validate]
- name: load
template: load-to-warehouse
dependencies: [transform]
- name: notify
template: send-notification
dependencies: [load]
- name: validate-data
inputs:
parameters:
- name: bucket
- name: key
container:
image: data-validator:latest
args:
- --bucket={{inputs.parameters.bucket}}
- --key={{inputs.parameters.key}}
- name: transform-data
container:
image: spark:latest
command: ["/bin/sh", "-c"]
args:
- spark-submit transform.py
- name: load-to-warehouse
container:
image: data-loader:latest
- name: send-notification
container:
image: curlimages/curl:latest
command: ["/bin/sh", "-c"]
args:
- |
curl -X POST https://slack.com/api/chat.postMessage \
-H "Authorization: Bearer $SLACK_TOKEN" \
-d "channel=data-team" \
-d "text=Data processing completed"
parameters:
- src:
dependencyName: new-file
dataKey: notification.s3.bucket.name
dest: spec.arguments.parameters.0.value
- src:
dependencyName: new-file
dataKey: notification.s3.object.key
dest: spec.arguments.parameters.1.value
7.6.3 Alert Response Automation
# alert-response.yaml
# EventSource - Prometheus Alertmanager Webhook
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: alertmanager
namespace: argo-events
spec:
service:
ports:
- port: 12000
targetPort: 12000
webhook:
alerts:
port: "12000"
endpoint: /alertmanager
method: POST
---
# Sensor - Automated alert response
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: alert-responder
namespace: argo-events
spec:
template:
serviceAccountName: argo-events-sa
dependencies:
- name: firing-alert
eventSourceName: alertmanager
eventName: alerts
filters:
data:
- path: body.status
type: string
value:
- firing
triggers:
# High CPU alert - Auto-scale
- template:
name: scale-on-high-cpu
conditions: firing-alert
k8s:
source:
resource:
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: app-hpa
namespace: production
spec:
minReplicas: 5
maxReplicas: 20
operation: patch
parameters:
- src:
dependencyName: firing-alert
dataTemplate: |
{{ range .Input.body.alerts }}
{{ if eq .labels.alertname "HighCPU" }}10{{ end }}
{{ end }}
dest: spec.minReplicas
# Condition filtering
retryStrategy:
steps: 3
# Pod OOM alert - Auto-restart
- template:
name: restart-on-oom
conditions: firing-alert
argoWorkflow:
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: oom-response-
spec:
entrypoint: restart-pod
arguments:
parameters:
- name: namespace
- name: pod
templates:
- name: restart-pod
inputs:
parameters:
- name: namespace
- name: pod
container:
image: bitnami/kubectl:latest
command: ["/bin/sh", "-c"]
args:
- |
kubectl delete pod {{inputs.parameters.pod}} \
-n {{inputs.parameters.namespace}} \
--grace-period=30
parameters:
- src:
dependencyName: firing-alert
dataTemplate: |
{{ range .Input.body.alerts }}
{{ if eq .labels.alertname "PodOOMKilled" }}{{ .labels.namespace }}{{ end }}
{{ end }}
dest: spec.arguments.parameters.0.value
- src:
dependencyName: firing-alert
dataTemplate: |
{{ range .Input.body.alerts }}
{{ if eq .labels.alertname "PodOOMKilled" }}{{ .labels.pod }}{{ end }}
{{ end }}
dest: spec.arguments.parameters.1.value
# All alerts - Send notification
- template:
name: notify-on-any-alert
conditions: firing-alert
http:
url: https://hooks.slack.com/services/xxx/yyy/zzz
method: POST
payload:
- src:
dependencyName: firing-alert
dataTemplate: |
{
"text": "🚨 Alert Firing",
"attachments": [
{{ range $i, $alert := .Input.body.alerts }}
{{ if $i }},{{ end }}
{
"color": "danger",
"title": "{{ $alert.labels.alertname }}",
"text": "{{ $alert.annotations.summary }}",
"fields": [
{"title": "Severity", "value": "{{ $alert.labels.severity }}", "short": true},
{"title": "Instance", "value": "{{ $alert.labels.instance }}", "short": true}
]
}
{{ end }}
]
}
dest: body
7.7 Chapter Summary
This chapter detailed the event-driven capabilities of Argo Events:
🔄 正在渲染 Mermaid 图表...
Key Points:
- EventBus is the core component for event transport
- EventSource supports multiple event sources
- Sensor can define complex event dependencies and conditions
- Trigger supports various target types
- Combined with Argo Workflows, you can build powerful automation pipelines
In the next chapter, we will learn how to integrate various Argo components to build a complete cloud-native CI/CD platform.