Build the PySpark environment
First, make sure you have Python 2.7 installed. It is strongly recommended that you use Virtualenv to facilitate the management of your Python environment. Pyspark is then installed through PIP
pip install pyspark
Copy the code
The file is relatively large, about 180 MB, be patient.
Download Spark 2.2.0, unzip to a specific directory, and set SPARK_HOME.
In fact, if you use spark-Submit, you don’t need to install PySpark. The main purpose of installing PySpark through PIP is to have code prompts in your IDE.
PySpark Worker startup mechanism
PySpark works by starting one (or more, with pythonExec and envVars as keys) Python deamon processes using the PythonRDD in Spark, and then, as soon as a task comes in, Fork a new Python worker through the Python deamon process. Python workers are reusable and not destroyed immediately after use. The process of a task coming is to see if there is any idle in the worker and return it directly if there is. Fork a new worker instead.
How does PySpark implement a variable singleton in a worker
From the previous PySpark Worker startup mechanism, we can see that a Python worker can repeatedly perform tasks. In NLP tasks, we often load so many dictionaries that we expect the dictionary to be loaded only once. This is where you need to do some extra processing. The practice is as follows:
class DictLoader(object):
clf = None
def __init__(self, baseDir, archive_auto_extract, zipResources):
if not DictLoader.is_loaded():
DictLoader.load_dic(baseDir)
@staticmethod
def load_dic(baseDir):
globPath = baseDir + "/dic/*.dic"
dicts = glob.glob(globPath)
for dictFile in dicts:
temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
jieba.load_userdict(temp)
jieba.cut("nice to meet you")
DictLoader.clf = "SUCCESS"
@staticmethod
def is_loaded(a):
return DictLoader.clf is not None
Copy the code
Define a CLS object and use staicMethod annotations to simulate java-like static methods. Then you can do whatever you want
How do I load a resource file
In NLP processing, dictionaries are indispensable. Previously, we avoided a worker loading dictionaries multiple times. Now there is another problem, which is how the program loads dictionaries. Usually we want to be able to put the dictionary into a ZIP package, the code into a ZIP package, and then submit it with the following command:
./bin/spark-submit \
--py-files dist/jobs.zip \
--files dist/dics.zip \
--master "local[*]" python/src/batch.py
Copy the code
The self-developed modules can be packaged as Jobs.zip, the corresponding Spark tasks as a separate batch.py file, and the dictionary as dics.zip.
So how does the program read the files in dics.zip? In Spark standalone and local mode, dics.zip will not be unpacked in the working directory of each worker, so additional processing is required:
def __init__(self, baseDir, archive_auto_extract, zipResources):
if not DictLoader.is_loaded():
for zr in zipResources:
if not archive_auto_extract:
with zipfile.ZipFile(SparkFiles.getRootDirectory() + '/' + zr, 'r') as f:
f.extractall(".")
DictLoader(baseDir)
Copy the code
Archive_auto_extract Check whether the file is automatically decompressed (in YARN mode, the file is automatically decompressed).
archive_auto_extract = spark.conf.get("spark.master").lower().startswith("yarn")
Copy the code
ZipResources is the name of all zip packages that need to be decompressed. The corresponding method is as follows:
zipfiles = [f.split("/")[-1] for f in spark.conf.get("spark.files").split(",") if f.endswith(".zip")]
Copy the code
You can concatenate the corresponding zipfiles directory like this:
SparkFiles.getRootDirectory() + '/' + zfilename
Copy the code
So if you are not running in YARN mode, you need to unzip and then load. You are advised to obtain the path as follows:
temp = dictFile if os.path.exists(dictFile) else SparkFiles.get(dictFile)
Copy the code
This works in IDE, local/standalone/ YARN mode.
The previous jobs.zip file is full of Python files that can be read without compression.
Actively define schema and avoid Spark Auto Inference Schema
I wrote this code earlier:
oldr = df.rdd.map(
lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))
Copy the code
Then I need to change olDR back to RDD, which I use like this:
resultDf = spark.createDataFrame(oldr) resultDf.mode("overwrite").format(...) .save(...Copy the code
This results in olDR being executed twice, once for schema speculation and once for actual calculations. We can write this:
from pyspark.sql.types import StructType, IntegerType, ArrayType, StructField, StringType, MapType
fields = [StructField("ids", ArrayType(IntegerType())), StructField("mainId", IntegerType()),
StructField("tags", MapType(StringType(), IntegerType()))]
resultDf = spark.createDataFrame(resultRdd, StructType(fields=fields)
Copy the code
This display defines the schema for the RDD and avoids additional speculation.
Lambda and function selection
Lambda can define anonymous functions, but with limited expressiveness:
.map(
lambda row: Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"]))
Copy the code
We can also define functions:
def create_new_row(row):
return Row(ids=row['ids'], mainId=row["mainId"].item(), tags=row["tags"])
Copy the code
Then directly use:
.map(create_new_row).....
Copy the code
How to define udF functions/how to avoid using Python UDF functions
Start by defining a regular Python function:
Def split_sentence(s): return s.split(" ")Copy the code
Convert to udF function and use.
from pyspark.sql.functions import udf
from pyspark.sql.types import *
ss = udf(split_sentence, ArrayType(StringType()))
documentDF.select(ss("text").alias("text_array")).show()
Copy the code
The only trouble is that when you define udF functions, you need to specify the type of the return value.
Using Python udF functions is obviously inefficient, so we recommend using standard library functions.
from pyspark.sql import functions as f
documentDF.select(f.split("text", "\\s+").alias("text_array")).show()
Copy the code
Pyspark.sql. functions references spark implementations, so they are more efficient.