Python Data Preprocessing Using Pandas DataFrame, Spark DataFrame, and Koalas DataFrame

Preparing data for machine learning in Python

Oct 15 ·10min read

With widespread use in data preprocessing, data analytics, and machine learning, Pandas, in conjunction with Numpy, Scikit-Learn, and Matplotlib, becomes a de facto data science stack in Python.

However, one of the major limitations of Pandas is that Pandas was designed for small datasets that can be handled on a single machine and thus it does not scale well to big data. On the contrary, Apache spark was designed for big data, but it has a very different API and also lacks many of the easy-to-use functionality in Pandas for data wrangling and visualization. Recently a new open source Koalas was announced that bridges the gap between Pandas DataFrame and Spark DataFrame by augmenting PySpark’s DataFrame API to make it compatible with pandas DataFrame API.

In this post, similar to the comparison between Pandas DataFrame and Spark DataFrame , I use a public dataset sample_stocks.csv to evaluate and compare the basic functionality of Pandas, Spark, and Koalas DataFrames in typical data preprocessing steps for machine learning, including:

  1. loading data (e.g., load csv file from internet)
  2. exploring data (e.g., summary statistics, data visualization, etc.)
  3. cleaning data (e.g., handle missing data)
  4. transforming data (e.g., features engineering, scaling, reformatting as Numpy array or Spark RDD (Resilient Distributed Dataset))

For convenience, it is assumed that the following Python libraries have been installed on a local machine such as Mac:

  • Anaconda (conda 4.7.10) with Numpy, Pandas, Matplotlib, and Scikit-Learn
  • Spark 2.4.4

1. Loading Data

A dataset (e.g., the public sample_stocks.csv file) needs to be loaded into memory before any data preprocessing can begin. To this end, let’s import the related Python libraries:

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inlinefrom pyspark import SparkContexttry:
    sc = SparkContext('local', 'Pyspark demo')
except ValueError:
    print('SparkContext already exists!')from pyspark.sql import SparkSession
try:
    spark = SparkSession.builder.appName('Recommendation_system').getOrCreate()
except ValueError:
    print('SparkSession already exists!')

1.1 Loading csv File in Pandas

Pandas provides a read_csv () function that can read both local csv file and csv file from internet as a Pandas DataFrame:

pd_df = pd.read_csv("https://raw.githubusercontent.com/databricks/koalas/master/data/sample_stocks.csv")
pd_df.head(1)

1.2 Loading csv File in Spark

In Spark, the SparkSession provides only a method to read from a local csv file or a RDD in memory as a Spark DataFrame. Spark needs to be combined with other Python libraries to read a csv file remotely from the internet.

One way of doing that is to use the URL request and response package to read the contents of the csv file from the internet first and then convert the contents into a Spark RDD for SparkSession to load as a Spark DataFrame.

import urllib.requesturl = "https://raw.githubusercontent.com/databricks/koalas/master/data/sample_stocks.csv"
response = urllib.request.urlopen(url)
data = response.read()
text = data.decode('utf-8')
spark_df1 = spark.read.csv(sc.parallelize(text.splitlines()), header=True)
print(spark_df1.show(1))

The other method is to use Pandas to read the csv file as a Pandas DataFrame first and then use SparkSession to create a Spark DataFrame from Pandas DataFrame.

spark_df2 = spark.createDataFrame(pd.read_csv(url))

1.3 Loading csv File in Koalas

Like Spark, Koalas only provides a method to read from a local csv file. It needs to be combined with other Python libraries to read a csv file from the internet. One simple method is to use Pandas to read the csv file as a Pandas DataFrame first and then convert it into a Koalas DataFrame.

import databricks.koalas as ks
ks_df = ks.from_pandas(pd.read_csv(url))
ks_df.head(1)

2. Exploring Data

Once a dataset is loaded into memory as a DataFrame, we can explore it from different aspects using various functions of the DataFrame.

2.1 Understanding DataFrame Schema

Typically, the first step to explore a DataFrame is to understand its schema: column names and corresponding data types.

The way of obtaining both DataFrame column names and data types is similar for Pandas, Spark, and Koalas DataFrames. All of those DataFrames provide an attribute columns for column names and an attribute dtypes for column data types.

One thing to note is that the data types of Spark DataFrame depend on how the sample public csv file is loaded. The column data types are string type by default if the csv file is loaded by using URL request and response package with Spark, while the column data types are double if the csv file is loaded by using Pandas with Spark.

