The author | Aaron Richter compile | source of vitamin k | forward Data Science

Random forest is a machine learning algorithm that is trusted by many data scientists for its robustness, accuracy, and scalability.

The algorithm trains multiple decision trees through bootstrap aggregation, and then predicts the output through integration. Random forest is an algorithm that can be implemented in distributed computing environment because of its integration characteristics. Trees can be trained in parallel across processes and machines in a cluster, resulting in much faster training times than using a single process.

In this article, we explore the implementation of distributed random forest training on a cluster of CPU machines using Apache Spark and compare its training performance with that on a cluster of GPU machines using NVIDIA RAPIDS and Dask.

While GPU computing has traditionally been reserved for deep learning applications, RAPIDS is a library that performs data processing and non-deep learning ML work on the GPU, which can significantly improve performance compared to execution on the CPU.

We trained a random forest model with 300 million instances: Spark took 37 minutes on a 20-node CPU cluster, while RAPIDS took 1 second on a 20-node GPU cluster. GPU speed increased by more than 2000 times!

The overview

We used a publicly available New York taxi data set and trained a random forest regression that could predict taxi fare amounts using attributes associated with passenger pickup. Taking taxi trips in 2017, 2018 and 2019 as the training set, a total of 3,0070,143 instances are taken.

Data set links: www1.nyc.gov/site/tlc/ab…

The Spark and RAPIDS codes can be found in Jupyter Notebook.

hardware

The Spark cluster is managed using Amazon EMR, while the Dask/RAPIDS cluster is managed using Saturn Cloud.

Both clusters have 20 working nodes with the following AWS instance types:

The Spark: r5.2 xlarge

  • 8 cpus, 64 GB RAM

  • Price on demand: $0.504 / hour

Cedar: g4dn. Xlarge

  • 4 cpus, 16 GB RAM

  • 1 X GPU, 16 GB GPU RAM (NVIDIA T4)

  • Price on demand: $0.526 / hr

Saturn Cloud could also have started a Dask cluster with an NVIDIA Tesla V100 GPU, but we chose G4Dn.xlarge for this exercise, keeping a similar hourly cost profile to the Spark cluster.

Spark

Apache Spark is an open source big data processing engine built in Scala that has a Python interface to call Scala/JVM code.

It is an important part of the Hadoop processing ecosystem, built around the MapReduce paradigm, and has interfaces for data frames and machine learning.

Setting up the Spark cluster is beyond the scope of this article, but once you have the cluster ready, you can initialize Spark by running the following command in Jupyter Notebook:

import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = (SparkSession
        .builder
        .config('spark.executor.memory'.'36g')
        .getOrCreate())
Copy the code

The findSpark package detects the installation position of Spark on the system. This may not be necessary if you know where the Spark package is installed.

To get the Spark code to perform, you need to set several configuration Settings, depending on cluster Settings and workflow. In this case, we set spark.executor.memory to ensure that we don’t experience any memory overflow or Java heap errors.

RAPIDS

NVIDIA RAPIDS is an open source Python framework that executes data science code on gpus rather than cpus. Similar to what was seen when training deep learning models, this could lead to a huge performance boost for data science work.

RAPIDS has interfaces for data frames, ML, graph analysis, and more. RAPIDS uses Dask to handle parallelization with machines with multiple Gpus, and clusters of machines with one or more Gpus per machine.

Setting up GPU machines can be a bit tricky, but Saturn Cloud has pre-built images for starting up GPU clusters, so you’ll be up and running in just a few minutes! To initialize the Dask client pointing to the cluster, run the following command:

from dask.distributed import Client
from dask_saturn import SaturnCluster

cluster = SaturnCluster()
client = Client(cluster)
Copy the code

Dask cluster to set themselves up, please refer to the docs page: docs.dask.org/en/latest/s…

The data load

The data files are hosted on a common S3 bucket, so we can read the CSV directly from there. All files for S3 buckets are in the same directory, so we use s3FS to select the files we want:

import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = [f"s3://{x}" for x in fs.ls('s3://nyc-tlc/trip data/')
         if 'yellow' in x and ('2019' in x or '2018' in x or '2017' in x)]
         
cols = ['VendorID'.'tpep_pickup_datetime'.'tpep_dropoff_datetime'.'passenger_count'.'trip_distance'.'RatecodeID'.'store_and_fwd_flag'.'PULocationID'.'DOLocationID'.'payment_type'.'fare_amount'.'extra'.'mta_tax'.'tip_amount'.'tolls_amount'.'improvement_surcharge'.'total_amount']
Copy the code

With Spark, we need to read each CSV file individually and then combine them together:

import functools
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import DataFrame

