Todo o código utilizado no artigo está no repositório pedrohff/pocairflowkubernetesspark

Tive uma missão recentemente de analisar a possibilidade de implantar o Airflow no Kubernetes, principalmente pelo fato da minha equipe já ter passado por um ambiente hosteado numa única máquina no EC2, como também utilizando o MWAA. Ambas as alternativas trouxeram alguns tradeoffs que dificultam o uso ou melhorias no ambiente do Airflow.

Essa configuração teve a premissa de ser totalmente local, sem nenhum acesso à serviço externo (como Git ou bancos) facilitando o máximo sua replicação. Para isso executei vários testes e com ajuda de colegas consegui vencer os três grandes problemas enfrentados: falta de logs, configuração de dags e permissões nos ambientes.

DAGs e Logs

Uma vez que parti tentando entender melhor o funcionamento do Airflow, ficou claro que ele deve ter acesso a um diretório de DAGs, e que deve ser compartilhado entre todos seus componentes. O mesmo não aconteceu para os logs, já que buscava sempre entender os conceitos da plataforma de workflow, mas não as melhores práticas para sua implantação em ambientes self-managed.

A configuração de ambos os pontos foi bem semelhante e partiu da configuração do cluster local de Kubernetes com o minikube. Para iniciar o minikube, foram passados alguns parâmetros que diziam respeito a o diretório mount que consta no repositório desse artigo, como também ao gid e uid: o grupo e o usuário do SO, respectivamente, que devem ter acesso aos diretórios montados.

minikube start --mount-string="$(PWD)/mount:/mnt/airflow" --mount --mount-gid 0 --mount-uid 50000

Uma vez montado esse diretório no host da VM em que o Kubernetes vai operar, temos que realizar algumas instruções no Airflow; para instalá-lo usei o Helm Chart da ferramenta, e no seu arquivo de configuração, foram declarados o gid e uid já citados, o diretório em que as DAGs e os logs devem residir, como também foi feita uma customização no modelo em que os pods dos workers do Airflow devem seguir: o pod_template. Sem essa alteração, o diretório de log não fica vinculado às tasks.


scheduler:
  replicas: 1
  securityContext: # SecurityContext aplicado para que o(s) pods possam ter acesso ao disco montado para logs
    runAsUser: 50000
    fsGroup: 0
    runAsGroup: 0
  extraVolumes:
    - name: dags
      hostPath:
        path: "/mnt/airflow/dags"
  extraVolumeMounts:
    - name: dags
      mountPath: "/opt/airflow/dags"

triggerer:
  securityContext: # SecurityContext aplicado para que o(s) pods possam ter acesso ao disco montado para logs
    runAsUser: 50000
    fsGroup: 0
    runAsGroup: 0
  extraVolumes:
    - name: dags
      hostPath:
        path: "/mnt/airflow/dags"
  extraVolumeMounts:
    - name: dags
      mountPath: "/opt/airflow/dags"

workers:
  securityContext:
    runAsUser: 50000
    fsGroup: 0
    runAsGroup: 0
  replicas: 1
  extraVolumes:
    - name: dags
      hostPath:
        path: "/mnt/airflow/dags"
  extraVolumeMounts:
    - name: dags
      mountPath: "/opt/airflow/dags"

dags:
  persistence:
    enabled: false # Persistência de dags só é necessário quando utilizado o GitSync

config:
  core:
    dags_folder: "/opt/airflow/dags" # Configura diretório que as dags irão residir


# SecurityContext aplicado para acesso ao mount de logs
securityContext:
  runAsUser: 50000
  fsGroup: 0
  runAsGroup: 0

# Configuração de logs
# Depende que o arquivo "logs-pv.yaml" seja aplicado anteriormente
logs:
  persistence:
    # Enable persistent volume for storing logs
    enabled: true
    # Volume size for logs
    size: 2Gi
    # If using a custom storageClass, pass name here
    storageClassName: localsc

