A set of wrappers for parallel
functions that facilitate the implementation of parallel routines in functions via pblapply
.
cl_lapply(x, fun, ..., cl = NULL, varlist = NULL, use_chunks = FALSE)
cl_check(cl = NULL, varlist = NULL)
cl_cores(cl = NULL)
cl_chunks(cl = NULL, length)
cl_export(cl = NULL, varlist = NULL)
cl_stop(cl = NULL)
A list
over which to iterate.
A function that is applied to elements of x
alongside any optional arguments to fun
.
(optional) A cluster from makeCluster
or an integer that defines the number of child processes (see pblapply
).
(optional) A character vector of objects for export (see clusterExport
). If cl
is a cluster, this may be required. Exported objects must be located in the global environment.
A logical vector that defines whether to parallelise over `chunks' (TRUE
) or over the elements of x
(FALSE
). If use_chunks = TRUE
, x
is split into n chunks (one per core) that are processed in parallel; within each chunk x
is updated iteratively.
An integer that defines the number of elements in the iteration.
cl_lapply
returns a list.
cl_cores
returns an integer.
cl_chunks
returns a list of integers.
cl_check
, cl_export
and cl_stop
return invisible()
.
cl_lapply
is a wrapper for pblapply
that handles cluster checking, set up and closure, using the following functions:
cl_check
checks cl
and varlist
arguments, as inputted to a parent function. For example, if cl = NULL
, varlist
should also be NULL
.
cl_cores
identifies the number of cores specified.
cl_chunks
defines a list, with one element for core specified, that contains an integer vector of the positions of an object over which to iterate serially in each chunk.
cl_export
implements clusterExport
if both cl
and varlist
are specified.
cl_stop
implements stopCluster
if cl
is a cluster object from makeCluster
.
#### Examples of cl_lapply()
# Implement cl_lapply() without cluster
z <- cl_lapply(1:10, function(x) x + 1)
# Implement cl_lapply() with forking (not on Windows)
z <- cl_lapply(1:10, function(x) x + 1, cl = 2L)
# Implement cl_lapply() with socket cluster
z <- cl_lapply(1:10, function(x) x + 1, cl = parallel::makeCluster(2L))
#### Catch mistakes
z <- cl_lapply(1:10, function(x) x + 1, cl = 2L, varlist = list())
#> Warning: 'cl' is an integer: input to 'varlist' ignored.
z <- cl_lapply(1:10, function(x) x + 1, varlist = list())
#> Warning: 'cl' is NULL: input to 'varlist' ignored.
#### Compare time trials for chunk-wise versus element-wise parallelisation
if (flapper_run_parallel) {
## Background
# In this simple example, we will sample 'size' cells n times from a raster
# The output is a list of cell samples. We compare the time taken to complete
# sampling using different approaches.
## Define a dataframe of time trial scenarios
require(dplyr)
dat <- expand.grid(
n = 1e4,
method = c("socket", "fork"),
cores = 2L,
use_chunks = c(FALSE, TRUE),
time = NA
)
## Estimate the duration of each scenario
dat_by_trial <-
lapply(split(dat, seq_len(nrow(dat))), function(d) {
if (d$method == "socket") {
t1 <- Sys.time()
z <- cl_lapply(
x = 1:d$n,
fun = function(i) {
raster::sampleRandom(flapper::dat_gebco, size = 5)
},
cl = parallel::makeCluster(d$cores),
use_chunks = d$use_chunks
)
t2 <- Sys.time()
} else if (d$method == "fork") {
t1 <- Sys.time()
z <- cl_lapply(
x = 1:d$n,
fun = function(i) {
raster::sampleRandom(flapper::dat_gebco, size = 5)
},
cl = d$cores,
use_chunks = d$use_chunks
)
t2 <- Sys.time()
}
d$time <- as.numeric(difftime(t2, t1, "secs"))
return(d)
})
## Examine the results
dat_for_trials <-
dat_by_trial %>%
dplyr::bind_rows() %>%
dplyr::arrange(.data$n, .data$time) %>%
print()
}
#> Loading required package: dplyr
#>
#> Attaching package: ‘dplyr’
#> The following objects are masked from ‘package:stats’:
#>
#> filter, lag
#> The following objects are masked from ‘package:base’:
#>
#> intersect, setdiff, setequal, union
#> n method cores use_chunks time
#> 1 10000 fork 2 TRUE 23.39939
#> 2 10000 socket 2 TRUE 24.95896
#> 3 10000 socket 2 FALSE 26.45997
#> 4 10000 fork 2 FALSE 26.77094