Chapter 7: Argo Events Event-Driven

In-depth study of Argo Events core concepts including event sources, sensors, triggers, and building event-driven automation workflows

作者
41min

Argo Events Event-Driven

Chapter 7: Building Event-Driven Architecture with Argo Events

Argo Events is an event-driven workflow automation framework that connects various event sources (such as webhooks, message queues, timers, etc.) with Kubernetes resources or Argo Workflows.

7.1 Argo Events Overview

7.1.1 Event-Driven Architecture

🔄 正在渲染 Mermaid 图表...

7.1.2 Core Components

🔄 正在渲染 Mermaid 图表...

7.1.3 Installing Argo Events

# Create namespace
kubectl create namespace argo-events

# Install Argo Events
kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install.yaml

# Install EventBus (using NATS)
kubectl apply -n argo-events -f https://raw.githubusercontent.com/argoproj/argo-events/stable/manifests/install-validating-webhook.yaml

# Verify installation
kubectl get pods -n argo-events

7.2 EventBus Event Bus

7.2.1 EventBus Concept

EventBus is the transport layer for events, responsible for passing events between EventSource and Sensor.

🔄 正在渲染 Mermaid 图表...

7.2.2 NATS EventBus

# eventbus-nats.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
  namespace: argo-events
spec:
  nats:
    native:
      # Number of replicas
      replicas: 3
      # Authentication strategy
      auth: token
      # Persistence configuration
      persistence:
        storageClassName: standard
        accessMode: ReadWriteOnce
        volumeSize: 10Gi
      # Container resource configuration
      containerTemplate:
        resources:
          requests:
            cpu: 100m
            memory: 128Mi
          limits:
            cpu: 500m
            memory: 512Mi
      # Metrics configuration
      metricsContainerTemplate:
        resources:
          requests:
            cpu: 50m
            memory: 64Mi
      # Anti-affinity
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchLabels:
                  controller: eventbus-controller
              topologyKey: kubernetes.io/hostname

7.2.3 Jetstream EventBus

# eventbus-jetstream.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
  namespace: argo-events
spec:
  jetstream:
    version: "2.9.15"
    replicas: 3
    persistence:
      storageClassName: fast-ssd
      accessMode: ReadWriteOnce
      volumeSize: 20Gi
    # Stream configuration
    streamConfig: |
      max_msgs: 100000
      max_bytes: 1073741824
      max_age: 72h
      max_msg_size: 1048576
      retention: limits
      storage: file
      replicas: 3
    containerTemplate:
      resources:
        requests:
          cpu: 200m
          memory: 256Mi
        limits:
          cpu: 1
          memory: 1Gi

7.2.4 Kafka EventBus

# eventbus-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
  namespace: argo-events
spec:
  kafka:
    url: kafka-cluster.kafka:9092
    topic: argo-events
    version: "3.4.0"
    # TLS configuration
    tls:
      caCertSecret:
        name: kafka-ca
        key: ca.crt
      clientCertSecret:
        name: kafka-client-cert
        key: tls.crt
      clientKeySecret:
        name: kafka-client-key
        key: tls.key
    # SASL authentication
    sasl:
      mechanism: SCRAM-SHA-512
      userSecret:
        name: kafka-credentials
        key: username
      passwordSecret:
        name: kafka-credentials
        key: password
    # Consumer group configuration
    consumerGroup:
      groupName: argo-events-sensors
      rebalanceStrategy: sticky

7.3 EventSource Event Sources

7.3.1 Webhook EventSource

# eventsource-webhook.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: webhook
  namespace: argo-events
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  webhook:
    # Generic webhook
    generic:
      port: "12000"
      endpoint: /generic
      method: POST
    # CI/CD webhook
    cicd:
      port: "12000"
      endpoint: /cicd
      method: POST
      # Filter
      filter:
        expression: "body.action == 'deploy'"
    # Secured webhook
    secure:
      port: "12000"
      endpoint: /secure
      method: POST
      # HMAC authentication
      authSecret:
        name: webhook-secret
        key: token
---
# Expose webhook service
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: webhook-ingress
  namespace: argo-events
spec:
  rules:
  - host: events.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: webhook-eventsource-svc
            port:
              number: 12000