Specify schemas manually because inferSchema in read.csv is very slow
schema = StructType([
    StructField('VendorID', DoubleType()),
    StructField('tpep_pickup_datetime', TimestampType()),
    ...
    See Notebook for the full object schema
]) 

def read_csv(path) :
    df = spark.read.csv(path,
                        header=True,
                        schema=schema,
                        timestampFormat='yyyy-MM-dd HH:mm:ss',
                       )
    df = df.select(cols)
    return df

dfs = []
for tf in files:
    df = read_csv(tf)
    dfs.append(df)

taxi = functools.reduce(DataFrame.unionAll, dfs)
taxi.count()
Copy the code

Using Dask+RAPIDS, we can read all CSV files at once:

import dask_cudf

taxi = dask_cudf.read_csv(files, 
                          assume_missing=True,
                          parse_dates=[1.2], 
                          usecols=cols, 
                          storage_options={'anon': True})
len(taxi)
Copy the code

Characteristics of the engineering

We will generate some features based on time, and then save the data frame. In both frameworks, this performs all CSV loading and preprocessing, and stores the results in RAM (in RAPIDS ‘case, GPU RAM). The features we will use for training include:

features = ['pickup_weekday'.'pickup_hour'.'pickup_minute'.'pickup_week_hour'.'passenger_count'.'VendorID'.'RatecodeID'.'store_and_fwd_flag'.'PULocationID'.'DOLocationID']
Copy the code

For Spark, we need to collect features into vector classes:

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.pipeline import Pipeline

taxi = taxi.withColumn('pickup_weekday', F.dayofweek(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_hour', F.hour(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_minute', F.minute(taxi.tpep_pickup_datetime).cast(DoubleType()))

taxi = taxi.withColumn('pickup_week_hour', ((taxi.pickup_weekday * 24) + taxi.pickup_hour).cast(DoubleType()))

taxi = taxi.withColumn('store_and_fwd_flag', F.when(taxi.store_and_fwd_flag == 'Y'.1).otherwise(0))

taxi = taxi.withColumn('label', taxi.total_amount)  
taxi = taxi.fillna(-1)

assembler = VectorAssembler(
    inputCols=features,
    outputCol='features'. ) pipeline = Pipeline(stages=[assembler]) assembler_fitted = pipeline.fit(taxi) X = assembler_fitted.transform(taxi) X.cache() X.count()Copy the code

For RAPIDS, we convert all floating-point values to FLOAT32 for GPU calculation:

from dask import persist
from dask.distributed import wait

taxi['pickup_weekday'] = taxi.tpep_pickup_datetime.dt.weekday
taxi['pickup_hour'] = taxi.tpep_pickup_datetime.dt.hour
taxi['pickup_minute'] = taxi.tpep_pickup_datetime.dt.minute
taxi['pickup_week_hour'] = (taxi.pickup_weekday * 24) + taxi.pickup_hour
taxi['store_and_fwd_flag'] = (taxi.store_and_fwd_flag == 'Y').astype(float)
taxi = taxi.fillna(-1)

X = taxi[features].astype('float32')
y = taxi['total_amount']
X, y = persist(X, y)
_ = wait([X, y])
len(X)
Copy the code

Training random forest

We can train a random forest with just a few lines of code.

The Spark:

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(numTrees=100, maxDepth=10, seed=42)
fitted = rf.fit(X)
Copy the code

Cedar:

from cuml.dask.ensemble import RandomForestRegressor
rf = RandomForestRegressor(n_estimators=100, max_depth=10, seed=42)
_ = rf.fit(X, y)
Copy the code

The results of

We trained a random forest model on 300700143 New York taxi data instances on Spark (CPU) and RAPIDS (GPU) clusters. Both clusters have 20 working nodes at roughly the same price per hour. Here are the results for each part of the workflow:

Task Spark RAPIDS
Load/rowcount 20.6 seconds 25.5 seconds
Feature engineering 54.3 seconds 23.1 seconds
Random forest 36.9 minutes 1.02 seconds

37 minutes of Spark and 1 second of RAPIDS!

GPU victory! Think about it, you won’t have to wait 37 minutes for a fit, which will speed up subsequent iterations and improvements. On cpus, iterations can easily add up to hours or days once you add hyperparametric tuning or test different models.

Do you need to see it to believe it? You can find Notebook here and run the test yourself: github.com/saturncloud…

Do you need a faster random forest

Right! You can enter Dask/RAPIDS with Saturn Cloud in seconds. Saturn handles all the tools infrastructure, security, and deployment challenges and lets you get RAPIDS up and running immediately. Click here for free trial Saturn in your AWS account: manager. AWS. Saturnenterprise. IO/register

The original link: towardsdatascience.com/random-fore…

Welcome to panchuangai blog: panchuang.net/

Sklearn123.com/

Welcome to docs.panchuang.net/