Introduction to the
This is the first in a series of articles on how to leverage Hadoop (a distributed computing framework) with Python.
The purpose of this series of articles is to focus on specific tools and recipes to address the recurring challenges that many data experts face, for example.
-
Move HDFS (Hadoop Distributed File system) files using Python.
-
Load data from HDFS into a data structure, such as Spark or pandasDataFrame, for calculation.
-
Write the analysis result **** back to HDFS.
The first tool in this series is Spark. A framework that defines itself as a unified analysis engine for large-scale data processing.
Apache Spark
PySpark and FindSpark installation
I encourage you to use the Conda__ virtual environment. If you don’t know how to set up conda, read this article.
First, install FindSpark, a library that will help you integrate Spark into your Python workflow, and also install PySpark in case you’re working on a local computer instead of a proper Hadoop cluster.
If you are working on this tutorial in a Hadoop cluster, skip the PySpark installation.
conda install -c conda-forge findspark -y
# optional, for local setup
conda install -c conda-forge pyspark openjdk -y
Copy the code
Set Spark using findSpark
Once you have findSpark installed, you can set up the use of Spark in your Python code.
The code for local and cluster mode is provided here, unpacking the lines you need, and adjusting the paths for your particular infrastructure and library version (the paths for Cloudera Spark should be similar to those provided here).
import findspark # Local Spark Findspark. Init ('/home/cloudera/miniconda3 / envs / < your_environment_name > / lib/python3.7 / site - packages/pyspark/') # cloudera Cluster Spark Findspark. Init (spark_home = '/ opt/cloudera/parcels/SPARK2-2.3.0. Cloudera4-1. Cdh5.13.3. P0.611179 / lib/SPARK2 /')Copy the code
This tutorial is using the Cloudera Quickstart VM (CentOS Linux distribution, username _cloudera_), remember to adjust the path for your infrastructure!
Create a Spark application
Once Spark is initialized, we must create a Spark application, execute the following code, and make sure you ** specify the master site you need, ** such as’ YARN ‘, in the case of an appropriate Hadoop cluster, or ‘local[*]’, in the case of a fully local setup.
The from pyspark. SQL import SparkSession spark = SparkSession. Builder. AppName (' example_app). The master (' yarn). GetOrCreate ()Copy the code
PySpark recipe and use case
Once we have Spark at work, let’s start interacting with Hadoop and take advantage of some of its common use cases.
Listing Hive Databases
Let’s get the existing database. I assume that you are already familiar with the Spark DataFrame API and its methods.
spark.sql("show databases").show()
Copy the code
You deserve something like this.
+------------+
|databaseName|
+------------+
| db1|
| default|
| fhadoop|
+------------+
Copy the code
Converts pandas DataFrame to Spark DataFrame
The first integration concerns how to move data from pandas, the Python standard library for performing in-memory data operations, to Spark.
First, let’s load a PANDAS DataFrame. This is about ** Madrid’s air quality (** just to satisfy your curiosity, but not important for moving data from one place to another). You can download it here. Make sure you have the pyTables library installed to read data in HDF5 format.
Import pandas as pd air_quality_df = pd.read_hdf(' data/air_quality/air-quality-madrid/ Madrid.h5 ', Key = '28079008') air_quality_df. Head ()Copy the code
This data is a time series for many well-known pollutants, such as nitrogen oxides, ozone, and so on.
Let’s make some changes to the DataFrame, such as resetting the date and time index, to avoid losing information when loading into Spark. The date column will also be converted to a string because Spark has some problems handling dates (related to system region, time zone, etc.) unless further configured for your region.
Air_quality_df. Reset_index (inplace = True) air_quality_df [' date '] = air_quality_df [' date ']. Dt. Strftime (' % % m - Y % d H: % % M: % S ')Copy the code
We can simply load from Pandas to Spark with createDataFrame.
air_quality_sdf = spark.createDataFrame(air_quality_df)
Copy the code
Once the DataFrame is loaded into Spark (such as air_quality_SDF), it can be easily manipulated using the PySpark DataFrame API.
air_quality_sdf.select('date', 'NOx').show(5)
Copy the code
The output should look something like this.
+ -- -- -- -- -- - + -- -- -- -- -- + | date NOx | | + -- -- -- -- - + -- -- -- -- -- + | 2001-07-01 01:00:00 | 1017.0 | | 2001-07-01 02:00:00 409.20001220703125 | | | 2001-07-01 03:00:00 143.39999389648438 | | | 2001-07-01 04:00:00 | 149.3000030517578 | | 2001-07-01 05:00:00 124.80000305175781 | | + -- -- -- -- - + -- -- -- -- - + only showing the top 5 rowsCopy the code
Create a Hive table from Spark DataFrame
To persist the Spark DataFrame to HDFS, where queries can be made using the default Hadoop SQL engine (Hive), a straightforward strategy (not the only strategy) is to create a temporal view from the DataFrame.
air_quality_sdf.createOrReplaceTempView("air_quality_sdf")
Copy the code
Once the time view is created, you can create a real table from the Spark SQL engine using Create Table as SELECT. Before I create this table, I will create a new database called Analytics to store it.
sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)
Copy the code
We can then create a new table there.
sql_create_table = """ create table if not exists analytics.pandas_spark_hive using parquet as select to_timestamp(date) as date_parsed, * from air_quality_sdf """ result_create_table = spark.sql(sql_create_table)Copy the code
Use PySpark to read data from Hive tables
Once we have created our Hive table, we can use the Spark SQL engine to check the results and load the results back, for example, by selecting ozone pollutant concentrations over time.
spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)
Copy the code
Output:
+ -- -- -- -- - + -- -- -- -- -- + | date_parsed | O_3 | + -- -- -- -- - + -- -- -- -- -- + | 2001-07-01 9.010000228881836 01:00:00 | | | 2001-07-01 02:00:00 23.81999969482422 | | | 2001-07-01 03:00:00 | | 31.059999465942383 23.780000686645508 2001-07-01 04:00:00 | | | | 2001-07-01 05:00:00 29.530000686645508 | | + -- -- -- -- - + -- -- -- - -- + only showing top 5 rowsCopy the code
Hope you enjoyed this post. Over the next few weeks, we’ll be publishing a series of articles on other tools you can use to master Hadoop in Python.