7.3.2 GitHub EventSource

# eventsource-github.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: github
  namespace: argo-events
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  github:
    # Monitor specific repositories
    app-repo:
      # Repository information
      repositories:
      - owner: myorg
        names:
        - myapp
        - mylib
      # Event types to listen for
      events:
      - push
      - pull_request
      - release
      # Webhook configuration
      webhook:
        endpoint: /github/app
        port: "12000"
        method: POST
      # GitHub API configuration
      apiToken:
        name: github-access
        key: token
      # Webhook secret
      webhookSecret:
        name: github-webhook-secret
        key: secret
      # SSL verification
      insecure: false
      # Active status
      active: true
      # Content type
      contentType: json
    # Listen to organization-level events
    org-events:
      owner: myorg
      events:
      - repository
      - member
      - team
      webhook:
        endpoint: /github/org
        port: "12000"
        method: POST
      apiToken:
        name: github-access
        key: token

7.3.3 GitLab EventSource

# eventsource-gitlab.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: gitlab
  namespace: argo-events
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  gitlab:
    app-project:
      # GitLab project ID
      projectID: "12345"
      # Events to listen for
      events:
      - PushEvents
      - MergeRequestEvents
      - TagPushEvents
      - PipelineEvents
      # Webhook configuration
      webhook:
        endpoint: /gitlab/app
        port: "12000"
        method: POST
      # GitLab URL
      gitlabBaseURL: https://gitlab.example.com
      # Access token
      accessToken:
        name: gitlab-access
        key: token
      # Webhook secret token
      secretToken:
        name: gitlab-secret
        key: token
      # SSL configuration
      enableSSLVerification: true

7.3.4 Kafka EventSource

# eventsource-kafka.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: kafka
  namespace: argo-events
spec:
  kafka:
    # Order events
    orders:
      url: kafka-cluster.kafka:9092
      topic: orders
      partition: "0"
      consumerGroup:
        groupName: argo-events-orders
        rebalanceStrategy: sticky
      # JSON data
      jsonBody: true
      # Rate limiting
      limitEventsPerSecond: "100"
    # User events
    users:
      url: kafka-cluster.kafka:9092
      topic: users
      partition: "0"
      consumerGroup:
        groupName: argo-events-users
      jsonBody: true
      # TLS configuration
      tls:
        caCertSecret:
          name: kafka-ca
          key: ca.crt
      # SASL configuration
      sasl:
        mechanism: SCRAM-SHA-256
        userSecret:
          name: kafka-creds
          key: username
        passwordSecret:
          name: kafka-creds
          key: password

7.3.5 S3/MinIO EventSource

# eventsource-s3.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: minio
  namespace: argo-events
spec:
  minio:
    # Monitor file uploads
    file-upload:
      bucket:
        name: uploads
      endpoint: minio.minio:9000
      events:
      - s3:ObjectCreated:*
      - s3:ObjectRemoved:*
      # Filter specific prefix
      filter:
        prefix: "data/"
        suffix: ".csv"
      # Use SSL
      insecure: true
      # Access credentials
      accessKey:
        name: minio-credentials
        key: accesskey
      secretKey:
        name: minio-credentials
        key: secretkey
    # Monitor reports directory
    reports:
      bucket:
        name: reports
      endpoint: minio.minio:9000
      events:
      - s3:ObjectCreated:Put
      filter:
        prefix: "daily/"
        suffix: ".pdf"
      insecure: true
      accessKey:
        name: minio-credentials
        key: accesskey
      secretKey:
        name: minio-credentials
        key: secretkey

7.3.6 Calendar/Cron EventSource

# eventsource-calendar.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: calendar
  namespace: argo-events
spec:
  calendar:
    # Daily report
    daily-report:
      # Cron expression
      schedule: "0 8 * * *"  # Every day at 8 AM
      # Timezone
      timezone: "Asia/Shanghai"
      # Metadata
      metadata:
        task: daily-report
        priority: high
    # Hourly task
    hourly-cleanup:
      schedule: "0 * * * *"  # Every hour
      timezone: "UTC"
      metadata:
        task: cleanup
    # Workday task
    workday-sync:
      schedule: "30 9 * * 1-5"  # Workdays at 9:30
      timezone: "Asia/Shanghai"
      metadata:
        task: data-sync
    # Using interval
    interval-task:
      interval: 30m  # Every 30 minutes
      metadata:
        task: health-check

