Spark SQL

DataFrames y Datasets

En Apache Spark, DataFrames y Datasets son dos abstracciones de datos que permiten manipular datos estructurados de manera eficiente.

  • DataFrames: Son similares a las tablas en una base de datos o a los data frames en R y Python. Proporcionan una API de alto nivel para manipular datos estructurados y semiestructurados. Los DataFrames están optimizados internamente por el motor Catalyst de Spark y pueden ser creados a partir de diversas fuentes de datos, como archivos CSV, JSON, Parquet, bases de datos, etc.

  • Datasets: Introducidos en Spark 1.6, los Datasets combinan las ventajas de RDDs (tipo de datos fuertemente tipados y seguros) y DataFrames (API de alto nivel y optimizaciones). Los Datasets son objetos distribuidos que permiten realizar operaciones de tipo seguro. En Scala y Java, se puede utilizar el tipo seguro mientras que en Python, los Datasets no están disponibles.

Código de Ejemplo

				
					import org.apache.spark.sql.{SparkSession, Dataset, DataFrame}
import org.apache.spark.sql.Encoders

object DataFrameDatasetExample {
  case class Person(name: String, age: Int)

  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("DataFrame and Dataset Example")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Crear un DataFrame a partir de una secuencia de datos
    val df: DataFrame = Seq(
      ("Alice", 29),
      ("Bob", 25),
      ("Carol", 32)
    ).toDF("name", "age")

    // Mostrar el esquema del DataFrame
    df.printSchema()

    // Mostrar el contenido del DataFrame
    df.show()

    // Crear un Dataset a partir del DataFrame
    val ds: Dataset[Person] = df.as[Person]

    // Mostrar el esquema del Dataset
    ds.printSchema()

    // Mostrar el contenido del Dataset
    ds.show()

    // Aplicar transformaciones y acciones en el Dataset
    val adults: Dataset[Person] = ds.filter(_.age > 18)
    adults.show()

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se utiliza SparkSession.builder para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Crear un DataFrame:

    • Se importa spark.implicits._ para habilitar las conversiones implícitas.
    • Se crea una secuencia de datos (Seq) que contiene tuplas con nombre y edad.
    • Se convierte la secuencia en un DataFrame utilizando el método toDF, especificando los nombres de las columnas.
  3. Mostrar el esquema y el contenido del DataFrame:

    • Se utiliza df.printSchema() para imprimir el esquema del DataFrame, mostrando las columnas y sus tipos de datos.
    • Se utiliza df.show() para mostrar el contenido del DataFrame.
  4. Crear un Dataset a partir del DataFrame:

    • Se define una clase case Person con los campos name y age.
    • Se convierte el DataFrame en un Dataset de tipo Person utilizando el método as[Person].
  5. Mostrar el esquema y el contenido del Dataset:

    • Se utiliza ds.printSchema() para imprimir el esquema del Dataset.
    • Se utiliza ds.show() para mostrar el contenido del Dataset.
  6. Aplicar transformaciones y acciones en el Dataset:

    • Se aplica la transformación filter para filtrar los elementos del Dataset donde la edad es mayor que 18.
    • Se muestra el resultado del Dataset filtrado utilizando adults.show().
  7. Finalizar SparkSession:

    • Se utiliza spark.stop() para detener la sesión de Spark y liberar los recursos.

Consultas SQL

Apache Spark SQL es un módulo de Spark que permite trabajar con datos estructurados utilizando el lenguaje SQL. Puedes ejecutar consultas SQL directamente en DataFrames y Datasets, así como combinar SQL con la API de DataFrame/Dataset para aprovechar las ventajas de ambos enfoques. Spark SQL permite crear vistas temporales a partir de DataFrames y ejecutar consultas SQL sobre ellas, facilitando el análisis y manipulación de datos.

Código de Ejemplo

				
					import org.apache.spark.sql.{SparkSession, DataFrame}

object SparkSQLExample {
  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("Spark SQL Example")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Crear un DataFrame a partir de una secuencia de datos
    val df: DataFrame = Seq(
      ("Alice", 29),
      ("Bob", 25),
      ("Carol", 32)
    ).toDF("name", "age")

    // Mostrar el contenido del DataFrame
    df.show()

    // Crear una vista temporal para ejecutar consultas SQL
    df.createOrReplaceTempView("people")

    // Ejecutar una consulta SQL para seleccionar todas las filas
    val resultDF: DataFrame = spark.sql("SELECT * FROM people")

    // Mostrar el resultado de la consulta SQL
    resultDF.show()

    // Ejecutar una consulta SQL con una condición
    val adultsDF: DataFrame = spark.sql("SELECT name, age FROM people WHERE age > 18")

