weighted quantile summaries, energy iteration clustering, spark_write_rds(), and extra


Sparklyr 1.6 is now obtainable on CRAN!

To put in sparklyr 1.6 from CRAN, run

On this weblog publish, we will spotlight the next options and enhancements from sparklyr 1.6:

Weighted quantile summaries

Apache Spark is well-known for supporting approximate algorithms that commerce off marginal quantities of accuracy for better pace and parallelism. Such algorithms are significantly helpful for performing preliminary information explorations at scale, as they permit customers to rapidly question sure estimated statistics inside a predefined error margin, whereas avoiding the excessive value of tangible computations. One instance is the Greenwald-Khanna algorithm for on-line computation of quantile summaries, as described in Greenwald and Khanna (2001). This algorithm was initially designed for environment friendly (epsilon)– approximation of quantiles inside a big dataset with out the notion of knowledge factors carrying completely different weights, and the unweighted model of it has been applied as approxQuantile() since Spark 2.0. Nonetheless, the identical algorithm may be generalized to deal with weighted inputs, and as sparklyr person @Zhuk66 talked about in this subject, a weighted model of this algorithm makes for a helpful sparklyr function.

To correctly clarify what weighted-quantile means, we should make clear what the load of every information level signifies. For instance, if we’ve a sequence of observations ((1, 1, 1, 1, 0, 2, -1, -1)), and wish to approximate the median of all information factors, then we’ve the next two choices:

  • Both run the unweighted model of approxQuantile() in Spark to scan by way of all 8 information factors

  • Or alternatively, “compress” the information into 4 tuples of (worth, weight): ((1, 0.5), (0, 0.125), (2, 0.125), (-1, 0.25)), the place the second element of every tuple represents how usually a worth happens relative to the remainder of the noticed values, after which discover the median by scanning by way of the 4 tuples utilizing the weighted model of the Greenwald-Khanna algorithm

We are able to additionally run by way of a contrived instance involving the usual regular distribution as an example the facility of weighted quantile estimation in sparklyr 1.6. Suppose we can’t merely run qnorm() in R to judge the quantile perform of the usual regular distribution at (p = 0.25) and (p = 0.75), how can we get some obscure concept concerning the 1st and third quantiles of this distribution? A method is to pattern a lot of information factors from this distribution, after which apply the Greenwald-Khanna algorithm to our unweighted samples, as proven under:


sc <- spark_connect(grasp = "native")

num_samples <- 1e6
samples <- information.body(x = rnorm(num_samples))

samples_sdf <- copy_to(sc, samples, title = random_string())

samples_sdf %>%
    column = "x",
    chances = c(0.25, 0.75),
    relative.error = 0.01
  ) %>%
##        25%        75%
## -0.6629242  0.6874939

Discover that as a result of we’re working with an approximate algorithm, and have specified relative.error = 0.01, the estimated worth of (-0.6629242) from above could possibly be wherever between the twenty fourth and the twenty sixth percentile of all samples. In actual fact, it falls within the (25.36896)-th percentile:

## [1] 0.2536896

Now how can we make use of weighted quantile estimation from sparklyr 1.6 to acquire related outcomes? Easy! We are able to pattern a lot of (x) values uniformly randomly from ((-infty, infty)) (or alternatively, simply choose a lot of values evenly spaced between ((-M, M)) the place (M) is roughly (infty)), and assign every (x) worth a weight of (displaystyle frac{1}{sqrt{2 pi}}e^{-frac{x^2}{2}}), the usual regular distribution’s chance density at (x). Lastly, we run the weighted model of sdf_quantile() from sparklyr 1.6, as proven under:


sc <- spark_connect(grasp = "native")

num_samples <- 1e6
M <- 1000
samples <- tibble::tibble(
  x = M * seq(-num_samples / 2 + 1, num_samples / 2) / num_samples,
  weight = dnorm(x)

samples_sdf <- copy_to(sc, samples, title = random_string())

samples_sdf %>%
    column = "x",
    weight.column = "weight",
    chances = c(0.25, 0.75),
    relative.error = 0.01
  ) %>%
