A set of wrappers for parallel functions that facilitate the implementation of parallel routines in functions via pblapply.

cl_lapply(x, fun, ..., cl = NULL, varlist = NULL, use_chunks = FALSE)

cl_check(cl = NULL, varlist = NULL)

cl_cores(cl = NULL)

cl_chunks(cl = NULL, length)

cl_export(cl = NULL, varlist = NULL)

cl_stop(cl = NULL)

Arguments

x

A list over which to iterate.

fun, ...

A function that is applied to elements of x alongside any optional arguments to fun.

cl

(optional) A cluster from makeCluster or an integer that defines the number of child processes (see pblapply).

varlist

(optional) A character vector of objects for export (see clusterExport). If cl is a cluster, this may be required. Exported objects must be located in the global environment.

use_chunks

A logical vector that defines whether to parallelise over `chunks' (TRUE) or over the elements of x (FALSE). If use_chunks = TRUE, x is split into n chunks (one per core) that are processed in parallel; within each chunk x is updated iteratively.

length

An integer that defines the number of elements in the iteration.

Value

  • cl_lapply returns a list.

  • cl_cores returns an integer.

  • cl_chunks returns a list of integers.

  • cl_check, cl_export and cl_stop return invisible().

Details

cl_lapply is a wrapper for pblapply that handles cluster checking, set up and closure, using the following functions:

  • cl_check checks cl and varlist arguments, as inputted to a parent function. For example, if cl = NULL, varlist should also be NULL.

  • cl_cores identifies the number of cores specified.

  • cl_chunks defines a list, with one element for core specified, that contains an integer vector of the positions of an object over which to iterate serially in each chunk.

  • cl_export implements clusterExport if both cl and varlist are specified.

  • cl_stop implements stopCluster if cl is a cluster object from makeCluster.

Author

Edward Lavender

Examples

#### Examples of cl_lapply()
# Implement cl_lapply() without cluster
z <- cl_lapply(1:10, function(x) x + 1)
# Implement cl_lapply() with forking (not on Windows)
z <- cl_lapply(1:10, function(x) x + 1, cl = 2L)
# Implement cl_lapply() with socket cluster
z <- cl_lapply(1:10, function(x) x + 1, cl = parallel::makeCluster(2L))

#### Catch mistakes
z <- cl_lapply(1:10, function(x) x + 1, cl = 2L, varlist = list())
#> Warning: 'cl' is an integer: input to 'varlist' ignored.
z <- cl_lapply(1:10, function(x) x + 1, varlist = list())
#> Warning: 'cl' is NULL: input to 'varlist' ignored.

#### Compare time trials for chunk-wise versus element-wise parallelisation

if (flapper_run_parallel) {
  ## Background
  # In this simple example, we will sample 'size' cells n times from a raster
  # The output is a list of cell samples. We compare the time taken to complete
  # sampling using different approaches.

  ## Define a dataframe of time trial scenarios
  require(dplyr)
  dat <- expand.grid(
    n = 1e4,
    method = c("socket", "fork"),
    cores = 2L,
    use_chunks = c(FALSE, TRUE),
    time = NA
  )

  ## Estimate the duration of each scenario
  dat_by_trial <-
    lapply(split(dat, seq_len(nrow(dat))), function(d) {
      if (d$method == "socket") {
        t1 <- Sys.time()
        z <- cl_lapply(
          x = 1:d$n,
          fun = function(i) {
            raster::sampleRandom(flapper::dat_gebco, size = 5)
          },
          cl = parallel::makeCluster(d$cores),
          use_chunks = d$use_chunks
        )
        t2 <- Sys.time()
      } else if (d$method == "fork") {
        t1 <- Sys.time()
        z <- cl_lapply(
          x = 1:d$n,
          fun = function(i) {
            raster::sampleRandom(flapper::dat_gebco, size = 5)
          },
          cl = d$cores,
          use_chunks = d$use_chunks
        )
        t2 <- Sys.time()
      }
      d$time <- as.numeric(difftime(t2, t1, "secs"))
      return(d)
    })

  ## Examine the results
  dat_for_trials <-
    dat_by_trial %>%
    dplyr::bind_rows() %>%
    dplyr::arrange(.data$n, .data$time) %>%
    print()
}
#> Loading required package: dplyr
#> 
#> Attaching package: ‘dplyr’
#> The following objects are masked from ‘package:stats’:
#> 
#>     filter, lag
#> The following objects are masked from ‘package:base’:
#> 
#>     intersect, setdiff, setequal, union
#>       n method cores use_chunks     time
#> 1 10000   fork     2       TRUE 23.39939
#> 2 10000 socket     2       TRUE 24.95896
#> 3 10000 socket     2      FALSE 26.45997
#> 4 10000   fork     2      FALSE 26.77094