7.3.7 Redis EventSource

# eventsource-redis.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: redis
  namespace: argo-events
spec:
  redis:
    # Pub/Sub pattern
    notifications:
      hostAddress: redis.redis:6379
      # Subscribed channels
      channels:
      - notifications
      - alerts
      # Password
      password:
        name: redis-secret
        key: password
      # JSON format
      jsonBody: true
      # TLS configuration
      tls:
        caCertSecret:
          name: redis-ca
          key: ca.crt
    # Keyspace notifications
    keyspace:
      hostAddress: redis.redis:6379
      channels:
      - "__keyevent@0__:set"
      - "__keyevent@0__:del"
      password:
        name: redis-secret
        key: password

7.3.8 NATS EventSource

# eventsource-nats.yaml
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: nats
  namespace: argo-events
spec:
  nats:
    # Order subject
    orders:
      url: nats://nats.nats:4222
      subject: orders.>
      jsonBody: true
      # Authentication
      auth:
        credential:
          name: nats-creds
          key: creds
    # Event subject
    events:
      url: nats://nats.nats:4222
      subject: events.*
      jsonBody: true
      # Queue group (load balancing)
      queue: argo-events-group

7.4 Sensor

7.4.1 Basic Sensor Configuration

# sensor-basic.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: webhook-sensor
  namespace: argo-events
spec:
  # Event dependencies
  dependencies:
  - name: webhook-dep
    eventSourceName: webhook
    eventName: generic
  # Triggers
  triggers:
  - template:
      name: log-trigger
      log:
        intervalSeconds: 1

7.4.2 Triggering Argo Workflow

# sensor-workflow.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: workflow-trigger
  namespace: argo-events
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  - name: github-push
    eventSourceName: github
    eventName: app-repo
    # Event filtering
    filters:
      data:
      - path: body.ref
        type: string
        value:
        - refs/heads/main
        - refs/heads/develop
      - path: headers.X-GitHub-Event
        type: string
        value:
        - push
  triggers:
  - template:
      name: trigger-build-workflow
      argoWorkflow:
        # Workflow source
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: build-
            spec:
              entrypoint: build
              arguments:
                parameters:
                - name: repo
                - name: branch
                - name: commit
              templates:
              - name: build
                inputs:
                  parameters:
                  - name: repo
                  - name: branch
                  - name: commit
                container:
                  image: docker:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - |
                    echo "Building repo: {{inputs.parameters.repo}}"
                    echo "Branch: {{inputs.parameters.branch}}"
                    echo "Commit: {{inputs.parameters.commit}}"
        # Parameter mapping
        parameters:
        - src:
            dependencyName: github-push
            dataKey: body.repository.full_name
          dest: spec.arguments.parameters.0.value
        - src:
            dependencyName: github-push
            dataKey: body.ref
            dataTemplate: "{{ .Input | replace \"refs/heads/\" \"\" }}"
          dest: spec.arguments.parameters.1.value
        - src:
            dependencyName: github-push
            dataKey: body.after
          dest: spec.arguments.parameters.2.value

7.4.3 Triggering Kubernetes Resources

# sensor-k8s.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: k8s-trigger
  namespace: argo-events
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  - name: deploy-event
    eventSourceName: webhook
    eventName: cicd
  triggers:
  # Trigger Deployment update
  - template:
      name: update-deployment
      k8s:
        source:
          resource:
            apiVersion: apps/v1
            kind: Deployment
            metadata:
              name: myapp
              namespace: production
            spec:
              template:
                spec:
                  containers:
                  - name: app
                    image: myapp:latest
        operation: patch
        parameters:
        - src:
            dependencyName: deploy-event
            dataKey: body.image
          dest: spec.template.spec.containers.0.image
  # Trigger ConfigMap creation
  - template:
      name: create-configmap
      k8s:
        source:
          resource:
            apiVersion: v1
            kind: ConfigMap
            metadata:
              generateName: config-
              namespace: production
            data:
              version: ""
              timestamp: ""
        operation: create
        parameters:
        - src:
            dependencyName: deploy-event
            dataKey: body.version
          dest: data.version
        - src:
            dependencyName: deploy-event
            dataTemplate: "{{ now | date \"2006-01-02T15:04:05Z\" }}"
          dest: data.timestamp

