Git address: https://github.com/hxjcarrie/pyspark_study LogisticRegression, for example in this paper. Below is a screenshot of the data, from which you can see that the first column is ID, the second column is label, and the other columns are features.Copy the code

Lrdemo.py (RDD-based MLlib)

import sys
reload(sys)
sys.setdefaultencoding('utf8')
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
from time import *
import numpy
import os
 
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
#os.environ['PYSPARK_PYTHON'] = './python_env/py27/bin/python2' 

This function converts x to float
def parseFloat(x) :
    try:
        rx = float(x)
    except:
        rx = 0.0
    return rx
 
def parse(line, ifUid=False) :
    For each row of data, split by TAB character
    l = line.split('\t')
    
    The first column is uid
    uid = l[0] 
    
    The second column, label, is converted to float by parseFloat()
    label = parseFloat(l[1])
    
    The rest of the columns are characteristic columns, which also require parseFloat() to be converted to float
    features = map(lambda x: parseFloat(x), l[2:)if ifUid:
        return (uid, LabeledPoint(label, features))
    else:
        return LabeledPoint(label, features)

# main function
def main() :
    #spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate()
    spark = SparkSession.builder.getOrCreate()
    print( "Session created!" ) 
    
    sc = spark.sparkContext
    Print the tracking task URL
    print("The url to track the job: http://namenode-01:8088/proxy/" + sc.applicationId )
 
    sampleHDFS_train = sys.argv[1] The first parameter is training data
    sampleHDFS_test = sys.argv[2]  The second parameter is the test data
    outputHDFS = sys.argv[3]       The third argument is the output directory
   
    # Start building an ELASTIC RDD distributed dataset here
    sampleRDD = sc.textFile(sampleHDFS_train).map( parse )
    predictRDD = sc.textFile(sampleHDFS_test).map(lambda x: parse(x, True))
 
    # training
    model = LogisticRegressionWithLBFGS.train(sampleRDD)
    model.clearThreshold() # delete default threshold (otherwise print 0, 1)
 
    # Predict, save the results
    labelsAndPreds = predictRDD.map(lambda p: (p[0], p[1].label, model.predict(p[1].features)))
    labelsAndPreds.map(lambda p: '\t'.join(map(str, p))).saveAsTextFile(outputHDFS + "/target/output")
 
    # Evaluate the accuracy rate and recall rate under different thresholds
    labelsAndPreds_label_1 = labelsAndPreds.filter(lambda lp: int(lp[1= =])1)
    labelsAndPreds_label_0 = labelsAndPreds.filter(lambda lp: int(lp[1= =])0)
    t_cnt = labelsAndPreds_label_1.count()
    f_cnt = labelsAndPreds_label_0.count()
    print "thre\ttp\ttn\tfp\tfn\taccuracy\trecall"
    for thre in [0.0.0.1.0.2.0.3.0.4.0.5.0.6.0.7.0.8.0.9.0.95]:
        tp = labelsAndPreds_label_1.filter(lambda lp: lp[2] > thre).count()
        tn = t_cnt - tp
        fp = labelsAndPreds_label_0.filter(lambda lp: lp[2] > thre).count()
        fn = f_cnt - fp
        print("%.1f\t%d\t%d\t%d\t%d\t%.4f\t%.4f"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))
 
    Save model, load model
    model.save(sc, outputHDFS + "/target/tmp/pythonLogisticRegressionWithLBFGSModel")
    sameModel = LogisticRegressionModel.load(sc, outputHDFS + "/target/tmp/pythonLogisticRegressionWithLBFGSModel")
 
    print "output:", outputHDFS
 
if __name__ == '__main__':
    main()
Copy the code

Lrdemo_df.py (DATaframe-based ML)

import sys
reload(sys)
sys.setdefaultencoding('utf8')
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
from time import *
import numpy
import os
 
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, col
 
def getFeatureName() :
    featureLst = ['feature1'.'feature2'.'feature3'.'feature4'.'feature5'.'feature6'.'feature7'.'feature8'.'feature9']
    colLst = ['uid'.'label'] + featureLst
    return featureLst, colLst
 
def parseFloat(x) :
    try:
        rx = float(x)
    except:
        rx = 0.0
    return rx
 
 
def getDict(dictDataLst, colLst) :
    dictData = {}
    for i in range(len(colLst)):
        dictData[colLst[i]] = parseFloat(dictDataLst[i])
        if colLst[i] == "label":
            dictData["weight"] = 1.0 if dictDataLst[i] == '1' else 1.0
    return dictData
 