# DataFrame column names
pandas_column_names = pd_df.columns
spark_column_names  = spark_df1.columns
ks_column_names     = ks_df.columns# DataFrame column data type
pandas_column_data_types = pd_df.dtypes
spark_column_data_types  = spark_df1.dtypes
ks_column_data_types     = ks_df.dtypes

2.2 Obtaining Summary Statistics

Once we understand the schema of a DataFrame, the next step to explore data is to look at the summary statistics such as five number summary . All of the Pandas, Spark, and Koalas DataFrames provide the same function describe () for obtaining such basic summary statistics, including the total number of rows, min , mean , max , and percentile of each of the columns of the DataFrame.

pd_df.describe()

spark_df1.describe()
ks_df.describe()

In addition to the basic summary statistics, the other element of summary statistics is the correlation among different columns in a DataFrame. All of the Pandas, Spark, Koalas DataFrames provide a function corr () to calculate correlation coefficients. The corr () function of Pandas and Koalas DataFrame can handle any number of columns, but the corr () function of Spark DataFrame allows only two columns.

pd_df[['Open','Close', 'Volume']].corr()
ks_df[['Open','Close', 'Volume']].corr()

from pyspark.sql.functions import corr
spark_df2.select(corr("Open","Close")).show()

2.3 Grouping By and Aggregation

Grouping by and aggregation (e.g., min, max, mean, etc.) are another element of summary statistics. Pandas and Koalas DataFrames provide the same function for group by and aggregation:

pd_df.groupby('Symbol').max()['Open']
ks_df.groupby('Symbol').max()['Open']

However, the Spark DataFrame function for group by and aggregation has a different format:

from pyspark.sql.functions import max
spark_df2.groupBy("Symbol").agg(max("Open")).show()

2.4 Visualizing Data

Data visualization is an important and efficient method for understanding data. Both Pandas and Koalas DataFrames provide similar plot functions for data visualization, but the quality of plots can be different significantly. For example, the Koalas DataFrame scatter plot below missed many data points compared with the scatter plot of Pandas DataFrame.

However, Spark DataFrame does not directly provide any data visualization functions. One easy workaround is to convert Spark DataFrame to Pandas or Koalas DataFrame for data visualization.

pd_df_sub = pd_df[['Open', 'Close']]
pd_df_sub.plot.scatter(x='Open', y='Close')

ks_df_sub = ks_df[['Open', 'Close']]
ks_df_sub.plot.scatter(x='Open', y='Close')

The scatter plot function in Pandas and Koalas DataFrame can only handle two columns. The Pandas plotting module provides a scatter_matrix () function that can draw a scatter plot diagram for each pair of any number of columns.

from pandas.plotting import scatter_matrix
pd_df_sub = pd_df[['Open', 'Close', 'Volume']]
scatter_matrix(pd_df_sub, alpha=0.2)

3. Cleaning Data

Two of the major goals of data cleaning are to handle missing data and filter out outliers.

3.1 Handling Missing Data

To demonstrate how to handle missing data, first let’s assign a missing data item (e.g., np.nan) into the Pandas DataFrame:

pd_df_missing = pd_df.copy()
pd_df_missing.loc[0, 'Open'] = np.nan
pd_df_missing[pd_df_missing['Open'].isnull()]

Pandas DataFrame provides a fillna () function that can fill missing data items with any string or number. Spark and Koalas DataFrames provide a similar function, but they only allow a value that matches the data type of the corresponding column.

pd_df_missing.fillna('N/A', inplace = True)
pd_df_missing.fillna(0, inplace = True)# Spark and Koalas allow only a number
ks_df_missing.fillna(0, inplace = True) 
spark_df_missing = spark_df_missing.na.fill(0)

3.2 Filtering Data

Data filtering can be used for removing outliers and many other purposes.

As shown in the sample code below, Pandas and Koalas DataFrames have the same API for conditionally selecting data rows and columns. However, the Spark DataFrame has a different API.

pd_df.loc[pd_df['Open'] >= 168, ['Open','Close']].head()
ks_df.loc[ks_df['Open'] >= 168, ['Open','Close']].head()
spark_df.filter("Open > 168").select("Open","Close").show(5)

4. Transforming Data for Features Engineering

Features engineering can be fundamental to the application of machine learning and it is achieved by various types of data transformation. A feature is a data column in DataFrame. The scope of features engineering varies, but typically includes the following:

  • Select a subset of existing data columns that are correlated with the prediction target in machine learning (i.e., labels in supervised machine learning)
  • Rename an existing column with a more meaningful name
  • Create new columns based on existing columns (i.e., create derived features)
  • Scale column values into a certain range (i.e., scaling column values into the range of [0,1] or [-1,1] in deep learning)

