Apache Spark MLlib es una biblioteca de aprendizaje automático escalable que se integra perfectamente con Spark. Proporciona una variedad de algoritmos de machine learning y herramientas, incluyendo clasificación, regresión, clustering, filtrado colaborativo y reducción de dimensionalidad. MLlib facilita la implementación de modelos de machine learning sobre grandes volúmenes de datos de manera eficiente.
A continuación, se muestra cómo implementar algunos algoritmos básicos de machine learning usando MLlib: regresión lineal, clasificación y clustering.
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
object MLLibExample {
def main(args: Array[String]): Unit = {
// Crear una SparkSession
val spark = SparkSession.builder
.appName("MLLib Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Cargar datos de ejemplo para regresión lineal
val regressionData = Seq(
(1.0, 2.0, 3.0, 4.0),
(2.0, 3.0, 4.0, 5.0),
(3.0, 4.0, 5.0, 6.0)
).toDF("label", "feature1", "feature2", "feature3")
// Transformar datos a formato adecuado para MLlib
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val regressionDataTransformed = assembler.transform(regressionData)
// Regresión Lineal
val lr = new LinearRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val lrModel = lr.fit(regressionDataTransformed)
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")
// Cargar datos de ejemplo para clasificación logística
val classificationData = Seq(
(1.0, 1.0, 2.0, 3.0),
(0.0, 2.0, 3.0, 4.0),
(1.0, 3.0, 4.0, 5.0)
).toDF("label", "feature1", "feature2", "feature3")
// Transformar datos a formato adecuado para MLlib
val classificationDataTransformed = assembler.transform(classificationData)
// Clasificación Logística
val lrClassifier = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val lrClassifierModel = lrClassifier.fit(classificationDataTransformed)
println(s"Coefficients: ${lrClassifierModel.coefficients} Intercept: ${lrClassifierModel.intercept}")
// Cargar datos de ejemplo para clustering KMeans
val clusteringData = Seq(
(1.0, 2.0, 3.0),
(4.0, 5.0, 6.0),
(7.0, 8.0, 9.0)
).toDF("feature1", "feature2", "feature3")
// Transformar datos a formato adecuado para MLlib
val clusteringDataTransformed = assembler.transform(clusteringData)
// Clustering KMeans
val kmeans = new KMeans()
.setK(2)
.setFeaturesCol("features")
val kmeansModel = kmeans.fit(clusteringDataTransformed)
println(s"Cluster Centers: ${kmeansModel.clusterCenters.mkString(", ")}")
// Finalizar SparkSession
spark.stop()
}
}
Crear una SparkSession:
local[*]
).Regresión Lineal:
label
, feature1
, feature2
, y feature3
.VectorAssembler
para transformar las columnas de características en una única columna de características (features
).LinearRegression
) y se ajusta (entrena) el modelo con los datos transformados.Clasificación Logística:
VectorAssembler
.LogisticRegression
) y se ajusta el modelo con los datos transformados.Clustering KMeans:
feature1
, feature2
, y feature3
.VectorAssembler
.KMeans
), se establece el número de clusters (setK(2)
) y se ajusta el modelo con los datos transformados.Finalizar SparkSession:
En Apache Spark MLlib, un pipeline de machine learning es una secuencia de etapas que se ejecutan en orden para procesar y transformar los datos antes de entrenar un modelo y realizar predicciones. Un pipeline típico incluye varias etapas de transformación de datos y una etapa final de estimación (entrenamiento del modelo).
Las etapas del pipeline pueden incluir:
VectorAssembler
, StandardScaler
, etc.LinearRegression
, LogisticRegression
, KMeans
, etc.Una vez definido y ajustado (fit) el pipeline, se puede utilizar para transformar nuevos datos de manera consistente con el entrenamiento.
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
object MLPipelineExample {
def main(args: Array[String]): Unit = {
// Crear una SparkSession
val spark = SparkSession.builder
.appName("ML Pipeline Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Cargar datos de ejemplo
val data = Seq(
(1.0, 1.0, 0.1, 0.5),
(0.0, 2.0, 0.2, 0.6),
(1.0, 3.0, 0.3, 0.7),
(0.0, 4.0, 0.4, 0.8)
).toDF("label", "feature1", "feature2", "feature3")
// Definir las etapas del pipeline
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
val logisticRegression = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("scaledFeatures")
// Crear el pipeline
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler, logisticRegression))
// Dividir los datos en conjuntos de entrenamiento y prueba
val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2))
// Ajustar el pipeline con los datos de entrenamiento
val model = pipeline.fit(trainingData)
// Realizar predicciones en los datos de prueba
val predictions = model.transform(testData)
// Evaluar el modelo
val evaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1.0 - accuracy}")
// Ejemplo de validación cruzada con TrainValidationSplit
val paramGrid = new ParamGridBuilder()
.addGrid(logisticRegression.regParam, Array(0.1, 0.01))
.build()
val trainValidationSplit = new TrainValidationSplit()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setTrainRatio(0.8)
// Ajustar TrainValidationSplit con los datos de entrenamiento
val tvModel = trainValidationSplit.fit(data)
// Realizar predicciones y evaluar el modelo
val tvPredictions = tvModel.transform(testData)
val tvAccuracy = evaluator.evaluate(tvPredictions)
println(s"TrainValidationSplit Test Error = ${1.0 - tvAccuracy}")
// Finalizar SparkSession
spark.stop()
}
}
Crear una SparkSession:
local[*]
).Cargar datos de ejemplo:
label
, feature1
, feature2
, y feature3
.Definir las etapas del pipeline:
VectorAssembler
: Se utiliza para combinar múltiples columnas de características en una única columna de características.StandardScaler
: Se utiliza para escalar las características a una escala estándar.LogisticRegression
: Se define un modelo de regresión logística que usará las características escaladas para la clasificación.Crear el pipeline:
Pipeline
y se configuran las etapas del pipeline en el orden deseado.Dividir los datos en conjuntos de entrenamiento y prueba:
randomSplit
para dividir los datos en dos conjuntos: entrenamiento (80%) y prueba (20%).Ajustar el pipeline con los datos de entrenamiento:
Realizar predicciones en los datos de prueba:
Evaluar el modelo:
BinaryClassificationEvaluator
) para evaluar la precisión del modelo.Validación cruzada con TrainValidationSplit:
regParam
).TrainValidationSplit
para realizar la validación cruzada y seleccionar el mejor modelo basado en la cuadrícula de parámetros.Evaluar el modelo de TrainValidationSplit:
TrainValidationSplit
.Este ejemplo muestra cómo crear y utilizar pipelines de machine learning en Spark MLlib, incluyendo la definición de transformadores y estimadores, el ajuste del pipeline, la evaluación del modelo y el uso de TrainValidationSplit
para la validación cruzada.
Evaluar modelos de machine learning es crucial para entender su rendimiento y generalización a datos no vistos. Apache Spark MLlib proporciona varios evaluadores que facilitan la evaluación de modelos, como BinaryClassificationEvaluator
, MulticlassClassificationEvaluator
, y RegressionEvaluator
. Estos evaluadores calculan métricas estándar como precisión, recall, F1 score, RMSE (Root Mean Squared Error), entre otras.
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.regression.LinearRegression
object ModelEvaluationExample {
def main(args: Array[String]): Unit = {
// Crear una SparkSession
val spark = SparkSession.builder
.appName("Model Evaluation Example")
.master("local[*]")
.getOrCreate()
import spark.implicits._
// Cargar datos de ejemplo para clasificación
val classificationData = Seq(
(1.0, 1.0, 0.1, 0.5),
(0.0, 2.0, 0.2, 0.6),
(1.0, 3.0, 0.3, 0.7),
(0.0, 4.0, 0.4, 0.8)
).toDF("label", "feature1", "feature2", "feature3")
// Transformar datos a formato adecuado para MLlib
val assembler = new VectorAssembler()
.setInputCols(Array("feature1", "feature2", "feature3"))
.setOutputCol("features")
val classificationDataTransformed = assembler.transform(classificationData)
// Dividir los datos en conjuntos de entrenamiento y prueba
val Array(trainingData, testData) = classificationDataTransformed.randomSplit(Array(0.8, 0.2))
// Clasificación Logística
val logisticRegression = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val lrModel = logisticRegression.fit(trainingData)
// Realizar predicciones en los datos de prueba
val predictions = lrModel.transform(testData)
// Evaluar el modelo de clasificación
val binaryEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
val auc = binaryEvaluator.evaluate(predictions)
println(s"Área bajo la curva ROC: $auc")
// Cargar datos de ejemplo para regresión
val regressionData = Seq(
(4.0, 1.0, 2.0, 3.0),
(2.0, 2.0, 3.0, 4.0),
(3.0, 3.0, 4.0, 5.0),
(5.0, 4.0, 5.0, 6.0)
).toDF("label", "feature1", "feature2", "feature3")
val regressionDataTransformed = assembler.transform(regressionData)
// Dividir los datos en conjuntos de entrenamiento y prueba
val Array(trainingDataReg, testDataReg) = regressionDataTransformed.randomSplit(Array(0.8, 0.2))
// Regresión Lineal
val linearRegression = new LinearRegression()
.setLabelCol("label")
.setFeaturesCol("features")
val lrRegModel = linearRegression.fit(trainingDataReg)
// Realizar predicciones en los datos de prueba
val regPredictions = lrRegModel.transform(testDataReg)
// Evaluar el modelo de regresión
val regEvaluator = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
val rmse = regEvaluator.evaluate(regPredictions)
println(s"Error cuadrático medio raíz (RMSE): $rmse")
// Finalizar SparkSession
spark.stop()
}
}
Crear una SparkSession:
local[*]
).Cargar y transformar datos para clasificación:
label
, feature1
, feature2
, y feature3
.VectorAssembler
para combinar múltiples columnas de características en una única columna de características (features
).Dividir los datos en conjuntos de entrenamiento y prueba:
randomSplit
para dividir los datos en dos conjuntos: entrenamiento (80%) y prueba (20%).Entrenar y evaluar un modelo de clasificación logística:
LogisticRegression
) y se ajusta (entrena) el modelo con los datos de entrenamiento.BinaryClassificationEvaluator
para evaluar el modelo de clasificación, calculando el Área Bajo la Curva ROC (AUC).Cargar y transformar datos para regresión:
label
, feature1
, feature2
, y feature3
.VectorAssembler
.Dividir los datos en conjuntos de entrenamiento y prueba:
randomSplit
para dividir los datos en dos conjuntos: entrenamiento (80%) y prueba (20%).Entrenar y evaluar un modelo de regresión lineal:
LinearRegression
) y se ajusta (entrena) el modelo con los datos de entrenamiento.RegressionEvaluator
para evaluar el modelo de regresión, calculando el Error Cuadrático Medio Raíz (RMSE).Finalizar SparkSession: