background
This section describes how to read Excel in Pandas and output the basic usage of Excel.
- Specify sheet, specify desired column, rename column and specify type while reading
- Use Spark to analyze and manipulate data (using pandas is acceptable for simple manipulation).
- Output of single Sheet and multiple Sheet Excel files
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('JupyterPySpark').enableHiveSupport().getOrCreate()
Copy the code
Reading Excel
When an Excel file is read by Pandas
- If you want to usecol = [2,6,7], you can usecol = [2,6,7]. Reportedly fixed in 0.23.0
- You can rename the specified COL with the names parameter, which is a list of all field names
- The COL type can be specified with the Converters parameter (or dType), which is a dictionary
# all fields
# cols = [' types', 'the head of the household number', 'household name', 'but the account type', 'but the account opening institutions',' branch full name to open an account institutions', b 'user name', 'but the accounts']
#names = ['type','code','name','acct_type','acct_org','org_name','acct_name','acct_code']
Select some fields
cols = ['Head name'.'Beneficiary's Account Name'.'Payee account number'] # pandas <= version 0.20 can be used this way
# cols=[2,6,7] # 0.21 and later versions are used this way
names = ['name'.'acct_name'.'acct_code']
pddf_meta = pd.read_excel('./sample.xlsx', sheet_name='Bank Account relationship', header=0, usecols = cols, names=names, converters={'Payee account number':str} ).dropna()
Copy the code
Use Spark to process data
- Convert Pands DF to Spark DF
- When constructing Spark DF, explicitly specify a schema to avoid type errors
Pandas DF to Spark DF
from pyspark.sql.types import *
# display the structure of the specified DF
schema = StructType().add('name', StringType()).add('acct_name', StringType()).add('acct_code', StringType())
df_meta = spark.createDataFrame(pddf_meta, schema)
df_meta.show(truncate=False)
Copy the code
+---------+---------+-------------------+ |name |acct_name|acct_code | +---------+---------+-------------------+ Universal center city shunxing snacks, xiao Ming | | | 6228450123084500000 | | chengdu Dan Lou little turrets small red | | 6222629123002790000 | | universal center city shunxing snacks xiaoming | 6228450123084500000 | | +---------+---------+-------------------+Copy the code
df_meta.registerTempTable('df_meta')
# Use Spark SQL to collect data
df_stats = spark.sql('select acct_code, acct_name, count(1) as cnt from df_meta group by acct_code, acct_name order by 1,2')
df_stats.show(5)
Copy the code
+-------------------+---------+---+ | acct_code|acct_name|cnt| +-------------------+---------+---+ |6222629123002790000| Xiao Ming little red | 1 | | 6228450123084500000 | | 2 | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- - + - +Copy the code
Union All operation
Spark’s unionAll() and union() are the same method and do not perform de-duplication (note the difference from SQL). Below, we intentionally construct a duplicate data.
Union () is officially recommended.
df_tmp_1 = df_stats
df_tmp_2 = df_stats.filter("Acct_name in (' xiaoming ')")
df_result = df_tmp_1.union(df_tmp_2)
df_result.show()
Copy the code
+-------------------+---------+---+ | acct_code|acct_name|cnt| +-------------------+---------+---+ |6222629123002790000| Xiao Ming little red | 1 | | 6228450123084500000 | | 2 | | 6228450123084500000 | xiaoming | 2 | + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- - + - +Copy the code
Add the serial number
Depending on business requirements, you need to add a sequence number to each line in the output result. We use the row_number() function of SparkSQL and native Spark respectively, and there is no essential difference between the two methods.
SparkSQL way
df_result.registerTempTable('df_result')
df_result_1 = spark.sql("select row_number() over(order by r.cnt desc) as rn, r.* from df_result r")
df_result_1.show()
Copy the code
+---+-------------------+---------+---+ | rn| acct_code|acct_name|cnt| +---+-------------------+---------+---+ | 1 xiaoming | | 6228450123084500000 | 2 | 2 | | 6228450123084500000 | xiaoming | | 2 | 3 | 1 | | 6222629123002790000 | small red +---+-------------------+---------+---+Copy the code
Spark Function Mode
import pyspark.sql.functions as F
from pyspark.sql.window import Window
df_result_2 = df_result.withColumn('rn', F.row_number().over(Window.orderBy( F.col('cnt').desc() ) ) )\
.select(['rn'.'acct_code'.'acct_name'.'cnt'])
df_result_2.show()
Copy the code
+---+-------------------+---------+---+ | rn| acct_code|acct_name|cnt| +---+-------------------+---------+---+ | 1 xiaoming | | 6228450123084500000 | 2 | 2 | | 6228450123084500000 | xiaoming | | 2 | 3 | 1 | | 6222629123002790000 | small red +---+-------------------+---------+---+Copy the code
Use Pandas to export Excel
- Convert to Pandas DF
- Export Excel
# convert to Pandas DF and rename the field
pddf_meta_out = df_result_2.toPandas()
pddf_meta_out.columns = ['number'.'Payee account number'.'Beneficiary's Account Name'.'number']
Copy the code
Single Sheet
pddf_meta_out.to_excel('./sample_out_single.xlsx',sheet_name='output stats', index=False)
Copy the code
Multi Sheets
- ExcelWriter can add multiple sheets
- Use startrow to specify the location of the start Cell
writer = pd.ExcelWriter('./sample_out_multi.xlsx')
pddf_meta_out.to_excel(writer,sheet_name='output stats1', index=False)
pddf_meta_out.to_excel(writer,sheet_name='output stats2', index=False, startrow=1)
writer.save()
Copy the code
Check to see if the output file is generated
! pwd && ls -lh
Copy the code
/home total 60K-RW-RW-r -- 1 etL ETL 14K May 2 20:44 pandas_excel_in_out. Ipynb-rw-rw-r -- 1 ETL ETL 6.0K May 2 20:45 Xlsx-rw-rw-r -- 1 etl etl 5.5k May 2 20:45 sample_out_single. xlsx-RW-RW-r -- 1 etl etl 12K Apr 27 15:24 sample.xlsxCopy the code