##    25%    75%
## -0.696  0.662

Voilà! The estimates aren’t too far off from the twenty fifth and seventy fifth percentiles (in relation to our abovementioned most permissible error of (0.01)):

## [1] 0.2432144
## [1] 0.7460144

Energy iteration clustering

Energy iteration clustering (PIC), a easy and scalable graph clustering technique offered in Lin and Cohen (2010), first finds a low-dimensional embedding of a dataset, utilizing truncated energy iteration on a normalized pairwise-similarity matrix of all information factors, after which makes use of this embedding because the “cluster indicator,” an intermediate illustration of the dataset that results in quick convergence when used as enter to k-means clustering. This course of could be very nicely illustrated in determine 1 of Lin and Cohen (2010) (reproduced under)

through which the leftmost picture is the visualization of a dataset consisting of three circles, with factors coloured in purple, inexperienced, and blue indicating clustering outcomes, and the following pictures present the facility iteration course of regularly reworking the unique set of factors into what seems to be three disjoint line segments, an intermediate illustration that may be quickly separated into 3 clusters utilizing k-means clustering with (okay = 3).

In sparklyr 1.6, ml_power_iteration() was applied to make the PIC performance in Spark accessible from R. It expects as enter a 3-column Spark dataframe that represents a pairwise-similarity matrix of all information factors. Two of the columns on this dataframe ought to comprise 0-based row and column indices, and the third column ought to maintain the corresponding similarity measure. Within the instance under, we are going to see a dataset consisting of two circles being simply separated into two clusters by ml_power_iteration(), with the Gaussian kernel getting used because the similarity measure between any 2 factors:

