26 Nov 2016

Spark and R

Spark and R

Spark

Apache Spark is a fast and general engine for large-scale data processing as mentioned on the official project’s page. For many it is considered to be the successor of the popular Hadoop engine.

Spark has considerably increased in popularity in the last years since:

  • It is easy to implement with existing technologies. It can run on:
    • Hadoop
    • Mesos
    • Standalone
    • In the cloud
  • It can also access diverse data sources including:
    • HDFS
    • Cassandra
    • HBase
    • S3
  • It runs programs up to 100x faster than Hadoop MapReduce in RAM or 10x faster on disk.
  • It is implemented in a plethora of languages like:
    • Java
    • Scala
    • Python
    • R

R

R is a statistical programming language (also considered a general purpose language after latest developments) and it is one of the languages used to run Spark for statistical / data analysis.

In this course we will explore the two main R packages used to run data analysis on Spark:

  • SparkR - natively included in Spark after version 1.6.2
  • sparklyr - developed by RStudio

Installation of Spark and Hadoop and versions

Versions and Spark Mode

I spent a considerable amount of time, trying to find out exactly which combination of Hadoop and spark would work with both SparkR and sparklyr (these are very new packages and there are still some issues, especially with the recent update of Spark to version 2.0.1). According to my research Spark version 2.0.1 (the most recent at the time of writing) and Hadoop version 2.4 seem to work perfect with both SparkR and sparklyr. For the rest of the course I will be using this combination of Spark and Hadoop.

I used the standalone mode of Spark and I am also working on Windows 10.

#details of my session
sessionInfo()
## R version 3.3.0 (2016-05-03)
## Platform: x86_64-w64-mingw32/x64 (64-bit)
## Running under: Windows 10 x64 (build 14393)
## 
## locale:
## [1] LC_COLLATE=English_United Kingdom.1252 
## [2] LC_CTYPE=English_United Kingdom.1252   
## [3] LC_MONETARY=English_United Kingdom.1252
## [4] LC_NUMERIC=C                           
## [5] LC_TIME=English_United Kingdom.1252    
## 
## attached base packages:
## [1] stats     graphics  grDevices utils     datasets  methods   base     
## 
## loaded via a namespace (and not attached):
##  [1] magrittr_1.5       assertthat_0.1     formatR_1.4       
##  [4] tools_3.3.0        htmltools_0.3.5    yaml_2.1.13       
##  [7] tibble_1.1         Rcpp_0.12.5        stringi_1.1.1     
## [10] rmarkdown_1.0.9016 knitr_1.14         stringr_1.0.0     
## [13] digest_0.6.10      evaluate_0.9

Installation of Spark and Hadoop

The easiest way to install Spark Standalone and Hadoop is through sparklyr::spark_install:

#install Spark 2.0.1 and Hadoop 2.4
#sparklyr version 0.4.22 
library(sparklyr)
spark_install(version = '2.0.1', hadoop_version = '2.4')

After the download and installation of Spark and Hadoop (this might take a while) we are ready to start using SparkR and sparklyr.

For clarity your spark and hadoop versions by default will be installed in:

C:\Users\<your user>\AppData\Local\rstudio\spark\Cache\spark-2.0.1-bin-hadoop2.4

RStudio

SparkR is native to Spark since version 1.6.2 and can be immediately used through the command line. sparklyr can be used after loading the package sparklyr. However, most users (including myself) like working on an IDE (this is why we have IDEs, right?) like RStudio. In fact the RStudio preview Release includes a toolset for creating / managing spark connections, browsing Spark DataFrames and has specifically been designed to work with Spark.

For the rest of the course we will be using the RStudio preview release which can be downloaded for free in the link above.

Installation of sparklyr and SparkR

sparklyr is readily available on CRAN so running install.packages('sparklyr') would be enough. Things are slightly more complicated for SparkR since it has to be downloaded and installed from github. Nothing too difficult though:

#you will need the devtools package to install packages from github
#so install.packages('devtools') if you don't have it
library(devtools)
install_github('apache/spark@v2.0.1', subdir='R/pkg')

Remember that working through the command line SparkR can be done simply by visiting C:\Users\<your user>\AppData\Local\rstudio\spark\Cache\spark-2.0.1-bin-hadoop2.4\bin\ and double clicking sparkR.exe or simple by running sparkR on windows cmd if the above path is part of the environment variable %path%.

And that would be enough to have both sparklyr and SparkR.

SparkR

Our first task would be to connect RStudio to spark using SparkR.

