En Apache Spark, DataFrames y Datasets son dos abstracciones de datos que permiten manipular datos estructurados de manera eficiente.
DataFrames: Son similares a las tablas en una base de datos o a los data frames en R y Python. Proporcionan una API de alto nivel para manipular datos estructurados y semiestructurados. Los DataFrames están optimizados internamente por el motor Catalyst de Spark y pueden ser creados a partir de diversas fuentes de datos, como archivos CSV, JSON, Parquet, bases de datos, etc.
Datasets: Introducidos en Spark 1.6, los Datasets combinan las ventajas de RDDs (tipo de datos fuertemente tipados y seguros) y DataFrames (API de alto nivel y optimizaciones). Los Datasets son objetos distribuidos que permiten realizar operaciones de tipo seguro. En Scala y Java, se puede utilizar el tipo seguro mientras que en Python, los Datasets no están disponibles.
import org.apache.spark.sql.{SparkSession, Dataset, DataFrame}
import org.apache.spark.sql.Encoders
object DataFrameDatasetExample {
case class Person(name: String, age: Int)
def main(args: Array[String]): Unit = {
// Crear una SparkSession
val spark = SparkSession.builder
.appName("DataFrame and Dataset Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Crear un DataFrame a partir de una secuencia de datos
val df: DataFrame = Seq(
("Alice", 29),
("Bob", 25),
("Carol", 32)
).toDF("name", "age")
// Mostrar el esquema del DataFrame
df.printSchema()
// Mostrar el contenido del DataFrame
df.show()
// Crear un Dataset a partir del DataFrame
val ds: Dataset[Person] = df.as[Person]
// Mostrar el esquema del Dataset
ds.printSchema()
// Mostrar el contenido del Dataset
ds.show()
// Aplicar transformaciones y acciones en el Dataset
val adults: Dataset[Person] = ds.filter(_.age > 18)
adults.show()
// Finalizar SparkSession
spark.stop()
}
}
Crear una SparkSession:
SparkSession.builder
para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]
).Crear un DataFrame:
spark.implicits._
para habilitar las conversiones implícitas.Seq
) que contiene tuplas con nombre y edad.toDF
, especificando los nombres de las columnas.Mostrar el esquema y el contenido del DataFrame:
df.printSchema()
para imprimir el esquema del DataFrame, mostrando las columnas y sus tipos de datos.df.show()
para mostrar el contenido del DataFrame.Crear un Dataset a partir del DataFrame:
Person
con los campos name
y age
.Person
utilizando el método as[Person]
.Mostrar el esquema y el contenido del Dataset:
ds.printSchema()
para imprimir el esquema del Dataset.ds.show()
para mostrar el contenido del Dataset.Aplicar transformaciones y acciones en el Dataset:
filter
para filtrar los elementos del Dataset donde la edad es mayor que 18.adults.show()
.Finalizar SparkSession:
spark.stop()
para detener la sesión de Spark y liberar los recursos.Apache Spark SQL es un módulo de Spark que permite trabajar con datos estructurados utilizando el lenguaje SQL. Puedes ejecutar consultas SQL directamente en DataFrames y Datasets, así como combinar SQL con la API de DataFrame/Dataset para aprovechar las ventajas de ambos enfoques. Spark SQL permite crear vistas temporales a partir de DataFrames y ejecutar consultas SQL sobre ellas, facilitando el análisis y manipulación de datos.
import org.apache.spark.sql.{SparkSession, DataFrame}
object SparkSQLExample {
def main(args: Array[String]): Unit = {
// Crear una SparkSession
val spark = SparkSession.builder
.appName("Spark SQL Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Crear un DataFrame a partir de una secuencia de datos
val df: DataFrame = Seq(
("Alice", 29),
("Bob", 25),
("Carol", 32)
).toDF("name", "age")
// Mostrar el contenido del DataFrame
df.show()
// Crear una vista temporal para ejecutar consultas SQL
df.createOrReplaceTempView("people")
// Ejecutar una consulta SQL para seleccionar todas las filas
val resultDF: DataFrame = spark.sql("SELECT * FROM people")
// Mostrar el resultado de la consulta SQL
resultDF.show()
// Ejecutar una consulta SQL con una condición
val adultsDF: DataFrame = spark.sql("SELECT name, age FROM people WHERE age > 18")
// Mostrar el resultado de la consulta con condición
adultsDF.show()
// Finalizar SparkSession
spark.stop()
}
}
Crear una SparkSession:
SparkSession.builder
para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]
).Crear un DataFrame:
spark.implicits._
para habilitar las conversiones implícitas.Seq
) que contiene tuplas con nombre y edad.toDF
, especificando los nombres de las columnas.Mostrar el contenido del DataFrame:
df.show()
para mostrar el contenido del DataFrame.Crear una vista temporal:
df.createOrReplaceTempView("people")
para crear una vista temporal con el nombre «people», lo que permite ejecutar consultas SQL sobre este DataFrame.Ejecutar una consulta SQL:
spark.sql("SELECT * FROM people")
para ejecutar una consulta SQL que selecciona todas las filas de la vista temporal «people».resultDF
.resultDF.show()
para mostrar el resultado de la consulta.Ejecutar una consulta SQL con una condición:
spark.sql("SELECT name, age FROM people WHERE age > 18")
para ejecutar una consulta SQL que selecciona los nombres y edades de las personas mayores de 18 años de la vista temporal «people».adultsDF
.adultsDF.show()
para mostrar el resultado de la consulta.Finalizar SparkSession:
spark.stop()
para detener la sesión de Spark y liberar los recursos.Apache Spark proporciona una API robusta para leer y escribir datos desde y hacia diversas fuentes de datos como archivos CSV, JSON, Parquet, bases de datos SQL, y sistemas de almacenamiento como HDFS, S3, entre otros. Utilizando la API de Spark SQL, se pueden cargar datos en DataFrames y realizar operaciones de procesamiento sobre ellos.
import org.apache.spark.sql.{SparkSession, DataFrame}
object DataSourceExample {
def main(args: Array[String]): Unit = {
// Crear una SparkSession
val spark = SparkSession.builder
.appName("Data Source Example")
.master("local[*]")
.getOrCreate()
// Leer datos desde un archivo CSV
val csvDF: DataFrame = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("path/to/your/file.csv")
// Mostrar los datos leídos desde el archivo CSV
csvDF.show()
// Leer datos desde un archivo JSON
val jsonDF: DataFrame = spark.read
.option("inferSchema", "true")
.json("path/to/your/file.json")
// Mostrar los datos leídos desde el archivo JSON
jsonDF.show()
// Leer datos desde una base de datos SQL
val jdbcDF: DataFrame = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/database_name")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.load()
// Mostrar los datos leídos desde la base de datos SQL
jdbcDF.show()
// Finalizar SparkSession
spark.stop()
}
}
Crear una SparkSession:
SparkSession.builder
para configurar y crear una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]
).Leer datos desde un archivo CSV:
spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/your/file.csv")
para leer datos desde un archivo CSV. header
se establece en true
para indicar que el archivo CSV tiene una fila de encabezado. inferSchema
se establece en true
para inferir automáticamente el esquema de los datos.csvDF
.csvDF.show()
para mostrar los datos leídos desde el archivo CSV.Leer datos desde un archivo JSON:
spark.read.option("inferSchema", "true").json("path/to/your/file.json")
para leer datos desde un archivo JSON. inferSchema
se establece en true
para inferir automáticamente el esquema de los datos.jsonDF
.jsonDF.show()
para mostrar los datos leídos desde el archivo JSON.Leer datos desde una base de datos SQL:
spark.read.format("jdbc")
para configurar la lectura de datos desde una base de datos SQL.url
), el nombre de la tabla (dbtable
), el nombre de usuario (user
) y la contraseña (password
).jdbcDF
.jdbcDF.show()
para mostrar los datos leídos desde la base de datos SQL.Finalizar SparkSession:
spark.stop()
para detener la sesión de Spark y liberar los recursos.