Spark Streaming es un componente de Apache Spark que facilita el procesamiento de datos en tiempo real. Permite la ingestión y el procesamiento de flujos de datos en tiempo real de diversas fuentes, como Kafka, Flume, Twitter, sockets TCP, entre otros. Los datos de entrada se dividen en micro-batches y se procesan usando el motor Spark, proporcionando capacidades de procesamiento en tiempo real junto con las ventajas de escalabilidad y tolerancia a fallos de Spark.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreamingExample {
def main(args: Array[String]): Unit = {
// Crear una configuración de Spark
val conf = new SparkConf().setAppName("Spark Streaming Example").setMaster("local[*]")
// Crear un contexto de streaming con un intervalo de batch de 10 segundos
val ssc = new StreamingContext(conf, Seconds(10))
// Crear un DStream a partir de un socket TCP (por ejemplo, localhost:9999)
val lines = ssc.socketTextStream("localhost", 9999)
// Realizar una transformación en el DStream: contar palabras
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Imprimir los resultados
wordCounts.print()
// Iniciar la computación de streaming
ssc.start()
// Esperar a que la computación de streaming finalice
ssc.awaitTermination()
}
}
Crear una configuración de Spark:
SparkConf
para crear una configuración de Spark con un nombre de aplicación y establecer el modo de ejecución en local (local[*]
).Crear un contexto de streaming:
StreamingContext
con la configuración de Spark y un intervalo de batch de 10 segundos (Seconds(10)
). Esto significa que los datos entrantes se agruparán en intervalos de 10 segundos para su procesamiento.Crear un DStream a partir de un socket TCP:
ssc.socketTextStream("localhost", 9999)
para crear un DStream que escuche en un socket TCP en el host localhost
y el puerto 9999
. Este DStream recibirá líneas de texto desde el socket.Realizar una transformación en el DStream:
flatMap
para dividir cada línea en palabras.(word, 1)
.reduceByKey(_ + _)
para contar las ocurrencias de cada palabra, sumando los valores asociados a cada clave (palabra).Imprimir los resultados:
wordCounts.print()
para imprimir los conteos de palabras en la consola.Iniciar la computación de streaming:
ssc.start()
para iniciar el procesamiento de streaming. Esto comienza a escuchar en el socket y a procesar los datos entrantes en intervalos de 10 segundos.Esperar a que la computación de streaming finalice:
ssc.awaitTermination()
para mantener el programa en ejecución hasta que la computación de streaming sea explícitamente detenida (por ejemplo, mediante una interrupción del usuario).En Apache Spark Streaming, los DStreams (Discretized Streams) son la abstracción fundamental para el procesamiento de datos en tiempo real. Un DStream representa una secuencia continua de datos, que se divide en pequeños lotes discretos llamados micro-batches. Estos micro-batches se procesan usando las operaciones de Spark, permitiendo realizar transformaciones y acciones sobre los datos en tiempo real.
Un DStream puede ser creado a partir de diversas fuentes, como sockets TCP, sistemas de mensajería como Kafka, archivos HDFS, y otros. Las transformaciones en DStreams se aplican a cada micro-batch, lo que permite un procesamiento eficiente y escalable.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DStreamExample {
def main(args: Array[String]): Unit = {
// Crear una configuración de Spark
val conf = new SparkConf().setAppName("DStream Example").setMaster("local[*]")
// Crear un contexto de streaming con un intervalo de batch de 5 segundos
val ssc = new StreamingContext(conf, Seconds(5))
// Crear un DStream a partir de un socket TCP (por ejemplo, localhost:9999)
val lines = ssc.socketTextStream("localhost", 9999)
// Transformación: Contar palabras
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Imprimir los resultados de cada batch
wordCounts.print()
// Iniciar la computación de streaming
ssc.start()
// Esperar a que la computación de streaming finalice
ssc.awaitTermination()
}
}
Crear una configuración de Spark:
SparkConf
para configurar Spark con un nombre de aplicación y establecer el modo de ejecución en local (local[*]
).Crear un contexto de streaming:
StreamingContext
con la configuración de Spark y un intervalo de batch de 5 segundos (Seconds(5)
). Esto significa que los datos se agruparán en intervalos de 5 segundos para su procesamiento.Crear un DStream a partir de un socket TCP:
ssc.socketTextStream("localhost", 9999)
para crear un DStream que escuche en un socket TCP en el host localhost
y el puerto 9999
. Este DStream recibirá líneas de texto desde el socket.Transformación: Contar palabras:
flatMap
: Se aplica flatMap
para dividir cada línea en palabras.map
: Se mapea cada palabra a un par clave-valor (word, 1)
.reduceByKey
: Se utiliza reduceByKey(_ + _)
para contar las ocurrencias de cada palabra, sumando los valores asociados a cada clave (palabra).Imprimir los resultados de cada batch:
wordCounts.print()
para imprimir los conteos de palabras en la consola para cada micro-batch.Iniciar la computación de streaming:
ssc.start()
para iniciar el procesamiento de streaming, comenzando a escuchar en el socket y a procesar los datos entrantes en intervalos de 5 segundos.Esperar a que la computación de streaming finalice:
ssc.awaitTermination()
para mantener el programa en ejecución hasta que la computación de streaming sea explícitamente detenida (por ejemplo, mediante una interrupción del usuario).Apache Kafka es una plataforma de mensajería distribuida que permite la publicación y suscripción de flujos de registros en tiempo real. Integrar Spark Streaming con Kafka permite procesar estos flujos de datos en tiempo real utilizando las capacidades de procesamiento distribuido de Spark.
La integración de Spark Streaming con Kafka se realiza mediante el uso de un conector de Kafka, que permite a Spark consumir mensajes de temas de Kafka y realizar operaciones de transformación y análisis en esos mensajes.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
object KafkaIntegrationExample {
def main(args: Array[String]): Unit = {
// Crear una configuración de Spark
val conf = new SparkConf().setAppName("Kafka Integration Example").setMaster("local[*]")
// Crear un contexto de streaming con un intervalo de batch de 10 segundos
val ssc = new StreamingContext(conf, Seconds(10))
// Configuraciones del consumidor de Kafka
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// Suscribirse a un tema de Kafka
val topics = Array("test_topic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
// Procesar los mensajes recibidos
val messages = stream.map(record => (record.key, record.value))
// Contar las palabras en los mensajes
val words = messages.flatMap(_._2.split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Imprimir los resultados de cada batch
wordCounts.print()
// Iniciar la computación de streaming
ssc.start()
// Esperar a que la computación de streaming finalice
ssc.awaitTermination()
}
}
Crear una configuración de Spark:
SparkConf
para crear una configuración de Spark con un nombre de aplicación y establecer el modo de ejecución en local (local[*]
).Crear un contexto de streaming:
StreamingContext
con la configuración de Spark y un intervalo de batch de 10 segundos (Seconds(10)
). Esto significa que los datos se agruparán en intervalos de 10 segundos para su procesamiento.Configuraciones del consumidor de Kafka:
kafkaParams
con las configuraciones necesarias para conectarse a Kafka, incluyendo el servidor de Kafka (bootstrap.servers
), los deserializadores para las claves y los valores (key.deserializer
y value.deserializer
), el grupo de consumidores (group.id
), la política de reinicio de offset (auto.offset.reset
), y la opción de no cometer automáticamente los offsets (enable.auto.commit
).Suscribirse a un tema de Kafka:
topics
con el nombre del tema de Kafka al que se desea suscribir (test_topic
).KafkaUtils.createDirectStream
para crear un DStream que consume mensajes de Kafka utilizando las configuraciones y temas especificados.Procesar los mensajes recibidos:
stream.map(record => (record.key, record.value))
.Contar las palabras en los mensajes:
flatMap
: Se aplica flatMap
para dividir cada mensaje en palabras.map
: Se mapea cada palabra a un par clave-valor (word, 1)
.reduceByKey
: Se utiliza reduceByKey(_ + _)
para contar las ocurrencias de cada palabra, sumando los valores asociados a cada clave (palabra).Imprimir los resultados de cada batch:
wordCounts.print()
para imprimir los conteos de palabras en la consola para cada micro-batch.Iniciar la computación de streaming:
ssc.start()
para iniciar el procesamiento de streaming, comenzando a consumir mensajes de Kafka y a procesar los datos entrantes en intervalos de 10 segundos.Esperar a que la computación de streaming finalice:
ssc.awaitTermination()
para mantener el programa en ejecución hasta que la computación de streaming sea explícitamente detenida (por ejemplo, mediante una interrupción del usuario).