    // Mostrar el resultado de la consulta con condición
    adultsDF.show()

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se utiliza SparkSession.builder para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Crear un DataFrame:

    • Se importa spark.implicits._ para habilitar las conversiones implícitas.
    • Se crea una secuencia de datos (Seq) que contiene tuplas con nombre y edad.
    • Se convierte la secuencia en un DataFrame utilizando el método toDF, especificando los nombres de las columnas.
  3. Mostrar el contenido del DataFrame:

    • Se utiliza df.show() para mostrar el contenido del DataFrame.
  4. Crear una vista temporal:

    • Se utiliza df.createOrReplaceTempView("people") para crear una vista temporal con el nombre «people», lo que permite ejecutar consultas SQL sobre este DataFrame.
  5. Ejecutar una consulta SQL:

    • Se utiliza spark.sql("SELECT * FROM people") para ejecutar una consulta SQL que selecciona todas las filas de la vista temporal «people».
    • El resultado de la consulta se guarda en un DataFrame resultDF.
    • Se utiliza resultDF.show() para mostrar el resultado de la consulta.
  6. Ejecutar una consulta SQL con una condición:

    • Se utiliza spark.sql("SELECT name, age FROM people WHERE age > 18") para ejecutar una consulta SQL que selecciona los nombres y edades de las personas mayores de 18 años de la vista temporal «people».
    • El resultado de la consulta se guarda en un DataFrame adultsDF.
    • Se utiliza adultsDF.show() para mostrar el resultado de la consulta.
  7. Finalizar SparkSession:

    • Se utiliza spark.stop() para detener la sesión de Spark y liberar los recursos.

Conexión a fuentes de datos

Apache Spark proporciona una API robusta para leer y escribir datos desde y hacia diversas fuentes de datos como archivos CSV, JSON, Parquet, bases de datos SQL, y sistemas de almacenamiento como HDFS, S3, entre otros. Utilizando la API de Spark SQL, se pueden cargar datos en DataFrames y realizar operaciones de procesamiento sobre ellos.

Código de Ejemplo

				
					import org.apache.spark.sql.{SparkSession, DataFrame}

object DataSourceExample {
  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("Data Source Example")
      .master("local[*]")
      .getOrCreate()

    // Leer datos desde un archivo CSV
    val csvDF: DataFrame = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv("path/to/your/file.csv")

    // Mostrar los datos leídos desde el archivo CSV
    csvDF.show()

    // Leer datos desde un archivo JSON
    val jsonDF: DataFrame = spark.read
      .option("inferSchema", "true")
      .json("path/to/your/file.json")

    // Mostrar los datos leídos desde el archivo JSON
    jsonDF.show()

    // Leer datos desde una base de datos SQL
    val jdbcDF: DataFrame = spark.read
      .format("jdbc")
      .option("url", "jdbc:mysql://localhost:3306/database_name")
      .option("dbtable", "table_name")
      .option("user", "username")
      .option("password", "password")
      .load()

    // Mostrar los datos leídos desde la base de datos SQL
    jdbcDF.show()

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se utiliza SparkSession.builder para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Leer datos desde un archivo CSV:

    • Se utiliza spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/your/file.csv") para leer datos desde un archivo CSV. header se establece en true para indicar que el archivo CSV tiene una fila de encabezado. inferSchema se establece en true para inferir automáticamente el esquema de los datos.
    • Los datos leídos se almacenan en un DataFrame csvDF.
    • Se utiliza csvDF.show() para mostrar los datos leídos desde el archivo CSV.
  3. Leer datos desde un archivo JSON:

    • Se utiliza spark.read.option("inferSchema", "true").json("path/to/your/file.json") para leer datos desde un archivo JSON. inferSchema se establece en true para inferir automáticamente el esquema de los datos.
    • Los datos leídos se almacenan en un DataFrame jsonDF.
    • Se utiliza jsonDF.show() para mostrar los datos leídos desde el archivo JSON.
  4. Leer datos desde una base de datos SQL:

    • Se utiliza spark.read.format("jdbc") para configurar la lectura de datos desde una base de datos SQL.
    • Se especifican las opciones de conexión, incluyendo la URL de la base de datos (url), el nombre de la tabla (dbtable), el nombre de usuario (user) y la contraseña (password).
    • Los datos leídos se almacenan en un DataFrame jdbcDF.
    • Se utiliza jdbcDF.show() para mostrar los datos leídos desde la base de datos SQL.
  5. Finalizar SparkSession:

    • Se utiliza spark.stop() para detener la sesión de Spark y liberar los recursos.