podTemplate: |
  ---
  apiVersion: v1
  kind: Pod
  metadata:
    name: dummy-name
    labels:
      tier: airflow
      component: worker
      release: airflow
  spec:
    containers:
      - envFrom:
          []
        env:
          - name: AIRFLOW__CORE__EXECUTOR
            value: KubernetesExecutor
          # Hard Coded Airflow Envs
          - name: AIRFLOW__CORE__FERNET_KEY
            valueFrom:
              secretKeyRef:
                name: airflow-fernet-key
                key: fernet-key
          - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
            valueFrom:
              secretKeyRef:
                name: airflow-airflow-metadata
                key: connection
          - name: AIRFLOW_CONN_AIRFLOW_DB
            valueFrom:
              secretKeyRef:
                name: airflow-airflow-metadata
                key: connection
          - name: AIRFLOW__WEBSERVER__SECRET_KEY
            valueFrom:
              secretKeyRef:
                name: airflow-webserver-secret-key
                key: webserver-secret-key
          # Dynamically created environment variables
          # Dynamically created secret envs
          - name: AIRFLOW_CONN_KUBERNETES_DEFAULT
            valueFrom:
              secretKeyRef:
                name: airflow-airflow-connections
                key: AIRFLOW_CONN_KUBERNETES_DEFAULT
          - name: AIRFLOW__KUBERNETES_SECRETS__AIRFLOW_CONN_KUBERNETES_DEFAULT
            value: airflow-airflow-connections=AIRFLOW_CONN_KUBERNETES_DEFAULT

          # Extra env
        image: apache/airflow:2.2.4
        imagePullPolicy: IfNotPresent
        name: base
        resources:
          {}
        volumeMounts:
          - mountPath: "/opt/airflow/logs"
            name: logs
          - name: config
            mountPath: "/opt/airflow/airflow.cfg"
            subPath: airflow.cfg
            readOnly: true
          - name: config
            mountPath: "/opt/airflow/config/airflow_local_settings.py"
            subPath: airflow_local_settings.py
            readOnly: true
          - mountPath: /opt/airflow/dags
            name: dags
    restartPolicy: Never
    securityContext:
      runAsUser: 50000
      fsGroup: 0
    nodeSelector:
      {}
    affinity:
      {}
    tolerations:
      []
    serviceAccountName: airflow-worker
    volumes:
      - name: logs
        persistentVolumeClaim:
          claimName: airflow-logs
      - configMap:
          name: airflow-airflow-config
        name: config
      - hostPath:
          path: /mnt/airflow/dags
        name: dags  

Com o arquivo acima salvo, execute o comando helm repo add apache-airflow https://airflow.apache.org && helm upgrade --install airflow apache-airflow/airflow -n airflow -f airflow/config.yaml --create-namespace --version 1.5.0 --timeout 10m0s para que o Airflow seja instalado no cluster.

Utilize do kubectl port-forward svc/airflow-webserver 8080 -n airflow para ter acesso à interface visual.

Por fim, criei de forma externa ao Helm dois recursos do Kubernetes: um StorageAccount e um PersistentVolume. Ambos serviram para dar acesso ao diretório de logs, uma vez que o mesmo é conectado no Helm via StorageAccount. Para aplicar a alteração, executamos o kubectl apply -f {sa_pv_file}, que deve ter a seguinte configuração:

kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: localsc
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: Immediate
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: localpv
  annotations:
    pv.beta.kubernetes.io/gid: "0"
spec:
  capacity:
    storage: 5Gi
  accessModes:
    - ReadWriteMany 
  persistentVolumeReclaimPolicy: Retain
  storageClassName: localsc
  hostPath:
    path: /mnt/airflow/logs
  nodeAffinity:
    required:
      nodeSelectorTerms:
        - matchExpressions:
            - key: kubernetes.io/hostname
              operator: In
              values:
                - minikube

Permissões do Airflow

Integrar o Airflow com o Kubernetes não é um desafio muito grande quando se quer apenas executar tarefas em pods distintos. Porém, a integração com o SparkKubernetesOperator requer uma Connection. Tradicionalmente as Connections são criadas pela interface do Airflow, mas por questões de comodidade, optei por fazê-lo no arquivo de configuração do Helm, adicionando o seguinte trecho:

# As configurações abaixo de secret e extraSecrets são utilizadas para configurar o Airflow Connection com o Kubernetes
# O KubernetesExecutor e o KubernetesOperator não requerem tal configuração, já o SparkKubernetesOperator sim, então abaixo
# fazemos a configuração de forma automatizada, sem a necessidade de fazer via interface.
# OBS: a Connection não é exibida na interface.
secret:
  - envName: "AIRFLOW_CONN_KUBERNETES_DEFAULT"
    secretName: 'airflow-airflow-connections'
    secretKey: "AIRFLOW_CONN_KUBERNETES_DEFAULT"
extraSecrets:
  'airflow-airflow-connections':
    type: 'Opaque' # O valor da chave abaixo é o seguinte, encriptado em Base64: kubernetes://?extra__kubernetes__in_cluster=True&conn_id=kubernetes_default
    data: |
      AIRFLOW_CONN_KUBERNETES_DEFAULT: 'a3ViZXJuZXRlczovLz9leHRyYV9fa3ViZXJuZXRlc19faW5fY2x1c3Rlcj1UcnVlJmNvbm5faWQ9a3ViZXJuZXRlc19kZWZhdWx0'      

SparkOperator e suas permissões

Para instalar o Spark Operator, utilizei as configurações padrão.

Apesar do Operator instalar um pod no namespace selecionado, ele não é a parte principal: temos uma nova API disponibilizada no kubectl chamada sparkapp, que é o componente responsável por executar tarefas clusterizadas.

E o instalei via helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator && helm upgrade --install sparkop spark-operator/spark-operator -n spark-operator --create-namespace --version 1.1.20 --set webhook.enable=true --timeout 10m0s. Feito isso e tentando executar uma DAG, foram encontrados alguns problemas de permissão, e para contorná-los, foi necessário aplicar o seguinte script no cluster:

# https://stackoverflow.com/questions/68371840/unable-to-create-sparkapplications-on-kubernetes-cluster-using-sparkkubernetesop
# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: spark-cluster-cr
  namespace: airflow
  labels:
    rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
  - apiGroups:
      - sparkoperator.k8s.io
    resources:
      - sparkapplications
    verbs:
      - '*'
---
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: airflow-spark-crb
  namespace: airflow
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: spark-cluster-cr
subjects:
  - kind: ServiceAccount
    name: airflow-cluster
    namespace: airflow
  - kind: ServiceAccount
    name: airflow-scheduler
    namespace: airflow
  - kind: ServiceAccount
    name: airflow-worker
    namespace: airflow
  - kind: ServiceAccount
    name: airflow-triggerer
    namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  creationTimestamp: null
  name: spark-operator-role-new
  namespace: default
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: edit
subjects:
- kind: ServiceAccount
  name: sparkoperator
  namespace: default
---
apiVersion: v1
kind: ServiceAccount
metadata:
  creationTimestamp: null
  name: sparkoperator
  namespace: default

Nele criamos um ServiceAccount, que pode ser entendido como um usuário que tem permissão de fazer requisições na API do K8s; para esse sa criamos uma ClusterRole, que podemos tratá-lo como uma política e um ClusterRoleBinding que liga o usuário à política.

Aplicando o script com o kubectl apply -f {sa_cr_crb_file}, podemos criar nossos SparkApp e o executar sempre no namespace configurado (nesse caso o default).

Conclusões e lições aprendidas

Com esse material espero ajudar engenheiros de dados, de software ou SRE’s que passem por um desafio semelhante, visto que a comunidade tem muito pouco conteúdo acerca do assunto.

Quando precisar novamente explorar uma ferramenta desconhecida, fica a principal lição aprendida: além de estudar a fundo seus componentes, devo sempre verificar as boas práticas de como eles devem estar num ambiente produtivo, assim como diz neste tutorial do Airflow. O resultado dessa pesquisa não para por aqui, e ainda há um longo caminho pra levar as ferramentas a um ambiente, mesmo que de teste. Quem sabe novos problemas surjam e esse post não ganha uma continuação!?