本文共 2682 字,大约阅读时间需要 8 分钟。
1、MLlib简介
MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作,并方便扩展到更大规模。MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的管道API。具体来说,其主要包括以下几方面的内容:
2、MLlib支持的主要机器学习算法
Spark在机器学习方面的发展非常快,目前已经支持了主流的统计和机器学习算法。纵观所有基于分布式架构的开源机器学习库,MLlib可以算是计算效率最高的。MLlib目前支持4种常见的机器学习问题: 分类、回归、聚类和协同过滤。下表列出了目前MLlib支持的主要的机器学习算法:
3、机器学习工作流(PipeLine)
3.1、工作流的组成
Estimator:翻译成估计器或评估器,它是学习算法或在训练数据上的训练方法的概念抽象。在 Pipeline 里通常是被用来操作 DataFrame 数据并生产一个 Transformer。从技术上讲,Estimator实现了一个方法fit(),它接受一个DataFrame并产生一个转换器。如一个随机森林算法就是一个 Estimator,它可以调用fit(),通过训练特征数据而得到一个随机森林模型。
Parameter:Parameter 被用来设置 Transformer 或者 Estimator 的参数。现在,所有转换器和估计器可共享用于指定参数的公共API。ParamMap是一组(参数,值)对。
PipeLine:翻译为工作流或者管道。工作流将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。
3.2、如何构建一个工作流
定义 Pipeline 中的各个工作流阶段PipelineStage,(包括转换器和评估器),比如指标提取和转换模型训练等。有了这些处理特定问题的转换器和 评估器,就可以按照具体的处理逻辑有序的组织PipelineStages 并创建一个Pipeline。
pipeline =Pipeline(stages=[stage1,stage2,stage3])
在训练阶段,管道如下,以DataFrame存储的行形式的文本(Raw text)经过Tokenizer转化变成了词(Words),词经HashingTF转化变成了特征(Feature vectors),特征经LR得到了回归模型。
测试过程的管道如下:
3.3、构建管道实例
设置sparkSession
from pyspark.sql import SparkSessionspark = SparkSession.builder.master("local").appName("spark ML").getOrCreate()
设置机器学习相关包
from pyspark.ml import Pipelinefrom pyspark.ml.classification import LogisticRegressionfrom pyspark.ml.feature import HashingTF,Tokenizer
创建训练集
#创建DataFrame训练集#训练集包括字段id,text,labeldf_train = spark.createDataFrame([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
定义工作流,构建训练管道,得到训练模型
#构建转化器和评估器#定义分词器,spark自带的Tokenizer以空格分词;inputCol为输入的列名,outputCol为转化输出的列名tokenizer=Tokenizer(inputCol="text", outputCol="words")hashTf=HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")lr = LogisticRegression(maxIter=10, regParam=0.001)#创建训练管道pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])#训练模型model=pipeline.fit(df_train)
构建测试集及预测
#测试DataFrame构建df_test=spark.createDataFrame([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop")], ["id", "text"])#测试predict=model.transform(df_test)#显示预测结果id|text|words|features|rawPrediction|probability|predictionpredict.show()
参考
转载地址:http://oubpz.baihongyu.com/