Title: | Tools for Computation on Batch Systems |
---|---|
Description: | As a successor of the packages 'BatchJobs' and 'BatchExperiments', this package provides a parallel implementation of the Map function for high performance computing systems managed by schedulers 'IBM Spectrum LSF' (<https://www.ibm.com/products/hpc-workload-management>), 'OpenLava' (<https://www.openlava.org/>), 'Univa Grid Engine'/'Oracle Grid Engine' (<https://www.univa.com/>), 'Slurm' (<https://slurm.schedmd.com/>), 'TORQUE/PBS' (<https://adaptivecomputing.com/cherry-services/torque-resource-manager/>), or 'Docker Swarm' (<https://docs.docker.com/engine/swarm/>). A multicore and socket mode allow the parallelization on a local machines, and multiple machines can be hooked up via SSH to create a makeshift cluster. Moreover, the package provides an abstraction mechanism to define large-scale computer experiments in a well-organized and reproducible way. |
Authors: | Michel Lang [cre, aut] , Bernd Bischl [aut], Dirk Surmann [ctb] |
Maintainer: | Michel Lang <[email protected]> |
License: | LGPL-3 |
Version: | 0.9.17 |
Built: | 2025-01-06 04:03:15 UTC |
Source: | https://github.com/mllg/batchtools |
For bug reports and feature requests please use the tracker: https://github.com/mllg/batchtools.
batchtools.verbose
Verbosity. Set to FALSE
to suppress info messages and progress bars.
batchtools.progress
Progress bars. Set to FALSE
to disable them.
batchtools.timestamps
Add time stamps to log output. Set to FALSE
to disable them.
Furthermore, you may enable a debug mode using the debugme package by setting the environment variable “DEBUGME” to “batchtools” before loading batchtools.
Maintainer: Michel Lang [email protected] (ORCID)
Authors:
Bernd Bischl [email protected]
Other contributors:
Dirk Surmann [email protected] (ORCID) [contributor]
Useful links:
Algorithms are functions which get the codedata part as well as the problem instance (the return value of the
function defined in Problem
) and return an arbitrary R object.
This function serializes all components to the file system and registers the algorithm in the ExperimentRegistry
.
removeAlgorithm
removes all jobs from the registry which depend on the specific algorithm.
reg$algorithms
holds the IDs of already defined algorithms.
addAlgorithm(name, fun = NULL, reg = getDefaultRegistry()) removeAlgorithms(name, reg = getDefaultRegistry())
addAlgorithm(name, fun = NULL, reg = getDefaultRegistry()) removeAlgorithms(name, reg = getDefaultRegistry())
name |
[ |
fun |
[ If you do not provide a function, it defaults to a function which just returns the instance. |
reg |
[ |
[Algorithm
]. Object of class “Algorithm”.
Adds experiments (parametrized combinations of problems with algorithms) to the registry and thereby defines batch jobs.
If multiple problem designs or algorithm designs are provided, they are combined via the Cartesian product.
E.g., if you have two problems p1
and p2
and three algorithms a1
, a2
and a3
,
addExperiments
creates experiments for all parameters for the combinations (p1, a1)
, (p1, a2)
,
(p1, a3)
, (p2, a1)
, (p2, a2)
and (p2, a3)
.
addExperiments( prob.designs = NULL, algo.designs = NULL, repls = 1L, combine = "crossprod", reg = getDefaultRegistry() )
addExperiments( prob.designs = NULL, algo.designs = NULL, repls = 1L, combine = "crossprod", reg = getDefaultRegistry() )
prob.designs |
[named list of |
algo.designs |
[named list of |
repls |
[ |
combine |
[ |
reg |
[ |
[data.table
] with ids of added jobs stored in column “job.id”.
R's data.frame
converts character vectors to factors by default in R versions prior to 4.0.0 which frequently resulted in problems using addExperiments
.
Therefore, this function will warn about factor variables if the following conditions hold:
R version is < 4.0.0
The design is passed as a data.frame
, not a data.table
or tibble
.
The option “stringsAsFactors” is not set or set to TRUE
.
Other Experiment:
removeExperiments()
,
summarizeExperiments()
tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) # add first problem fun = function(job, data, n, mean, sd, ...) rnorm(n, mean = mean, sd = sd) addProblem("rnorm", fun = fun, reg = tmp) # add second problem fun = function(job, data, n, lambda, ...) rexp(n, rate = lambda) addProblem("rexp", fun = fun, reg = tmp) # add first algorithm fun = function(instance, method, ...) if (method == "mean") mean(instance) else median(instance) addAlgorithm("average", fun = fun, reg = tmp) # add second algorithm fun = function(instance, ...) sd(instance) addAlgorithm("deviation", fun = fun, reg = tmp) # define problem and algorithm designs library(data.table) prob.designs = algo.designs = list() prob.designs$rnorm = CJ(n = 100, mean = -1:1, sd = 1:5) prob.designs$rexp = data.table(n = 100, lambda = 1:5) algo.designs$average = data.table(method = c("mean", "median")) algo.designs$deviation = data.table() # add experiments and submit addExperiments(prob.designs, algo.designs, reg = tmp) # check what has been created summarizeExperiments(reg = tmp) unwrap(getJobPars(reg = tmp))
tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) # add first problem fun = function(job, data, n, mean, sd, ...) rnorm(n, mean = mean, sd = sd) addProblem("rnorm", fun = fun, reg = tmp) # add second problem fun = function(job, data, n, lambda, ...) rexp(n, rate = lambda) addProblem("rexp", fun = fun, reg = tmp) # add first algorithm fun = function(instance, method, ...) if (method == "mean") mean(instance) else median(instance) addAlgorithm("average", fun = fun, reg = tmp) # add second algorithm fun = function(instance, ...) sd(instance) addAlgorithm("deviation", fun = fun, reg = tmp) # define problem and algorithm designs library(data.table) prob.designs = algo.designs = list() prob.designs$rnorm = CJ(n = 100, mean = -1:1, sd = 1:5) prob.designs$rexp = data.table(n = 100, lambda = 1:5) algo.designs$average = data.table(method = c("mean", "median")) algo.designs$deviation = data.table() # add experiments and submit addExperiments(prob.designs, algo.designs, reg = tmp) # check what has been created summarizeExperiments(reg = tmp) unwrap(getJobPars(reg = tmp))
Problems may consist of up to two parts: A static, immutable part (data
in addProblem
)
and a dynamic, stochastic part (fun
in addProblem
).
For example, for statistical learning problems a data frame would be the static problem part while
a resampling function would be the stochastic part which creates problem instance.
This instance is then typically passed to a learning algorithm like a wrapper around a statistical model
(fun
in addAlgorithm
).
This function serialize all components to the file system and registers the problem in the ExperimentRegistry
.
removeProblem
removes all jobs from the registry which depend on the specific problem.
reg$problems
holds the IDs of already defined problems.
addProblem( name, data = NULL, fun = NULL, seed = NULL, cache = FALSE, reg = getDefaultRegistry() ) removeProblems(name, reg = getDefaultRegistry())
addProblem( name, data = NULL, fun = NULL, seed = NULL, cache = FALSE, reg = getDefaultRegistry() ) removeProblems(name, reg = getDefaultRegistry())
name |
[ |
data |
[ |
fun |
[ |
seed |
[ |
cache |
[ |
reg |
[ |
[Problem
]. Object of class “Problem” (invisibly).
tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) addProblem("p1", fun = function(job, data) data, reg = tmp) addProblem("p2", fun = function(job, data) job, reg = tmp) addAlgorithm("a1", fun = function(job, data, instance) instance, reg = tmp) addExperiments(repls = 2, reg = tmp) # List problems, algorithms and job parameters: tmp$problems tmp$algorithms getJobPars(reg = tmp) # Remove one problem removeProblems("p1", reg = tmp) # List problems and algorithms: tmp$problems tmp$algorithms getJobPars(reg = tmp)
tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) addProblem("p1", fun = function(job, data) data, reg = tmp) addProblem("p2", fun = function(job, data) job, reg = tmp) addAlgorithm("a1", fun = function(job, data, instance) instance, reg = tmp) addExperiments(repls = 2, reg = tmp) # List problems, algorithms and job parameters: tmp$problems tmp$algorithms getJobPars(reg = tmp) # Remove one problem removeProblems("p1", reg = tmp) # List problems and algorithms: tmp$problems tmp$algorithms getJobPars(reg = tmp)
Assert that a given object is a batchtools
registry.
Additionally can sync the registry, check if it is writeable, or check if jobs are running.
If any check fails, throws an error indicting the reason for the failure.
assertRegistry( reg, class = NULL, writeable = FALSE, sync = FALSE, running.ok = TRUE )
assertRegistry( reg, class = NULL, writeable = FALSE, sync = FALSE, running.ok = TRUE )
reg |
[ |
class |
[ |
writeable |
[ |
sync |
[ |
running.ok |
[ |
TRUE
invisibly.
Objects are saved in subdirectory “exports” of the
“file.dir” of reg
.
They are automatically loaded and placed in the global environment
each time the registry is loaded or a job collection is executed.
batchExport( export = list(), unexport = character(0L), reg = getDefaultRegistry() )
batchExport( export = list(), unexport = character(0L), reg = getDefaultRegistry() )
export |
[ |
unexport |
[ |
reg |
[ |
[data.table
] with name and uri to the exported objects.
tmp = makeRegistry(file.dir = NA, make.default = FALSE) # list exports exports = batchExport(reg = tmp) print(exports) # add a job and required exports batchMap(function(x) x^2 + y + z, x = 1:3, reg = tmp) exports = batchExport(export = list(y = 99, z = 1), reg = tmp) print(exports) submitJobs(reg = tmp) waitForJobs(reg = tmp) stopifnot(loadResult(1, reg = tmp) == 101) # Un-export z exports = batchExport(unexport = "z", reg = tmp) print(exports)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) # list exports exports = batchExport(reg = tmp) print(exports) # add a job and required exports batchMap(function(x) x^2 + y + z, x = 1:3, reg = tmp) exports = batchExport(export = list(y = 99, z = 1), reg = tmp) print(exports) submitJobs(reg = tmp) waitForJobs(reg = tmp) stopifnot(loadResult(1, reg = tmp) == 101) # Un-export z exports = batchExport(unexport = "z", reg = tmp) print(exports)
A parallel and asynchronous Map
/mapply
for batch systems.
Note that this function only defines the computational jobs.
The actual computation is started with submitJobs
.
Results and partial results can be collected with reduceResultsList
, reduceResults
or
loadResult
.
For a synchronous Map
-like execution, see btmapply
.
batchMap( fun, ..., args = list(), more.args = list(), reg = getDefaultRegistry() )
batchMap( fun, ..., args = list(), more.args = list(), reg = getDefaultRegistry() )
fun |
[ |
... |
[ANY] |
args |
[ |
more.args |
[ |
reg |
[ |
[data.table
] with ids of added jobs stored in column “job.id”.
# example using "..." and more.args tmp = makeRegistry(file.dir = NA, make.default = FALSE) f = function(x, y) x^2 + y ids = batchMap(f, x = 1:10, more.args = list(y = 100), reg = tmp) getJobPars(reg = tmp) testJob(6, reg = tmp) # 100 + 6^2 = 136 # vector recycling tmp = makeRegistry(file.dir = NA, make.default = FALSE) f = function(...) list(...) ids = batchMap(f, x = 1:3, y = 1:6, reg = tmp) getJobPars(reg = tmp) # example for an expand.grid()-like operation on parameters tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(paste, args = data.table::CJ(x = letters[1:3], y = 1:3), reg = tmp) getJobPars(reg = tmp) testJob(6, reg = tmp)
# example using "..." and more.args tmp = makeRegistry(file.dir = NA, make.default = FALSE) f = function(x, y) x^2 + y ids = batchMap(f, x = 1:10, more.args = list(y = 100), reg = tmp) getJobPars(reg = tmp) testJob(6, reg = tmp) # 100 + 6^2 = 136 # vector recycling tmp = makeRegistry(file.dir = NA, make.default = FALSE) f = function(...) list(...) ids = batchMap(f, x = 1:3, y = 1:6, reg = tmp) getJobPars(reg = tmp) # example for an expand.grid()-like operation on parameters tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(paste, args = data.table::CJ(x = letters[1:3], y = 1:3), reg = tmp) getJobPars(reg = tmp) testJob(6, reg = tmp)
This function allows you to create new computational jobs (just like batchMap
based on the results of
a Registry
.
batchMapResults( fun, ids = NULL, ..., more.args = list(), target, source = getDefaultRegistry() )
batchMapResults( fun, ids = NULL, ..., more.args = list(), target, source = getDefaultRegistry() )
fun |
[ |
ids |
[ |
... |
[ANY] |
more.args |
[ |
target |
[ |
source |
[ |
[data.table
] with ids of jobs added to target
.
The URI to the result files in registry source
is hard coded as parameter in the target
registry.
This means that target
is currently not portable between systems for computation.
Other Results:
loadResult()
,
reduceResultsList()
,
reduceResults()
# Source registry: calculate square of some numbers tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x) list(square = x^2), x = 1:10, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) # Target registry: calculate the square root on results of first registry target = makeRegistry(file.dir = NA, make.default = FALSE) batchMapResults(fun = function(x, y) list(sqrt = sqrt(x$square)), ids = 4:8, target = target, source = tmp) submitJobs(reg = target) waitForJobs(reg = target) # Map old to new ids. First, get a table with results and parameters results = unwrap(rjoin(getJobPars(reg = target), reduceResultsDataTable(reg = target))) print(results) # Parameter '.id' points to job.id in 'source'. Use a inner join to combine: ijoin(results, unwrap(reduceResultsDataTable(reg = tmp)), by = c(".id" = "job.id"))
# Source registry: calculate square of some numbers tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x) list(square = x^2), x = 1:10, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) # Target registry: calculate the square root on results of first registry target = makeRegistry(file.dir = NA, make.default = FALSE) batchMapResults(fun = function(x, y) list(sqrt = sqrt(x$square)), ids = 4:8, target = target, source = tmp) submitJobs(reg = target) waitForJobs(reg = target) # Map old to new ids. First, get a table with results and parameters results = unwrap(rjoin(getJobPars(reg = target), reduceResultsDataTable(reg = target))) print(results) # Parameter '.id' points to job.id in 'source'. Use a inner join to combine: ijoin(results, unwrap(reduceResultsDataTable(reg = tmp)), by = c(".id" = "job.id"))
A parallel and asynchronous Reduce
for batch systems.
Note that this function only defines the computational jobs.
Each job reduces a certain number of elements on one slave.
The actual computation is started with submitJobs
.
Results and partial results can be collected with reduceResultsList
, reduceResults
or
loadResult
.
batchReduce( fun, xs, init = NULL, chunks = seq_along(xs), more.args = list(), reg = getDefaultRegistry() )
batchReduce( fun, xs, init = NULL, chunks = seq_along(xs), more.args = list(), reg = getDefaultRegistry() )
fun |
[ |
xs |
[ |
init |
[ANY] |
chunks |
[ |
more.args |
[ |
reg |
[ |
[data.table
] with ids of added jobs stored in column “job.id”.
# define function to reduce on slave, we want to sum a vector tmp = makeRegistry(file.dir = NA, make.default = FALSE) xs = 1:100 f = function(aggr, x) aggr + x # sum 20 numbers on each slave process, i.e. 5 jobs chunks = chunk(xs, chunk.size = 5) batchReduce(fun = f, 1:100, init = 0, chunks = chunks, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) # now reduce one final time on master reduceResults(fun = function(aggr, job, res) f(aggr, res), reg = tmp)
# define function to reduce on slave, we want to sum a vector tmp = makeRegistry(file.dir = NA, make.default = FALSE) xs = 1:100 f = function(aggr, x) aggr + x # sum 20 numbers on each slave process, i.e. 5 jobs chunks = chunk(xs, chunk.size = 5) batchReduce(fun = f, 1:100, init = 0, chunks = chunks, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) # now reduce one final time on master reduceResults(fun = function(aggr, job, res) f(aggr, res), reg = tmp)
This is a set of functions acting as counterparts to the sequential popular apply functions in base R:
btlapply
for lapply
and btmapply
for mapply
.
Internally, jobs are created using batchMap
on the provided registry.
If no registry is provided, a temporary registry (see argument file.dir
of makeRegistry
) and batchMap
will be used.
After all jobs are terminated (see waitForJobs
), the results are collected and returned as a list.
Note that these functions are only suitable for short and fail-safe operations on batch system. If some jobs fail, you have to retrieve partial results from the registry directory yourself.
btlapply( X, fun, ..., resources = list(), n.chunks = NULL, chunk.size = NULL, reg = makeRegistry(file.dir = NA) ) btmapply( fun, ..., more.args = list(), simplify = FALSE, use.names = TRUE, resources = list(), n.chunks = NULL, chunk.size = NULL, reg = makeRegistry(file.dir = NA) )
btlapply( X, fun, ..., resources = list(), n.chunks = NULL, chunk.size = NULL, reg = makeRegistry(file.dir = NA) ) btmapply( fun, ..., more.args = list(), simplify = FALSE, use.names = TRUE, resources = list(), n.chunks = NULL, chunk.size = NULL, reg = makeRegistry(file.dir = NA) )
X |
[ |
fun |
[ |
... |
[ |
resources |
[ |
n.chunks |
[ |
chunk.size |
[ |
reg |
[ |
more.args |
[ |
simplify |
[ |
use.names |
[ |
[list
] List with the results of the function call.
btlapply(1:3, function(x) x^2) btmapply(function(x, y, z) x + y + z, x = 1:3, y = 1:3, more.args = list(z = 1), simplify = TRUE)
btlapply(1:3, function(x) x^2) btmapply(function(x, y, z) x + y + z, x = 1:3, y = 1:3, more.args = list(z = 1), simplify = TRUE)
This function is only intended for use in your own cluster functions implementation.
Calls brew silently on your template, any error will lead to an exception. The file is stored at the same place as the corresponding job file in the “jobs”-subdir of your files directory.
cfBrewTemplate(reg, text, jc)
cfBrewTemplate(reg, text, jc)
reg |
[ |
text |
[ |
jc |
[ |
[character(1)
]. File path to brewed template file.
Other ClusterFunctionsHelper:
cfHandleUnknownSubmitError()
,
cfKillJob()
,
cfReadBrewTemplate()
,
makeClusterFunctions()
,
makeSubmitJobResult()
,
runOSCommand()
This function is only intended for use in your own cluster functions implementation.
Simply constructs a SubmitJobResult
object with status code 101, NA as batch id and
an informative error message containing the output of the OS command in output
.
cfHandleUnknownSubmitError(cmd, exit.code, output)
cfHandleUnknownSubmitError(cmd, exit.code, output)
cmd |
[ |
exit.code |
[ |
output |
[ |
Other ClusterFunctionsHelper:
cfBrewTemplate()
,
cfKillJob()
,
cfReadBrewTemplate()
,
makeClusterFunctions()
,
makeSubmitJobResult()
,
runOSCommand()
This function is only intended for use in your own cluster functions implementation.
Calls the OS command to kill a job via system
like this: “cmd batch.job.id”. If the
command returns an exit code > 0, the command is repeated after a 1 second sleep
max.tries-1
times. If the command failed in all tries, an error is generated.
cfKillJob( reg, cmd, args = character(0L), max.tries = 3L, nodename = "localhost" )
cfKillJob( reg, cmd, args = character(0L), max.tries = 3L, nodename = "localhost" )
reg |
[ |
cmd |
[ |
args |
[ |
max.tries |
[ |
nodename |
[ |
TRUE
on success. An exception is raised otherwise.
Other ClusterFunctionsHelper:
cfBrewTemplate()
,
cfHandleUnknownSubmitError()
,
cfReadBrewTemplate()
,
makeClusterFunctions()
,
makeSubmitJobResult()
,
runOSCommand()
This function is only intended for use in your own cluster functions implementation.
This function is only intended for use in your own cluster functions implementation. Simply reads your template file and returns it as a character vector.
cfReadBrewTemplate(template, comment.string = NA_character_)
cfReadBrewTemplate(template, comment.string = NA_character_)
template |
[ |
comment.string |
[ |
[character
].
Other ClusterFunctionsHelper:
cfBrewTemplate()
,
cfHandleUnknownSubmitError()
,
cfKillJob()
,
makeClusterFunctions()
,
makeSubmitJobResult()
,
runOSCommand()
Jobs can be partitioned into “chunks” to be executed sequentially on the computational nodes.
Chunks are defined by providing a data frame with columns “job.id” and “chunk” (integer)
to submitJobs
.
All jobs with the same chunk number will be grouped together on one node to form a single
computational job.
The function chunk
simply splits x
into either a fixed number of groups, or
into a variable number of groups with a fixed number of maximum elements.
The function lpt
also groups x
into a fixed number of chunks,
but uses the actual values of x
in a greedy “Longest Processing Time” algorithm.
As a result, the maximum sum of elements in minimized.
binpack
splits x
into a variable number of groups whose sum of elements do
not exceed the upper limit provided by chunk.size
.
See examples of estimateRuntimes
for an application of binpack
and lpt
.
chunk(x, n.chunks = NULL, chunk.size = NULL, shuffle = TRUE) lpt(x, n.chunks = 1L) binpack(x, chunk.size = max(x))
chunk(x, n.chunks = NULL, chunk.size = NULL, shuffle = TRUE) lpt(x, n.chunks = 1L) binpack(x, chunk.size = max(x))
x |
[ |
n.chunks |
[ |
chunk.size |
[ |
shuffle |
[ |
[integer
] giving the chunk number for each element of x
.
ch = chunk(1:10, n.chunks = 2) table(ch) ch = chunk(rep(1, 10), chunk.size = 2) table(ch) set.seed(1) x = runif(10) ch = lpt(x, n.chunks = 2) sapply(split(x, ch), sum) set.seed(1) x = runif(10) ch = binpack(x, 1) sapply(split(x, ch), sum) # Job chunking tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(identity, 1:25, reg = tmp) ### Group into chunks with 10 jobs each library(data.table) ids[, chunk := chunk(job.id, chunk.size = 10)] print(ids[, .N, by = chunk]) ### Group into 4 chunks ids[, chunk := chunk(job.id, n.chunks = 4)] print(ids[, .N, by = chunk]) ### Submit to batch system submitJobs(ids = ids, reg = tmp) # Grouped chunking tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) prob = addProblem(reg = tmp, "prob1", data = iris, fun = function(job, data) nrow(data)) prob = addProblem(reg = tmp, "prob2", data = Titanic, fun = function(job, data) nrow(data)) algo = addAlgorithm(reg = tmp, "algo", fun = function(job, data, instance, i, ...) problem) prob.designs = list(prob1 = data.table(), prob2 = data.table(x = 1:2)) algo.designs = list(algo = data.table(i = 1:3)) addExperiments(prob.designs, algo.designs, repls = 3, reg = tmp) ### Group into chunks of 5 jobs, but do not put multiple problems into the same chunk # -> only one problem has to be loaded per chunk, and only once because it is cached ids = getJobTable(reg = tmp)[, .(job.id, problem, algorithm)] ids[, chunk := chunk(job.id, chunk.size = 5), by = "problem"] ids[, chunk := .GRP, by = c("problem", "chunk")] dcast(ids, chunk ~ problem)
ch = chunk(1:10, n.chunks = 2) table(ch) ch = chunk(rep(1, 10), chunk.size = 2) table(ch) set.seed(1) x = runif(10) ch = lpt(x, n.chunks = 2) sapply(split(x, ch), sum) set.seed(1) x = runif(10) ch = binpack(x, 1) sapply(split(x, ch), sum) # Job chunking tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(identity, 1:25, reg = tmp) ### Group into chunks with 10 jobs each library(data.table) ids[, chunk := chunk(job.id, chunk.size = 10)] print(ids[, .N, by = chunk]) ### Group into 4 chunks ids[, chunk := chunk(job.id, n.chunks = 4)] print(ids[, .N, by = chunk]) ### Submit to batch system submitJobs(ids = ids, reg = tmp) # Grouped chunking tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) prob = addProblem(reg = tmp, "prob1", data = iris, fun = function(job, data) nrow(data)) prob = addProblem(reg = tmp, "prob2", data = Titanic, fun = function(job, data) nrow(data)) algo = addAlgorithm(reg = tmp, "algo", fun = function(job, data, instance, i, ...) problem) prob.designs = list(prob1 = data.table(), prob2 = data.table(x = 1:2)) algo.designs = list(algo = data.table(i = 1:3)) addExperiments(prob.designs, algo.designs, repls = 3, reg = tmp) ### Group into chunks of 5 jobs, but do not put multiple problems into the same chunk # -> only one problem has to be loaded per chunk, and only once because it is cached ids = getJobTable(reg = tmp)[, .(job.id, problem, algorithm)] ids[, chunk := chunk(job.id, chunk.size = 5), by = "problem"] ids[, chunk := .GRP, by = c("problem", "chunk")] dcast(ids, chunk ~ problem)
Removes all jobs from a registry and calls sweepRegistry
.
clearRegistry(reg = getDefaultRegistry())
clearRegistry(reg = getDefaultRegistry())
reg |
[ |
Other Registry:
getDefaultRegistry()
,
loadRegistry()
,
makeRegistry()
,
removeRegistry()
,
saveRegistry()
,
sweepRegistry()
,
syncRegistry()
Executes every job in a JobCollection
.
This function is intended to be called on the slave.
doJobCollection(jc, output = NULL)
doJobCollection(jc, output = NULL)
jc |
[ |
output |
[ |
[character(1)
]: Hash of the JobCollection
executed.
Other JobCollection:
makeJobCollection()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, 1:2, reg = tmp) jc = makeJobCollection(1:2, reg = tmp) doJobCollection(jc)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, 1:2, reg = tmp) jc = makeJobCollection(1:2, reg = tmp) doJobCollection(jc)
Estimates the runtimes of jobs using the random forest implemented in ranger.
Observed runtimes are retrieved from the Registry
and runtimes are
predicted for unfinished jobs.
The estimated remaining time is calculated in the print
method.
You may also pass n
here to determine the number of parallel jobs which is then used
in a simple Longest Processing Time (LPT) algorithm to give an estimate for the parallel runtime.
estimateRuntimes(tab, ..., reg = getDefaultRegistry()) ## S3 method for class 'RuntimeEstimate' print(x, n = 1L, ...)
estimateRuntimes(tab, ..., reg = getDefaultRegistry()) ## S3 method for class 'RuntimeEstimate' print(x, n = 1L, ...)
tab |
[ |
... |
[ANY] |
reg |
[ |
x |
[ |
n |
[ |
[RuntimeEstimate
] which is a list
with two named elements:
“runtimes” is a data.table
with columns “job.id”,
“runtime” (in seconds) and “type” (“estimated” if runtime is estimated,
“observed” if runtime was observed).
The other element of the list named “model”] contains the fitted random forest object.
binpack
and lpt
to chunk jobs according to their estimated runtimes.
# Create a simple toy registry set.seed(1) tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE, seed = 1) addProblem(name = "iris", data = iris, fun = function(data, ...) nrow(data), reg = tmp) addAlgorithm(name = "nrow", function(instance, ...) nrow(instance), reg = tmp) addAlgorithm(name = "ncol", function(instance, ...) ncol(instance), reg = tmp) addExperiments(algo.designs = list(nrow = data.table::CJ(x = 1:50, y = letters[1:5])), reg = tmp) addExperiments(algo.designs = list(ncol = data.table::CJ(x = 1:50, y = letters[1:5])), reg = tmp) # We use the job parameters to predict runtimes tab = unwrap(getJobPars(reg = tmp)) # First we need to submit some jobs so that the forest can train on some data. # Thus, we just sample some jobs from the registry while grouping by factor variables. library(data.table) ids = tab[, .SD[sample(nrow(.SD), 5)], by = c("problem", "algorithm", "y")] setkeyv(ids, "job.id") submitJobs(ids, reg = tmp) waitForJobs(reg = tmp) # We "simulate" some more realistic runtimes here to demonstrate the functionality: # - Algorithm "ncol" is 5 times more expensive than "nrow" # - x has no effect on the runtime # - If y is "a" or "b", the runtimes are really high runtime = function(algorithm, x, y) { ifelse(algorithm == "nrow", 100L, 500L) + 1000L * (y %in% letters[1:2]) } tmp$status[ids, done := done + tab[ids, runtime(algorithm, x, y)]] rjoin(sjoin(tab, ids), getJobStatus(ids, reg = tmp)[, c("job.id", "time.running")]) # Estimate runtimes: est = estimateRuntimes(tab, reg = tmp) print(est) rjoin(tab, est$runtimes) print(est, n = 10) # Submit jobs with longest runtime first: ids = est$runtimes[type == "estimated"][order(runtime, decreasing = TRUE)] print(ids) ## Not run: submitJobs(ids, reg = tmp) ## End(Not run) # Group jobs into chunks with runtime < 1h ids = est$runtimes[type == "estimated"] ids[, chunk := binpack(runtime, 3600)] print(ids) print(ids[, list(runtime = sum(runtime)), by = chunk]) ## Not run: submitJobs(ids, reg = tmp) ## End(Not run) # Group jobs into 10 chunks with similar runtime ids = est$runtimes[type == "estimated"] ids[, chunk := lpt(runtime, 10)] print(ids[, list(runtime = sum(runtime)), by = chunk])
# Create a simple toy registry set.seed(1) tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE, seed = 1) addProblem(name = "iris", data = iris, fun = function(data, ...) nrow(data), reg = tmp) addAlgorithm(name = "nrow", function(instance, ...) nrow(instance), reg = tmp) addAlgorithm(name = "ncol", function(instance, ...) ncol(instance), reg = tmp) addExperiments(algo.designs = list(nrow = data.table::CJ(x = 1:50, y = letters[1:5])), reg = tmp) addExperiments(algo.designs = list(ncol = data.table::CJ(x = 1:50, y = letters[1:5])), reg = tmp) # We use the job parameters to predict runtimes tab = unwrap(getJobPars(reg = tmp)) # First we need to submit some jobs so that the forest can train on some data. # Thus, we just sample some jobs from the registry while grouping by factor variables. library(data.table) ids = tab[, .SD[sample(nrow(.SD), 5)], by = c("problem", "algorithm", "y")] setkeyv(ids, "job.id") submitJobs(ids, reg = tmp) waitForJobs(reg = tmp) # We "simulate" some more realistic runtimes here to demonstrate the functionality: # - Algorithm "ncol" is 5 times more expensive than "nrow" # - x has no effect on the runtime # - If y is "a" or "b", the runtimes are really high runtime = function(algorithm, x, y) { ifelse(algorithm == "nrow", 100L, 500L) + 1000L * (y %in% letters[1:2]) } tmp$status[ids, done := done + tab[ids, runtime(algorithm, x, y)]] rjoin(sjoin(tab, ids), getJobStatus(ids, reg = tmp)[, c("job.id", "time.running")]) # Estimate runtimes: est = estimateRuntimes(tab, reg = tmp) print(est) rjoin(tab, est$runtimes) print(est, n = 10) # Submit jobs with longest runtime first: ids = est$runtimes[type == "estimated"][order(runtime, decreasing = TRUE)] print(ids) ## Not run: submitJobs(ids, reg = tmp) ## End(Not run) # Group jobs into chunks with runtime < 1h ids = est$runtimes[type == "estimated"] ids[, chunk := binpack(runtime, 3600)] print(ids) print(ids[, list(runtime = sum(runtime)), by = chunk]) ## Not run: submitJobs(ids, reg = tmp) ## End(Not run) # Group jobs into 10 chunks with similar runtime ids = est$runtimes[type == "estimated"] ids[, chunk := lpt(runtime, 10)] print(ids[, list(runtime = sum(runtime)), by = chunk])
Executes a single job (as created by makeJob
) and returns
its result. Also works for Experiments.
execJob(job)
execJob(job)
job |
[ |
Result of the job.
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, 1:2, reg = tmp) job = makeJob(1, reg = tmp) execJob(job)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, 1:2, reg = tmp) job = makeJob(1, reg = tmp) execJob(job)
These functions are used to find and filter jobs, depending on either their parameters (findJobs
and
findExperiments
), their tags (findTagged
), or their computational status (all other functions,
see getStatus
for an overview).
Note that findQueued
, findRunning
, findOnSystem
and findExpired
are somewhat heuristic
and may report misleading results, depending on the state of the system and the ClusterFunctions
implementation.
See JoinTables
for convenient set operations (unions, intersects, differences) on tables with job ids.
findJobs(expr, ids = NULL, reg = getDefaultRegistry()) findExperiments( ids = NULL, prob.name = NA_character_, prob.pattern = NA_character_, algo.name = NA_character_, algo.pattern = NA_character_, prob.pars, algo.pars, repls = NULL, reg = getDefaultRegistry() ) findSubmitted(ids = NULL, reg = getDefaultRegistry()) findNotSubmitted(ids = NULL, reg = getDefaultRegistry()) findStarted(ids = NULL, reg = getDefaultRegistry()) findNotStarted(ids = NULL, reg = getDefaultRegistry()) findDone(ids = NULL, reg = getDefaultRegistry()) findNotDone(ids = NULL, reg = getDefaultRegistry()) findErrors(ids = NULL, reg = getDefaultRegistry()) findOnSystem(ids = NULL, reg = getDefaultRegistry()) findRunning(ids = NULL, reg = getDefaultRegistry()) findQueued(ids = NULL, reg = getDefaultRegistry()) findExpired(ids = NULL, reg = getDefaultRegistry()) findTagged(tags = character(0L), ids = NULL, reg = getDefaultRegistry())
findJobs(expr, ids = NULL, reg = getDefaultRegistry()) findExperiments( ids = NULL, prob.name = NA_character_, prob.pattern = NA_character_, algo.name = NA_character_, algo.pattern = NA_character_, prob.pars, algo.pars, repls = NULL, reg = getDefaultRegistry() ) findSubmitted(ids = NULL, reg = getDefaultRegistry()) findNotSubmitted(ids = NULL, reg = getDefaultRegistry()) findStarted(ids = NULL, reg = getDefaultRegistry()) findNotStarted(ids = NULL, reg = getDefaultRegistry()) findDone(ids = NULL, reg = getDefaultRegistry()) findNotDone(ids = NULL, reg = getDefaultRegistry()) findErrors(ids = NULL, reg = getDefaultRegistry()) findOnSystem(ids = NULL, reg = getDefaultRegistry()) findRunning(ids = NULL, reg = getDefaultRegistry()) findQueued(ids = NULL, reg = getDefaultRegistry()) findExpired(ids = NULL, reg = getDefaultRegistry()) findTagged(tags = character(0L), ids = NULL, reg = getDefaultRegistry())
expr |
[ |
ids |
[ |
reg |
[ |
prob.name |
[ |
prob.pattern |
[ |
algo.name |
[ |
algo.pattern |
[ |
prob.pars |
[ |
algo.pars |
[ |
repls |
[ |
tags |
[ |
[data.table
] with column “job.id” containing matched jobs.
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, i = 1:3, reg = tmp) ids = findNotSubmitted(reg = tmp) # get all jobs: findJobs(reg = tmp) # filter for jobs with parameter i >= 2 findJobs(i >= 2, reg = tmp) # filter on the computational status findSubmitted(reg = tmp) findNotDone(reg = tmp) # filter on tags addJobTags(2:3, "my_tag", reg = tmp) findTagged(tags = "my_tag", reg = tmp) # combine filter functions using joins # -> jobs which are not done and not tagged (using an anti-join): ajoin(findNotDone(reg = tmp), findTagged("my_tag", reg = tmp))
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, i = 1:3, reg = tmp) ids = findNotSubmitted(reg = tmp) # get all jobs: findJobs(reg = tmp) # filter for jobs with parameter i >= 2 findJobs(i >= 2, reg = tmp) # filter on the computational status findSubmitted(reg = tmp) findNotDone(reg = tmp) # filter on tags addJobTags(2:3, "my_tag", reg = tmp) findTagged(tags = "my_tag", reg = tmp) # combine filter functions using joins # -> jobs which are not done and not tagged (using an anti-join): ajoin(findNotDone(reg = tmp), findTagged("my_tag", reg = tmp))
getDefaultRegistry
returns the registry currently set as default (or
stops with an exception if none is set). setDefaultRegistry
sets
a registry as default.
getDefaultRegistry() setDefaultRegistry(reg)
getDefaultRegistry() setDefaultRegistry(reg)
reg |
[ |
Other Registry:
clearRegistry()
,
loadRegistry()
,
makeRegistry()
,
removeRegistry()
,
saveRegistry()
,
sweepRegistry()
,
syncRegistry()
Extracts error messages from the internal data base and returns them in a table.
getErrorMessages( ids = NULL, missing.as.error = FALSE, reg = getDefaultRegistry() )
getErrorMessages( ids = NULL, missing.as.error = FALSE, reg = getDefaultRegistry() )
ids |
[ |
missing.as.error |
[ |
reg |
[ |
[data.table
] with columns “job.id”, “terminated” (logical),
“error” (logical) and “message” (string).
Other debug:
getStatus()
,
grepLogs()
,
killJobs()
,
resetJobs()
,
showLog()
,
testJob()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) fun = function(i) if (i == 3) stop(i) else i ids = batchMap(fun, i = 1:5, reg = tmp) submitJobs(1:4, reg = tmp) waitForJobs(1:4, reg = tmp) getErrorMessages(ids, reg = tmp) getErrorMessages(ids, missing.as.error = TRUE, reg = tmp)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) fun = function(i) if (i == 3) stop(i) else i ids = batchMap(fun, i = 1:5, reg = tmp) submitJobs(1:4, reg = tmp) waitForJobs(1:4, reg = tmp) getErrorMessages(ids, reg = tmp) getErrorMessages(ids, missing.as.error = TRUE, reg = tmp)
getJobStatus
returns the internal table which stores information about the computational
status of jobs, getJobPars
a table with the job parameters, getJobResources
a table
with the resources which were set to submit the jobs, and getJobTags
the tags of the jobs
(see Tags).
getJobTable
returns all these tables joined.
getJobTable(ids = NULL, reg = getDefaultRegistry()) getJobStatus(ids = NULL, reg = getDefaultRegistry()) getJobResources(ids = NULL, reg = getDefaultRegistry()) getJobPars(ids = NULL, reg = getDefaultRegistry()) getJobTags(ids = NULL, reg = getDefaultRegistry())
getJobTable(ids = NULL, reg = getDefaultRegistry()) getJobStatus(ids = NULL, reg = getDefaultRegistry()) getJobResources(ids = NULL, reg = getDefaultRegistry()) getJobPars(ids = NULL, reg = getDefaultRegistry()) getJobTags(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
reg |
[ |
[data.table
] with the following columns (not necessarily in this order):
Unique Job ID as integer.
Time the job was submitted to the batch system as POSIXct
.
Time the job was started on the batch system as POSIXct
.
Time the job terminated (successfully or with an error) as POSIXct
.
Either NA
if the job terminated successfully or the error message.
Estimate of the memory usage.
Batch ID as reported by the scheduler.
Log file. If missing, defaults to [job.hash].log
.
Unique string identifying the job or chunk.
Time in seconds (as difftime
) the job was queued.
Time in seconds (as difftime
) the job was running.
List of parameters/arguments for this job.
List of computational resources set for this job.
Tags as joined string, delimited by “,”.
Only for ExperimentRegistry
: the problem identifier.
Only for ExperimentRegistry
: the algorithm identifier.
tmp = makeRegistry(file.dir = NA, make.default = FALSE) f = function(x) if (x < 0) stop("x must be > 0") else sqrt(x) batchMap(f, x = c(-1, 0, 1), reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) addJobTags(1:2, "tag1", reg = tmp) addJobTags(2, "tag2", reg = tmp) # Complete table: getJobTable(reg = tmp) # Job parameters: getJobPars(reg = tmp) # Set and retrieve tags: getJobTags(reg = tmp) # Job parameters with tags right-joined: rjoin(getJobPars(reg = tmp), getJobTags(reg = tmp))
tmp = makeRegistry(file.dir = NA, make.default = FALSE) f = function(x) if (x < 0) stop("x must be > 0") else sqrt(x) batchMap(f, x = c(-1, 0, 1), reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) addJobTags(1:2, "tag1", reg = tmp) addJobTags(2, "tag2", reg = tmp) # Complete table: getJobTable(reg = tmp) # Job parameters: getJobPars(reg = tmp) # Set and retrieve tags: getJobTags(reg = tmp) # Job parameters with tags right-joined: rjoin(getJobPars(reg = tmp), getJobTags(reg = tmp))
This function gives an encompassing overview over the computational status on your system. The status can be one or many of the following:
“defined”: Jobs which are defined via batchMap
or addExperiments
, but are not yet submitted.
“submitted”: Jobs which are submitted to the batch system via submitJobs
, scheduled for execution.
“started”: Jobs which have been started.
“done”: Jobs which terminated successfully.
“error”: Jobs which terminated with an exception.
“running”: Jobs which are listed by the cluster functions to be running on the live system. Not supported for all cluster functions.
“queued”: Jobs which are listed by the cluster functions to be queued on the live system. Not supported for all cluster functions.
“system”: Jobs which are listed by the cluster functions to be queued or running. Not supported for all cluster functions.
“expired”: Jobs which have been submitted, but vanished from the live system. Note that this is determined heuristically and may include some false positives.
Here, a job which terminated successfully counts towards the jobs which are submitted, started and done.
To retrieve the corresponding job ids, see findJobs
.
getStatus(ids = NULL, reg = getDefaultRegistry())
getStatus(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
reg |
[ |
[data.table
] (with class “Status” for printing).
Other debug:
getErrorMessages()
,
grepLogs()
,
killJobs()
,
resetJobs()
,
showLog()
,
testJob()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) fun = function(i) if (i == 3) stop(i) else i ids = batchMap(fun, i = 1:5, reg = tmp) submitJobs(ids = 1:4, reg = tmp) waitForJobs(reg = tmp) tab = getStatus(reg = tmp) print(tab) str(tab)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) fun = function(i) if (i == 3) stop(i) else i ids = batchMap(fun, i = 1:5, reg = tmp) submitJobs(ids = 1:4, reg = tmp) waitForJobs(reg = tmp) tab = getStatus(reg = tmp) print(tab) str(tab)
Crawls through log files and reports jobs with lines matching the pattern
.
See showLog
for an example.
grepLogs( ids = NULL, pattern, ignore.case = FALSE, fixed = FALSE, reg = getDefaultRegistry() )
grepLogs( ids = NULL, pattern, ignore.case = FALSE, fixed = FALSE, reg = getDefaultRegistry() )
ids |
[ |
pattern |
[ |
ignore.case |
[ |
fixed |
[ |
reg |
[ |
[data.table
] with columns “job.id” and “message”.
Other debug:
getErrorMessages()
,
getStatus()
,
killJobs()
,
resetJobs()
,
showLog()
,
testJob()
Set custom names for jobs. These are passed to the template as ‘job.name’.
If no custom name is set (or any of the job names of the chunk is missing),
the job hash is used as job name.
Individual job names can be accessed via jobs$job.name
.
setJobNames(ids = NULL, names, reg = getDefaultRegistry()) getJobNames(ids = NULL, reg = getDefaultRegistry())
setJobNames(ids = NULL, names, reg = getDefaultRegistry()) getJobNames(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
names |
[ |
reg |
[ |
setJobNames
returns NULL
invisibly, getJobTable
returns a data.table
with columns job.id
and job.name
.
tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(identity, 1:10, reg = tmp) setJobNames(ids, letters[1:nrow(ids)], reg = tmp) getJobNames(reg = tmp)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(identity, 1:10, reg = tmp) setJobNames(ids, letters[1:nrow(ids)], reg = tmp) getJobNames(reg = tmp)
These helper functions perform join operations on data tables. Most of them are basically one-liners. See https://rpubs.com/ronasta/join_data_tables for a overview of join operations in data table or alternatively dplyr's vignette on two table verbs.
ijoin(x, y, by = NULL) ljoin(x, y, by = NULL) rjoin(x, y, by = NULL) ojoin(x, y, by = NULL) sjoin(x, y, by = NULL) ajoin(x, y, by = NULL) ujoin(x, y, all.y = FALSE, by = NULL)
ijoin(x, y, by = NULL) ljoin(x, y, by = NULL) rjoin(x, y, by = NULL) ojoin(x, y, by = NULL) sjoin(x, y, by = NULL) ajoin(x, y, by = NULL) ujoin(x, y, all.y = FALSE, by = NULL)
x |
[ |
y |
[ |
by |
[
You may pass a named character vector to merge on columns with different names in |
all.y |
[logical(1)] |
[data.table
] with key identical to by
.
# Create two tables for demonstration tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, x = 1:6, reg = tmp) x = getJobPars(reg = tmp) y = findJobs(x >= 2 & x <= 5, reg = tmp) y$extra.col = head(letters, nrow(y)) # Inner join: similar to intersect(): keep all columns of x and y with common matches ijoin(x, y) # Left join: use all ids from x, keep all columns of x and y ljoin(x, y) # Right join: use all ids from y, keep all columns of x and y rjoin(x, y) # Outer join: similar to union(): keep all columns of x and y with matches in x or y ojoin(x, y) # Semi join: filter x with matches in y sjoin(x, y) # Anti join: filter x with matches not in y ajoin(x, y) # Updating join: Replace values in x with values in y ujoin(x, y)
# Create two tables for demonstration tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(identity, x = 1:6, reg = tmp) x = getJobPars(reg = tmp) y = findJobs(x >= 2 & x <= 5, reg = tmp) y$extra.col = head(letters, nrow(y)) # Inner join: similar to intersect(): keep all columns of x and y with common matches ijoin(x, y) # Left join: use all ids from x, keep all columns of x and y ljoin(x, y) # Right join: use all ids from y, keep all columns of x and y rjoin(x, y) # Outer join: similar to union(): keep all columns of x and y with matches in x or y ojoin(x, y) # Semi join: filter x with matches in y sjoin(x, y) # Anti join: filter x with matches not in y ajoin(x, y) # Updating join: Replace values in x with values in y ujoin(x, y)
Kill jobs which are currently running on the batch system.
In case of an error when killing, the function tries - after a short sleep - to kill the remaining batch jobs again. If this fails three times for some jobs, the function gives up. Jobs that could be successfully killed are reset in the Registry.
killJobs(ids = NULL, reg = getDefaultRegistry())
killJobs(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
reg |
[ |
[data.table
] with columns “job.id”, the corresponding “batch.id” and
the logical flag “killed” indicating success.
Other debug:
getErrorMessages()
,
getStatus()
,
grepLogs()
,
resetJobs()
,
showLog()
,
testJob()
Loads a registry from its file.dir
.
Multiple R sessions accessing the same registry simultaneously can lead to database inconsistencies.
This is especially dangerous if the same file.dir
is accessed from multiple machines, e.g. via a mount.
If you just need to check on the status or peek into some preliminary results while another process is still submitting or waiting for pending results, you can load the registry in a read-only mode. All operations that need to change the registry will raise an exception in this mode. Files communicated back by the computational nodes are parsed to update the registry in memory while the registry on the file system remains unchanged.
A heuristic tries to detect if the registry has been altered in the background by an other process and in this case automatically restricts the current registry to read-only mode.
However, you should rely on this heuristic to work flawlessly.
Thus, set to writeable
to TRUE
if and only if you are absolutely sure that other state-changing processes are terminated.
If you need write access, load the registry with writeable
set to TRUE
.
loadRegistry( file.dir, work.dir = NULL, conf.file = findConfFile(), make.default = TRUE, writeable = FALSE )
loadRegistry( file.dir, work.dir = NULL, conf.file = findConfFile(), make.default = TRUE, writeable = FALSE )
file.dir |
[ If you pass |
work.dir |
[ The provided path will get normalized unless it is given relative to the home directory (i.e., starting with “~”). Note that some templates do not handle relative paths well. |
conf.file |
[ The file lookup is implemented in the internal (but exported) function
Set to |
make.default |
[ |
writeable |
[ |
[Registry
].
Other Registry:
clearRegistry()
,
getDefaultRegistry()
,
makeRegistry()
,
removeRegistry()
,
saveRegistry()
,
sweepRegistry()
,
syncRegistry()
Loads the result of a single job.
loadResult(id, reg = getDefaultRegistry())
loadResult(id, reg = getDefaultRegistry())
id |
[ |
reg |
[ |
[ANY
]. The stored result.
Other Results:
batchMapResults()
,
reduceResultsList()
,
reduceResults()
This is the constructor used to create custom cluster functions. Note that some standard implementations for TORQUE, Slurm, LSF, SGE, etc. ship with the package.
makeClusterFunctions( name, submitJob, killJob = NULL, listJobsQueued = NULL, listJobsRunning = NULL, array.var = NA_character_, store.job.collection = FALSE, store.job.files = FALSE, scheduler.latency = 0, fs.latency = 0, hooks = list() )
makeClusterFunctions( name, submitJob, killJob = NULL, listJobsQueued = NULL, listJobsRunning = NULL, array.var = NA_character_, store.job.collection = FALSE, store.job.files = FALSE, scheduler.latency = 0, fs.latency = 0, hooks = list() )
name |
[ |
submitJob |
[ |
killJob |
[ |
listJobsQueued |
[ |
listJobsRunning |
[ |
array.var |
[ |
store.job.collection |
[ |
store.job.files |
[ |
scheduler.latency |
[ |
fs.latency |
[ |
hooks |
[ |
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
Other ClusterFunctionsHelper:
cfBrewTemplate()
,
cfHandleUnknownSubmitError()
,
cfKillJob()
,
cfReadBrewTemplate()
,
makeSubmitJobResult()
,
runOSCommand()
Cluster functions for Docker/Docker Swarm (https://docs.docker.com/engine/swarm/).
The submitJob
function executes
docker [docker.args] run --detach=true [image.args] [resources] [image] [cmd]
.
Arguments docker.args
, image.args
and image
can be set on construction.
The resources
part takes the named resources ncpus
and memory
from submitJobs
and maps them to the arguments --cpu-shares
and --memory
(in Megabytes). The resource threads
is mapped to the environment variables “OMP_NUM_THREADS”
and “OPENBLAS_NUM_THREADS”.
To reliably identify jobs in the swarm, jobs are labeled with “batchtools=[job.hash]” and named
using the current login name (label “user”) and the job hash (label “batchtools”).
listJobsRunning
uses docker [docker.args] ps --format={{.ID}}
to filter for running jobs.
killJobs
uses docker [docker.args] kill [batch.id]
to filter for running jobs.
These cluster functions use a Hook to remove finished jobs before a new submit and every time the Registry
is synchronized (using syncRegistry
).
This is currently required because docker does not remove terminated containers automatically.
Use docker ps -a --filter 'label=batchtools' --filter 'status=exited'
to identify and remove terminated
containers manually (or usa a cron job).
makeClusterFunctionsDocker( image, docker.args = character(0L), image.args = character(0L), scheduler.latency = 1, fs.latency = 65 )
makeClusterFunctionsDocker( image, docker.args = character(0L), image.args = character(0L), scheduler.latency = 1, fs.latency = 65 )
image |
[ |
docker.args |
[ |
image.args |
[ |
scheduler.latency |
[ |
fs.latency |
[ |
Other ClusterFunctions:
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
All jobs are executed sequentially using the current R process in which submitJobs
is called.
Thus, submitJob
blocks the session until the job has finished.
The main use of this ClusterFunctions
implementation is to test and debug programs on a local computer.
Listing jobs returns an empty vector (as no jobs can be running when you call this)
and killJob
is not implemented for the same reasons.
makeClusterFunctionsInteractive( external = FALSE, write.logs = TRUE, fs.latency = 0 )
makeClusterFunctionsInteractive( external = FALSE, write.logs = TRUE, fs.latency = 0 )
external |
[ |
write.logs |
[ |
fs.latency |
[ |
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Cluster functions for LSF (https://www.ibm.com/products/hpc-workload-management).
Job files are created based on the brew template template.file
. This
file is processed with brew and then submitted to the queue using the
bsub
command. Jobs are killed using the bkill
command and the
list of running jobs is retrieved using bjobs -u $USER -w
. The user
must have the appropriate privileges to submit, delete and list jobs on the
cluster (this is usually the case).
The template file can access all resources passed to submitJobs
as well as all variables stored in the JobCollection
.
It is the template file's job to choose a queue for the job and handle the desired resource
allocations.
makeClusterFunctionsLSF( template = "lsf", scheduler.latency = 1, fs.latency = 65 )
makeClusterFunctionsLSF( template = "lsf", scheduler.latency = 1, fs.latency = 65 )
template |
[
|
scheduler.latency |
[ |
fs.latency |
[ |
Array jobs are currently not supported.
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Jobs are spawned asynchronously using the functions mcparallel
and mccollect
(both in parallel).
Does not work on Windows, use makeClusterFunctionsSocket
instead.
makeClusterFunctionsMulticore(ncpus = NA_integer_, fs.latency = 0)
makeClusterFunctionsMulticore(ncpus = NA_integer_, fs.latency = 0)
ncpus |
[ |
fs.latency |
[ |
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Cluster functions for OpenLava.
Job files are created based on the brew template template
. This
file is processed with brew and then submitted to the queue using the
bsub
command. Jobs are killed using the bkill
command and the
list of running jobs is retrieved using bjobs -u $USER -w
. The user
must have the appropriate privileges to submit, delete and list jobs on the
cluster (this is usually the case).
The template file can access all resources passed to submitJobs
as well as all variables stored in the JobCollection
.
It is the template file's job to choose a queue for the job and handle the desired resource
allocations.
makeClusterFunctionsOpenLava( template = "openlava", scheduler.latency = 1, fs.latency = 65 )
makeClusterFunctionsOpenLava( template = "openlava", scheduler.latency = 1, fs.latency = 65 )
template |
[
|
scheduler.latency |
[ |
fs.latency |
[ |
Array jobs are currently not supported.
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Cluster functions for Univa Grid Engine / Oracle Grid Engine / Sun Grid Engine (https://www.univa.com/).
Job files are created based on the brew template template
. This
file is processed with brew and then submitted to the queue using the
qsub
command. Jobs are killed using the qdel
command and the
list of running jobs is retrieved using qselect
. The user must have
the appropriate privileges to submit, delete and list jobs on the cluster
(this is usually the case).
The template file can access all resources passed to submitJobs
as well as all variables stored in the JobCollection
.
It is the template file's job to choose a queue for the job and handle the desired resource
allocations.
makeClusterFunctionsSGE( template = "sge", nodename = "localhost", scheduler.latency = 1, fs.latency = 65 )
makeClusterFunctionsSGE( template = "sge", nodename = "localhost", scheduler.latency = 1, fs.latency = 65 )
template |
[
|
nodename |
[
|
scheduler.latency |
[ |
fs.latency |
[ |
Array jobs are currently not supported.
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Cluster functions for Slurm (https://slurm.schedmd.com/).
Job files are created based on the brew template template.file
. This
file is processed with brew and then submitted to the queue using the
sbatch
command. Jobs are killed using the scancel
command and
the list of running jobs is retrieved using squeue
. The user must
have the appropriate privileges to submit, delete and list jobs on the
cluster (this is usually the case).
The template file can access all resources passed to submitJobs
as well as all variables stored in the JobCollection
.
It is the template file's job to choose a queue for the job and handle the desired resource
allocations.
Note that you might have to specify the cluster name here if you do not want to use the default, otherwise the commands for listing and killing jobs will not work.
makeClusterFunctionsSlurm( template = "slurm", array.jobs = TRUE, nodename = "localhost", scheduler.latency = 1, fs.latency = 65 )
makeClusterFunctionsSlurm( template = "slurm", array.jobs = TRUE, nodename = "localhost", scheduler.latency = 1, fs.latency = 65 )
template |
[
|
array.jobs |
[ |
nodename |
[
|
scheduler.latency |
[ |
fs.latency |
[ |
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Jobs are spawned asynchronously using the package snow.
makeClusterFunctionsSocket(ncpus = NA_integer_, fs.latency = 65)
makeClusterFunctionsSocket(ncpus = NA_integer_, fs.latency = 65)
ncpus |
[ |
fs.latency |
[ |
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
Jobs are spawned by starting multiple R sessions via Rscript
over SSH.
If the hostname of the Worker
equals “localhost”,
Rscript
is called directly so that you do not need to have an SSH client installed.
makeClusterFunctionsSSH(workers, fs.latency = 65)
makeClusterFunctionsSSH(workers, fs.latency = 65)
workers |
[ |
fs.latency |
[ |
If you use a custom “.ssh/config” file, make sure your ProxyCommand passes ‘-q’ to ssh, otherwise each output will end with the message “Killed by signal 1” and this will break the communication with the nodes.
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctionsTORQUE()
,
makeClusterFunctions()
## Not run: # cluster functions for multicore execution on the local machine makeClusterFunctionsSSH(list(Worker$new("localhost", ncpus = 2))) ## End(Not run)
## Not run: # cluster functions for multicore execution on the local machine makeClusterFunctionsSSH(list(Worker$new("localhost", ncpus = 2))) ## End(Not run)
Cluster functions for TORQUE/PBS (https://adaptivecomputing.com/cherry-services/torque-resource-manager/).
Job files are created based on the brew template template.file
. This file is processed
with brew and then submitted to the queue using the qsub
command. Jobs are killed using
the qdel
command and the list of running jobs is retrieved using qselect
. The user
must have the appropriate privileges to submit, delete and list jobs on the cluster (this is
usually the case).
The template file can access all resources passed to submitJobs
as well as all variables stored in the JobCollection
.
It is the template file's job to choose a queue for the job and handle the desired resource
allocations.
makeClusterFunctionsTORQUE( template = "torque", scheduler.latency = 1, fs.latency = 65 )
makeClusterFunctionsTORQUE( template = "torque", scheduler.latency = 1, fs.latency = 65 )
template |
[
|
scheduler.latency |
[ |
fs.latency |
[ |
Other ClusterFunctions:
makeClusterFunctionsDocker()
,
makeClusterFunctionsInteractive()
,
makeClusterFunctionsLSF()
,
makeClusterFunctionsMulticore()
,
makeClusterFunctionsOpenLava()
,
makeClusterFunctionsSGE()
,
makeClusterFunctionsSSH()
,
makeClusterFunctionsSlurm()
,
makeClusterFunctionsSocket()
,
makeClusterFunctions()
makeExperimentRegistry
constructs a special Registry
which
is suitable for the definition of large scale computer experiments.
Each experiments consists of a Problem
and an Algorithm
.
These can be parametrized with addExperiments
to actually define computational
jobs.
makeExperimentRegistry( file.dir = "registry", work.dir = getwd(), conf.file = findConfFile(), packages = character(0L), namespaces = character(0L), source = character(0L), load = character(0L), seed = NULL, make.default = TRUE )
makeExperimentRegistry( file.dir = "registry", work.dir = getwd(), conf.file = findConfFile(), packages = character(0L), namespaces = character(0L), source = character(0L), load = character(0L), seed = NULL, make.default = TRUE )
file.dir |
[ If you pass |
work.dir |
[ The provided path will get normalized unless it is given relative to the home directory (i.e., starting with “~”). Note that some templates do not handle relative paths well. |
conf.file |
[ The file lookup is implemented in the internal (but exported) function
Set to |
packages |
[ |
namespaces |
[ |
source |
[ |
load |
[ |
seed |
[ |
make.default |
[ |
[ExperimentRegistry
].
tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) # Definde one problem, two algorithms and add them with some parameters: addProblem(reg = tmp, "p1", fun = function(job, data, n, mean, sd, ...) rnorm(n, mean = mean, sd = sd)) addAlgorithm(reg = tmp, "a1", fun = function(job, data, instance, ...) mean(instance)) addAlgorithm(reg = tmp, "a2", fun = function(job, data, instance, ...) median(instance)) ids = addExperiments(reg = tmp, list(p1 = data.table::CJ(n = c(50, 100), mean = -2:2, sd = 1:4))) # Overview over defined experiments: tmp$problems tmp$algorithms summarizeExperiments(reg = tmp) summarizeExperiments(reg = tmp, by = c("problem", "algorithm", "n")) ids = findExperiments(prob.pars = (n == 50), reg = tmp) print(unwrap(getJobPars(ids, reg = tmp))) # Submit jobs submitJobs(reg = tmp) waitForJobs(reg = tmp) # Reduce the results of algorithm a1 ids.mean = findExperiments(algo.name = "a1", reg = tmp) reduceResults(ids.mean, fun = function(aggr, res, ...) c(aggr, res), reg = tmp) # Join info table with all results and calculate mean of results # grouped by n and algorithm ids = findDone(reg = tmp) pars = unwrap(getJobPars(ids, reg = tmp)) results = unwrap(reduceResultsDataTable(ids, fun = function(res) list(res = res), reg = tmp)) tab = ljoin(pars, results) tab[, list(mres = mean(res)), by = c("n", "algorithm")]
tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) # Definde one problem, two algorithms and add them with some parameters: addProblem(reg = tmp, "p1", fun = function(job, data, n, mean, sd, ...) rnorm(n, mean = mean, sd = sd)) addAlgorithm(reg = tmp, "a1", fun = function(job, data, instance, ...) mean(instance)) addAlgorithm(reg = tmp, "a2", fun = function(job, data, instance, ...) median(instance)) ids = addExperiments(reg = tmp, list(p1 = data.table::CJ(n = c(50, 100), mean = -2:2, sd = 1:4))) # Overview over defined experiments: tmp$problems tmp$algorithms summarizeExperiments(reg = tmp) summarizeExperiments(reg = tmp, by = c("problem", "algorithm", "n")) ids = findExperiments(prob.pars = (n == 50), reg = tmp) print(unwrap(getJobPars(ids, reg = tmp))) # Submit jobs submitJobs(reg = tmp) waitForJobs(reg = tmp) # Reduce the results of algorithm a1 ids.mean = findExperiments(algo.name = "a1", reg = tmp) reduceResults(ids.mean, fun = function(aggr, res, ...) c(aggr, res), reg = tmp) # Join info table with all results and calculate mean of results # grouped by n and algorithm ids = findDone(reg = tmp) pars = unwrap(getJobPars(ids, reg = tmp)) results = unwrap(reduceResultsDataTable(ids, fun = function(res) list(res = res), reg = tmp)) tab = ljoin(pars, results) tab[, list(mres = mean(res)), by = c("n", "algorithm")]
Jobs and Experiments are abstract objects which hold all information necessary to execute a single computational
job for a Registry
or ExperimentRegistry
, respectively.
They can be created using the constructor makeJob
which takes a single job id.
Jobs and Experiments are passed to reduce functions like reduceResults
.
Furthermore, Experiments can be used in the functions of the Problem
and Algorithm
.
Jobs and Experiments hold these information:
job.id
Job ID as integer.
pars
Job parameters as named list.
For ExperimentRegistry
, the parameters are divided into the sublists “prob.pars” and “algo.pars”.
seed
Seed which is set via doJobCollection
as scalar integer.
resources
Computational resources which were set for this job as named list.
external.dir
Path to a directory which is created exclusively for this job. You can store external files here.
Directory is persistent between multiple restarts of the job and can be cleaned by calling resetJobs
.
fun
Job only: User function passed to batchMap
.
prob.name
Experiments only: Problem id.
algo.name
Experiments only: Algorithm id.
problem
Experiments only: Problem
.
instance
Experiments only: Problem instance.
algorithm
Experiments only: Algorithm
.
repl
Experiments only: Replication number.
Note that the slots “pars”, “fun”, “algorithm” and “problem” lazy-load required files from the file system and construct the object on the first access. The realizations are cached for all slots except “instance” (which might be stochastic).
Jobs and Experiments can be executed manually with execJob
.
makeJob(id, reader = NULL, reg = getDefaultRegistry())
makeJob(id, reader = NULL, reg = getDefaultRegistry())
id |
[ |
reader |
[ |
reg |
[ |
[Job
| Experiment
].
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x, y) x + y, x = 1:2, more.args = list(y = 99), reg = tmp) submitJobs(resources = list(foo = "bar"), reg = tmp) job = makeJob(1, reg = tmp) print(job) # Get the parameters: job$pars # Get the job resources: job$resources # Execute the job locally: execJob(job)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x, y) x + y, x = 1:2, more.args = list(y = 99), reg = tmp) submitJobs(resources = list(foo = "bar"), reg = tmp) job = makeJob(1, reg = tmp) print(job) # Get the parameters: job$pars # Get the job resources: job$resources # Execute the job locally: execJob(job)
makeJobCollection
takes multiple job ids and creates an object of class “JobCollection” which holds all
necessary information for the calculation with doJobCollection
. It is implemented as an environment
with the following variables:
file.dir
of the Registry.
work.dir
of the Registry.
Unique identifier of the job. Used to create names on the file system.
data.table
holding individual job information. See examples.
Location of the designated log file for this job.
Named list of of specified computational resources.
Location of the job description file (saved with link[base]{saveRDS}
on the file system.
integer(1)
Seed of the Registry.
character
with required packages to load via require
.
codecharacter with required packages to load via requireNamespace
.
character
with list of files to source before execution.
character
with list of files to load before execution.
character(1)
of the array environment variable specified by the cluster functions.
logical(1)
signaling if jobs were submitted using chunks.as.arrayjobs
.
If your ClusterFunctions uses a template, brew
will be executed in the environment of such
a collection. Thus all variables available inside the job can be used in the template.
makeJobCollection(ids = NULL, resources = list(), reg = getDefaultRegistry())
makeJobCollection(ids = NULL, resources = list(), reg = getDefaultRegistry())
ids |
[ |
resources |
[ |
reg |
[ |
[JobCollection
].
Other JobCollection:
doJobCollection()
tmp = makeRegistry(file.dir = NA, make.default = FALSE, packages = "methods") batchMap(identity, 1:5, reg = tmp) # resources are usually set in submitJobs() jc = makeJobCollection(1:3, resources = list(foo = "bar"), reg = tmp) ls(jc) jc$resources
tmp = makeRegistry(file.dir = NA, make.default = FALSE, packages = "methods") batchMap(identity, 1:5, reg = tmp) # resources are usually set in submitJobs() jc = makeJobCollection(1:3, resources = list(foo = "bar"), reg = tmp) ls(jc) jc$resources
makeRegistry
constructs the inter-communication object for all functions in batchtools
.
All communication transactions are processed via the file system:
All information required to run a job is stored as JobCollection
in a file in the
a subdirectory of the file.dir
directory.
Each jobs stores its results as well as computational status information (start time, end time, error message, ...)
also on the file system which is regular merged parsed by the master using syncRegistry
.
After integrating the new information into the Registry, the Registry is serialized to the file system via saveRegistry
.
Both syncRegistry
and saveRegistry
are called whenever required internally.
Therefore it should be safe to quit the R session at any time.
Work can later be resumed by calling loadRegistry
which de-serializes the registry from
the file system.
The registry created last is saved in the package namespace (unless make.default
is set to
FALSE
) and can be retrieved via getDefaultRegistry
.
Canceled jobs and jobs submitted multiple times may leave stray files behind.
These can be swept using sweepRegistry
.
clearRegistry
completely erases all jobs from a registry, including log files and results,
and thus allows you to start over.
makeRegistry( file.dir = "registry", work.dir = getwd(), conf.file = findConfFile(), packages = character(0L), namespaces = character(0L), source = character(0L), load = character(0L), seed = NULL, make.default = TRUE )
makeRegistry( file.dir = "registry", work.dir = getwd(), conf.file = findConfFile(), packages = character(0L), namespaces = character(0L), source = character(0L), load = character(0L), seed = NULL, make.default = TRUE )
file.dir |
[ If you pass |
work.dir |
[ The provided path will get normalized unless it is given relative to the home directory (i.e., starting with “~”). Note that some templates do not handle relative paths well. |
conf.file |
[ The file lookup is implemented in the internal (but exported) function
Set to |
packages |
[ |
namespaces |
[ |
source |
[ |
load |
[ |
seed |
[ |
make.default |
[ |
Currently batchtools understands the following options set via the configuration file:
cluster.functions
:As returned by a constructor, e.g. makeClusterFunctionsSlurm
.
default.resources
:List of resources to use. Will be overruled by resources specified via submitJobs
.
temp.dir
:Path to directory to use for temporary registries.
sleep
:Custom sleep function. See waitForJobs
.
expire.after
:Number of iterations before treating jobs as expired in waitForJobs
.
compress
:Compression algorithm to use via saveRDS
.
[environment
] of class “Registry” with the following slots:
file.dir
[path]:File directory.
work.dir
[path]:Working directory.
temp.dir
[path]:Temporary directory. Used if file.dir
is NA
to create temporary registries.
packages
[character()]:Packages to load on the slaves.
namespaces
[character()]:Namespaces to load on the slaves.
seed
[integer(1)]:Registry seed. Before each job is executed, the seed seed + job.id
is set.
cluster.functions
[cluster.functions]:Usually set in your conf.file
. Set via a call to makeClusterFunctions
. See example.
default.resources
[named list()]:Usually set in your conf.file
. Named list of default resources.
max.concurrent.jobs
[integer(1)]:Usually set in your conf.file
. Maximum number of concurrent jobs for a single user and current registry on the system.
submitJobs
will try to respect this setting. The resource “max.concurrent.jobs” has higher precedence.
defs
[data.table]:Table with job definitions (i.e. parameters).
status
[data.table]:Table holding information about the computational status. Also see getJobStatus
.
resources
[data.table]:Table holding information about the computational resources used for the job. Also see getJobResources
.
tags
[data.table]:Table holding information about tags. See Tags.
hash
[character(1)]:Unique hash which changes each time the registry gets saved to the file system. Can be utilized to invalidate the cache of knitr.
Other Registry:
clearRegistry()
,
getDefaultRegistry()
,
loadRegistry()
,
removeRegistry()
,
saveRegistry()
,
sweepRegistry()
,
syncRegistry()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) print(tmp) # Set cluster functions to interactive mode and start jobs in external R sessions tmp$cluster.functions = makeClusterFunctionsInteractive(external = TRUE) # Change packages to load tmp$packages = c("MASS") saveRegistry(reg = tmp)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) print(tmp) # Set cluster functions to interactive mode and start jobs in external R sessions tmp$cluster.functions = makeClusterFunctionsInteractive(external = TRUE) # Change packages to load tmp$packages = c("MASS") saveRegistry(reg = tmp)
This function is only intended for use in your own cluster functions implementation.
Use this function in your implementation of makeClusterFunctions
to create a return
value for the submitJob
function.
makeSubmitJobResult( status, batch.id, log.file = NA_character_, msg = NA_character_ )
makeSubmitJobResult( status, batch.id, log.file = NA_character_, msg = NA_character_ )
status |
[ |
batch.id |
[ |
log.file |
[ |
msg |
[ |
[SubmitJobResult
]. A list, containing
status
, batch.id
and msg
.
Other ClusterFunctionsHelper:
cfBrewTemplate()
,
cfHandleUnknownSubmitError()
,
cfKillJob()
,
cfReadBrewTemplate()
,
makeClusterFunctions()
,
runOSCommand()
A version of Reduce
for Registry
objects
which iterates over finished jobs and aggregates them.
All jobs must have terminated, an error is raised otherwise.
reduceResults(fun, ids = NULL, init, ..., reg = getDefaultRegistry())
reduceResults(fun, ids = NULL, init, ..., reg = getDefaultRegistry())
fun |
[ |
ids |
[ |
init |
[ |
... |
[ |
reg |
[ |
Aggregated results in the same order as provided ids.
Return type depends on the user function. If ids
is empty, reduceResults
returns init
(if available) or NULL
otherwise.
If you have thousands of jobs, disabling the progress bar (options(batchtools.progress = FALSE)
)
can significantly increase the performance.
Other Results:
batchMapResults()
,
loadResult()
,
reduceResultsList()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(a, b) list(sum = a+b, prod = a*b), a = 1:3, b = 1:3, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) # Extract element sum from each result reduceResults(function(aggr, res) c(aggr, res$sum), init = list(), reg = tmp) # Aggregate element sum via '+' reduceResults(function(aggr, res) aggr + res$sum, init = 0, reg = tmp) # Aggregate element prod via '*' where parameter b < 3 reduce = function(aggr, res, job) { if (job$pars$b >= 3) return(aggr) aggr * res$prod } reduceResults(reduce, init = 1, reg = tmp) # Reduce to data.frame() (inefficient, use reduceResultsDataTable() instead) reduceResults(rbind, init = data.frame(), reg = tmp) # Reduce to data.frame by collecting results first, then utilize vectorization of rbind: res = reduceResultsList(fun = as.data.frame, reg = tmp) do.call(rbind, res) # Reduce with custom combine function: comb = function(x, y) list(sum = x$sum + y$sum, prod = x$prod * y$prod) reduceResults(comb, reg = tmp) # The same with neutral element NULL comb = function(x, y) if (is.null(x)) y else list(sum = x$sum + y$sum, prod = x$prod * y$prod) reduceResults(comb, init = NULL, reg = tmp) # Alternative: Reduce in list, reduce manually in a 2nd step res = reduceResultsList(reg = tmp) Reduce(comb, res)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(a, b) list(sum = a+b, prod = a*b), a = 1:3, b = 1:3, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) # Extract element sum from each result reduceResults(function(aggr, res) c(aggr, res$sum), init = list(), reg = tmp) # Aggregate element sum via '+' reduceResults(function(aggr, res) aggr + res$sum, init = 0, reg = tmp) # Aggregate element prod via '*' where parameter b < 3 reduce = function(aggr, res, job) { if (job$pars$b >= 3) return(aggr) aggr * res$prod } reduceResults(reduce, init = 1, reg = tmp) # Reduce to data.frame() (inefficient, use reduceResultsDataTable() instead) reduceResults(rbind, init = data.frame(), reg = tmp) # Reduce to data.frame by collecting results first, then utilize vectorization of rbind: res = reduceResultsList(fun = as.data.frame, reg = tmp) do.call(rbind, res) # Reduce with custom combine function: comb = function(x, y) list(sum = x$sum + y$sum, prod = x$prod * y$prod) reduceResults(comb, reg = tmp) # The same with neutral element NULL comb = function(x, y) if (is.null(x)) y else list(sum = x$sum + y$sum, prod = x$prod * y$prod) reduceResults(comb, init = NULL, reg = tmp) # Alternative: Reduce in list, reduce manually in a 2nd step res = reduceResultsList(reg = tmp) Reduce(comb, res)
Applies a function on the results of your finished jobs and thereby collects
them in a list
or data.table
.
The later requires the provided function to return a list (or data.frame
) of scalar values.
See rbindlist
for features and limitations of the aggregation.
If not all jobs are terminated, the respective result will be NULL
.
reduceResultsList( ids = NULL, fun = NULL, ..., missing.val, reg = getDefaultRegistry() ) reduceResultsDataTable( ids = NULL, fun = NULL, ..., missing.val, reg = getDefaultRegistry() )
reduceResultsList( ids = NULL, fun = NULL, ..., missing.val, reg = getDefaultRegistry() ) reduceResultsDataTable( ids = NULL, fun = NULL, ..., missing.val, reg = getDefaultRegistry() )
ids |
[ |
fun |
[ |
... |
[ |
missing.val |
[ |
reg |
[ |
reduceResultsList
returns a list of the results in the same order as the provided ids.
reduceResultsDataTable
returns a data.table
with columns “job.id” and additional result columns
created via rbindlist
, sorted by “job.id”.
If you have thousands of jobs, disabling the progress bar (options(batchtools.progress = FALSE)
)
can significantly increase the performance.
Other Results:
batchMapResults()
,
loadResult()
,
reduceResults()
### Example 1 - reduceResultsList tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x) x^2, x = 1:10, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) reduceResultsList(fun = sqrt, reg = tmp) ### Example 2 - reduceResultsDataTable tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) # add first problem fun = function(job, data, n, mean, sd, ...) rnorm(n, mean = mean, sd = sd) addProblem("rnorm", fun = fun, reg = tmp) # add second problem fun = function(job, data, n, lambda, ...) rexp(n, rate = lambda) addProblem("rexp", fun = fun, reg = tmp) # add first algorithm fun = function(instance, method, ...) if (method == "mean") mean(instance) else median(instance) addAlgorithm("average", fun = fun, reg = tmp) # add second algorithm fun = function(instance, ...) sd(instance) addAlgorithm("deviation", fun = fun, reg = tmp) # define problem and algorithm designs library(data.table) prob.designs = algo.designs = list() prob.designs$rnorm = CJ(n = 100, mean = -1:1, sd = 1:5) prob.designs$rexp = data.table(n = 100, lambda = 1:5) algo.designs$average = data.table(method = c("mean", "median")) algo.designs$deviation = data.table() # add experiments and submit addExperiments(prob.designs, algo.designs, reg = tmp) submitJobs(reg = tmp) # collect results and join them with problem and algorithm paramters res = ijoin( getJobPars(reg = tmp), reduceResultsDataTable(reg = tmp, fun = function(x) list(res = x)) ) unwrap(res, sep = ".")
### Example 1 - reduceResultsList tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x) x^2, x = 1:10, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) reduceResultsList(fun = sqrt, reg = tmp) ### Example 2 - reduceResultsDataTable tmp = makeExperimentRegistry(file.dir = NA, make.default = FALSE) # add first problem fun = function(job, data, n, mean, sd, ...) rnorm(n, mean = mean, sd = sd) addProblem("rnorm", fun = fun, reg = tmp) # add second problem fun = function(job, data, n, lambda, ...) rexp(n, rate = lambda) addProblem("rexp", fun = fun, reg = tmp) # add first algorithm fun = function(instance, method, ...) if (method == "mean") mean(instance) else median(instance) addAlgorithm("average", fun = fun, reg = tmp) # add second algorithm fun = function(instance, ...) sd(instance) addAlgorithm("deviation", fun = fun, reg = tmp) # define problem and algorithm designs library(data.table) prob.designs = algo.designs = list() prob.designs$rnorm = CJ(n = 100, mean = -1:1, sd = 1:5) prob.designs$rexp = data.table(n = 100, lambda = 1:5) algo.designs$average = data.table(method = c("mean", "median")) algo.designs$deviation = data.table() # add experiments and submit addExperiments(prob.designs, algo.designs, reg = tmp) submitJobs(reg = tmp) # collect results and join them with problem and algorithm paramters res = ijoin( getJobPars(reg = tmp), reduceResultsDataTable(reg = tmp, fun = function(x) list(res = x)) ) unwrap(res, sep = ".")
Remove Experiments from an ExperimentRegistry
.
This function automatically checks if any of the jobs to reset is either pending or running.
However, if the implemented heuristic fails, this can lead to inconsistencies in the data base.
Use with care while jobs are running.
removeExperiments(ids = NULL, reg = getDefaultRegistry())
removeExperiments(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
reg |
[ |
[data.table
] of removed job ids, invisibly.
Other Experiment:
addExperiments()
,
summarizeExperiments()
All files will be erased from the file system, including all results.
If you wish to remove only intermediate files, use sweepRegistry
.
removeRegistry(wait = 5, reg = getDefaultRegistry())
removeRegistry(wait = 5, reg = getDefaultRegistry())
wait |
[ |
reg |
[ |
[character(1)
]: Path of the deleted file directory.
Other Registry:
clearRegistry()
,
getDefaultRegistry()
,
loadRegistry()
,
makeRegistry()
,
saveRegistry()
,
sweepRegistry()
,
syncRegistry()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) removeRegistry(0, tmp)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) removeRegistry(0, tmp)
Resets the computational state of jobs in the Registry
.
This function automatically checks if any of the jobs to reset is either pending or running.
However, if the implemented heuristic fails, this can lead to inconsistencies in the data base.
Use with care while jobs are running.
resetJobs(ids = NULL, reg = getDefaultRegistry())
resetJobs(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
reg |
[ |
[data.table
] of job ids which have been reset.
See JoinTables
for examples on working with job tables.
Other debug:
getErrorMessages()
,
getStatus()
,
grepLogs()
,
killJobs()
,
showLog()
,
testJob()
Hooks allow to trigger functions calls on specific events.
They can be specified via the ClusterFunctions
and are triggered on the following events:
pre.sync
function(reg, fns, ...)
: Run before synchronizing the registry on the master. fn
is the character vector of paths to the update files.
post.sync
function(reg, updates, ...)
: Run after synchronizing the registry on the master. updates
is the data.table of processed updates.
pre.submit.job
function(reg, ...)
: Run before a job is successfully submitted to the scheduler on the master.
post.submit.job
function(reg, ...)
: Run after a job is successfully submitted to the scheduler on the master.
pre.submit
function(reg, ...)
: Run before any job is submitted to the scheduler.
post.submit
function(reg, ...)
: Run after a jobs are submitted to the schedule.
pre.do.collection
function(reg, reader, ...)
: Run before starting the job collection on the slave.
reader
is an internal cache object.
post.do.collection
function(reg, updates, reader, ...)
: Run after all jobs in the chunk are terminated on the slave.
updates
is a data.table
of updates which will be merged with the Registry
by the master.
reader
is an internal cache object.
pre.kill
function(reg, ids, ...)
: Run before any job is killed.
post.kill
function(reg, ids, ...)
: Run after jobs are killed. ids
is the return value of killJobs
.
runHook(obj, hook, ...)
runHook(obj, hook, ...)
obj |
[Registry | JobCollection] |
hook |
[ |
... |
[ANY] |
Return value of the called function, or NULL
if there is no hook
with the specified ID.
This is a helper function to run arbitrary OS commands on local or remote machines.
The interface is similar to system2
, but it always returns the exit status
and the output.
runOSCommand( sys.cmd, sys.args = character(0L), stdin = "", nodename = "localhost" )
runOSCommand( sys.cmd, sys.args = character(0L), stdin = "", nodename = "localhost" )
sys.cmd |
[ |
sys.args |
[ |
stdin |
[ |
nodename |
[ |
[named list
] with “sys.cmd”, “sys.args”, “exit.code” (integer), “output” (character).
Other ClusterFunctionsHelper:
cfBrewTemplate()
,
cfHandleUnknownSubmitError()
,
cfKillJob()
,
cfReadBrewTemplate()
,
makeClusterFunctions()
,
makeSubmitJobResult()
## Not run: runOSCommand("ls") runOSCommand("ls", "-al") runOSCommand("notfound") ## End(Not run)
## Not run: runOSCommand("ls") runOSCommand("ls", "-al") runOSCommand("notfound") ## End(Not run)
Stores the registry on the file system in its “file.dir” (specified
for construction in makeRegistry
, can be accessed via
reg$file.dir
).
This function is usually called internally whenever needed.
saveRegistry(reg = getDefaultRegistry())
saveRegistry(reg = getDefaultRegistry())
reg |
[ |
[logical(1)
]: TRUE
if the registry was saved,
FALSE
otherwise (if the registry is read-only).
Other Registry:
clearRegistry()
,
getDefaultRegistry()
,
loadRegistry()
,
makeRegistry()
,
removeRegistry()
,
sweepRegistry()
,
syncRegistry()
showLog
opens the log in the pager. For customization, see file.show
.
getLog
returns the log as character vector.
showLog(id, reg = getDefaultRegistry()) getLog(id, reg = getDefaultRegistry())
showLog(id, reg = getDefaultRegistry()) getLog(id, reg = getDefaultRegistry())
id |
[ |
reg |
[ |
Nothing.
Other debug:
getErrorMessages()
,
getStatus()
,
grepLogs()
,
killJobs()
,
resetJobs()
,
testJob()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) # Create some dummy jobs fun = function(i) { if (i == 3) stop(i) if (i %% 2 == 1) warning("That's odd.") } ids = batchMap(fun, i = 1:5, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) getStatus(reg = tmp) writeLines(getLog(ids[1], reg = tmp)) ## Not run: showLog(ids[1], reg = tmp) ## End(Not run) grepLogs(pattern = "warning", ignore.case = TRUE, reg = tmp)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) # Create some dummy jobs fun = function(i) { if (i == 3) stop(i) if (i %% 2 == 1) warning("That's odd.") } ids = batchMap(fun, i = 1:5, reg = tmp) submitJobs(reg = tmp) waitForJobs(reg = tmp) getStatus(reg = tmp) writeLines(getLog(ids[1], reg = tmp)) ## Not run: showLog(ids[1], reg = tmp) ## End(Not run) grepLogs(pattern = "warning", ignore.case = TRUE, reg = tmp)
Submits defined jobs to the batch system.
After submitting the jobs, you can use waitForJobs
to wait for the
termination of jobs or call reduceResultsList
/reduceResults
to collect partial results.
The progress can be monitored with getStatus
.
submitJobs( ids = NULL, resources = list(), sleep = NULL, reg = getDefaultRegistry() )
submitJobs( ids = NULL, resources = list(), sleep = NULL, reg = getDefaultRegistry() )
ids |
[ |
resources |
[ |
sleep |
[ |
reg |
[ |
[data.table
] with columns “job.id” and “chunk”.
You can pass arbitrary resources to submitJobs()
which then are available in the cluster function template.
Some resources' names are standardized and it is good practice to stick to the following nomenclature to avoid confusion:
Upper time limit in seconds for jobs before they get killed by the scheduler. Can be passed as additional column as part of ids
to set per-job resources.
Memory limit in Mb. If jobs exceed this limit, they are usually killed by the scheduler. Can be passed as additional column as part of ids
to set per-job resources.
Number of (physical) CPUs to use on the slave. Can be passed as additional column as part of ids
to set per-job resources.
Number of threads to use via OpenMP. Used to set environment variable “OMP_NUM_THREADS”. Can be passed as additional column as part of ids
to set per-job resources.
Maximum size of the pointer protection stack, see Memory
.
Number of threads to use for the BLAS backend. Used to set environment variables “MKL_NUM_THREADS” and “OPENBLAS_NUM_THREADS”. Can be passed as additional column as part of ids
to set per-job resources.
Enable memory measurement for jobs. Comes with a small runtime overhead.
Execute chunks as array jobs.
Start a parallelMap backend on the slave.
Start a foreach backend on the slave.
Resource used for Slurm to select the set of clusters to run sbatch
/squeue
/scancel
on.
Multiple jobs can be grouped (chunked) together to be executed sequentially on the batch system as a single batch job.
This is especially useful to avoid overburding the scheduler by submitting thousands of jobs simultaneously.
To chunk jobs together, job ids must be provided as data.frame
with columns “job.id” and “chunk” (integer).
All jobs with the same chunk number will be executed sequentially inside the same batch job.
The utility functions chunk
, binpack
and lpt
can assist in grouping jobs.
If your cluster supports array jobs, you can set the resource chunks.as.arrayjobs
to TRUE
in order
to execute chunks as job arrays on the cluster.
For each chunk of size n
, batchtools creates a JobCollection
of (possibly heterogeneous) jobs which is
submitted to the scheduler as a single array job with n
repetitions.
For each repetition, the JobCollection
is first read from the file system, then subsetted to the i
-th job using
the environment variable reg$cluster.functions$array.var
(depending on the cluster backend, defined automatically) and finally
executed.
Jobs are submitted in the order of chunks, i.e. jobs which have chunk number
sort(unique(ids$chunk))[1]
first, then jobs with chunk number sort(unique(ids$chunk))[2]
and so on. If no chunks are provided, jobs are submitted in the order of ids$job.id
.
If requested, submitJobs
tries to limit the number of concurrent jobs of the user by waiting until jobs terminate
before submitting new ones.
This can be controlled by setting “max.concurrent.jobs” in the configuration file (see Registry
)
or by setting the resource “max.concurrent.jobs” to the maximum number of jobs to run simultaneously.
If both are set, the setting via the resource takes precedence over the setting in the configuration.
Setting the resource measure.memory
to TRUE
turns on memory measurement:
gc
is called directly before and after the job and the difference is
stored in the internal database. Note that this is just a rough estimate and does
neither work reliably for external code like C/C++ nor in combination with threading.
Inner parallelization is typically done via threading, sockets or MPI. Two backends are supported to assist in setting up inner parallelization.
The first package is parallelMap.
If you set the resource “pm.backend” to “multicore”, “socket” or “mpi”,
parallelStart
is called on the slave before the first job in the chunk is started
and parallelStop
is called after the last job terminated.
This way, the resources for inner parallelization can be set and get automatically stored just like other computational resources.
The function provided by the user just has to call parallelMap
to start parallelization using the preconfigured backend.
To control the number of CPUs, you have to set the resource ncpus
.
Otherwise ncpus
defaults to the number of available CPUs (as reported by (see detectCores
))
on the executing machine for multicore and socket mode and defaults to the return value of mpi.universe.size-1
for MPI.
Your template must be set up to handle the parallelization, e.g. request the right number of CPUs or start R with mpirun
.
You may pass further options like level
to parallelStart
via the named list “pm.opts”.
The second supported parallelization backend is foreach.
If you set the resource “foreach.backend” to “seq” (sequential mode), “parallel” (doParallel) or
“mpi” (doMPI), the requested foreach backend is automatically registered on the slave.
Again, the resource ncpus
is used to determine the number of CPUs.
Neither the namespace of parallelMap nor the namespace foreach are attached.
You have to do this manually via library
or let the registry load the packages for you.
If you a large number of jobs, disabling the progress bar (options(batchtools.progress = FALSE)
)
can significantly increase the performance of submitJobs
.
### Example 1: Submit subsets of jobs tmp = makeRegistry(file.dir = NA, make.default = FALSE) # toy function which fails if x is even and an input file does not exists fun = function(x, fn) if (x %% 2 == 0 && !file.exists(fn)) stop("file not found") else x # define jobs via batchMap fn = tempfile() ids = batchMap(fun, 1:20, reg = tmp, fn = fn) # submit some jobs ids = 1:10 submitJobs(ids, reg = tmp) waitForJobs(ids, reg = tmp) getStatus(reg = tmp) # create the required file and re-submit failed jobs file.create(fn) submitJobs(findErrors(ids, reg = tmp), reg = tmp) getStatus(reg = tmp) # submit remaining jobs which have not yet been submitted ids = findNotSubmitted(reg = tmp) submitJobs(ids, reg = tmp) getStatus(reg = tmp) # collect results reduceResultsList(reg = tmp) ### Example 2: Using memory measurement tmp = makeRegistry(file.dir = NA, make.default = FALSE) # Toy function which creates a large matrix and returns the column sums fun = function(n, p) colMeans(matrix(runif(n*p), n, p)) # Arguments to fun: args = data.table::CJ(n = c(1e4, 1e5), p = c(10, 50)) # like expand.grid() print(args) # Map function to create jobs ids = batchMap(fun, args = args, reg = tmp) # Set resources: enable memory measurement res = list(measure.memory = TRUE) # Submit jobs using the currently configured cluster functions submitJobs(ids, resources = res, reg = tmp) # Retrive information about memory, combine with parameters info = ijoin(getJobStatus(reg = tmp)[, .(job.id, mem.used)], getJobPars(reg = tmp)) print(unwrap(info)) # Combine job info with results -> each job is aggregated using mean() unwrap(ijoin(info, reduceResultsDataTable(fun = function(res) list(res = mean(res)), reg = tmp))) ### Example 3: Multicore execution on the slave tmp = makeRegistry(file.dir = NA, make.default = FALSE) # Function which sleeps 10 seconds, i-times f = function(i) { parallelMap::parallelMap(Sys.sleep, rep(10, i)) } # Create one job with parameter i=4 ids = batchMap(f, i = 4, reg = tmp) # Set resources: Use parallelMap in multicore mode with 4 CPUs # batchtools internally loads the namespace of parallelMap and then # calls parallelStart() before the job and parallelStop() right # after the job last job in the chunk terminated. res = list(pm.backend = "multicore", ncpus = 4) ## Not run: # Submit both jobs and wait for them submitJobs(resources = res, reg = tmp) waitForJobs(reg = tmp) # If successfull, the running time should be ~10s getJobTable(reg = tmp)[, .(job.id, time.running)] # There should also be a note in the log: grepLogs(pattern = "parallelMap", reg = tmp) ## End(Not run)
### Example 1: Submit subsets of jobs tmp = makeRegistry(file.dir = NA, make.default = FALSE) # toy function which fails if x is even and an input file does not exists fun = function(x, fn) if (x %% 2 == 0 && !file.exists(fn)) stop("file not found") else x # define jobs via batchMap fn = tempfile() ids = batchMap(fun, 1:20, reg = tmp, fn = fn) # submit some jobs ids = 1:10 submitJobs(ids, reg = tmp) waitForJobs(ids, reg = tmp) getStatus(reg = tmp) # create the required file and re-submit failed jobs file.create(fn) submitJobs(findErrors(ids, reg = tmp), reg = tmp) getStatus(reg = tmp) # submit remaining jobs which have not yet been submitted ids = findNotSubmitted(reg = tmp) submitJobs(ids, reg = tmp) getStatus(reg = tmp) # collect results reduceResultsList(reg = tmp) ### Example 2: Using memory measurement tmp = makeRegistry(file.dir = NA, make.default = FALSE) # Toy function which creates a large matrix and returns the column sums fun = function(n, p) colMeans(matrix(runif(n*p), n, p)) # Arguments to fun: args = data.table::CJ(n = c(1e4, 1e5), p = c(10, 50)) # like expand.grid() print(args) # Map function to create jobs ids = batchMap(fun, args = args, reg = tmp) # Set resources: enable memory measurement res = list(measure.memory = TRUE) # Submit jobs using the currently configured cluster functions submitJobs(ids, resources = res, reg = tmp) # Retrive information about memory, combine with parameters info = ijoin(getJobStatus(reg = tmp)[, .(job.id, mem.used)], getJobPars(reg = tmp)) print(unwrap(info)) # Combine job info with results -> each job is aggregated using mean() unwrap(ijoin(info, reduceResultsDataTable(fun = function(res) list(res = mean(res)), reg = tmp))) ### Example 3: Multicore execution on the slave tmp = makeRegistry(file.dir = NA, make.default = FALSE) # Function which sleeps 10 seconds, i-times f = function(i) { parallelMap::parallelMap(Sys.sleep, rep(10, i)) } # Create one job with parameter i=4 ids = batchMap(f, i = 4, reg = tmp) # Set resources: Use parallelMap in multicore mode with 4 CPUs # batchtools internally loads the namespace of parallelMap and then # calls parallelStart() before the job and parallelStop() right # after the job last job in the chunk terminated. res = list(pm.backend = "multicore", ncpus = 4) ## Not run: # Submit both jobs and wait for them submitJobs(resources = res, reg = tmp) waitForJobs(reg = tmp) # If successfull, the running time should be ~10s getJobTable(reg = tmp)[, .(job.id, time.running)] # There should also be a note in the log: grepLogs(pattern = "parallelMap", reg = tmp) ## End(Not run)
Returns a frequency table of defined experiments.
See ExperimentRegistry
for an example.
summarizeExperiments( ids = NULL, by = c("problem", "algorithm"), reg = getDefaultRegistry() )
summarizeExperiments( ids = NULL, by = c("problem", "algorithm"), reg = getDefaultRegistry() )
ids |
[ |
by |
[ |
reg |
[ |
[data.table
] of frequencies.
Other Experiment:
addExperiments()
,
removeExperiments()
Canceled jobs and jobs submitted multiple times may leave stray files behind. This function checks the registry for consistency and removes obsolete files and redundant data base entries.
sweepRegistry(reg = getDefaultRegistry())
sweepRegistry(reg = getDefaultRegistry())
reg |
[ |
Other Registry:
clearRegistry()
,
getDefaultRegistry()
,
loadRegistry()
,
makeRegistry()
,
removeRegistry()
,
saveRegistry()
,
syncRegistry()
Parses update files written by the slaves to the file system and updates the internal data base.
syncRegistry(reg = getDefaultRegistry())
syncRegistry(reg = getDefaultRegistry())
reg |
[ |
[logical(1)
]: TRUE
if the state has changed, FALSE
otherwise.
Other Registry:
clearRegistry()
,
getDefaultRegistry()
,
loadRegistry()
,
makeRegistry()
,
removeRegistry()
,
saveRegistry()
,
sweepRegistry()
Add and remove arbitrary tags to jobs.
addJobTags(ids = NULL, tags, reg = getDefaultRegistry()) removeJobTags(ids = NULL, tags, reg = getDefaultRegistry()) getUsedJobTags(ids = NULL, reg = getDefaultRegistry())
addJobTags(ids = NULL, tags, reg = getDefaultRegistry()) removeJobTags(ids = NULL, tags, reg = getDefaultRegistry()) getUsedJobTags(ids = NULL, reg = getDefaultRegistry())
ids |
[ |
tags |
[ |
reg |
[ |
[data.table
] with job ids affected (invisible).
tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(sqrt, x = -3:3, reg = tmp) # Add new tag to all ids addJobTags(ids, "needs.computation", reg = tmp) getJobTags(reg = tmp) # Add more tags addJobTags(findJobs(x < 0, reg = tmp), "x.neg", reg = tmp) addJobTags(findJobs(x > 0, reg = tmp), "x.pos", reg = tmp) getJobTags(reg = tmp) # Submit first 5 jobs and remove tag if successful ids = submitJobs(1:5, reg = tmp) if (waitForJobs(reg = tmp)) removeJobTags(ids, "needs.computation", reg = tmp) getJobTags(reg = tmp) # Grep for warning message and add a tag addJobTags(grepLogs(pattern = "NaNs produced", reg = tmp), "div.zero", reg = tmp) getJobTags(reg = tmp) # All tags where tag x.neg is set: ids = findTagged("x.neg", reg = tmp) getUsedJobTags(ids, reg = tmp)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) ids = batchMap(sqrt, x = -3:3, reg = tmp) # Add new tag to all ids addJobTags(ids, "needs.computation", reg = tmp) getJobTags(reg = tmp) # Add more tags addJobTags(findJobs(x < 0, reg = tmp), "x.neg", reg = tmp) addJobTags(findJobs(x > 0, reg = tmp), "x.pos", reg = tmp) getJobTags(reg = tmp) # Submit first 5 jobs and remove tag if successful ids = submitJobs(1:5, reg = tmp) if (waitForJobs(reg = tmp)) removeJobTags(ids, "needs.computation", reg = tmp) getJobTags(reg = tmp) # Grep for warning message and add a tag addJobTags(grepLogs(pattern = "NaNs produced", reg = tmp), "div.zero", reg = tmp) getJobTags(reg = tmp) # All tags where tag x.neg is set: ids = findTagged("x.neg", reg = tmp) getUsedJobTags(ids, reg = tmp)
Starts a single job on the local machine.
testJob(id, external = FALSE, reg = getDefaultRegistry())
testJob(id, external = FALSE, reg = getDefaultRegistry())
id |
[ |
external |
[ If |
reg |
[ |
Returns the result of the job if successful.
Other debug:
getErrorMessages()
,
getStatus()
,
grepLogs()
,
killJobs()
,
resetJobs()
,
showLog()
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x) if (x == 2) xxx else x, 1:2, reg = tmp) testJob(1, reg = tmp) ## Not run: testJob(2, reg = tmp) ## End(Not run)
tmp = makeRegistry(file.dir = NA, make.default = FALSE) batchMap(function(x) if (x == 2) xxx else x, 1:2, reg = tmp) testJob(1, reg = tmp) ## Not run: testJob(2, reg = tmp) ## End(Not run)
Some functions (e.g., getJobPars
, getJobResources
or reduceResultsDataTable
return a data.table
with columns of type list
.
These columns can be unnested/unwrapped with this function.
The contents of these columns will be transformed to a data.table
and cbind
-ed
to the input data.frame x
, replacing the original nested column.
unwrap(x, cols = NULL, sep = NULL) flatten(x, cols = NULL, sep = NULL)
unwrap(x, cols = NULL, sep = NULL) flatten(x, cols = NULL, sep = NULL)
x |
[ |
cols |
[ |
sep |
[ |
[data.table
].
There is a name clash with function flatten
in package purrr.
The function flatten
is discouraged to use for this reason in favor of unwrap
.
x = data.table::data.table( id = 1:3, values = list(list(a = 1, b = 3), list(a = 2, b = 2), list(a = 3)) ) unwrap(x) unwrap(x, sep = ".")
x = data.table::data.table( id = 1:3, values = list(list(a = 1, b = 3), list(a = 2, b = 2), list(a = 3)) ) unwrap(x) unwrap(x, sep = ".")
This function simply waits until all jobs are terminated.
waitForJobs( ids = NULL, sleep = NULL, timeout = 604800, expire.after = NULL, stop.on.error = FALSE, stop.on.expire = FALSE, reg = getDefaultRegistry() )
waitForJobs( ids = NULL, sleep = NULL, timeout = 604800, expire.after = NULL, stop.on.error = FALSE, stop.on.expire = FALSE, reg = getDefaultRegistry() )
ids |
[ |
sleep |
[ |
timeout |
[ |
expire.after |
[ |
stop.on.error |
[ |
stop.on.expire |
[ |
reg |
[ |
[logical(1)
]. Returns TRUE
if all jobs terminated
successfully and FALSE
if either the timeout is reached or at least
one job terminated with an exception or expired.
R6Class
to create local and remote linux workers.
An R6Class
generator object
[Worker
].
nodename
Host name. Set via constructor.
ncpus
Number of CPUs. Set via constructor and defaults to a heuristic which tries to detect the number of CPUs of the machine.
max.load
Maximum load average (of the last 5 min). Set via constructor and defaults to the number of CPUs of the machine.
status
Status of the worker; one of “unknown”, “available”, “max.cpus” and “max.load”.
new(nodename, ncpus, max.load)
Constructor.
update(reg)
Update the worker status.
list(reg)
List running jobs.
start(reg, fn, outfile)
Start job collection in file “fn” and output to “outfile”.
kill(reg, batch.id)
Kill job matching the “batch.id”.
## Not run: # create a worker for the local machine and use 4 CPUs. Worker$new("localhost", ncpus = 4) ## End(Not run)
## Not run: # create a worker for the local machine and use 4 CPUs. Worker$new("localhost", ncpus = 4) ## End(Not run)