Fundamentos de Spark

RDDs: Creación y operaciones

Resilient Distributed Datasets (RDDs) son la abstracción fundamental en Apache Spark. Un RDD es una colección distribuida de elementos que puede ser operada en paralelo. RDDs son inmutables, lo que significa que una vez creados, no pueden ser modificados. Sin embargo, puedes crear nuevos RDDs transformando los existentes.

Hay dos formas principales de crear RDDs:

  1. Paralelización de una colección existente en tu programa.
  2. Referenciando un dataset en un sistema de almacenamiento externo, como HDFS, S3, o HBase.

Las operaciones sobre RDDs se dividen en dos categorías:

  • Transformaciones: Estas son operaciones que crean un nuevo RDD a partir de uno existente (por ejemplo, map, filter, flatMap).
  • Acciones: Estas operaciones devuelven un valor después de realizar cálculos en el RDD (por ejemplo, count, collect, first).

Código de Ejemplo

				
					import org.apache.spark.{SparkConf, SparkContext}

object RDDExample {
  def main(args: Array[String]): Unit = {
    // Configuración de Spark
    val conf = new SparkConf().setAppName("RDD Example").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // Creación de un RDD a partir de una colección
    val data = Array(1, 2, 3, 4, 5)
    val rdd = sc.parallelize(data)

    // Transformaciones
    val rddSquared = rdd.map(x => x * x)
    val rddFiltered = rddSquared.filter(x => x > 10)

    // Acciones
    val result = rddFiltered.collect()

    // Imprimir el resultado
    println("Elementos del RDD filtrado: " + result.mkString(", "))

    // Finalizar SparkContext
    sc.stop()
  }
}

				
			

Explicación del Código

  1. Configuración de Spark:

    • Se configura SparkConf para crear una configuración de Spark con un nombre de aplicación y establecer el modo de ejecución en local con todos los núcleos disponibles (local[*]).
    • Se inicializa SparkContext con esta configuración, lo que permite interactuar con Spark.
  2. Creación de un RDD a partir de una colección:

    • Se crea una colección de datos (un array de enteros).
    • Se paraleliza esta colección utilizando sc.parallelize, creando un RDD.
  3. Transformaciones:

    • map: Se aplica una función a cada elemento del RDD original para crear un nuevo RDD donde cada elemento es el cuadrado del original.
    • filter: Se filtran los elementos del RDD resultante para crear un nuevo RDD que solo contenga elementos mayores que 10.
  4. Acciones:

    • collect: Se recopilan los elementos del RDD filtrado en un array en el programa driver.
  5. Imprimir el Resultado:

    • Se imprime el resultado de la operación de filtrado, mostrando los elementos del RDD que cumplen con la condición especificada.
  6. Finalizar SparkContext:

    • Se detiene SparkContext para liberar los recursos.

DataFrames y Spark SQL

DataFrames son una abstracción de datos en Apache Spark que permiten trabajar con datos estructurados de una manera similar a las tablas en una base de datos relacional o los data frames en R y Python. Los DataFrames proporcionan una API de alto nivel para manipular datos y son más eficientes que los RDDs debido a optimizaciones internas. Spark SQL es un módulo de Spark que permite trabajar con datos estructurados mediante consultas SQL.

Los DataFrames pueden ser creados a partir de diversas fuentes de datos, como archivos CSV, JSON, bases de datos, entre otros. Spark SQL permite realizar consultas SQL sobre DataFrames, lo que facilita la manipulación y análisis de datos.

Código de Ejemplo

				
					import org.apache.spark.sql.{SparkSession, DataFrame}

object DataFrameExample {
  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("DataFrame Example")
      .master("local[*]")
      .getOrCreate()

    // Leer un archivo CSV y crear un DataFrame
    val df: DataFrame = spark.read.option("header", "true").csv("path/to/your/csvfile.csv")

    // Mostrar el esquema del DataFrame
    df.printSchema()

    // Mostrar las primeras 5 filas del DataFrame
    df.show(5)

    // Crear una vista temporal para ejecutar consultas SQL
    df.createOrReplaceTempView("my_table")

    // Ejecutar una consulta SQL
    val sqlDF: DataFrame = spark.sql("SELECT * FROM my_table WHERE some_column > 10")

    // Mostrar el resultado de la consulta SQL
    sqlDF.show()

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se utiliza SparkSession.builder para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Leer un archivo CSV y crear un DataFrame:

    • Se utiliza spark.read.option("header", "true").csv("path/to/your/csvfile.csv") para leer un archivo CSV con encabezado y crear un DataFrame a partir de los datos del archivo.
  3. Mostrar el esquema del DataFrame:

    • Se utiliza df.printSchema() para imprimir el esquema del DataFrame, que muestra las columnas y sus tipos de datos.
  4. Mostrar las primeras 5 filas del DataFrame:

    • Se utiliza df.show(5) para mostrar las primeras 5 filas del DataFrame, lo que permite una vista rápida de los datos.
  5. Crear una vista temporal para ejecutar consultas SQL:

    • Se utiliza df.createOrReplaceTempView("my_table") para crear una vista temporal con el nombre «my_table», lo que permite ejecutar consultas SQL sobre este DataFrame.
  6. Ejecutar una consulta SQL:

    • Se utiliza spark.sql("SELECT * FROM my_table WHERE some_column > 10") para ejecutar una consulta SQL que selecciona todas las filas de «my_table» donde el valor de «some_column» es mayor que 10.
  7. Mostrar el resultado de la consulta SQL:

    • Se utiliza sqlDF.show() para mostrar los resultados de la consulta SQL.
  8. Finalizar SparkSession:

    • Se utiliza spark.stop() para detener la sesión de Spark y liberar los recursos.

Transformaciones y acciones

En Apache Spark, las operaciones sobre RDDs se dividen en dos categorías: transformaciones y acciones.

  1. Transformaciones: Son operaciones que crean un nuevo RDD a partir de uno existente. Son perezosas (lazy), lo que significa que no se ejecutan inmediatamente sino hasta que se llama una acción. Ejemplos de transformaciones son map, filter, flatMap, reduceByKey, entre otras.

  2. Acciones: Son operaciones que devuelven un valor o exportan datos desde un RDD. Las acciones son las que desencadenan la ejecución de las transformaciones pendientes. Ejemplos de acciones son collect, count, first, take, reduce, saveAsTextFile, entre otras.

Código de Ejemplo

				
					import org.apache.spark.{SparkConf, SparkContext}

object TransformationsAndActions {
  def main(args: Array[String]): Unit = {
    // Configuración de Spark
    val conf = new SparkConf().setAppName("Transformations and Actions Example").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // Crear un RDD a partir de una colección
    val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val rdd = sc.parallelize(data)

    // Transformación: map (elevar al cuadrado cada elemento)
    val squaredRdd = rdd.map(x => x * x)

    // Transformación: filter (filtrar elementos mayores que 20)
    val filteredRdd = squaredRdd.filter(x => x > 20)

    // Acción: collect (recoger los elementos del RDD en un array)
    val result = filteredRdd.collect()

    // Acción: count (contar el número de elementos en el RDD)
    val count = filteredRdd.count()

    // Imprimir resultados
    println("Elementos del RDD filtrado: " + result.mkString(", "))
    println("Número de elementos en el RDD filtrado: " + count)

    // Finalizar SparkContext
    sc.stop()
  }
}

				
			

Explicación del Código

  1. Configuración de Spark:

    • Se configura SparkConf para crear una configuración de Spark con un nombre de aplicación y establecer el modo de ejecución en local (local[*]).
    • Se inicializa SparkContext con esta configuración.
  2. Crear un RDD a partir de una colección:

    • Se crea una colección de datos (un array de enteros).
    • Se paraleliza esta colección utilizando sc.parallelize, creando un RDD.
  3. Transformación: map:

    • Se aplica la transformación map al RDD original, creando un nuevo RDD (squaredRdd) donde cada elemento es el cuadrado del elemento original.
  4. Transformación: filter:

    • Se aplica la transformación filter al RDD squaredRdd, creando un nuevo RDD (filteredRdd) que solo contiene elementos mayores que 20.
  5. Acción: collect:

    • Se aplica la acción collect al RDD filteredRdd, que recoge todos los elementos del RDD en un array y los devuelve al programa driver.
  6. Acción: count:

    • Se aplica la acción count al RDD filteredRdd, que cuenta el número de elementos en el RDD y devuelve el resultado.
  7. Imprimir Resultados:

    • Se imprimen los resultados de las acciones collect y count, mostrando los elementos filtrados y el número de elementos en el RDD filtrado.
  8. Finalizar SparkContext:

    • Se detiene SparkContext para liberar los recursos.