Todo o código utilizado no artigo está no repositório pedrohff/pocairflowkubernetesspark
Tive uma missão recentemente de analisar a possibilidade de implantar o Airflow no Kubernetes, principalmente pelo fato da minha equipe já ter passado por um ambiente hosteado numa única máquina no EC2, como também utilizando o MWAA. Ambas as alternativas trouxeram alguns tradeoffs que dificultam o uso ou melhorias no ambiente do Airflow.
Essa configuração teve a premissa de ser totalmente local, sem nenhum acesso à serviço externo (como Git ou bancos) facilitando o máximo sua replicação. Para isso executei vários testes e com ajuda de colegas consegui vencer os três grandes problemas enfrentados: falta de logs, configuração de dags e permissões nos ambientes.
DAGs e Logs
Uma vez que parti tentando entender melhor o funcionamento do Airflow, ficou claro que ele deve ter acesso a um diretório de DAGs, e que deve ser compartilhado entre todos seus componentes. O mesmo não aconteceu para os logs, já que buscava sempre entender os conceitos da plataforma de workflow, mas não as melhores práticas para sua implantação em ambientes self-managed.
A configuração de ambos os pontos foi bem semelhante e partiu da configuração do cluster local de Kubernetes com o minikube. Para iniciar o minikube, foram passados alguns parâmetros que diziam respeito a o diretório mount
que consta no repositório desse artigo, como também ao gid
e uid
: o grupo e o usuário do SO, respectivamente, que devem ter acesso aos diretórios montados.
minikube start --mount-string="$(PWD)/mount:/mnt/airflow" --mount --mount-gid 0 --mount-uid 50000
Uma vez montado esse diretório no host da VM em que o Kubernetes vai operar, temos que realizar algumas instruções no Airflow; para instalá-lo usei o Helm Chart da ferramenta, e no seu arquivo de configuração, foram declarados o gid
e uid
já citados, o diretório em que as DAGs e os logs devem residir, como também foi feita uma customização no modelo em que os pods dos workers do Airflow devem seguir: o pod_template
. Sem essa alteração, o diretório de log não fica vinculado às tasks.
scheduler:
replicas: 1
securityContext: # SecurityContext aplicado para que o(s) pods possam ter acesso ao disco montado para logs
runAsUser: 50000
fsGroup: 0
runAsGroup: 0
extraVolumes:
- name: dags
hostPath:
path: "/mnt/airflow/dags"
extraVolumeMounts:
- name: dags
mountPath: "/opt/airflow/dags"
triggerer:
securityContext: # SecurityContext aplicado para que o(s) pods possam ter acesso ao disco montado para logs
runAsUser: 50000
fsGroup: 0
runAsGroup: 0
extraVolumes:
- name: dags
hostPath:
path: "/mnt/airflow/dags"
extraVolumeMounts:
- name: dags
mountPath: "/opt/airflow/dags"
workers:
securityContext:
runAsUser: 50000
fsGroup: 0
runAsGroup: 0
replicas: 1
extraVolumes:
- name: dags
hostPath:
path: "/mnt/airflow/dags"
extraVolumeMounts:
- name: dags
mountPath: "/opt/airflow/dags"
dags:
persistence:
enabled: false # Persistência de dags só é necessário quando utilizado o GitSync
config:
core:
dags_folder: "/opt/airflow/dags" # Configura diretório que as dags irão residir
# SecurityContext aplicado para acesso ao mount de logs
securityContext:
runAsUser: 50000
fsGroup: 0
runAsGroup: 0
# Configuração de logs
# Depende que o arquivo "logs-pv.yaml" seja aplicado anteriormente
logs:
persistence:
# Enable persistent volume for storing logs
enabled: true
# Volume size for logs
size: 2Gi
# If using a custom storageClass, pass name here
storageClassName: localsc
podTemplate: |
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
labels:
tier: airflow
component: worker
release: airflow
spec:
containers:
- envFrom:
[]
env:
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: airflow-fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: AIRFLOW__WEBSERVER__SECRET_KEY
valueFrom:
secretKeyRef:
name: airflow-webserver-secret-key
key: webserver-secret-key
# Dynamically created environment variables
# Dynamically created secret envs
- name: AIRFLOW_CONN_KUBERNETES_DEFAULT
valueFrom:
secretKeyRef:
name: airflow-airflow-connections
key: AIRFLOW_CONN_KUBERNETES_DEFAULT
- name: AIRFLOW__KUBERNETES_SECRETS__AIRFLOW_CONN_KUBERNETES_DEFAULT
value: airflow-airflow-connections=AIRFLOW_CONN_KUBERNETES_DEFAULT
# Extra env
image: apache/airflow:2.2.4
imagePullPolicy: IfNotPresent
name: base
resources:
{}
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: logs
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
- name: config
mountPath: "/opt/airflow/config/airflow_local_settings.py"
subPath: airflow_local_settings.py
readOnly: true
- mountPath: /opt/airflow/dags
name: dags
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 0
nodeSelector:
{}
affinity:
{}
tolerations:
[]
serviceAccountName: airflow-worker
volumes:
- name: logs
persistentVolumeClaim:
claimName: airflow-logs
- configMap:
name: airflow-airflow-config
name: config
- hostPath:
path: /mnt/airflow/dags
name: dags
Com o arquivo acima salvo, execute o comando helm repo add apache-airflow https://airflow.apache.org && helm upgrade --install airflow apache-airflow/airflow -n airflow -f airflow/config.yaml --create-namespace --version 1.5.0 --timeout 10m0s
para que o Airflow seja instalado no cluster.
Utilize do
kubectl port-forward svc/airflow-webserver 8080 -n airflow
para ter acesso à interface visual.
Por fim, criei de forma externa ao Helm dois recursos do Kubernetes: um StorageAccount e um PersistentVolume. Ambos serviram para dar acesso ao diretório de logs, uma vez que o mesmo é conectado no Helm via StorageAccount. Para aplicar a alteração, executamos o kubectl apply -f {sa_pv_file}
, que deve ter a seguinte configuração:
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: localsc
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: Immediate
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: localpv
annotations:
pv.beta.kubernetes.io/gid: "0"
spec:
capacity:
storage: 5Gi
accessModes:
- ReadWriteMany
persistentVolumeReclaimPolicy: Retain
storageClassName: localsc
hostPath:
path: /mnt/airflow/logs
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- minikube
Permissões do Airflow
Integrar o Airflow com o Kubernetes não é um desafio muito grande quando se quer apenas executar tarefas em pods distintos.
Porém, a integração com o SparkKubernetesOperator
requer uma Connection. Tradicionalmente as Connections são criadas pela interface do Airflow, mas por questões de comodidade, optei por fazê-lo no arquivo de configuração do Helm, adicionando o seguinte trecho:
# As configurações abaixo de secret e extraSecrets são utilizadas para configurar o Airflow Connection com o Kubernetes
# O KubernetesExecutor e o KubernetesOperator não requerem tal configuração, já o SparkKubernetesOperator sim, então abaixo
# fazemos a configuração de forma automatizada, sem a necessidade de fazer via interface.
# OBS: a Connection não é exibida na interface.
secret:
- envName: "AIRFLOW_CONN_KUBERNETES_DEFAULT"
secretName: 'airflow-airflow-connections'
secretKey: "AIRFLOW_CONN_KUBERNETES_DEFAULT"
extraSecrets:
'airflow-airflow-connections':
type: 'Opaque' # O valor da chave abaixo é o seguinte, encriptado em Base64: kubernetes://?extra__kubernetes__in_cluster=True&conn_id=kubernetes_default
data: |
AIRFLOW_CONN_KUBERNETES_DEFAULT: 'a3ViZXJuZXRlczovLz9leHRyYV9fa3ViZXJuZXRlc19faW5fY2x1c3Rlcj1UcnVlJmNvbm5faWQ9a3ViZXJuZXRlc19kZWZhdWx0'
SparkOperator e suas permissões
Para instalar o Spark Operator, utilizei as configurações padrão.
Apesar do Operator instalar um pod no namespace selecionado, ele não é a parte principal: temos uma nova API disponibilizada no
kubectl
chamadasparkapp
, que é o componente responsável por executar tarefas clusterizadas.
E o instalei via helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator && helm upgrade --install sparkop spark-operator/spark-operator -n spark-operator --create-namespace --version 1.1.20 --set webhook.enable=true --timeout 10m0s
. Feito isso e tentando executar uma DAG, foram encontrados alguns problemas de permissão, e para contorná-los, foi necessário aplicar o seguinte script no cluster:
# https://stackoverflow.com/questions/68371840/unable-to-create-sparkapplications-on-kubernetes-cluster-using-sparkkubernetesop
# Role for spark-on-k8s-operator to create resources on cluster
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spark-cluster-cr
namespace: airflow
labels:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
- apiGroups:
- sparkoperator.k8s.io
resources:
- sparkapplications
verbs:
- '*'
---
# Allow airflow-worker service account access for spark-on-k8s
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: airflow-spark-crb
namespace: airflow
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-cluster-cr
subjects:
- kind: ServiceAccount
name: airflow-cluster
namespace: airflow
- kind: ServiceAccount
name: airflow-scheduler
namespace: airflow
- kind: ServiceAccount
name: airflow-worker
namespace: airflow
- kind: ServiceAccount
name: airflow-triggerer
namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
creationTimestamp: null
name: spark-operator-role-new
namespace: default
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: edit
subjects:
- kind: ServiceAccount
name: sparkoperator
namespace: default
---
apiVersion: v1
kind: ServiceAccount
metadata:
creationTimestamp: null
name: sparkoperator
namespace: default
Nele criamos um ServiceAccount, que pode ser entendido como um usuário que tem permissão de fazer requisições na API do K8s; para esse sa criamos uma ClusterRole, que podemos tratá-lo como uma política e um ClusterRoleBinding que liga o usuário à política.
Aplicando o script com o kubectl apply -f {sa_cr_crb_file}
, podemos criar nossos SparkApp e o executar sempre no namespace configurado (nesse caso o default).
Conclusões e lições aprendidas
Com esse material espero ajudar engenheiros de dados, de software ou SRE’s que passem por um desafio semelhante, visto que a comunidade tem muito pouco conteúdo acerca do assunto.
Quando precisar novamente explorar uma ferramenta desconhecida, fica a principal lição aprendida: além de estudar a fundo seus componentes, devo sempre verificar as boas práticas de como eles devem estar num ambiente produtivo, assim como diz neste tutorial do Airflow. O resultado dessa pesquisa não para por aqui, e ainda há um longo caminho pra levar as ferramentas a um ambiente, mesmo que de teste. Quem sabe novos problemas surjam e esse post não ganha uma continuação!?