Build the PySpark environment

First, make sure you have Python 2.7 installed. It is strongly recommended that you use Virtualenv to facilitate the management of your Python environment. Pyspark is then installed through PIP

pip install pyspark
Copy the code

The file is relatively large, about 180 MB, be patient.

Download Spark 2.2.0, unzip to a specific directory, and set SPARK_HOME.

In fact, if you use spark-Submit, you don’t need to install PySpark. The main purpose of installing PySpark through PIP is to have code prompts in your IDE.

PySpark Worker startup mechanism

PySpark works by starting one (or more, with pythonExec and envVars as keys) Python deamon processes using the PythonRDD in Spark, and then, as soon as a task comes in, Fork a new Python worker through the Python deamon process. Python workers are reusable and not destroyed immediately after use. The process of a task coming is to see if there is any idle in the worker and return it directly if there is. Fork a new worker instead.

How does PySpark implement a variable singleton in a worker

From the previous PySpark Worker startup mechanism, we can see that a Python worker can repeatedly perform tasks. In NLP tasks, we often load so many dictionaries that we expect the dictionary to be loaded only once. This is where you need to do some extra processing. The practice is as follows:

class DictLoader(object):
    clf = None

   def __init__(self, baseDir, archive_auto_extract, zipResources):
        if not DictLoader.is_loaded():            
            DictLoader.load_dic(baseDir)

    @staticmethod
    def load_dic(baseDir):
        globPath = baseDir + "/dic/*.dic"
        dicts = glob.glob(globPath)
        for dictFile in dicts:
            temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
            jieba.load_userdict(temp)
        jieba.cut("nice to meet you")
        DictLoader.clf = "SUCCESS"

    @staticmethod
    def is_loaded(a):
        return DictLoader.clf is not None
Copy the code

Define a CLS object and use staicMethod annotations to simulate java-like static methods. Then you can do whatever you want

How do I load a resource file

In NLP processing, dictionaries are indispensable. Previously, we avoided a worker loading dictionaries multiple times. Now there is another problem, which is how the program loads dictionaries. Usually we want to be able to put the dictionary into a ZIP package, the code into a ZIP package, and then submit it with the following command:

./bin/spark-submit \
--py-files dist/jobs.zip \
--files dist/dics.zip \
--master "local[*]"  python/src/batch.py
Copy the code

The self-developed modules can be packaged as Jobs.zip, the corresponding Spark tasks as a separate batch.py file, and the dictionary as dics.zip.

So how does the program read the files in dics.zip? In Spark standalone and local mode, dics.zip will not be unpacked in the working directory of each worker, so additional processing is required:

   def __init__(self, baseDir, archive_auto_extract, zipResources):
        if not DictLoader.is_loaded():  
            for zr in zipResources:
                if not archive_auto_extract:
                    with zipfile.ZipFile(SparkFiles.getRootDirectory() + '/' + zr, 'r') as f:
                        f.extractall(".")          
            DictLoader(baseDir)
Copy the code

Archive_auto_extract Check whether the file is automatically decompressed (in YARN mode, the file is automatically decompressed).

archive_auto_extract = spark.conf.get("spark.master").lower().startswith("yarn")
Copy the code

ZipResources is the name of all zip packages that need to be decompressed. The corresponding method is as follows:

zipfiles = [f.split("/")[-1] for f in spark.conf.get("spark.files").split(",") if f.endswith(".zip")]
Copy the code

You can concatenate the corresponding zipfiles directory like this:

SparkFiles.getRootDirectory() + '/' + zfilename
Copy the code

So if you are not running in YARN mode, you need to unzip and then load. You are advised to obtain the path as follows:

temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
Copy the code

This works in IDE, local/standalone/ YARN mode.

The previous jobs.zip file is full of Python files that can be read without compression.

Actively define schema and avoid Spark Auto Inference Schema

I wrote this code earlier:

oldr = df.rdd.map(
    lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))
Copy the code

Then I need to change olDR back to RDD, which I use like this:

resultDf = spark.createDataFrame(oldr) resultDf.mode("overwrite").format(...) .save(...Copy the code

This results in olDR being executed twice, once for schema speculation and once for actual calculations. We can write this:

from pyspark.sql.types import StructType, IntegerType, ArrayType, StructField, StringType, MapType

fields = [StructField("ids", ArrayType(IntegerType())), StructField("mainId", IntegerType()),
          StructField("tags", MapType(StringType(), IntegerType()))]
resultDf = spark.createDataFrame(resultRdd, StructType(fields=fields)
Copy the code

This display defines the schema for the RDD and avoids additional speculation.

Lambda and function selection

Lambda can define anonymous functions, but with limited expressiveness:

.map(
    lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))
Copy the code

We can also define functions:

def create_new_row(row):
    return Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"])
Copy the code

Then directly use:

.map(create_new_row).....
Copy the code

How to define udF functions/how to avoid using Python UDF functions

Start by defining a regular Python function:

Def split_sentence(s): return s.split(" ")Copy the code

Convert to udF function and use.

from pyspark.sql.functions import udf
from pyspark.sql.types import *

ss = udf(split_sentence, ArrayType(StringType()))
documentDF.select(ss("text").alias("text_array")).show()
Copy the code

The only trouble is that when you define udF functions, you need to specify the type of the return value.

Using Python udF functions is obviously inefficient, so we recommend using standard library functions.

from pyspark.sql import functions as f
documentDF.select(f.split("text", "\\s+").alias("text_array")).show()
Copy the code

Pyspark.sql. functions references spark implementations, so they are more efficient.