Machine Learning con MLlib

Algoritmos básicos

Apache Spark MLlib es una biblioteca de aprendizaje automático escalable que se integra perfectamente con Spark. Proporciona una variedad de algoritmos de machine learning y herramientas, incluyendo clasificación, regresión, clustering, filtrado colaborativo y reducción de dimensionalidad. MLlib facilita la implementación de modelos de machine learning sobre grandes volúmenes de datos de manera eficiente.

A continuación, se muestra cómo implementar algunos algoritmos básicos de machine learning usando MLlib: regresión lineal, clasificación y clustering.

Código de Ejemplo

				
					import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

object MLLibExample {
  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("MLLib Example")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Cargar datos de ejemplo para regresión lineal
    val regressionData = Seq(
      (1.0, 2.0, 3.0, 4.0),
      (2.0, 3.0, 4.0, 5.0),
      (3.0, 4.0, 5.0, 6.0)
    ).toDF("label", "feature1", "feature2", "feature3")

    // Transformar datos a formato adecuado para MLlib
    val assembler = new VectorAssembler()
      .setInputCols(Array("feature1", "feature2", "feature3"))
      .setOutputCol("features")
    val regressionDataTransformed = assembler.transform(regressionData)

    // Regresión Lineal
    val lr = new LinearRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
    val lrModel = lr.fit(regressionDataTransformed)
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // Cargar datos de ejemplo para clasificación logística
    val classificationData = Seq(
      (1.0, 1.0, 2.0, 3.0),
      (0.0, 2.0, 3.0, 4.0),
      (1.0, 3.0, 4.0, 5.0)
    ).toDF("label", "feature1", "feature2", "feature3")

    // Transformar datos a formato adecuado para MLlib
    val classificationDataTransformed = assembler.transform(classificationData)

    // Clasificación Logística
    val lrClassifier = new LogisticRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
    val lrClassifierModel = lrClassifier.fit(classificationDataTransformed)
    println(s"Coefficients: ${lrClassifierModel.coefficients} Intercept: ${lrClassifierModel.intercept}")

    // Cargar datos de ejemplo para clustering KMeans
    val clusteringData = Seq(
      (1.0, 2.0, 3.0),
      (4.0, 5.0, 6.0),
      (7.0, 8.0, 9.0)
    ).toDF("feature1", "feature2", "feature3")

    // Transformar datos a formato adecuado para MLlib
    val clusteringDataTransformed = assembler.transform(clusteringData)

    // Clustering KMeans
    val kmeans = new KMeans()
      .setK(2)
      .setFeaturesCol("features")
    val kmeansModel = kmeans.fit(clusteringDataTransformed)
    println(s"Cluster Centers: ${kmeansModel.clusterCenters.mkString(", ")}")

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se configura y crea una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Regresión Lineal:

    • Se cargan datos de ejemplo y se convierten a un DataFrame con columnas label, feature1, feature2, y feature3.
    • Se utiliza VectorAssembler para transformar las columnas de características en una única columna de características (features).
    • Se crea un modelo de regresión lineal (LinearRegression) y se ajusta (entrena) el modelo con los datos transformados.
    • Se imprimen los coeficientes e intercepto del modelo ajustado.
  3. Clasificación Logística:

    • Se cargan datos de ejemplo para clasificación y se convierten a un DataFrame similar al anterior.
    • Se transforman los datos utilizando VectorAssembler.
    • Se crea un modelo de clasificación logística (LogisticRegression) y se ajusta el modelo con los datos transformados.
    • Se imprimen los coeficientes e intercepto del modelo ajustado.
  4. Clustering KMeans:

    • Se cargan datos de ejemplo para clustering y se convierten a un DataFrame con columnas feature1, feature2, y feature3.
    • Se transforman los datos utilizando VectorAssembler.
    • Se crea un modelo de clustering KMeans (KMeans), se establece el número de clusters (setK(2)) y se ajusta el modelo con los datos transformados.
    • Se imprimen los centros de los clusters del modelo ajustado.
  5. Finalizar SparkSession:

    • Se detiene la sesión de Spark para liberar los recursos.

Pipelines de ML

En Apache Spark MLlib, un pipeline de machine learning es una secuencia de etapas que se ejecutan en orden para procesar y transformar los datos antes de entrenar un modelo y realizar predicciones. Un pipeline típico incluye varias etapas de transformación de datos y una etapa final de estimación (entrenamiento del modelo).

Las etapas del pipeline pueden incluir:

  • Transformadores: Etapas que transforman los datos de entrada en una forma que el modelo puede utilizar, como VectorAssembler, StandardScaler, etc.
  • Estimadores: Etapas que entrenan un modelo, como LinearRegression, LogisticRegression, KMeans, etc.

Una vez definido y ajustado (fit) el pipeline, se puede utilizar para transformar nuevos datos de manera consistente con el entrenamiento.

Código de Ejemplo

				
					import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

object MLPipelineExample {
  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("ML Pipeline Example")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Cargar datos de ejemplo
    val data = Seq(
      (1.0, 1.0, 0.1, 0.5),
      (0.0, 2.0, 0.2, 0.6),
      (1.0, 3.0, 0.3, 0.7),
      (0.0, 4.0, 0.4, 0.8)
    ).toDF("label", "feature1", "feature2", "feature3")

