Introduction to the

This is the first in a series of articles on how to leverage Hadoop (a distributed computing framework) with Python.

The purpose of this series of articles is to focus on specific tools and recipes to address the recurring challenges that many data experts face, for example.

  • Move HDFS (Hadoop Distributed File system) files using Python.

  • Load data from HDFS into a data structure, such as Spark or pandasDataFrame, for calculation.

  • Write the analysis result **** back to HDFS.

The first tool in this series is Spark. A framework that defines itself as a unified analysis engine for large-scale data processing.

Apache Spark

PySpark and FindSpark installation

I encourage you to use the Conda__ virtual environment. If you don’t know how to set up conda, read this article.

First, install FindSpark, a library that will help you integrate Spark into your Python workflow, and also install PySpark in case you’re working on a local computer instead of a proper Hadoop cluster.

If you are working on this tutorial in a Hadoop cluster, skip the PySpark installation.

conda install -c conda-forge findspark -y
# optional, for local setup
conda install -c conda-forge pyspark openjdk -y
Copy the code

Set Spark using findSpark

Once you have findSpark installed, you can set up the use of Spark in your Python code.

The code for local and cluster mode is provided here, unpacking the lines you need, and adjusting the paths for your particular infrastructure and library version (the paths for Cloudera Spark should be similar to those provided here).

import findspark # Local Spark Findspark. Init ('/home/cloudera/miniconda3 / envs / < your_environment_name > / lib/python3.7 / site - packages/pyspark/') # cloudera  Cluster Spark Findspark. Init (spark_home = '/ opt/cloudera/parcels/SPARK2-2.3.0. Cloudera4-1. Cdh5.13.3. P0.611179 / lib/SPARK2 /')Copy the code

This tutorial is using the Cloudera Quickstart VM (CentOS Linux distribution, username _cloudera_), remember to adjust the path for your infrastructure!

Create a Spark application

Once Spark is initialized, we must create a Spark application, execute the following code, and make sure you ** specify the master site you need, ** such as’ YARN ‘, in the case of an appropriate Hadoop cluster, or ‘local[*]’, in the case of a fully local setup.

The from pyspark. SQL import SparkSession spark = SparkSession. Builder. AppName (' example_app). The master (' yarn). GetOrCreate ()Copy the code

PySpark recipe and use case

Once we have Spark at work, let’s start interacting with Hadoop and take advantage of some of its common use cases.

Listing Hive Databases

Let’s get the existing database. I assume that you are already familiar with the Spark DataFrame API and its methods.

spark.sql("show databases").show()
Copy the code

You deserve something like this.

+------------+ 
|databaseName| 
+------------+ 
|         db1| 
|     default| 
|     fhadoop| 
+------------+
Copy the code

Converts pandas DataFrame to Spark DataFrame

The first integration concerns how to move data from pandas, the Python standard library for performing in-memory data operations, to Spark.

First, let’s load a PANDAS DataFrame. This is about ** Madrid’s air quality (** just to satisfy your curiosity, but not important for moving data from one place to another). You can download it here. Make sure you have the pyTables library installed to read data in HDF5 format.

Import pandas as pd air_quality_df = pd.read_hdf(' data/air_quality/air-quality-madrid/ Madrid.h5 ', Key = '28079008') air_quality_df. Head ()Copy the code

This data is a time series for many well-known pollutants, such as nitrogen oxides, ozone, and so on.

Let’s make some changes to the DataFrame, such as resetting the date and time index, to avoid losing information when loading into Spark. The date column will also be converted to a string because Spark has some problems handling dates (related to system region, time zone, etc.) unless further configured for your region.

Air_quality_df. Reset_index (inplace = True) air_quality_df [' date '] = air_quality_df [' date ']. Dt. Strftime (' % % m - Y % d H: % % M: % S ')Copy the code

We can simply load from Pandas to Spark with createDataFrame.

air_quality_sdf = spark.createDataFrame(air_quality_df)
Copy the code

Once the DataFrame is loaded into Spark (such as air_quality_SDF), it can be easily manipulated using the PySpark DataFrame API.

air_quality_sdf.select('date', 'NOx').show(5)
Copy the code

The output should look something like this.

+ -- -- -- -- -- - + -- -- -- -- -- + | date NOx | | + -- -- -- -- - + -- -- -- -- -- + | 2001-07-01 01:00:00 | 1017.0 | | 2001-07-01 02:00:00 409.20001220703125 | | | 2001-07-01 03:00:00 143.39999389648438 | | | 2001-07-01 04:00:00 | 149.3000030517578 | | 2001-07-01 05:00:00 124.80000305175781 | | + -- -- -- -- - + -- -- -- -- - + only showing the top 5 rowsCopy the code

Create a Hive table from Spark DataFrame

To persist the Spark DataFrame to HDFS, where queries can be made using the default Hadoop SQL engine (Hive), a straightforward strategy (not the only strategy) is to create a temporal view from the DataFrame.

air_quality_sdf.createOrReplaceTempView("air_quality_sdf")
Copy the code

Once the time view is created, you can create a real table from the Spark SQL engine using Create Table as SELECT. Before I create this table, I will create a new database called Analytics to store it.

sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)
Copy the code

We can then create a new table there.

sql_create_table = """ create table if not exists analytics.pandas_spark_hive using parquet as select to_timestamp(date)  as date_parsed, * from air_quality_sdf """ result_create_table = spark.sql(sql_create_table)Copy the code

Use PySpark to read data from Hive tables

Once we have created our Hive table, we can use the Spark SQL engine to check the results and load the results back, for example, by selecting ozone pollutant concentrations over time.

spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)
Copy the code

Output:

+ -- -- -- -- - + -- -- -- -- -- + | date_parsed | O_3 | + -- -- -- -- - + -- -- -- -- -- + | 2001-07-01 9.010000228881836 01:00:00 | | | 2001-07-01 02:00:00 23.81999969482422 | | | 2001-07-01 03:00:00 | | 31.059999465942383 23.780000686645508 2001-07-01 04:00:00 | | | | 2001-07-01 05:00:00 29.530000686645508 | | + -- -- -- -- - + -- -- -- - -- + only showing top 5 rowsCopy the code

Hope you enjoyed this post. Over the next few weeks, we’ll be publishing a series of articles on other tools you can use to master Hadoop in Python.