Mecanismos de Back pressure en Spark Streaming con Kafka

Introducción:

En el presente post pretendemos explicar el mecanismo de Back pressure y sus implementaciones en Spark cuando consume de un topic de Kafka, así cómo de las consideraciones que deben tenerse en cuenta a la hora de trabajar con él. Además, incluimos unos ejemplos de código por si queréis hacer vuestras propias pruebas.

¿Por qué usar Back Pressure?

Spark posee un módulo de streaming que permite trabajar de 3 formas distintas:

Back pressure

  • Mediante Direct Streams: Es el api antiguo, genera RDD con los datos leídos durante el batch establecido. Es decir, trabaja en micro-batches.
  • Mediante Spark Structured Streaming: Api nueva, permite generar streams de alto nivel, aunque la idea es análoga a la de los Direct Stream, también trabaja con micro-batches.
  • Mediante Continuous Processing: Procesamos evento a evento, pero hay operaciones que no podemos hacer (como por ejemplo, agregaciones).

 

Tanto Direct Streams como Structured Stream está orientado al trabajo con micro-batches, es decir, ejecutar procesos Spark de forma repetitiva cada poco tiempo. Aunque se configure un micro-batch para ejecutarse cada cierto tiempo (e.g, cada 5 segundos), si el total de mensajes traídos es grande, puede tardar mucho más en terminar dicho micro-batch, y hasta que no finalice, no disponibilizará los datos.

Por poner un ejemplo más claro, si somos capaces de procesar 100 eventos por segundo como máximo, y configuramos micro-batches de 1 segundo, si encontramos 30000 eventos, tardaremos 300 segundos en terminar de procesarlos, y hasta que no acabemos, no volcaremos estos datos donde corresponda, con lo que durante 5 minutos no veremos datos actualizados.

Back Pressure es un mecanismo que establece un límite a los eventos que van a ser procesados por cada micro-batch, asegurándonos que los datos fluyen constantemente, aunque sea de forma parcial.

Es importante entender que hay que intentar ajustar lo mejor posible dicho valor. Si ponemos un valor bajo, estaremos desaprovechando los recursos. Si ponemos un valor alto, seremos incapaces de cumplir los tiempos de nuestros micro-batches.

Back Pressure con Direct Streams:

Desde Spark 1.5 se encuentra disponible la opción de activar el Back Pressure, que permite especificar la opción “spark.streaming.receiver.maxRate”, indicando el número máximo de eventos por segundo que se puede procesar por receptor.

El total de eventos que podemos procesar por cada micro-batch es fácil de calcular:

maxRate * numReceivers * batchDuration

Es decir, con un maxRate de 100, 3 receivers y un batch de 5 segundos, estaríamos estableciendo un máximo de 100*3*5 = 1500 eventos por batch (suponiendo que no haya problema en procesar tal cantidad de eventos).

La ventaja de esto es evidente, la cantidad de eventos que permitimos procesar aumenta automáticamente con el escalado. A mayor número de receivers, mayor cantidad de eventos que podemos procesar.

Back Pressure con Structured Streaming:

No existe opción de Back Pressure para Structured Streaming. Configurarla no da error, pero tampoco tiene ningún efecto.

No existe la opción de cambiar dinámicamente el tamaño de cada Bach. Esto es debido a que la finalidad de Structured Streaming es procesar los eventos tan rápido como pueda (no tiene sentido, si hay un evento, dejarlo para procesar en el próximo batch!). Esto viene bien explicado en el artículo:

http://javaagile.blogspot.com/2019/03/everything-you-needed-to-know-about.html

Sin embargo, existe una opción para limitar el número de eventos que podemos leer en Kafka (también existen otros como máximo de ficheros a leer para FileSource).

Esta opción se llama “maxOffsetsPerTrigger”, y establece un máximo de eventos a ser leídos entre todas las particiones en cada trigger, (no por segundo, como se hace en el caso anterior).

La principal diferencia es que este, no es dinámico.

En el caso de los Direct Stream, se permitía especificar el número de eventos leídos por segundo por receiver. El total de eventos leídos dependía del número de receivers, si había más receivers, se leían más eventos en total.

Sin embargo, la propiedad “maxOffsetsPerTrigger”, actúa de una forma distinta. Se divide ese número entre el total de particiones del topic y se leen esos mensajes de cada partición.

Además, aumentar las capacidades del clúster o escalar el proceso, no supone ningún cambio en el total de eventos procesados por trigger. Esto nos obligaría a tener que parar la aplicación para cambiar la configuración si queremos escalar.

Conclusiones:

El mecanismo de Back Pressure tiene utilidad, en aquellos casos en los que necesitemos recibir de forma segura cada cierto tiempo los resultados, aunque sean parciales.

Sin embargo, tener que atarnos al Direct Stream hace que debamos plantearnos dos veces el usarlo.

La alternativa que posee el Structured Streaming no es suficiente, puesto que no permitiría autoescalar, aunque podría cubrir ciertas necesidades siempre que tengamos presente que para escalar debemos cambiar el valor de “maxOffsetsPerTrigger”.

Anexo: Código ejemplo con DStreams:

val conf = new SparkConf().setAppName("otherTestPressure").setMaster("local[4]")
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("spark.streaming.backpressure.initialRate", "100")
conf.set("spark.streaming.kafka.maxRatePerPartition", "100")
conf.set("spark.streaming.receiver.maxRate", "100")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val batchDuration = Durations.seconds(1)
val ssc = new StreamingContext(conf, batchDuration)
ssc.checkpoint("C:\\tmp\\otherDir") 
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "KafkaKyber:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "grou_id_jgos",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("mvp_events_raw_metrics")
val stream = KafkaUtils.createDirectStream[String, String](
  ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams))

val kafkaDStream = stream.map(record => (record.key, record.value))

ssc.sparkContext.setLogLevel("ERROR")

kafkaDStream
  .map( e => ("key",1)).reduceByKey(_+_)
  .foreachRDD { rdd =>
    rdd.foreach { record => println((new Date()) + " - " + record._2) }
  }

ssc.start()
ssc.awaitTermination()

Anexo: Código ejemplo con “maxOffsetsPerTrigger:

val spark = SparkSession
  .builder()
  .appName("testKafkaPressure")
  .config("spark.streaming.stopGracefullyOnShutdown", true)
  .getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "KafkaKyber:9092")
  .option("subscribe", "mvp_events_raw_metrics")
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger",100)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .map( e => ("key",1)).as[(String,Int)]
  .groupBy("_1").agg(sum("_2").alias("total"))
  .withColumn("time_stamp", lit(current_timestamp()))
  .selectExpr("time_stamp", "total")
  .writeStream.format("console")
  .outputMode("complete")
  .trigger(Trigger.ProcessingTime("1 second")) 
  .option("checkpointLocation", "C:\\tmp\\someDir")
  .start()

spark.streams.awaitAnyTermination()

 

Guía de Posibilidades Profesionales en el Ecosistema de Java