Aventuras y desventuras de una partición en Spark

Si habéis llegado aquí doy por hecho que habéis trabajado con Spark, pero para que el artículo sea completo, me vais a dejar que empiece por lo básico.

Spark es un motor para realizar analíticas sobre conjuntos enormes de datos, vamos, lo que solemos llamar Big Data.

Como bien sabéis, aprender Spark no es sólo aprender un nuevo framework o un nuevo lenguaje (porque no mintáis, no habíais tocado Scala antes, ¿verdad?), es aprender una nueva metodología. Si habíais trabajado con Map/Reduce antes en Hadoop, teníais casi todo el camino andado, al fin y al cabo, Spark es más flexible, pero parte de una misma idea, ser capaz de hacer un tratamiento aislado de los datos.

Y es que aquí viene la base de todo, ¿qué mas da la cantidad de los datos que tengas si eres capaz de tratarlos de forma parcial?

La idea es simple, si quiero contar el total de veces que aparece una palabra en un texto, puedo contar las veces que aparece en cada línea, y luego hacer la suma de todos estos números ¿cierto?

 

 

Bien, pues estupendo, ahora entendemos por qué Spark “trocea” los datos recibidos en los famosos “RDDs”. Sencillamente crea una colección de elementos, que se pueden tratar de forma independiente para algunas operaciones. Es decir, un “RDD” no es más que una colección de elementos, que se ha troceado y enviado, de forma parcial, a distintos nodos.

 

 

 

De tal forma que las operaciones se realizan en cada grupo de datos de forma independiente, siempre que se pueda. Es el caso de los map(), filter() o flatMap(). Operaciones para las que los datos son independientes de sus vecinos. Existen también otras transformaciones que requieren hacer cálculos con datos de los vecinos, como por ejemplo sacar una media.

El caso es que, si os habéis fijado, buscamos (siempre que sea posible) aplicar operaciones elemento a elemento, aunque en nuestro nodo siempre va a haber un grupo de datos juntos, y esto es lo que se denomina partición.

En la imagen mostrada antes, nuestros datos se “troceaban” en dos particiones, que se distribuían a dos nodos distintos (aunque varias particiones pueden acabar en el mismo).

Pues bien, hay ocasiones en las que necesitamos aprovecharnos de esto, y para ello usamos operaciones como “mapPartitions()” o “foreachPartition()”, y esto es mucho más útil de lo que parece, y en mi caso, me ha permitido encontrar una solución a problemas que estoy seguro, también os vais a encontrar.

Bien, ¿Cómo funciona un mapPartitions() o un foreachPartition()? Pues es muy simple es análogo al map(), o el foreach(), pero en lugar de procesar elemento a elemento, recibe un Iterador, es decir, una colección de elementos.

Dicho esto, imaginemos que tenemos que aplicar una función “f” a los elementos de un RDD, o un Dataframe que vamos a llamar “datos”

 

    
        val result = datos.map( elem => f(elem) )
    

 

Pues bien, esto se puede hacer con un “mapPartitions()”

 

    
val result = datos.mapPartitions( p => {
     val tmp = for (elem <- p)
        yield{
          f(elem)
        }
       tmp
      }
    )

    

 

Oye, pues muy bien, ahora sabemos cómo hacer lo mismo, pero más difícil.

 

Pues sí, pero es que esto tiene ventajas, si no ¿para qué íbamos a perder el tiempo aprendiéndolo?

Hay ocasiones en las que, para llevar a cabo nuestras tareas, necesitamos instanciar objetos muy pesados, como puede ser por ejemplo, una conexión a la base de datos.

Por lo general tenemos conectores en los que delegar este trabajo, pero en ocasiones, necesitamos hacer un trabajo muy específico, y el conector no nos vale. ¿qué pasaría si usáramos un “map()”, por ejemplo? Pues que estaríamos abriendo y cerrando conexiones para cada elemento, gastando mucho más tiempo en esto, que en el procesamiento de cada elemento.

Para cosas como esta, un “foreachPartition()” es perfecto. Podemos crear una conexión por cada partición, y cerrarla al acabar de procesarla.

De la misma forma, imaginad que para cada elemento que procesamos, necesitamos obtener un dato de un repositorio (e.g, MongoDB). Podemos lanzar querys y devolver el dato enriquecido mediante “mapPartitions()”, sin tener que traernos la colección entera y luego cruzarla con un pesado “join”.

 

    
        val result = datos.mapPartitions( p => {
//Abro una conexión
     val tmp = for (elem <- p)
        yield{
//obtengo los datos que vaya a necesitar
          f(elem)
        }
//cierro la conexión
       tmp
      }
    )

    

 

En conclusión, Spark nos simplifica en gran medida la analítica de grandes volúmenes de datos, y unos conocimientos básicos nos permiten llegar muy lejos, pero hay ocasiones en las que necesitamos bajar a un nivel inferior, para poder tener un mayor control sobre lo que Spark está haciendo. Conocer cómo trabaja nos ha permitido resolver problemas que un conector no resolvía “out of the box”, o ampliar una funcionalidad que éramos incapaces de abarcar con un simple “map()”.

Espero que os haya resultado interesante, y sobre todo, útil.

¡Hasta la próxima!