4.1 Selecting Columns

As described before, Pandas and Koalas DataFrames provide the same method for selecting columns, but Spark DataFrame provides a different API.

pd_df[['Open', 'Close']]
ks_df[['Open', 'Close']]
spark_df.select('Open', 'Close')

4.2 Renaming Columns

Pandas and Spark DataFrames use different function names with similar functionality of renaming columns.

pd_df.rename(columns = {"Symbol": "SYMBOL"}, inplace = True)
spark_df = spark_df.withColumnRenamed("Symbol", "SYMBOL")

However, the current Koalas DataFrame does not support the functionality of renaming columns.

4.3 Creating New Columns

It’s normally necessary to create new data columns from existing columns for machine learning. For example, a column of categorical data type needs to be converted into new columns of numerical data types using one-hot encoding:

pd_df_dummies = pd.get_dummies(pd_df)
ks_df_dummies = ks.get_dummies(ks_df)

However, Spark DataFrame does not provide such a function. This limitation is avoided by the Koalas DataFrame.

4.4 Scaling Columns

As described before, it’s necessary to scale the values of columns into a certain range (e.g., [0,1 or [-1,1]) in machine learning if different columns have values in very different ranges. For Pandas DataFrame, scikit-learn library provides two frequently used functions MinMaxScaler () and StandardScaler () for this purpose.

However, these functions cannot directly apply to Koalas DataFrame. A Koalas DataFrame needs to be converted into Pandas DataFrame to take advantage of those functions.

Scaling columns can be done for Spark DataFrame, but the implementation can be much more involved compared with using scikit-learn functions for Pandas DataFrame. As an example, similar to the Spark data scaling example, the following code uses the Spark MinMaxScaler , VectorAssembler , and Pipeline objects to scale Spark DataFrame columns:

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=['Open'], outputCol="Open_Vect")
scaler = MinMaxScaler(inputCol="Open_Vect", outputCol="Open_Scaled")# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])# Fitting pipeline on dataframe
spark_df3 = pipeline.fit(spark_df2).transform(spark_df2).withColumn("Open_Scaled", unlist("Open_Scaled")).drop("Open_Vect")
spark_df3.show(1)

4.5 Reformatting DataFrame for Machine Learning

The final step of data preprocessing is to convert a DataFrame into an appropriate format for the consumption of machine learning modeling, depending on the machine learning library in use.

If a Numpy-based machine learning or deep learning library (i.e., scikit-learn, Keras, etc.) is used, then a DataFrame needs to be converted into a Numpy array for modeling. The Pandas DataFrame provides a values attribute to get a NumPy array from a Pandas DataFrame. But the current Koalas DataFrame does not support such a method. A Spark or Koalas DataFrame can be converted into a Pandas DataFrame as follows to obtain a corresponding Numpy array easily if the dataset can be handled on a single machine.

pd_df_from_koalas = ks_df.to_pandas()
pd_df_from_spark = spark_df.toPandas()

If a Spark-based machine learning library like MLlib is used, then a DataFrame needs to be converted into either RDD or Spark DataFrame. A Spark DataFrame can be directly fed into an appropriately designed MLlib pipeline (see MLlib for example) or be converted into RDD if the RDD-based MLlib API is used for modeling. The Spark DataFrame provides an rdd attribute to return an RDD. In this case, a Pandas or Koalas DataFrame needs to be converted into a Spark DataFrame first for modeling purposes. This can be achieved as follows:

spark_df_from_pandas = spark.createDataFrame(pd_df)
spark_df_from_koalas = ks_df.to_spark()

Summary

As described in the Koalas announcement , data scientists tend to use Pandas DataFrame to explore data. They are reluctant to use Spark DataFrame due to the sharp learning curve. Koalas seems to fill the gap between them by providing an easy-to-use API similar to Pandas DataFrame that can run on Spark.

In this post, as shown in the summary table below, I use a public dataset sample_stocks.csv to evaluate and compare the basic functionality of Pandas, Spark, and Koalas DataFrames in typical data preprocessing tasks for machine learning.

Koalas is still in the early stage of development. As shown in the table above, it does not support some of the basic functions of data preprocessing. Certain supported functions are not yet matured. With the advance of development, many potential advantages of Koalas’ easy-to-use API for data transformation and visualization on Spark will start to shine in the case of large-scale datasets (e.g., hundreds millions of data records).

我来评几句
登录后评论

已发表评论数()

相关站点

热门文章