To do this we need first need to set the SPARK_HOME environment variable (don’t worry we will do it through R and it will be super easy) and then connect to spark through spark.session:

#lots of base R functions will be masked so please use SparkR only for working with Spark
#remember to use your own username instead of teoboot which is mine
#this sets the SPARK_HOME environment variable
Sys.setenv(SPARK_HOME = "C:/Users/teoboot/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.4")

#SparkR version 2.0.1
library(SparkR)
## 
## Attaching package: 'SparkR'
## The following objects are masked from 'package:stats':
## 
##     cov, filter, lag, na.omit, predict, sd, var, window
## The following objects are masked from 'package:base':
## 
##     as.data.frame, colnames, colnames<-, drop, endsWith,
##     intersect, rank, rbind, sample, startsWith, subset, summary,
##     transform, union
#start a spark R session in SparkR.
#by not setting the SPARK_HOME environment variable sparkR.session will attempt to download it
sc1 <- sparkR.session()
## Spark package found in SPARK_HOME: C:/Users/teoboot/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.4
## Launching java with spark-submit command C:/Users/teoboot/AppData/Local/rstudio/spark/Cache/spark-2.0.1-bin-hadoop2.4/bin/spark-submit2.cmd   sparkr-shell C:\Users\teoboot\AppData\Local\Temp\RtmpmC7JaR\backend_port88588c3803

For the following analysis we will be using the popular flights from the nycflights13 package.

Firstly, typical data frame operations in local R would involve subseting columns, rows, grouping and aggregating. Let’s see how we can do this in SparkR:

#load the package in order to use flights
library(nycflights13)
## Warning: package 'nycflights13' was built under R version 3.3.1

In order to create a spark dataframe we use createDataFrame:

df <- createDataFrame(flights)

In order to subset columns from flights we use select:

head(select(df, df$air_time, df$distance)) 
##   air_time distance
## 1      227     1400
## 2      227     1416
## 3      160     1089
## 4      183     1576
## 5      116      762
## 6      150      719

