
sparkML入门,通俗解释机器学习的框架和算法
1. 确定目标 | 想做什么菜(红烧肉/沙拉) | 明确任务 (分类/回归/聚类)分类判断邮件是垃圾邮件吗?| 逻辑回归、决策树 | 垃圾分类(干/湿/有害)回归预测房价 | 线性回归、随机森林回归 | 根据经验估算装修费用。3. 设计食谱 | 决定烹饪步骤和调料 | 选择算法和模型设计。2. 准备食材 | 买菜、洗菜、切菜 | 数据收集与预处理。4. 试做并尝味道 | 调整火候和调味 | 模型训
一、机器学习的整体框架(类比烹饪)
假设你要做一道菜,机器学习的过程可以类比为:
步骤 --> 烹饪类比 -->机器学习对应
1. 确定目标 | 想做什么菜(红烧肉/沙拉) | 明确任务 (分类/回归/聚类)
2. 准备食材 | 买菜、洗菜、切菜 | 数据收集与预处理
3. 设计食谱 | 决定烹饪步骤和调料 | 选择算法和模型设计
4. 试做并尝味道 | 调整火候和调味 | 模型训练与调参
5. 最终成品 | 端上桌的菜 | 模型部署与应用
二、机器学习的核心流程
1. 数据预处理(准备食材)
- 目标:把原始数据变成算法能“吃”的格式
- 常见操作:
- 清洗数据:去除重复、错误的数据(如年龄填了200岁)
- 特征工程:将数据转换为数值特征(比如把“男/女”转为0/1)
- 标准化:统一数据范围(比如把工资从[0, 100000]缩放到[0, 1])
2. 选择算法(选菜谱)
根据任务类型选择算法:
任务类型 | 问题示例 | 常用算法 | 生活类比
分类 判断邮件是垃圾邮件吗? | 逻辑回归、决策树 | 垃圾分类(干/湿/有害)
回归 预测房价 | 线性回归、随机森林回归 | 根据经验估算装修费用
聚类 用户分组 | K-Means、DBSCAN | 超市商品自动分区摆放
3. 模型训练(试做菜品)
- 训练过程:算法通过数据自动学习规律
- 监督学习(有标签):老师教学生做题(提供正确答案)
- 无监督学习(无标签):学生自己总结规律(如聚类)
4. 模型评估(尝味道调整)
- 评估指标:
- 分类:准确率(答对题的比例)
- 回归:RMSE(预测值与真实值的平均误差)
- 聚类:轮廓系数(分组是否紧密)
5. 模型部署(上菜)
- 将训练好的模型用于实际预测
三、算法解释
1. 聚类算法(比如K-Means)
- 目标:自动把相似的数据分到同一组
- 生活场景:整理衣柜
- 把衣服按季节(夏装/冬装)或类型(上衣/裤子)自动分类
- 步骤:
- 决定分几组(比如分3组)
- 随机选3个衣服作为初始中心点
- 计算每件衣服到中心的距离,分到最近的一组
- 重新计算每组的中心点
- 重复直到中心点不再变化
2. 回归算法(比如线性回归)
- 目标:预测数值型结果(如房价)
- 生活场景:估算打车费用
- 已知:距离(公里) → 费用(元)
- 规律:费用 = 起步价 + 每公里单价 × 距离
- 算法任务:从历史数据中学习
起步价
和每公里单价
3. 分类算法(比如决策树)
- 目标:预测类别(如是否中奖)
- 生活场景:判断水果类型
- 通过颜色、形状、重量等特征判断是苹果还是橙子
一、SparkML 的核心架构与流程
原始数据 → 数据清洗 → 特征工程 → 模型训练 → 评估调优 → 部署
核心组件
DataFrame:结构化数据容器(列式存储)
Transformer:数据转换器(如 VectorAssembler)
Estimator:模型训练器(如 LogisticRegression)
Pipeline:将多个步骤封装为工作流
二、数据预处理实战
1. 数据加载与探索
// 加载 CSV 数据
val data = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("data/iris.csv")
// 查看数据结构
data.printSchema()
// 统计摘要
data.describe().show()
2. 特征工程
1. 数据加载与探索
import org.apache.spark.ml.feature.VectorAssembler
val assembler = new VectorAssembler()
.setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width"))
.setOutputCol("features")
val df = assembler.transform(data)
2.类别特征编码:
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("species")
.setOutputCol("label")
val indexedDF = indexer.fit(df).transform(df)
3. 数据标准化:
import org.apache.spark.ml.feature.StandardScaler
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
val scaledDF = scaler.fit(df).transform(df)
三、模型训练与评估
1. 分类模型(以决策树为例)
import org.apache.spark.ml.classification.DecisionTreeClassifier
// 划分训练集/测试集
val Array(train, test) = indexedDF.randomSplit(Array(0.8, 0.2), seed=42)
// 定义模型
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
// 训练
val model = dt.fit(train)
// 预测
val predictions = model.transform(test)
// 评估
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Accuracy = ${accuracy}")
2. 回归模型(以线性回归为例)
import org.apache.spark.ml.regression.LinearRegression
// 加载房价数据
val housingDF = spark.read.parquet("data/housing.parquet")
// 定义模型
val lr = new LinearRegression()
.setLabelCol("price")
.setFeaturesCol("features")
.setMaxIter(100)
.setRegParam(0.3)
// 训练与评估
val lrModel = lr.fit(train)
val trainingSummary = lrModel.summary
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
3. 聚类模型(以K-Means为例)
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
.setK(3)
.setFeaturesCol("scaledFeatures")
.setSeed(1L)
val model = kmeans.fit(scaledDF)
val predictions = model.transform(scaledDF)
// 显示聚类中心
model.clusterCenters.foreach(println)
四、Pipeline 与超参数调优
1. 构建 Pipeline
import org.apache.spark.ml.Pipeline
val pipeline = new Pipeline()
.setStages(Array(
assembler,
scaler,
kmeans
))
val pipelineModel = pipeline.fit(data)
2. 交叉验证调参
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
// 参数网格
val paramGrid = new ParamGridBuilder()
.addGrid(kmeans.k, Array(2, 3, 4))
.build()
// 交叉验证
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new ClusteringEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
val cvModel = cv.fit(data)
五、模型部署
//TODO保存模型
pipelineModel.write.overwrite().save("models/iris_clustering")
//TODO加载预测模型
import org.apache.spark.ml.PipelineModel
val loadedModel = PipelineModel.load("models/iris_clustering")
val newPredictions = loadedModel.transform(newData)
实战案例:用户分群
字段:user_id, age, income, purchase_freq
目标:将用户分为高价值、中价值、低价值群体
代码实现:
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{VectorAssembler, StandardScaler}
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.sql.functions._
object UserClusteringWithMetrics {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("UserClusteringWithMetrics")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val userData = Seq(
(1, 25, 50000, 2),
(2, 45, 120000, 8),
(3, 32, 80000, 5),
(4, 28, 60000, 3),
(5, 50, 150000, 10),
(6, 22, 40000, 1)
).toDF("user_id", "age", "income", "purchase_freq")
---------- 数据预处理 -------------
val meanIncome = userData.select(mean("income")).first().getDouble(0)
val processedData = userData.na.fill(meanIncome, Seq("income"))
------------- 特征工程 -----------
val featureCols = Array("age", "income", "purchase_freq")
------------- 特征向量化-----------
val assembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("rawFeatures")
--------- 标准化----------
val scaler = new StandardScaler()
.setInputCol("rawFeatures")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(true)
-------模型定义 -----
val kmeans = new KMeans()
.setK(3) // 初始聚类数
.setFeaturesCol("scaledFeatures") // 使用标准化后的特征
.setSeed(42L) // 固定随机种子
.setPredictionCol("cluster")
----------- 构建 Pipeline ----------
val pipeline = new Pipeline()
.setStages(Array(assembler, scaler, kmeans))
-------------性能测速:训练阶段 ----------
val startTime = System.currentTimeMillis()
val model = pipeline.fit(processedData)
val trainingTime = (System.currentTimeMillis() - startTime) / 1000.0
println(f"训练耗时:$trainingTime%.2f 秒")
------------- 预测与评估--------------
val startPredictTime = System.currentTimeMillis()
val predictions = model.transform(processedData)
val predictTime = (System.currentTimeMillis() - startPredictTime) / 1000.0
println(f"预测耗时:$predictTime%.2f 秒")
-------------- 聚类结果统计------------
println("\n各簇样本分布:")
predictions.groupBy("cluster").count().show()
-------------评估指标(轮廓系数)--------
val evaluator = new ClusteringEvaluator()
.setFeaturesCol("scaledFeatures")
.setPredictionCol("cluster")
val silhouette = evaluator.evaluate(predictions)
println(f"轮廓系数:$silhouette%.4f")
------------------ 可视化分析------------------
val clusterCenters = model.stages.last.asInstanceOf[KMeansModel].clusterCenters
println("\n聚类中心特征:")
clusterCenters.foreach(println)
-------------- 各簇特征分布统计----------------
println("\n各簇特征均值:")
predictions.select("cluster", "age", "income", "purchase_freq")
.groupBy("cluster")
.agg(
avg("age").alias("avg_age"),
avg("income").alias("avg_income"),
avg("purchase_freq").alias("avg_purchase")
)
.show()
--------------- 参数调优建议 ---------------
println("\n参数调优建议:")
(2 to 5).foreach { k =>
val kmeansTmp = new KMeans().setK(k).setFeaturesCol("scaledFeatures")
val modelTmp = kmeansTmp.fit(scaler.transform(assembler.transform(processedData)))
val score = evaluator.evaluate(modelTmp.transform(processedData))
println(f"K=$k 时轮廓系数:$score%.4f")
}
spark.stop()
}
}
更多推荐
所有评论(0)