7.4.4 Triggering HTTP Requests

# sensor-http.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: http-trigger
  namespace: argo-events
spec:
  dependencies:
  - name: alert-event
    eventSourceName: kafka
    eventName: alerts
  triggers:
  # Send Slack notification
  - template:
      name: slack-notification
      http:
        url: https://hooks.slack.com/services/xxx/yyy/zzz
        method: POST
        headers:
          Content-Type: application/json
        payload:
        - src:
            dependencyName: alert-event
            dataTemplate: |
              {
                "text": "Alert: {{ .Input.body.message }}",
                "attachments": [{
                  "color": "danger",
                  "fields": [{
                    "title": "Service",
                    "value": "{{ .Input.body.service }}",
                    "short": true
                  }, {
                    "title": "Severity",
                    "value": "{{ .Input.body.severity }}",
                    "short": true
                  }]
                }]
              }
          dest: body
  # Call external API
  - template:
      name: external-api
      http:
        url: https://api.example.com/webhook
        method: POST
        headers:
          Authorization: "Bearer ${API_TOKEN}"
          Content-Type: application/json
        secureHeaders:
        - name: Authorization
          valueFrom:
            secretKeyRef:
              name: api-credentials
              key: token
        payload:
        - src:
            dependencyName: alert-event
            dataKey: body
          dest: body

7.4.5 Conditional Triggering

# sensor-conditions.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: conditional-sensor
  namespace: argo-events
spec:
  dependencies:
  - name: github-push
    eventSourceName: github
    eventName: app-repo
  - name: github-pr
    eventSourceName: github
    eventName: app-repo
    filters:
      data:
      - path: headers.X-GitHub-Event
        type: string
        value:
        - pull_request
  triggers:
  # Deploy only when pushing to main
  - template:
      name: deploy-production
      conditions: github-push
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: deploy-prod-
            spec:
              entrypoint: deploy
              templates:
              - name: deploy
                container:
                  image: deployer:latest
                  args: ["--env", "production"]
    # Only when pushing to main branch
    retryStrategy:
      steps: 3
      duration: 1m
  # PR event triggers tests
  - template:
      name: run-tests
      conditions: github-pr
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: test-pr-
            spec:
              entrypoint: test
              templates:
              - name: test
                container:
                  image: tester:latest

7.4.6 Multiple Event Dependencies

# sensor-multi-deps.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: multi-dependency-sensor
  namespace: argo-events
spec:
  dependencies:
  # Code push event
  - name: code-push
    eventSourceName: github
    eventName: app-repo
    filters:
      data:
      - path: headers.X-GitHub-Event
        type: string
        value:
        - push
  # Configuration update event
  - name: config-update
    eventSourceName: webhook
    eventName: config
  # Scheduled event
  - name: scheduled
    eventSourceName: calendar
    eventName: daily-deploy
  triggers:
  # Triggered by any event
  - template:
      name: any-event-trigger
      conditions: "code-push || config-update"
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: build-
            spec:
              entrypoint: main
              templates:
              - name: main
                container:
                  image: builder:latest
  # Triggered when all events are satisfied
  - template:
      name: all-events-trigger
      # Requires both code push and config update
      conditions: "code-push && config-update"
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: full-deploy-
            spec:
              entrypoint: deploy
              templates:
              - name: deploy
                container:
                  image: deployer:latest

7.5 Event Filtering and Transformation

7.5.1 Data Filtering

# sensor-data-filter.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: filtered-sensor
  namespace: argo-events
spec:
  dependencies:
  - name: webhook-event
    eventSourceName: webhook
    eventName: generic
    filters:
      # Data filtering
      data:
      # String matching
      - path: body.action
        type: string
        value:
        - deploy
        - rollback
      # Numeric comparison
      - path: body.priority
        type: number
        comparator: ">="
        value:
        - "5"
      # Boolean value
      - path: body.enabled
        type: bool
        value:
        - "true"
      # Regular expression
      - path: body.version
        type: string
        value:
        - "v[0-9]+\\.[0-9]+\\.[0-9]+"
      # Time filtering
      time:
        start: "09:00:00"
        stop: "18:00:00"
      # Expression filtering
      exprs:
      - expr: body.environment == "production" && body.approved == true
        fields:
        - name: body.environment
          path: body.environment
        - name: body.approved
          path: body.approved
  triggers:
  - template:
      name: filtered-trigger
      log:
        intervalSeconds: 1