Another way would be to use the [ operator as we would for a local dataframe:

head(df[, c('air_time', 'distance')])
##   air_time distance
## 1      227     1400
## 2      227     1416
## 3      160     1089
## 4      183     1576
## 5      116      762
## 6      150      719

In order to subset rows we use filter:

head(filter(df, df$distance > 3000))
##   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
## 1 2013     1   1      857            900        -3     1516           1530
## 2 2013     1   1     1344           1344         0     2005           1944
## 3 2013     1   2      909            900         9     1525           1530
## 4 2013     1   2     1344           1344         0     1940           1944
## 5 2013     1   3      914            900        14     1504           1530
## 6 2013     1   3     1418           1341        37     2006           1935
##   arr_delay carrier flight tailnum origin dest air_time distance hour
## 1       -14      HA     51  N380HA    JFK  HNL      659     4983    9
## 2        21      UA     15  N76065    EWR  HNL      656     4963   13
## 3        -5      HA     51  N380HA    JFK  HNL      638     4983    9
## 4        -4      UA     15  N77066    EWR  HNL      634     4963   13
## 5       -26      HA     51  N380HA    JFK  HNL      616     4983    9
## 6        31      UA     15  N76064    EWR  HNL      628     4963   13
##   minute           time_hour
## 1      0 2013-01-01 09:00:00
## 2     44 2013-01-01 13:00:00
## 3      0 2013-01-02 09:00:00
## 4     44 2013-01-02 13:00:00
## 5      0 2013-01-03 09:00:00
## 6     41 2013-01-03 13:00:00

Another way would be to use the [ operator as we would in a local dataframe but please note that subseting the rows of a data.frame with indices would not work i.e. df[1:100], ] would not work.

head(df[df$distance > 3000, ])
##   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
## 1 2013     1   1      857            900        -3     1516           1530
## 2 2013     1   1     1344           1344         0     2005           1944
## 3 2013     1   2      909            900         9     1525           1530
## 4 2013     1   2     1344           1344         0     1940           1944
## 5 2013     1   3      914            900        14     1504           1530
## 6 2013     1   3     1418           1341        37     2006           1935
##   arr_delay carrier flight tailnum origin dest air_time distance hour
## 1       -14      HA     51  N380HA    JFK  HNL      659     4983    9
## 2        21      UA     15  N76065    EWR  HNL      656     4963   13
## 3        -5      HA     51  N380HA    JFK  HNL      638     4983    9
## 4        -4      UA     15  N77066    EWR  HNL      634     4963   13
## 5       -26      HA     51  N380HA    JFK  HNL      616     4983    9
## 6        31      UA     15  N76064    EWR  HNL      628     4963   13
##   minute           time_hour
## 1      0 2013-01-01 09:00:00
## 2     44 2013-01-01 13:00:00
## 3      0 2013-01-02 09:00:00
## 4     44 2013-01-02 13:00:00
## 5      0 2013-01-03 09:00:00
## 6     41 2013-01-03 13:00:00

In order to group and aggregate we use summarize and groupBY:

grouped_df <- groupBy(df, df$origin)
df2 <- summarize(grouped_df, 
                 mean = mean(df$distance), 
                 count = n(df$origin),
                 sum = sum(df$distance))
head(df2)
##   origin      mean  count       sum
## 1    LGA  779.8357 104662  81619161
## 2    EWR 1056.7428 120835 127691515
## 3    JFK 1266.2491 111279 140906931

In order to sort the data frame we use arrange:

#head(arrange(df2, desc(df2$mean))) for sorting in descending order or
head(arrange(df2, df2$mean)) 
##   origin      mean  count       sum
## 1    LGA  779.8357 104662  81619161
## 2    EWR 1056.7428 120835 127691515
## 3    JFK 1266.2491 111279 140906931

However, the above processes can be easily combined with the pipe operator %>% from the magrittr package. In this way we can work in a similar way to dplyr, ggvis, tableHTML or other. The above process would be written like this:

library(magrittr)
df3 <- df %>%
        group_by(df$origin)%>%
        summarize(mean = mean(df$distance), 
                  count = n(df$origin),
                  sum = sum(df$distance)) 

arrange(df3, df3$mean) %>% 
 head
##   origin      mean  count       sum
## 1    LGA  779.8357 104662  81619161
## 2    EWR 1056.7428 120835 127691515
## 3    JFK 1266.2491 111279 140906931

And as you can see we get the same results.

SparkR also supports the use of SQL commands by registering a table as a sql table:

#create a sql table
createOrReplaceTempView(df, "sql_df")

#query sql table using sql syntax
df4 <- sql("SELECT * FROM sql_df WHERE distance > 4800")
head(df4)
##   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
## 1 2013     1   1      857            900        -3     1516           1530
## 2 2013     1   1     1344           1344         0     2005           1944
## 3 2013     1   2      909            900         9     1525           1530
## 4 2013     1   2     1344           1344         0     1940           1944
## 5 2013     1   3      914            900        14     1504           1530
## 6 2013     1   3     1418           1341        37     2006           1935
##   arr_delay carrier flight tailnum origin dest air_time distance hour
## 1       -14      HA     51  N380HA    JFK  HNL      659     4983    9
## 2        21      UA     15  N76065    EWR  HNL      656     4963   13
## 3        -5      HA     51  N380HA    JFK  HNL      638     4983    9
## 4        -4      UA     15  N77066    EWR  HNL      634     4963   13
## 5       -26      HA     51  N380HA    JFK  HNL      616     4983    9
## 6        31      UA     15  N76064    EWR  HNL      628     4963   13
##   minute           time_hour
## 1      0 2013-01-01 09:00:00
## 2     44 2013-01-01 13:00:00
## 3      0 2013-01-02 09:00:00
## 4     44 2013-01-02 13:00:00
## 5      0 2013-01-03 09:00:00
## 6     41 2013-01-03 13:00:00

There will be cases where we would want to collect our data sets from spark and work on them locally. This can be done using collect.

#local_df will be a data frame on our local R instance
local_df <- collect(df3)
local_df[1:2, ]
##   origin      mean  count       sum
## 1    LGA  779.8357 104662  81619161
## 2    EWR 1056.7428 120835 127691515

Machine Learning - SparkR

It is essential for our work to be able to do machine learning on a data set. SparkR offers this capability through MLlib.

By continuing to use the flights data set we will predict distance.

First of all we need to split our data set into a train and a test set with the randomSplit function.

#split into train and test - 20% for test and 80% for train
df_list <- randomSplit(df, c(20, 80))

#test
test <- df_list[[1]]
nrow(test)
## [1] 67637
#train
train <- df_list[[2]]
nrow(train)
## [1] 269139
#validation - nrow(test) + nrow(train) == nrow(df)
nrow(df)
## [1] 336776

Now that we have our train and test sets we are ready to run our linear model. We will train our model on the train set and then predict on our test set. We will try to predict distance based on air_time for this simple example.

Notice that glm cannot handle NAs that might exist within the DataFrame which will make the function crash with an uninformative error. Make sure you remove NAs beforehand with na.omit.

#run model
train <- na.omit(train)
my_mod <- glm(distance ~ air_time, data = train, family = 'gaussian')

Let’s check the model coefficients:

#summary
summary(my_mod)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##      Min        1Q    Median        3Q       Max  
## -1103.05    -50.84      3.67     52.39    713.62  
## 
## Coefficients:
##              Estimate  Std. Error  t value  Pr(>|t|)
## (Intercept)  -124.31   0.37142     -334.69  0       
## air_time     7.7824    0.002093    3718.2   0       
## 
## (Dispersion parameter for gaussian family taken to be 10067.37)
## 
##     Null deviance: 1.4181e+11  on 261570  degrees of freedom
## Residual deviance: 2.6333e+09  on 261569  degrees of freedom
## AIC: 3153224
## 
## Number of Fisher Scoring iterations: 1

Now we need to make the predictions on our test set in order to assess the goodness of fit. glm or SparkR do not provide any metrics or functions to assess the goodness of fit. These need to be created by the user like the calculation of the MSE below:

#predict
#omit the NAs manually here otherwise spark will crash without
#an informative error
test <- na.omit(test)
preds <- predict(my_mod, newData = test)

#predictions and actual distance
head(select(preds, 'distance', 'prediction'))
##   distance prediction
## 1      719  1043.0447
## 2      229   288.1558
## 3     1598  1292.0802
## 4      187   194.7675
## 5      301   365.9794
## 6     2434  2832.9874
#add squared residuals using transform
sq_resid <- transform(preds, sq_residuals = (preds$distance - preds$prediction)^2)

#calculate MSE and collect locally - it is only a number
MSE <- collect(summarize(sq_resid, mean = mean(sq_resid$sq_residuals)))$mean

#RMSE
sqrt(MSE) 
## [1] 100.6558

Lastly, SparkR seems to offer just four machine learning algorithms at the time of writting:

  • Generalised Linear Models
  • kmeans
  • Naive Bayes
  • Survival Regression Model

Sparklyr

library(sparklyr)
library(dplyr)
## Warning: package 'dplyr' was built under R version 3.3.1
## 
## Attaching package: 'dplyr'
## The following objects are masked from 'package:SparkR':
## 
##     arrange, between, collect, contains, count, cume_dist,
##     dense_rank, desc, distinct, explain, filter, first, group_by,
##     intersect, lag, last, lead, mutate, n, n_distinct, ntile,
##     percent_rank, rename, row_number, sample_frac, select, sql,
##     summarize, union
## The following objects are masked from 'package:stats':
## 
##     filter, lag
## The following objects are masked from 'package:base':
## 
##     intersect, setdiff, setequal, union
#make a connection
sc2 <- spark_connect('local', version = '2.0.1', hadoop_version = '2.4', config = list())

Notice, that when loading sparklyr it will mask many base (or SparkR functions if you have SparkR loaded) functions.

Now we will try to do the same process using the sparklyr package.

In sparklyr copy_to will transfer the flights data set to our clusters in the same way that createDataFrame did in SparkR.

#load nycflights13 for flights data set
library(nycflights13)

#pass to spark
df <- copy_to(sc2, flights)

sparklyr uses dplyr for all data manipulation processes using the usual dplyr verbs.

In order to select specific columns we use sparklyr::select in the same way we would in dplyr:

head(select(df, distance, origin))
## Source:   query [?? x 2]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##   distance origin
##      <dbl>  <chr>
## 1     1400    EWR
## 2     1416    LGA
## 3     1089    JFK
## 4     1576    JFK
## 5      762    LGA
## 6      719    EWR

Notice that unlike SparkR, df[, 'origin'] would not work on sparklyr.

In a similar way filter is used to subset rows from df:

head(filter(df, arr_delay > 30))
## Source:   query [?? x 19]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##    year month   day dep_time sched_dep_time dep_delay arr_time
##   <int> <int> <int>    <int>          <int>     <dbl>    <int>
## 1  2013     1     1      542            540         2      923
## 2  2013     1     1      559            600        -1      941
## 3  2013     1     1      608            600         8      807
## 4  2013     1     1      635            635         0     1028
## 5  2013     1     1      702            700         2     1058
## 6  2013     1     1      724            730        -6     1111
## # ... with 12 more variables: sched_arr_time <int>, arr_delay <dbl>,
## #   carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
## #   air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>,
## #   time_hour <dbl>

Notice again that unlike SparkR, df[distance > 3000, ] would not work on sparklyr.

Also, for sparklyr (as with dplyr) only the column name is required (unquoted) for subsetting df, whereas for SparkR we use df$column_name. To see the difference clearly, I add the two below:

SparkR::filter(df, df$distance > 3000)
sparklyr::filter(df, distance > 3000)

Grouping and aggregation in sparklyr is done in the exact same way as R users would do it in dplyr locally. dplyr promotes (and includes) the use of the pipe operator from magrittr (%>%). Therefore, even though we can perform a step by step group and aggregation process (remember we started like this with SparkR before we started using the pipe operator as well) we will use directly the chaining method (i.e. using pipe) which is what dplyr users are familiar with anyway.

#group and summurise
df_aggr <- 
  df %>%
   group_by(origin) %>%
   summarise(mean = mean(distance),
             count = n(),
             sum = sum(distance))
head(df_aggr)
## Source:   query [?? x 4]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##   origin      mean  count       sum
##    <chr>     <dbl>  <dbl>     <dbl>
## 1    LGA  779.8357 104662  81619161
## 2    EWR 1056.7428 120835 127691515
## 3    JFK 1266.2491 111279 140906931

You can see that in terms of selecting columns, subsetting rows, grouping and aggregating, SparkR and sparklyr have many similarities. Notice again that for sparklyr we only specify column names instead of the full df$column_name syntax. Also, function n does not take any arguments. The results are consistent between the two processes.

In a similar way, in order to sort df we will still use dplyr::arrange:

arrange(df_aggr, desc(mean)) %>% 
  head
## Source:   query [?? x 4]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##   origin      mean  count       sum
##    <chr>     <dbl>  <dbl>     <dbl>
## 1    JFK 1266.2491 111279 140906931
## 2    EWR 1056.7428 120835 127691515
## 3    LGA  779.8357 104662  81619161

sparklyr also supports sql in the same way as SparkR using the function dbGetQuery. dbGetQuery comes from the DBI packge.

library(DBI)
## Warning: package 'DBI' was built under R version 3.3.2
df_sql <- dbGetQuery(sc2, "SELECT * FROM flights WHERE distance > 4800 LIMIT 5")
df_sql
##   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
## 1 2013     1   1      857            900        -3     1516           1530
## 2 2013     1   1     1344           1344         0     2005           1944
## 3 2013     1   2      909            900         9     1525           1530
## 4 2013     1   2     1344           1344         0     1940           1944
## 5 2013     1   3      914            900        14     1504           1530
##   arr_delay carrier flight tailnum origin dest air_time distance hour
## 1       -14      HA     51  N380HA    JFK  HNL      659     4983    9
## 2        21      UA     15  N76065    EWR  HNL      656     4963   13
## 3        -5      HA     51  N380HA    JFK  HNL      638     4983    9
## 4        -4      UA     15  N77066    EWR  HNL      634     4963   13
## 5       -26      HA     51  N380HA    JFK  HNL      616     4983    9
##   minute time_hour
## 1      0      2013
## 2     44      2013
## 3      0      2013
## 4     44      2013
## 5      0      2013

Notice that dbGetQuery is used on the name of the object copy_to was applied on i.e. flights and not df. On RStudio under the Environment tab you should be able to see the flights data set. The data sources you see there are the ones you can actually access with sql.

Again in the same way as SparkR, dplyr::collect will bring a df back from spark to a local dataframe.

local_df <- collect(df_aggr)
local_df[1:2, ]
## # A tibble: 2 x 4
##   origin      mean  count       sum
##    <chr>     <dbl>  <dbl>     <dbl>
## 1    LGA  779.8357 104662  81619161
## 2    EWR 1056.7428 120835 127691515

Machine Learning - sparklyr

It is sparklyr’s turn to see how we can use its functions to do machine learning. We didn’t see many differences between the two packages in terms of data manipulation but we will see considerable differences in machine learning.

Let’s start again by splitting our data set into a train and a test set in the same way that we did with SparkR. To do this we use sdf_partition:

partitions <- sdf_partition(df, training = 0.8, test = 0.2, seed = 1099)
count(df)
## Source:   query [?? x 1]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##        n
##    <dbl>
## 1 336776
train <- partitions$training
count(train)
## Source:   query [?? x 1]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##        n
##    <dbl>
## 1 269145
test <- partitions$test
count(test)
## Source:   query [?? x 1]
## Database: spark connection master=local app=sparklyr local=TRUE
## 
##       n
##   <dbl>
## 1 67631

dplyr does not have an nrow function but we can easily use count to make sure that our data set was split properly.

Unlike SparkR, sparklyr offers a plethora of models or machine learning techniques to choose from including:

  • ml_als_factorization
  • ml_binary_classification_eval
  • ml_classification_eval
  • ml_create_dummy_variables
  • ml_decision_tree
  • ml_generalized_linear_regression
  • ml_gradient_boosted_trees
  • ml_kmeans
  • ml_lda
  • ml_linear_regression
  • ml_load
  • ml_logistic_regression
  • ml_model
  • ml_multilayer_perceptron
  • ml_naive_bayes
  • ml_one_vs_rest
  • ml_options
  • ml_pca
  • ml_prepare_dataframe
  • ml_prepare_features
  • ml_prepare_response_features_intercept
  • ml_random_forest
  • ml_save
  • ml_survival_regression
  • ml_tree_feature_importance

Now that we have our training and test sets we are ready to run our regression model.

fit <- ml_linear_regression(train, response = "distance", features = "air_time")
## * Dropped 7516 rows with 'na.omit' (269145 => 261629)
fit
## Call: ml_linear_regression(train, response = "distance", features = "air_time")
## 
## Coefficients:
## (Intercept)    air_time 
##  -123.97955     7.77944
#summary 
summary(fit)
## Call: ml_linear_regression(train, response = "distance", features = "air_time")
## 
## Deviance Residuals: (approximate):
##      Min       1Q   Median       3Q      Max 
## -696.632  -51.992    4.581   53.404  699.376 
## 
## Coefficients:
##                Estimate  Std. Error t value  Pr(>|t|)    
## (Intercept) -1.2398e+02  3.7156e-01 -333.67 < 2.2e-16 ***
## air_time     7.7794e+00  2.0937e-03 3715.64 < 2.2e-16 ***
## ---
## Signif. codes:  0 '***' 0.001 '**' 0.01 '*' 0.05 '.' 0.1 ' ' 1
## 
## R-Squared: 0.9814
## Root Mean Squared Error: 100.4

ml_linear_regression seems to be a better alternative to SparkR::glm since it can handle NAs without having to run na.omit before training the model (SparkR does not return a meaningful error either in case you forget to remove NAs). It also provides an R squared and RMSE by default which gives a first idea about the goodness of fit.

Of course, although ml_linear_regression provides an R squared and an RMSE we need to check those on the test set in order to assess the goodness of fit properly. We will do this in a similar way as we did in SparkR.

We will use na.omit in this case because we know some values in distance have missing values and we want to make sure predictions are correct. Also, make sure to use sdf_predict instead of predict because the latter will also collect and return an atomic vector.

mutate is used instead of transform in sparklyr to add a column.

#add squared residuals using transform
test <- na.omit(test)
## * Dropped 1914 rows with 'na.omit' (67631 => 65717)
#calculate predictions - sdf_predict will add predictions as a column
preds <- sdf_predict(fit, data = test)

#add squared residuals in preds
sq_resid <- mutate(preds, sq_residuals = (distance - prediction)^2)

#calculate MSE and collect locally
MSE <- collect(summarise(sq_resid, mean = mean(sq_residuals)))$mean

#RMSE
sqrt(MSE) 
## [1] 100.3887

Notes

Having used both SparkR and spalklyr in the above course I can say that I found sparklyr to be easier to work with (this is only a preference) and actually considerably faster (although this is on standalone mode) in comparison to SparkR. Also, I used spaklyr to install Hadoop and Spark which might also be a reason for the speed difference.

However, the fact is that spakrlyr in terms of data manipulation uses the exact same verbs as dplyr (which is a very popular package) so the learning curve might be easier. In terms of machine learning sparklyr is unquestionably more advanced since it has implemented a lot more algorithms compared to SparkR and they have done a great job making the functions very easy to use.

Also, sparklyr’s functions are documented in R’s help (i.e. using ?function_name will work), as opposed to SparkR which doesn’t have its functions documented in R at the time of writing (remember it was downloaded from github and not CRAN). Documentation can be found online though.

H2O

sparklyr is actually compatible with H2O. Unfortunately, it is only compatible with Spark 1.6 because Spark 2.0 just came out. It is in the pipeline to make it compatible with Spark 2.0. If you are using spark 1.6 and want to find out more about sparklyr and H2O you can have a look at this link.

Github

The code for this course is hosted on github.

Thanks for reading!


Tags:
Stats:
0 comments