一、机器学习的整体框架(类比烹饪)

假设你要做一道菜,机器学习的过程可以类比为:

步骤 --> 烹饪类比  -->机器学习对应

1. 确定目标 | 想做什么菜(红烧肉/沙拉) | 明确任务 (分类/回归/聚类)
2. 准备食材 | 买菜、洗菜、切菜 | 数据收集与预处理
3. 设计食谱 | 决定烹饪步骤和调料 | 选择算法和模型设计
4. 试做并尝味道 | 调整火候和调味 | 模型训练与调参
5. 最终成品 | 端上桌的菜 | 模型部署与应用


二、机器学习的核心流程

1. 数据预处理(准备食材)

  • 目标:把原始数据变成算法能“吃”的格式
  • 常见操作
    • 清洗数据:去除重复、错误的数据(如年龄填了200岁)
    • 特征工程:将数据转换为数值特征(比如把“男/女”转为0/1)
    • 标准化:统一数据范围(比如把工资从[0, 100000]缩放到[0, 1])

2. 选择算法(选菜谱)

根据任务类型选择算法:

 任务类型 | 问题示例 | 常用算法 | 生活类比

分类  判断邮件是垃圾邮件吗? | 逻辑回归、决策树 | 垃圾分类(干/湿/有害)
回归  预测房价 | 线性回归、随机森林回归 | 根据经验估算装修费用
聚类  用户分组 | K-Means、DBSCAN | 超市商品自动分区摆放

3. 模型训练(试做菜品)

  • 训练过程:算法通过数据自动学习规律
    • 监督学习(有标签):老师教学生做题(提供正确答案)
    • 无监督学习(无标签):学生自己总结规律(如聚类)

4. 模型评估(尝味道调整)

  • 评估指标
    • 分类:准确率(答对题的比例)
    • 回归:RMSE(预测值与真实值的平均误差)
    • 聚类:轮廓系数(分组是否紧密)

5. 模型部署(上菜)

  • 将训练好的模型用于实际预测

三、算法解释

1. 聚类算法(比如K-Means)

  • 目标:自动把相似的数据分到同一组
  • 生活场景:整理衣柜
    • 把衣服按季节(夏装/冬装)或类型(上衣/裤子)自动分类
  • 步骤
    1. 决定分几组(比如分3组)
    2. 随机选3个衣服作为初始中心点
    3. 计算每件衣服到中心的距离,分到最近的一组
    4. 重新计算每组的中心点
    5. 重复直到中心点不再变化

2. 回归算法(比如线性回归)

  • 目标:预测数值型结果(如房价)
  • 生活场景:估算打车费用
    • 已知:距离(公里) → 费用(元)
    • 规律:费用 = 起步价 + 每公里单价 × 距离
    • 算法任务:从历史数据中学习 起步价每公里单价

3. 分类算法(比如决策树)

  • 目标:预测类别(如是否中奖)
  • 生活场景:判断水果类型
    • 通过颜色、形状、重量等特征判断是苹果还是橙子

一、SparkML 的核心架构与流程

原始数据 → 数据清洗 → 特征工程 → 模型训练 → 评估调优 → 部署

核心组件

 DataFrame:结构化数据容器(列式存储)
Transformer:数据转换器(如 VectorAssembler)
Estimator:模型训练器(如 LogisticRegression)
Pipeline:将多个步骤封装为工作流

二、数据预处理实战 

1. 数据加载与探索

// 加载 CSV 数据
val data = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/iris.csv")

// 查看数据结构
data.printSchema()
// 统计摘要
data.describe().show()

2. 特征工程

1. 数据加载与探索

  import org.apache.spark.ml.feature.VectorAssembler

  val assembler = new VectorAssembler()
    .setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width"))
    .setOutputCol("features")

  val df = assembler.transform(data)


2.类别特征编码:

  import org.apache.spark.ml.feature.StringIndexer

  val indexer = new StringIndexer()
    .setInputCol("species")
    .setOutputCol("label")

  val indexedDF = indexer.fit(df).transform(df)



3. 数据标准化:


import org.apache.spark.ml.feature.StandardScaler

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(true)

val scaledDF = scaler.fit(df).transform(df)

三、模型训练与评估
1. 分类模型(以决策树为例)

import org.apache.spark.ml.classification.DecisionTreeClassifier

// 划分训练集/测试集
val Array(train, test) = indexedDF.randomSplit(Array(0.8, 0.2), seed=42)

// 定义模型
val dt = new DecisionTreeClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")

// 训练
val model = dt.fit(train)

// 预测
val predictions = model.transform(test)

// 评估
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")

val accuracy = evaluator.evaluate(predictions)
println(s"Accuracy = ${accuracy}")

2. 回归模型(以线性回归为例)

import org.apache.spark.ml.regression.LinearRegression

// 加载房价数据
val housingDF = spark.read.parquet("data/housing.parquet")

// 定义模型
val lr = new LinearRegression()
  .setLabelCol("price")
  .setFeaturesCol("features")
  .setMaxIter(100)
  .setRegParam(0.3)

// 训练与评估
val lrModel = lr.fit(train)
val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")

3. 聚类模型(以K-Means为例)

import org.apache.spark.ml.clustering.KMeans

val kmeans = new KMeans()
  .setK(3)
  .setFeaturesCol("scaledFeatures")
  .setSeed(1L)

val model = kmeans.fit(scaledDF)
val predictions = model.transform(scaledDF)

// 显示聚类中心
model.clusterCenters.foreach(println)

四、Pipeline 与超参数调优
1. 构建 Pipeline

import org.apache.spark.ml.Pipeline

val pipeline = new Pipeline()
  .setStages(Array(
    assembler,
    scaler,
    kmeans
  ))

val pipelineModel = pipeline.fit(data)

2. 交叉验证调参

import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}

// 参数网格
val paramGrid = new ParamGridBuilder()
  .addGrid(kmeans.k, Array(2, 3, 4))
  .build()

// 交叉验证
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(new ClusteringEvaluator())
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

val cvModel = cv.fit(data)

 五、模型部署

//TODO保存模型
pipelineModel.write.overwrite().save("models/iris_clustering")


//TODO加载预测模型

import org.apache.spark.ml.PipelineModel

val loadedModel = PipelineModel.load("models/iris_clustering")
val newPredictions = loadedModel.transform(newData)

实战案例:用户分群

字段:user_id, age, income, purchase_freq
目标:将用户分为高价值、中价值、低价值群体

代码实现:

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.sql.functions._


object UserClusteringWithMetrics {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("UserClusteringWithMetrics")
      .master("local[*]")
      .getOrCreate()
    import spark.implicits._

    val userData = Seq(
      (1, 25, 50000, 2),
      (2, 45, 120000, 8),
      (3, 32, 80000, 5),
      (4, 28, 60000, 3),
      (5, 50, 150000, 10),
      (6, 22, 40000, 1)
    ).toDF("user_id", "age", "income", "purchase_freq")

    ---------- 数据预处理 -------------

    val meanIncome = userData.select(mean("income")).first().getDouble(0)
    val processedData = userData.na.fill(meanIncome, Seq("income"))

    ------------- 特征工程 -----------
    val featureCols = Array("age", "income", "purchase_freq")
    
   ------------- 特征向量化-----------
    val assembler = new VectorAssembler()
      .setInputCols(featureCols)
      .setOutputCol("rawFeatures")
    
        --------- 标准化----------
    val scaler = new StandardScaler()
      .setInputCol("rawFeatures")
      .setOutputCol("scaledFeatures")
      .setWithStd(true)
      .setWithMean(true)

    -------模型定义 -----
    val kmeans = new KMeans()
      .setK(3)                          // 初始聚类数
      .setFeaturesCol("scaledFeatures") // 使用标准化后的特征
      .setSeed(42L)                     // 固定随机种子
      .setPredictionCol("cluster")

  ----------- 构建 Pipeline ----------
    val pipeline = new Pipeline()
      .setStages(Array(assembler, scaler, kmeans))

    -------------性能测速:训练阶段 ----------
    val startTime = System.currentTimeMillis()
    val model = pipeline.fit(processedData)
    val trainingTime = (System.currentTimeMillis() - startTime) / 1000.0
    println(f"训练耗时:$trainingTime%.2f 秒")

    ------------- 预测与评估--------------
    val startPredictTime = System.currentTimeMillis()
    val predictions = model.transform(processedData)
    val predictTime = (System.currentTimeMillis() - startPredictTime) / 1000.0
    println(f"预测耗时:$predictTime%.2f 秒")

-------------- 聚类结果统计------------
    println("\n各簇样本分布:")
    predictions.groupBy("cluster").count().show()

 -------------评估指标(轮廓系数)--------
    val evaluator = new ClusteringEvaluator()
      .setFeaturesCol("scaledFeatures")
      .setPredictionCol("cluster")
    val silhouette = evaluator.evaluate(predictions)
    println(f"轮廓系数:$silhouette%.4f")

  ------------------ 可视化分析------------------

    val clusterCenters = model.stages.last.asInstanceOf[KMeansModel].clusterCenters
    println("\n聚类中心特征:")
    clusterCenters.foreach(println)

  -------------- 各簇特征分布统计----------------
    println("\n各簇特征均值:")
    predictions.select("cluster", "age", "income", "purchase_freq")
      .groupBy("cluster")
      .agg(
        avg("age").alias("avg_age"),
        avg("income").alias("avg_income"),
        avg("purchase_freq").alias("avg_purchase")
      )
      .show()

    --------------- 参数调优建议 ---------------
    println("\n参数调优建议:")
    (2 to 5).foreach { k =>
      val kmeansTmp = new KMeans().setK(k).setFeaturesCol("scaledFeatures")
      val modelTmp = kmeansTmp.fit(scaler.transform(assembler.transform(processedData)))
      val score = evaluator.evaluate(modelTmp.transform(processedData))
      println(f"K=$k 时轮廓系数:$score%.4f")
    }

    spark.stop()
  }
}

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