def to_array(col) :
    def to_array_(v) :
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
 
 
def main() :
    #spark = SparkSession.builder.master("yarn").appName("spark_demo").getOrCreate()
    spark = SparkSession.builder.getOrCreate()
    print "Session created!"
    sc = spark.sparkContext
    print "The url to track the job: http://bx-namenode-02:8088/proxy/" + sc.applicationId
 
    sampleHDFS_train = sys.argv[1]
    sampleHDFS_test = sys.argv[2]
    outputHDFS = sys.argv[3]
 
    featureLst, colLst = getFeatureName()
 
    Convert RDD to DataFrame
    # Training data
    rdd_train = sc.textFile(sampleHDFS_train)
    rowRDD_train = rdd_train.map(lambda x: getDict(x.split('\t'), colLst))
    trainDF = spark.createDataFrame(rowRDD_train)
    # Test data
    rdd_test = sc.textFile(sampleHDFS_test)
    rowRDD_test = rdd_test.map(lambda x: getDict(x.split('\t'), colLst))
    testDF = spark.createDataFrame(rowRDD_test)
 
    # featureLst for training
    vectorAssembler = VectorAssembler().setInputCols(featureLst).setOutputCol("features")
 
    # # # # # # # #
    print "step 1"
    lr = LogisticRegression(regParam=0.01, maxIter=100, weightCol="weight")  # regParam Specifies the regParam parameter
 
    pipeline = Pipeline(stages=[vectorAssembler, lr])
    model = pipeline.fit(trainDF)
    # print parameters
    print "\n-------------------------------------------------------------------------"
    print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
    print "-------------------------------------------------------------------------\n"
 
    #### prediction, save results ####
    print "step 2"
    labelsAndPreds = model.transform(testDF).withColumn("probability_xj", to_array(col("probability"))1])\
                                            .select("uid"."label"."prediction"."probability_xj")
    labelsAndPreds.show()
    labelsAndPreds.write.mode("overwrite").options(header="true").csv(outputHDFS + "/target/output")
 
 
    #### Evaluate accuracy and recall rates at different thresholds
    print "step 3"
    labelsAndPreds_label_1 = labelsAndPreds.where(labelsAndPreds.label == 1)
    labelsAndPreds_label_0 = labelsAndPreds.where(labelsAndPreds.label == 0)
    labelsAndPreds_label_1.show(3)
    labelsAndPreds_label_0.show(3)
    t_cnt = labelsAndPreds_label_1.count()
    f_cnt = labelsAndPreds_label_0.count()
    print "thre\ttp\ttn\tfp\tfn\taccuracy\trecall"
    for thre in [0.0.0.1.0.2.0.3.0.4.0.5.0.6.0.7.0.8.0.9.0.95] : tp = labelsAndPreds_label_1.where(labelsAndPreds_label_1.probability_xj > thre).count() tn = t_cnt - tp fp = labelsAndPreds_label_0.where(labelsAndPreds_label_0.probability_xj > thre).count() fn = f_cnt - fpprint("%.1f\t%d\t%d\t%d\t%d\t%.4f\t%.4f"%(thre, tp, tn, fp, fn, float(tp)/(tp+fp), float(tp)/(t_cnt)))
 
    # Save the model
    model.write().overwrite().save(outputHDFS + "/target/model/lrModel")
    # Load model
    #model.load(outputHDFS + "/target/model/lrModel")
 
    print "output:", outputHDFS
 
 
if __name__ == '__main__':
    main()
Copy the code
The log printing model effect is as follows:Copy the code

The lr logistic regression code is developed in Python. How to submit the lr logistic regression code to spark cluster? Log in to the spark cluster client and edit the spark-submit-lr.sh file:Copy the code
ModelType=lrDemo
ModelType=lrDemo_df
#ModelType=xgbDemo

CUR_PATH=$(cd "$(dirname "$0")";pwd)
echo $CUR_PATH

SPARK_PATH=/user/spark/spark
YARN_QUEUE=

DEPLOY_MODE=cluster
DEPLOY_MODE=client

input_path_train=hdfs://
input_path_test=hdfs://
output_path=hdfs://user/huangxiaojuan/program/sparkDemo/${ModelType}

hadoop fs -rmr $output_path

${SPARK_PATH}/bin/spark-submit \
--master yarn \
--name "spark_demo_lr" \
--queue ${YARN_QUEUE} \
--deploy-mode ${DEPLOY_MODE}\ --driver-memory 6g \ --driver-cores 4 \ --executor-memory 12g \ --executor-cores 15 \ --num-executors 10 \ --archives . /source/py27.zip#python_env \--conf spark.default.parallelism=150 \ --conf spark.executor.memoryOverhead=4g \ --conf spark.driver.memoryOverhead=2g \  --conf spark.yarn.maxAppAttempts=3 \ --conf spark.yarn.submit.waitAppCompletion=true \
--conf spark.pyspark.driver.python=./source/py27/bin/python2 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/py27/bin/python2 \
--conf spark.pyspark.python=./python_env/py27/bin/python2 \
./${ModelType}.py $input_path_train $input_path_test $output_path

Copy the code
Nohup bash spakr-submit_lr.sh > spark-submit-ls.log 2>&1 & kill Yarn Application -kill appplication_XXXXxxxxx -xxxxx Ensure that the Python version on the driver and executor is the same. If the Python version on the executor does not meet the requirements, You can upload packaged Python to executor using the following parametersCopy the code
Upload the Python package to executor
--archives ./source/py27.zip  

# Specify the Python path on executor
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./python_env/py27/bin/python2 \
--conf spark.pyspark.python=./python_env/py27/bin/python2 \

# specify the Python path on the driver
--conf spark.pyspark.driver.python=./source/py27/bin/python2 \
 
Or upload to HDFS first
--conf spark.yarn.dist.archives=hdfs://user/huangxiaojuan/py27.zip#python_env\
Copy the code