    // Definir las etapas del pipeline
    val assembler = new VectorAssembler()
      .setInputCols(Array("feature1", "feature2", "feature3"))
      .setOutputCol("features")

    val scaler = new StandardScaler()
      .setInputCol("features")
      .setOutputCol("scaledFeatures")

    val logisticRegression = new LogisticRegression()
      .setLabelCol("label")
      .setFeaturesCol("scaledFeatures")

    // Crear el pipeline
    val pipeline = new Pipeline()
      .setStages(Array(assembler, scaler, logisticRegression))

    // Dividir los datos en conjuntos de entrenamiento y prueba
    val Array(trainingData, testData) = data.randomSplit(Array(0.8, 0.2))

    // Ajustar el pipeline con los datos de entrenamiento
    val model = pipeline.fit(trainingData)

    // Realizar predicciones en los datos de prueba
    val predictions = model.transform(testData)

    // Evaluar el modelo
    val evaluator = new BinaryClassificationEvaluator()
      .setLabelCol("label")
      .setRawPredictionCol("rawPrediction")

    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${1.0 - accuracy}")

    // Ejemplo de validación cruzada con TrainValidationSplit
    val paramGrid = new ParamGridBuilder()
      .addGrid(logisticRegression.regParam, Array(0.1, 0.01))
      .build()

    val trainValidationSplit = new TrainValidationSplit()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setTrainRatio(0.8)

    // Ajustar TrainValidationSplit con los datos de entrenamiento
    val tvModel = trainValidationSplit.fit(data)

    // Realizar predicciones y evaluar el modelo
    val tvPredictions = tvModel.transform(testData)
    val tvAccuracy = evaluator.evaluate(tvPredictions)
    println(s"TrainValidationSplit Test Error = ${1.0 - tvAccuracy}")

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se configura y crea una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Cargar datos de ejemplo:

    • Se crea un DataFrame con datos de ejemplo, con columnas label, feature1, feature2, y feature3.
  3. Definir las etapas del pipeline:

    • VectorAssembler: Se utiliza para combinar múltiples columnas de características en una única columna de características.
    • StandardScaler: Se utiliza para escalar las características a una escala estándar.
    • LogisticRegression: Se define un modelo de regresión logística que usará las características escaladas para la clasificación.
  4. Crear el pipeline:

    • Se crea un objeto Pipeline y se configuran las etapas del pipeline en el orden deseado.
  5. Dividir los datos en conjuntos de entrenamiento y prueba:

    • Se utiliza randomSplit para dividir los datos en dos conjuntos: entrenamiento (80%) y prueba (20%).
  6. Ajustar el pipeline con los datos de entrenamiento:

    • Se ajusta (entrena) el pipeline utilizando los datos de entrenamiento.
  7. Realizar predicciones en los datos de prueba:

    • Se utiliza el modelo ajustado para transformar los datos de prueba y generar predicciones.
  8. Evaluar el modelo:

    • Se define un evaluador de clasificación binaria (BinaryClassificationEvaluator) para evaluar la precisión del modelo.
    • Se calcula y muestra el error de prueba (1.0 – precisión).
  9. Validación cruzada con TrainValidationSplit:

    • Se define una cuadrícula de parámetros para la regularización del modelo de regresión logística (regParam).
    • Se configura y ajusta un TrainValidationSplit para realizar la validación cruzada y seleccionar el mejor modelo basado en la cuadrícula de parámetros.
  10. Evaluar el modelo de TrainValidationSplit:

  • Se utilizan los datos de prueba para realizar predicciones y evaluar el modelo ajustado con TrainValidationSplit.
  • Se calcula y muestra el error de prueba para el modelo de validación cruzada.
  1. Finalizar SparkSession:
  • Se detiene la sesión de Spark para liberar los recursos.

Este ejemplo muestra cómo crear y utilizar pipelines de machine learning en Spark MLlib, incluyendo la definición de transformadores y estimadores, el ajuste del pipeline, la evaluación del modelo y el uso de TrainValidationSplit para la validación cruzada.

Evaluación de modelos

Evaluar modelos de machine learning es crucial para entender su rendimiento y generalización a datos no vistos. Apache Spark MLlib proporciona varios evaluadores que facilitan la evaluación de modelos, como BinaryClassificationEvaluator, MulticlassClassificationEvaluator, y RegressionEvaluator. Estos evaluadores calculan métricas estándar como precisión, recall, F1 score, RMSE (Root Mean Squared Error), entre otras.

Código de Ejemplo

				
					import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator}
import org.apache.spark.ml.regression.LinearRegression

object ModelEvaluationExample {
  def main(args: Array[String]): Unit = {
    // Crear una SparkSession
    val spark = SparkSession.builder
      .appName("Model Evaluation Example")
      .master("local[*]")
      .getOrCreate()

    import spark.implicits._

    // Cargar datos de ejemplo para clasificación
    val classificationData = Seq(
      (1.0, 1.0, 0.1, 0.5),
      (0.0, 2.0, 0.2, 0.6),
      (1.0, 3.0, 0.3, 0.7),
      (0.0, 4.0, 0.4, 0.8)
    ).toDF("label", "feature1", "feature2", "feature3")

    // Transformar datos a formato adecuado para MLlib
    val assembler = new VectorAssembler()
      .setInputCols(Array("feature1", "feature2", "feature3"))
      .setOutputCol("features")
    val classificationDataTransformed = assembler.transform(classificationData)

    // Dividir los datos en conjuntos de entrenamiento y prueba
    val Array(trainingData, testData) = classificationDataTransformed.randomSplit(Array(0.8, 0.2))

    // Clasificación Logística
    val logisticRegression = new LogisticRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
    val lrModel = logisticRegression.fit(trainingData)

    // Realizar predicciones en los datos de prueba
    val predictions = lrModel.transform(testData)

    // Evaluar el modelo de clasificación
    val binaryEvaluator = new BinaryClassificationEvaluator()
      .setLabelCol("label")
      .setRawPredictionCol("rawPrediction")

    val auc = binaryEvaluator.evaluate(predictions)
    println(s"Área bajo la curva ROC: $auc")

    // Cargar datos de ejemplo para regresión
    val regressionData = Seq(
      (4.0, 1.0, 2.0, 3.0),
      (2.0, 2.0, 3.0, 4.0),
      (3.0, 3.0, 4.0, 5.0),
      (5.0, 4.0, 5.0, 6.0)
    ).toDF("label", "feature1", "feature2", "feature3")

    val regressionDataTransformed = assembler.transform(regressionData)

    // Dividir los datos en conjuntos de entrenamiento y prueba
    val Array(trainingDataReg, testDataReg) = regressionDataTransformed.randomSplit(Array(0.8, 0.2))

    // Regresión Lineal
    val linearRegression = new LinearRegression()
      .setLabelCol("label")
      .setFeaturesCol("features")
    val lrRegModel = linearRegression.fit(trainingDataReg)

    // Realizar predicciones en los datos de prueba
    val regPredictions = lrRegModel.transform(testDataReg)

    // Evaluar el modelo de regresión
    val regEvaluator = new RegressionEvaluator()
      .setLabelCol("label")
      .setPredictionCol("prediction")
      .setMetricName("rmse")

    val rmse = regEvaluator.evaluate(regPredictions)
    println(s"Error cuadrático medio raíz (RMSE): $rmse")

    // Finalizar SparkSession
    spark.stop()
  }
}

				
			

Explicación del Código

  1. Crear una SparkSession:

    • Se configura y crea una nueva sesión de Spark con un nombre de aplicación y el modo de ejecución en local (local[*]).
  2. Cargar y transformar datos para clasificación:

    • Se crea un DataFrame con datos de ejemplo para clasificación, con columnas label, feature1, feature2, y feature3.
    • Se utiliza VectorAssembler para combinar múltiples columnas de características en una única columna de características (features).
  3. Dividir los datos en conjuntos de entrenamiento y prueba:

    • Se utiliza randomSplit para dividir los datos en dos conjuntos: entrenamiento (80%) y prueba (20%).
  4. Entrenar y evaluar un modelo de clasificación logística:

    • Se define un modelo de regresión logística (LogisticRegression) y se ajusta (entrena) el modelo con los datos de entrenamiento.
    • Se realizan predicciones en los datos de prueba utilizando el modelo ajustado.
    • Se utiliza BinaryClassificationEvaluator para evaluar el modelo de clasificación, calculando el Área Bajo la Curva ROC (AUC).
    • Se imprime el valor del AUC.
  5. Cargar y transformar datos para regresión:

    • Se crea un DataFrame con datos de ejemplo para regresión, con columnas label, feature1, feature2, y feature3.
    • Se transforma el DataFrame utilizando VectorAssembler.
  6. Dividir los datos en conjuntos de entrenamiento y prueba:

    • Se utiliza randomSplit para dividir los datos en dos conjuntos: entrenamiento (80%) y prueba (20%).
  7. Entrenar y evaluar un modelo de regresión lineal:

    • Se define un modelo de regresión lineal (LinearRegression) y se ajusta (entrena) el modelo con los datos de entrenamiento.
    • Se realizan predicciones en los datos de prueba utilizando el modelo ajustado.
    • Se utiliza RegressionEvaluator para evaluar el modelo de regresión, calculando el Error Cuadrático Medio Raíz (RMSE).
    • Se imprime el valor del RMSE.
  8. Finalizar SparkSession:

    • Se detiene la sesión de Spark para liberar los recursos.