7.5.2 Context Filtering

# sensor-context-filter.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: context-filtered-sensor
  namespace: argo-events
spec:
  dependencies:
  - name: github-event
    eventSourceName: github
    eventName: app-repo
    filters:
      # Context filtering
      context:
        # Event source filtering
        source: github
        # Event type filtering
        type: push
        # Subject filtering
        subject: refs/heads/main
  triggers:
  - template:
      name: main-branch-only
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: main-deploy-
            spec:
              entrypoint: deploy
              templates:
              - name: deploy
                container:
                  image: deployer:latest

7.5.3 Data Transformation

# sensor-transform.yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: transform-sensor
  namespace: argo-events
spec:
  dependencies:
  - name: event
    eventSourceName: webhook
    eventName: generic
    # Event transformation
    transform:
      jq: ".body | {app: .application, env: .environment, version: .version, timestamp: now}"
    # Or use Lua script
    # transform:
    #   script: |
    #     local event = obj.body
    #     return {
    #       app = event.application,
    #       env = event.environment,
    #       version = event.version,
    #       timestamp = os.time()
    #     }
  triggers:
  - template:
      name: transformed-trigger
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: deploy-
            spec:
              entrypoint: main
              arguments:
                parameters:
                - name: app
                - name: env
                - name: version
              templates:
              - name: main
                inputs:
                  parameters:
                  - name: app
                  - name: env
                  - name: version
                container:
                  image: deployer:latest
                  args:
                  - --app={{inputs.parameters.app}}
                  - --env={{inputs.parameters.env}}
                  - --version={{inputs.parameters.version}}
        parameters:
        - src:
            dependencyName: event
            dataKey: app
          dest: spec.arguments.parameters.0.value
        - src:
            dependencyName: event
            dataKey: env
          dest: spec.arguments.parameters.1.value
        - src:
            dependencyName: event
            dataKey: version
          dest: spec.arguments.parameters.2.value

7.6 Practical Examples

7.6.1 GitOps CI/CD Pipeline

# gitops-pipeline.yaml
# EventSource - GitHub
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: github-events
  namespace: argo-events
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  github:
    app:
      repositories:
      - owner: myorg
        names:
        - frontend
        - backend
        - api
      events:
      - push
      - pull_request
      webhook:
        endpoint: /github
        port: "12000"
        method: POST
      apiToken:
        name: github-token
        key: token
      webhookSecret:
        name: github-webhook
        key: secret
---
# Sensor - CI/CD Pipeline
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: cicd-pipeline
  namespace: argo-events
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  # Push to main branch
  - name: main-push
    eventSourceName: github-events
    eventName: app
    filters:
      data:
      - path: body.ref
        type: string
        value:
        - refs/heads/main
      - path: headers.X-GitHub-Event
        type: string
        value:
        - push
  # Push to develop branch
  - name: develop-push
    eventSourceName: github-events
    eventName: app
    filters:
      data:
      - path: body.ref
        type: string
        value:
        - refs/heads/develop
      - path: headers.X-GitHub-Event
        type: string
        value:
        - push
  # Pull Request
  - name: pull-request
    eventSourceName: github-events
    eventName: app
    filters:
      data:
      - path: headers.X-GitHub-Event
        type: string
        value:
        - pull_request
      - path: body.action
        type: string
        value:
        - opened
        - synchronize
  triggers:
  # Production deployment (main branch)
  - template:
      name: production-deploy
      conditions: main-push
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: prod-deploy-
            spec:
              entrypoint: deploy-pipeline
              serviceAccountName: argo-workflow-sa
              arguments:
                parameters:
                - name: repo
                - name: commit
                - name: environment
                  value: production
              templates:
              - name: deploy-pipeline
                dag:
                  tasks:
                  - name: build
                    template: build
                  - name: test
                    template: test
                    dependencies: [build]
                  - name: security-scan
                    template: scan
                    dependencies: [build]
                  - name: deploy
                    template: deploy
                    dependencies: [test, security-scan]
              - name: build
                container:
                  image: docker:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - echo "Building..."
              - name: test
                container:
                  image: tester:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - echo "Testing..."
              - name: scan
                container:
                  image: trivy:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - echo "Scanning..."
              - name: deploy
                inputs:
                  parameters:
                  - name: environment
                container:
                  image: deployer:latest
                  args:
                  - --env={{inputs.parameters.environment}}
        parameters:
        - src:
            dependencyName: main-push
            dataKey: body.repository.full_name
          dest: spec.arguments.parameters.0.value
        - src:
            dependencyName: main-push
            dataKey: body.after
          dest: spec.arguments.parameters.1.value
  # Staging deployment (develop branch)
  - template:
      name: staging-deploy
      conditions: develop-push
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: staging-deploy-
            spec:
              entrypoint: deploy
              arguments:
                parameters:
                - name: environment
                  value: staging
              templates:
              - name: deploy
                container:
                  image: deployer:latest
                  args:
                  - --env={{workflow.parameters.environment}}
  # PR tests
  - template:
      name: pr-tests
      conditions: pull-request
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: pr-test-
            spec:
              entrypoint: test
              arguments:
                parameters:
                - name: pr-number
                - name: head-sha
              templates:
              - name: test
                dag:
                  tasks:
                  - name: lint
                    template: run-lint
                  - name: unit-test
                    template: run-unit-tests
                  - name: integration-test
                    template: run-integration-tests
                    dependencies: [unit-test]
              - name: run-lint
                container:
                  image: linter:latest
              - name: run-unit-tests
                container:
                  image: tester:latest
              - name: run-integration-tests
                container:
                  image: integration-tester:latest
        parameters:
        - src:
            dependencyName: pull-request
            dataKey: body.number
          dest: spec.arguments.parameters.0.value
        - src:
            dependencyName: pull-request
            dataKey: body.pull_request.head.sha
          dest: spec.arguments.parameters.1.value

7.6.2 Data Processing Pipeline

# data-pipeline.yaml
# EventSource - Monitor S3/MinIO file uploads
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: data-upload
  namespace: argo-events
spec:
  minio:
    raw-data:
      bucket:
        name: data-lake
      endpoint: minio.minio:9000
      events:
      - s3:ObjectCreated:*
      filter:
        prefix: "raw/"
        suffix: ".csv"
      insecure: true
      accessKey:
        name: minio-creds
        key: accesskey
      secretKey:
        name: minio-creds
        key: secretkey
---
# Sensor - Data processing
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: data-processor
  namespace: argo-events
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  - name: new-file
    eventSourceName: data-upload
    eventName: raw-data
  triggers:
  - template:
      name: process-data
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: data-process-
            spec:
              entrypoint: etl-pipeline
              arguments:
                parameters:
                - name: bucket
                - name: key
              templates:
              - name: etl-pipeline
                dag:
                  tasks:
                  - name: validate
                    template: validate-data
                  - name: transform
                    template: transform-data
                    dependencies: [validate]
                  - name: load
                    template: load-to-warehouse
                    dependencies: [transform]
                  - name: notify
                    template: send-notification
                    dependencies: [load]
              - name: validate-data
                inputs:
                  parameters:
                  - name: bucket
                  - name: key
                container:
                  image: data-validator:latest
                  args:
                  - --bucket={{inputs.parameters.bucket}}
                  - --key={{inputs.parameters.key}}
              - name: transform-data
                container:
                  image: spark:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - spark-submit transform.py
              - name: load-to-warehouse
                container:
                  image: data-loader:latest
              - name: send-notification
                container:
                  image: curlimages/curl:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - |
                    curl -X POST https://slack.com/api/chat.postMessage \
                      -H "Authorization: Bearer $SLACK_TOKEN" \
                      -d "channel=data-team" \
                      -d "text=Data processing completed"
        parameters:
        - src:
            dependencyName: new-file
            dataKey: notification.s3.bucket.name
          dest: spec.arguments.parameters.0.value
        - src:
            dependencyName: new-file
            dataKey: notification.s3.object.key
          dest: spec.arguments.parameters.1.value

7.6.3 Alert Response Automation

# alert-response.yaml
# EventSource - Prometheus Alertmanager Webhook
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: alertmanager
  namespace: argo-events
spec:
  service:
    ports:
    - port: 12000
      targetPort: 12000
  webhook:
    alerts:
      port: "12000"
      endpoint: /alertmanager
      method: POST
---
# Sensor - Automated alert response
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: alert-responder
  namespace: argo-events
spec:
  template:
    serviceAccountName: argo-events-sa
  dependencies:
  - name: firing-alert
    eventSourceName: alertmanager
    eventName: alerts
    filters:
      data:
      - path: body.status
        type: string
        value:
        - firing
  triggers:
  # High CPU alert - Auto-scale
  - template:
      name: scale-on-high-cpu
      conditions: firing-alert
      k8s:
        source:
          resource:
            apiVersion: autoscaling/v2
            kind: HorizontalPodAutoscaler
            metadata:
              name: app-hpa
              namespace: production
            spec:
              minReplicas: 5
              maxReplicas: 20
        operation: patch
        parameters:
        - src:
            dependencyName: firing-alert
            dataTemplate: |
              {{ range .Input.body.alerts }}
              {{ if eq .labels.alertname "HighCPU" }}10{{ end }}
              {{ end }}
          dest: spec.minReplicas
      # Condition filtering
      retryStrategy:
        steps: 3
  # Pod OOM alert - Auto-restart
  - template:
      name: restart-on-oom
      conditions: firing-alert
      argoWorkflow:
        source:
          resource:
            apiVersion: argoproj.io/v1alpha1
            kind: Workflow
            metadata:
              generateName: oom-response-
            spec:
              entrypoint: restart-pod
              arguments:
                parameters:
                - name: namespace
                - name: pod
              templates:
              - name: restart-pod
                inputs:
                  parameters:
                  - name: namespace
                  - name: pod
                container:
                  image: bitnami/kubectl:latest
                  command: ["/bin/sh", "-c"]
                  args:
                  - |
                    kubectl delete pod {{inputs.parameters.pod}} \
                      -n {{inputs.parameters.namespace}} \
                      --grace-period=30
        parameters:
        - src:
            dependencyName: firing-alert
            dataTemplate: |
              {{ range .Input.body.alerts }}
              {{ if eq .labels.alertname "PodOOMKilled" }}{{ .labels.namespace }}{{ end }}
              {{ end }}
          dest: spec.arguments.parameters.0.value
        - src:
            dependencyName: firing-alert
            dataTemplate: |
              {{ range .Input.body.alerts }}
              {{ if eq .labels.alertname "PodOOMKilled" }}{{ .labels.pod }}{{ end }}
              {{ end }}
          dest: spec.arguments.parameters.1.value
  # All alerts - Send notification
  - template:
      name: notify-on-any-alert
      conditions: firing-alert
      http:
        url: https://hooks.slack.com/services/xxx/yyy/zzz
        method: POST
        payload:
        - src:
            dependencyName: firing-alert
            dataTemplate: |
              {
                "text": "🚨 Alert Firing",
                "attachments": [
                  {{ range $i, $alert := .Input.body.alerts }}
                  {{ if $i }},{{ end }}
                  {
                    "color": "danger",
                    "title": "{{ $alert.labels.alertname }}",
                    "text": "{{ $alert.annotations.summary }}",
                    "fields": [
                      {"title": "Severity", "value": "{{ $alert.labels.severity }}", "short": true},
                      {"title": "Instance", "value": "{{ $alert.labels.instance }}", "short": true}
                    ]
                  }
                  {{ end }}
                ]
              }
          dest: body

7.7 Chapter Summary

This chapter detailed the event-driven capabilities of Argo Events:

🔄 正在渲染 Mermaid 图表...

Key Points:

  1. EventBus is the core component for event transport
  2. EventSource supports multiple event sources
  3. Sensor can define complex event dependencies and conditions
  4. Trigger supports various target types
  5. Combined with Argo Workflows, you can build powerful automation pipelines

In the next chapter, we will learn how to integrate various Argo components to build a complete cloud-native CI/CD platform.