Integrar o Spark com o Kafka é um desafio que pode envolver todos os usuários de um cluster Kafka, ainda assim algumas boas práticas podem nos auxiliar a acelerar o desenvolvimento desta ligação e otimizar nossos resultados.
Toda a parte de produção de mensagens no Kafka utilizando Spark pode ser considerada experimental por escrever utilizando o modo continuous, desta forma nós engenheiros de dados acabando nos preocupando mais com o consumo dos dados do serviço. Para quem já é habituado com o Kafka, o comportamento do Consumer Group no Spark se mostra incomum: o commit das mensagens é integrado diretamente com os checkpoints, e não são repassados ao broker; isso por um lado traz um baixo overhead na integração dos sistemas, mas para quem é habituado a acompanhar o consumo dos eventos via alguma UI (como KafkaUI ou o Kafka Prometheus Exporter) pode se chocar com a ausência desses metadados.
Outro desafio importante, dessa vez para usuários do Spark, é que por se tratar de um cenário de streaming em que não temos previsibilidade da volumetria, como por ser um cenário em que a fonte não são arquivos já escritos, nós temos que nos preocupar com a formação final dos arquivos durante cada microbatch. Temos então a preocupação constante de conter a criação de small files, que podem afetar negativamente os consumidores dos dados produzidos pelo Spark.
Destacando a complexidade da integração, uma vez que encontramos algum problema que afete a performance do Spark, as potenciais correções podem abranger desde mudanças nos produtores - como melhorar a distribuição das chaves das mensagens entre as partições - ou até mesmo em mudanças nas configurações dos consumidores, o que focaremos em seguida.
Configurações de volume
A primeira abordagem recomendada para melhorar uma aplicação streaming que lê tópicos Kafka, é atribuir ou melhorar três configurações: maxTriggerDelay
, minOffsetsPerTrigger
e maxOffsetsPerTrigger
. As configurações que controlam a quantidade de offsets nos ajudam a ter o controle da quantidade de registros a serem tratados por cada microbatch, enquanto o maxTriggerDelay
determina quando um microbatch será computado caso não cheguem dados suficientes relacionados ao minOffsetsPerTrigger
desde a última execução de um microbatch. Essas configurações devem ser analisadas em conjunto ao volume de dados no tópico, e para isso acabo usando a fórmula abaixo para me auxiliar a decidir os valores:
# Input - obtenho a média de mensagens por segundo de um tópico a partir de um sistema de métricas
topic_average_messages_per_second = 100
# Configurações - intervalo desejado para a execução de microbatchs
desired_microbatch_update_interval_seconds = 5 * 60
max_offsets_threshold = 2.5
# Outputs
maxTriggerDelay = 10m
minOffsetsPerTrigger = topic_average_messages_per_second * desired_microbatch_update_interval_seconds
maxOffsetsPerTrigger = minOffsetsPerTrigger * max_offsets_threshold
Configurações de particionamento
Essa segunda abordagem acaba sendo tão relevante quanto a primeira por também influenciar bastante na contenção de small files.
Para introduzir esse assunto, reforço um ponto importantíssimo no funcionamento de tópicos Kafka: a quantidade máxima de consumidores é sempre o número de suas partições. Minha experiência com outros contextos me levou a crer que devo relacionar esse limite máximo à quantidade executores, mas isso foi um engano. Esse é um ponto não tão explícito na documentação: cada partição fica ligada a cada core dos executores, então num tópico com 5 partições, posso ligar apenas um executor com 5 cores e todas seriam consumidas paralelamente.
Como medida para não ocupar todos os cores apenas com a leitura do Kafka, recomenda-se deixar a quantidade de cores em ao mínimo o dobro da quantidade de partições.
Por fim, a configuração do spark spark.sql.shuffle.partitions
é diretamente ligada controle de paralelismo, e infelizmente também acaba sendo pouco documentando/evidenciado pela comunidade. Nessa discussão vemos que o Adaptive Query Engine tenta otimizar esta configuração, mesmo assim foi bastante notável o resultado ao colocar seu valor de forma 1:1 com a quantidade de partições. Assim teremos a certeza de que num cenário sem particionamento por partitionBy
, a quantidade de arquivos escritos em cada microbatch será sempre igual à quantidade de partições do tópico!
Conclusões
Com os pontos trazidos podemos ter uma maior confiança e previsibilidade no uso do Spark Streaming, tendo como destaque:
- podemos controlar a quantidade de arquivos por microbatch;
- temos uma média de quantos microbatches devem executar em uma hora, em períodos de pico;
- controlamos o limite superior da quantidade de eventos por microbatch;
- obtendo uma média do tamanho de cada evento no tópico, podemos mensurar o tamanho máximo em bytes de cada microbatch executado;
- com todas essas métricas podemos mensurar o crescimento de uma tabela ao longo do tempo!