Git address: https://github.com/hxjcarrie/pyspark_study LogisticRegression, for example in this paper. Below is a screenshot of the data, from which you can see that the first column is ID, the second column is label, and the other columns are features.Copy the code
Lrdemo.py (RDD-based MLlib)
import sys
reload(sys)
sys.setdefaultencoding('utf8')
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
from time import *
import numpy
import os
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
#os.environ['PYSPARK_PYTHON'] = './python_env/py27/bin/python2'
This function converts x to float
def parseFloat(x) :
try:
rx = float(x)
except:
rx = 0.0
return rx
def parse(line, ifUid=False) :
For each row of data, split by TAB character
l = line.split('\t')
The first column is uid
uid = l[0]
The second column, label, is converted to float by parseFloat()
label = parseFloat(l[1])
The rest of the columns are characteristic columns, which also require parseFloat() to be converted to float
features = map(lambda x: parseFloat(x), l[2:)if ifUid:
return (uid, LabeledPoint(label, features))
else:
return LabeledPoint(label, features)
# main function
def main() :
#spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate()
spark = SparkSession.builder.getOrCreate()
print( "Session created!" )
sc = spark.sparkContext
Print the tracking task URL
print("The url to track the job: http://namenode-01:8088/proxy/" + sc.applicationId )
sampleHDFS_train = sys.argv[1] The first parameter is training data
sampleHDFS_test = sys.argv[2] The second parameter is the test data
outputHDFS = sys.argv[3] The third argument is the output directory
# Start building an ELASTIC RDD distributed dataset here
sampleRDD = sc.textFile(sampleHDFS_train).map( parse )
predictRDD = sc.textFile(sampleHDFS_test).map(lambda x: parse(x, True))
# training
model = LogisticRegressionWithLBFGS.train(sampleRDD)
model.clearThreshold() # delete default threshold (otherwise print 0, 1)
# Predict, save the results
labelsAndPreds = predictRDD.map(lambda p: (p[0], p[1].label, model.predict(p[1].features)))
labelsAndPreds.map(lambda p: '\t'.join(map(str, p))).saveAsTextFile(outputHDFS + "/target/output")
# Evaluate the accuracy rate and recall rate under different thresholds
labelsAndPreds_label_1 = labelsAndPreds.filter(lambda lp: int(lp[1= =])1)
labelsAndPreds_label_0 = labelsAndPreds.filter(lambda lp: int(lp[1= =])0)
t_cnt = labelsAndPreds_label_1.count()
f_cnt = labelsAndPreds_label_0.count()
print "thre\ttp\ttn\tfp\tfn\taccuracy\trecall"
for thre in [0.0.0.1.0.2.0.3.0.4.0.5.0.6.0.7.0.8.0.9.0.95]:
tp = labelsAndPreds_label_1.filter(lambda lp: lp[2] > thre).count()
tn = t_cnt - tp
fp = labelsAndPreds_label_0.filter(lambda lp: lp[2] > thre).count()
fn = f_cnt - fp
print("%.1f\t%d\t%d\t%d\t%d\t%.4f\t%.4f"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))
Save model, load model
model.save(sc, outputHDFS + "/target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc, outputHDFS + "/target/tmp/pythonLogisticRegressionWithLBFGSModel")
print "output:", outputHDFS
if __name__ == '__main__':
main()
Copy the code
Lrdemo_df.py (DATaframe-based ML)
import sys
reload(sys)
sys.setdefaultencoding('utf8')
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
from time import *
import numpy
import os
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col
def getFeatureName() :
featureLst = ['feature1'.'feature2'.'feature3'.'feature4'.'feature5'.'feature6'.'feature7'.'feature8'.'feature9']
colLst = ['uid'.'label'] + featureLst
return featureLst, colLst
def parseFloat(x) :
try:
rx = float(x)
except:
rx = 0.0
return rx
def getDict(dictDataLst, colLst) :
dictData = {}
for i in range(len(colLst)):
dictData[colLst[i]] = parseFloat(dictDataLst[i])
if colLst[i] == "label":
dictData["weight"] = 1.0 if dictDataLst[i] == '1' else 1.0
return dictData
def to_array(col) :
def to_array_(v) :
return v.toArray().tolist()
return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
def main() :
#spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate()
spark = SparkSession.builder.getOrCreate()
print "Session created!"
sc = spark.sparkContext
print "The url to track the job: http://bx-namenode-02:8088/proxy/" + sc.applicationId
sampleHDFS_train = sys.argv[1]
sampleHDFS_test = sys.argv[2]
outputHDFS = sys.argv[3]
featureLst, colLst = getFeatureName()
Convert RDD to DataFrame
# Training data
rdd_train = sc.textFile(sampleHDFS_train)
rowRDD_train = rdd_train.map(lambda x: getDict(x.split('\t'), colLst))
trainDF = spark.createDataFrame(rowRDD_train)
# Test data
rdd_test = sc.textFile(sampleHDFS_test)
rowRDD_test = rdd_test.map(lambda x: getDict(x.split('\t'), colLst))
testDF = spark.createDataFrame(rowRDD_test)
# featureLst for training
vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol("features")
# # # # # # # #
print "step 1"
lr = LogisticRegression(regParam=0.01, maxIter=100, weightCol="weight") # regParam Specifies the regParam parameter
pipeline = Pipeline(stages=[vectorAssembler, lr])
model = pipeline.fit(trainDF)
# print parameters
print "\n-------------------------------------------------------------------------"
print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
print "-------------------------------------------------------------------------\n"
#### prediction, save results ####
print "step 2"
labelsAndPreds = model.transform(testDF).withColumn("probability_xj", to_array(col("probability"))1])\
.select("uid"."label"."prediction"."probability_xj")
labelsAndPreds.show()
labelsAndPreds.write.mode("overwrite").options(header="true").csv(outputHDFS + "/target/output")
#### Evaluate accuracy and recall rates at different thresholds
print "step 3"
labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)
labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)
labelsAndPreds_label_1.show(3)
labelsAndPreds_label_0.show(3)
t_cnt = labelsAndPreds_label_1.count()
f_cnt = labelsAndPreds_label_0.count()
print "thre\ttp\ttn\tfp\tfn\taccuracy\trecall"
for thre in [0.0.0.1.0.2.0.3.0.4.0.5.0.6.0.7.0.8.0.9.0.95] : tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count() tn = t_cnt - tp fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count() fn = f_cnt - fpprint("%.1f\t%d\t%d\t%d\t%d\t%.4f\t%.4f"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))
# Save the model
model.write().overwrite().save(outputHDFS + "/target/model/lrModel")
# Load model
#model.load(outputHDFS + "/target/model/lrModel")
print "output:", outputHDFS
if __name__ == '__main__':
main()
Copy the code
The log printing model effect is as follows:Copy the code
The lr logistic regression code is developed in Python. How to submit the lr logistic regression code to spark cluster? Log in to the spark cluster client and edit the spark-submit-lr.sh file:Copy the code
ModelType=lrDemo
ModelType=lrDemo_df
#ModelType=xgbDemo
CUR_PATH=$(cd "$(dirname "$0")";pwd)
echo $CUR_PATH
SPARK_PATH=/user/spark/spark
YARN_QUEUE=
DEPLOY_MODE=cluster
DEPLOY_MODE=client
input_path_train=hdfs://
input_path_test=hdfs://
output_path=hdfs://user/huangxiaojuan/program/sparkDemo/${ModelType}
hadoop fs -rmr $output_path
${SPARK_PATH}/bin/spark-submit \
--master yarn \
--name "spark_demo_lr" \
--queue ${YARN_QUEUE} \
--deploy-mode ${DEPLOY_MODE}\ --driver-memory 6g \ --driver-cores 4 \ --executor-memory 12g \ --executor-cores 15 \ --num-executors 10 \ --archives . /source/py27.zip#python_env \--conf spark.default.parallelism=150 \ --conf spark.executor.memoryOverhead=4g \ --conf spark.driver.memoryOverhead=2g \ --conf spark.yarn.maxAppAttempts=3 \ --conf spark.yarn.submit.waitAppCompletion=true \
--conf spark.pyspark.driver.python=./source/py27/bin/python2 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/py27/bin/python2 \
--conf spark.pyspark.python=./python_env/py27/bin/python2 \
./${ModelType}.py $input_path_train $input_path_test $output_path
Copy the code
Nohup bash spakr-submit_lr.sh > spark-submit-ls.log 2>&1 & kill Yarn Application -kill appplication_XXXXxxxxx -xxxxx Ensure that the Python version on the driver and executor is the same. If the Python version on the executor does not meet the requirements, You can upload packaged Python to executor using the following parametersCopy the code
Upload the Python package to executor
--archives ./source/py27.zip
# Specify the Python path on executor
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/py27/bin/python2 \
--conf spark.pyspark.python=./python_env/py27/bin/python2 \
# specify the Python path on the driver
--conf spark.pyspark.driver.python=./source/py27/bin/python2 \
Or upload to HDFS first
--conf spark.yarn.dist.archives=hdfs://user/huangxiaojuan/py27.zip#python_env\
Copy the code