gen_similarity_matrix <- perform() {
  # Guassian similarity measure
  guassian_similarity <- perform(pt1, pt2) {
    exp(-sum((pt2 - pt1) ^ 2) / 2)
  # generate evenly distributed factors on a circle centered on the origin
  gen_circle <- perform(radius, num_pts) {
    seq(0, num_pts - 1) %>%
        perform(idx) {
          theta <- 2 * pi * idx / num_pts
          radius * c(x = cos(theta), y = sin(theta))
  # generate factors on each circles
  pts <- rbind(
    gen_circle(radius = 1, num_pts = 80),
    gen_circle(radius = 4, num_pts = 80)
  # populate the pairwise similarity matrix (saved as a 3-column dataframe)
  similarity_matrix <- information.body()
  for (i in seq(2, nrow(pts)))
    similarity_matrix <- similarity_matrix %>%
      rbind(seq(i - 1L) %>%
        purrr::map_dfr(~ checklist(
          src = i - 1L, dst = .x - 1L,
          similarity = guassian_similarity(pts[i,], pts[.x,])



sc <- spark_connect(grasp = "native")
sdf <- copy_to(sc, gen_similarity_matrix())
clusters <- ml_power_iteration(
  sdf, okay = 2, max_iter = 10, init_mode = "diploma",
  src_col = "src", dst_col = "dst", weight_col = "similarity"

clusters %>% print(n = 160)
## # A tibble: 160 x 2
##        id cluster
##     <dbl>   <int>
##   1     0       1
##   2     1       1
##   3     2       1
##   4     3       1
##   5     4       1
##   ...
##   157   156       0
##   158   157       0
##   159   158       0
##   160   159       0

The output reveals factors from the 2 circles being assigned to separate clusters, as anticipated, after solely a small variety of PIC iterations.

spark_write_rds() + collect_from_rds()

spark_write_rds() and collect_from_rds() are applied as a much less memory- consuming various to acquire(). Not like acquire(), which retrieves all components of a Spark dataframe by way of the Spark driver node, therefore doubtlessly inflicting slowness or out-of-memory failures when gathering massive quantities of knowledge, spark_write_rds(), when used together with collect_from_rds(), can retrieve all partitions of a Spark dataframe immediately from Spark staff, somewhat than by way of the Spark driver node. First, spark_write_rds() will distribute the duties of serializing Spark dataframe partitions in RDS model 2 format amongst Spark staff. Spark staff can then course of a number of partitions in parallel, every dealing with one partition at a time and persisting the RDS output on to disk, somewhat than sending dataframe partitions to the Spark driver node. Lastly, the RDS outputs may be re-assembled to R dataframes utilizing collect_from_rds().

Proven under is an instance of spark_write_rds() + collect_from_rds() utilization, the place RDS outputs are first saved to HDFS, then downloaded to the native filesystem with hadoop fs -get, and eventually, post-processed with collect_from_rds():


num_partitions <- 10L
sc <- spark_connect(grasp = "yarn", spark_home = "/usr/lib/spark")
flights_sdf <- copy_to(sc, flights, repartition = num_partitions)

# Spark staff serialize all partition in RDS format in parallel and write RDS
# outputs to HDFS
  dest_uri = "hdfs://<namenode>:8020/flights-part-{partitionId}.rds"

# Run `hadoop fs -get` to obtain RDS recordsdata from HDFS to native file system
for (partition in seq(num_partitions) - 1)
    c("fs", "-get", sprintf("hdfs://<namenode>:8020/flights-part-%d.rds", partition))

# Put up-process RDS outputs
partitions <- seq(num_partitions) - 1 %>%
  lapply(perform(partition) collect_from_rds(sprintf("flights-part-%d.rds", partition)))

# Optionally, name `rbind()` to mix information from all partitions right into a single R dataframe
flights_df <- do.name(rbind, partitions)

Just like different current sparklyr releases, sparklyr 1.6 comes with various dplyr-related enhancements, akin to

  • Assist for the place() predicate inside choose() and summarize(throughout(...)) operations on Spark dataframes
  • Addition of if_all() and if_any() features
  • Full compatibility with dbplyr 2.0 backend API

choose(the place(...)) and summarize(throughout(the place(...)))

The dplyr the place(...) assemble is helpful for making use of a variety or aggregation perform to a number of columns that fulfill some boolean predicate. For instance,

returns all numeric columns from the iris dataset, and

computes the common of every numeric column.

In sparklyr 1.6, each varieties of operations may be utilized to Spark dataframes, e.g.,

if_all() and if_any()

if_all() and if_any() are two comfort features from dplyr 1.0.4 (see right here for extra particulars) that successfully mix the outcomes of making use of a boolean predicate to a tidy choice of columns utilizing the logical and/or operators.

Ranging from sparklyr 1.6, if_all() and if_any() may also be utilized to Spark dataframes, .e.g.,

Compatibility with dbplyr 2.0 backend API

Sparklyr 1.6 is absolutely appropriate with the newer dbplyr 2.0 backend API (by implementing all interface adjustments really helpful in right here), whereas nonetheless sustaining backward compatibility with the earlier version of dbplyr API, in order that sparklyr customers is not going to be compelled to change to any specific model of dbplyr.

This must be a principally non-user-visible change as of now. In actual fact, the one discernible habits change would be the following code


[1] 2

if sparklyr is working with dbplyr 2.0+, and

[1] 1

if in any other case.


In chronological order, we wish to thank the next contributors for making sparklyr 1.6 superior:

We might additionally like to offer an enormous shout-out to the fantastic open-source group behind sparklyr, with out whom we might not have benefitted from quite a few sparklyr-related bug reviews and have solutions.

Lastly, the creator of this weblog publish additionally very a lot appreciates the extremely invaluable editorial solutions from @skeydan.

Should you want to study extra about sparklyr, we suggest trying out sparklyr.ai, spark.rstudio.com, and in addition some earlier sparklyr launch posts akin to sparklyr 1.5 and sparklyr 1.4.

That’s all. Thanks for studying!

Greenwald, Michael, and Sanjeev Khanna. 2001. “House-Environment friendly On-line Computation of Quantile Summaries.” SIGMOD Rec. 30 (2): 58–66. https://doi.org/10.1145/376284.375670.

Lin, Frank, and William Cohen. 2010. “Energy Iteration Clustering.” In, 655–62.


Leave a Reply

Your email address will not be published. Required fields are marked *