Integrar o Spark com o Kafka é um desafio que pode envolver todos os usuários de um cluster Kafka, ainda assim algumas boas práticas podem nos auxiliar a acelerar o desenvolvimento desta ligação e otimizar nossos resultados.

Toda a parte de produção de mensagens no Kafka utilizando Spark pode ser considerada experimental por escrever utilizando o modo continuous, desta forma nós engenheiros de dados acabando nos preocupando mais com o consumo dos dados do serviço. Para quem já é habituado com o Kafka, o comportamento do Consumer Group no Spark se mostra incomum: o commit das mensagens é integrado diretamente com os checkpoints, e não são repassados ao broker; isso por um lado traz um baixo overhead na integração dos sistemas, mas para quem é habituado a acompanhar o consumo dos eventos via alguma UI (como KafkaUI ou o Kafka Prometheus Exporter) pode se chocar com a ausência desses metadados.

Outro desafio importante, dessa vez para usuários do Spark, é que por se tratar de um cenário de streaming em que não temos previsibilidade da volumetria, como por ser um cenário em que a fonte não são arquivos já escritos, nós temos que nos preocupar com a formação final dos arquivos durante cada microbatch. Temos então a preocupação constante de conter a criação de small files, que podem afetar negativamente os consumidores dos dados produzidos pelo Spark.

Destacando a complexidade da integração, uma vez que encontramos algum problema que afete a performance do Spark, as potenciais correções podem abranger desde mudanças nos produtores - como melhorar a distribuição das chaves das mensagens entre as partições - ou até mesmo em mudanças nas configurações dos consumidores, o que focaremos em seguida.

Configurações de volume

A primeira abordagem recomendada para melhorar uma aplicação streaming que lê tópicos Kafka, é atribuir ou melhorar três configurações: maxTriggerDelay, minOffsetsPerTrigger e maxOffsetsPerTrigger. As configurações que controlam a quantidade de offsets nos ajudam a ter o controle da quantidade de registros a serem tratados por cada microbatch, enquanto o maxTriggerDelay determina quando um microbatch será computado caso não cheguem dados suficientes relacionados ao minOffsetsPerTrigger desde a última execução de um microbatch. Essas configurações devem ser analisadas em conjunto ao volume de dados no tópico, e para isso acabo usando a fórmula abaixo para me auxiliar a decidir os valores:

# Input - obtenho a média de mensagens por segundo de um tópico a partir de um sistema de métricas
topic_average_messages_per_second = 100

# Configurações - intervalo desejado para a execução de microbatchs
desired_microbatch_update_interval_seconds = 5 * 60
max_offsets_threshold = 2.5

# Outputs
maxTriggerDelay = 10m
minOffsetsPerTrigger = topic_average_messages_per_second * desired_microbatch_update_interval_seconds
maxOffsetsPerTrigger = minOffsetsPerTrigger * max_offsets_threshold

Configurações de particionamento

Essa segunda abordagem acaba sendo tão relevante quanto a primeira por também influenciar bastante na contenção de small files.

Para introduzir esse assunto, reforço um ponto importantíssimo no funcionamento de tópicos Kafka: a quantidade máxima de consumidores é sempre o número de suas partições. Minha experiência com outros contextos me levou a crer que devo relacionar esse limite máximo à quantidade executores, mas isso foi um engano. Esse é um ponto não tão explícito na documentação: cada partição fica ligada a cada core dos executores, então num tópico com 5 partições, posso ligar apenas um executor com 5 cores e todas seriam consumidas paralelamente.

Como medida para não ocupar todos os cores apenas com a leitura do Kafka, recomenda-se deixar a quantidade de cores em ao mínimo o dobro da quantidade de partições.

Por fim, a configuração do spark spark.sql.shuffle.partitions é diretamente ligada controle de paralelismo, e infelizmente também acaba sendo pouco documentando/evidenciado pela comunidade. Nessa discussão vemos que o Adaptive Query Engine tenta otimizar esta configuração, mesmo assim foi bastante notável o resultado ao colocar seu valor de forma 1:1 com a quantidade de partições. Assim teremos a certeza de que num cenário sem particionamento por partitionBy, a quantidade de arquivos escritos em cada microbatch será sempre igual à quantidade de partições do tópico!

Conclusões

Com os pontos trazidos podemos ter uma maior confiança e previsibilidade no uso do Spark Streaming, tendo como destaque:

  • podemos controlar a quantidade de arquivos por microbatch;
  • temos uma média de quantos microbatches devem executar em uma hora, em períodos de pico;
  • controlamos o limite superior da quantidade de eventos por microbatch;
  • obtendo uma média do tamanho de cada evento no tópico, podemos mensurar o tamanho máximo em bytes de cada microbatch executado;
  • com todas essas métricas podemos mensurar o crescimento de uma tabela ao longo do tempo!