Skip to content

Commit

Permalink
Merge branch 'main' into 1315
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau authored Sep 9, 2024
2 parents 8e0390b + 6411f1f commit 47f734c
Show file tree
Hide file tree
Showing 27 changed files with 230 additions and 78 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ S3method(store_tar_path,tar_cloud)
S3method(store_unload,default)
S3method(store_unload,tar_aws_file)
S3method(store_unload,tar_gcp_file)
S3method(store_unload,tar_repository_cas_file)
S3method(store_unmarshal_object,default)
S3method(store_unmarshal_object,tar_keras)
S3method(store_unmarshal_object,tar_store_format_custom)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Replace `pingr` dependency with `base::socketConnection()` for local URL utilities (#1317, #1318, @Adafede).
* Implement `tar_repository_cas()`, `tar_repository_cas_local()`, and `tar_repository_cas_local_gc()` for content-addressable storage (#1232, #1314, @noamross).
* Add `tar_format_get()` to make implementing CAS systems easier.
* Implement `error = "trim"` in `tar_target()` and `tar_option_set()` (#1310, #1311, @hadley).
* Use the file system type to decide whether to trust time stamps (#1315, @hadley, @gaborcsardi).
* Deprecate `format = "file_fast"` in favor of the above (#1315).
* Deprecate `trust_object_timestamps` in favor of the more unified `trust_timestamps` in `tar_option_set()` (#1315).
Expand Down
4 changes: 3 additions & 1 deletion R/class_active.R
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ active_class <- R6::R6Class(
target <- pipeline_get_target(self$pipeline, name)
target_debug(target)
target_update_depend(target, self$pipeline, self$meta)
if (target_should_run(target, self$meta)) {
if (counter_exists_name(self$scheduler$trimmed, name)) {
self$scheduler$trim(target, self$pipeline)
} else if (target_should_run(target, self$meta)) {
self$flush_upload_meta_file(target)
self$run_target(name)
} else {
Expand Down
1 change: 1 addition & 0 deletions R/class_aws_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,6 @@ store_read_object.tar_aws_file <- function(store) {
#' @export
store_unload.tar_aws_file <- function(store, target) {
unlink(as.character(target$value$object))
NextMethod()
}
# nocov end
5 changes: 3 additions & 2 deletions R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -339,14 +339,15 @@ builder_handle_error <- function(target, pipeline, scheduler, meta) {
target$settings$error,
continue = builder_error_continue(target, scheduler),
abridge = scheduler$abridge(target),
trim = scheduler$trim(target, pipeline),
stop = builder_error_exit(target, pipeline, scheduler, meta),
null = builder_error_null(target, pipeline, scheduler, meta),
workspace = builder_error_exit(target, pipeline, scheduler, meta)
)
}

builder_error_continue <- function(target, scheduler) {
target$value <- NULL
store_unload(store = target$store, target = target)
scheduler$reporter$report_error(target$metrics$error)
}

Expand Down Expand Up @@ -444,7 +445,7 @@ builder_unload_value <- function(target) {
clear <- identical(settings$deployment, "worker") &&
identical(settings$storage, "worker")
if (clear) {
target$value <- NULL
store_unload(store = target$store, target = target)
}
}

Expand Down
1 change: 1 addition & 0 deletions R/class_gcp_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,6 @@ store_read_object.tar_gcp_file <- function(store) {
#' @export
store_unload.tar_gcp_file <- function(store, target) {
unlink(as.character(target$value$object))
NextMethod()
}
# nocov end
2 changes: 1 addition & 1 deletion R/class_options.R
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ options_class <- R6::R6Class(
deprecate_error_workspace(error)
tar_assert_flag(
error,
c("stop", "continue", "abridge", "workspace", "null")
c("stop", "continue", "null", "abridge", "trim", "workspace")
)
},
validate_memory = function(memory) {
Expand Down
2 changes: 1 addition & 1 deletion R/class_pattern.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pattern_new <- function(

#' @export
target_get_children.tar_pattern <- function(target) {
target$junction$splits
as.character(target$junction$splits)
}

#' @export
Expand Down
1 change: 0 additions & 1 deletion R/class_pipeline.R
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ pipeline_register_loaded <- function(pipeline, names) {
pipeline_unload_target <- function(pipeline, name) {
target <- pipeline_get_target(pipeline, name)
store_unload(target$store, target)
target$value <- NULL
counter_del_name(pipeline$loaded, name)
counter_del_name(pipeline$transient, name)
}
Expand Down
22 changes: 18 additions & 4 deletions R/class_scheduler.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ scheduler_init <- function(
)
reporter <- reporter_init(reporter, seconds_interval = seconds_reporter)
backoff <- tar_options$get_backoff()
trimmed <- counter_init()
scheduler_new(
graph = graph,
queue = queue,
progress = progress,
reporter = reporter,
backoff = backoff
backoff = backoff,
trimmed <- trimmed
)
}

Expand All @@ -52,9 +54,10 @@ scheduler_new <- function(
queue = NULL,
progress = NULL,
reporter = NULL,
backoff = NULL
backoff = NULL,
trimmed = NULL
) {
scheduler_class$new(graph, queue, progress, reporter, backoff)
scheduler_class$new(graph, queue, progress, reporter, backoff, trimmed)
}

scheduler_class <- R6::R6Class(
Expand All @@ -68,18 +71,21 @@ scheduler_class <- R6::R6Class(
progress = NULL,
reporter = NULL,
backoff = NULL,
trimmed = NULL,
initialize = function(
graph = NULL,
queue = NULL,
progress = NULL,
reporter = NULL,
backoff = NULL
backoff = NULL,
trimmed = NULL
) {
self$graph <- graph
self$queue <- queue
self$progress <- progress
self$reporter <- reporter
self$backoff <- backoff
self$trimmed <- trimmed
},
count_unfinished_deps = function(name) {
deps <- self$graph$produce_upstream(name)
Expand All @@ -98,12 +104,20 @@ scheduler_class <- R6::R6Class(
self$progress$abridge()
self$queue$abridge()
},
trim = function(target, pipeline) {
parent_name <- target_get_parent(target)
parent_target <- pipeline_get_target(pipeline, parent_name)
downstream <- self$graph$produce_downstream(parent_name)
siblings <- target_get_children(parent_target)
counter_set_names(self$trimmed, c(downstream, siblings))
},
validate = function() {
self$graph$validate()
self$queue$validate()
self$progress$validate()
self$reporter$validate()
self$backoff$validate()
counter_validate(self$trimmed)
}
)
)
8 changes: 7 additions & 1 deletion R/class_stem.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,13 @@ target_produce_record.tar_stem <- function(target, pipeline, meta) {
}

#' @export
target_skip.tar_stem <- function(target, pipeline, scheduler, meta, active) {
target_skip.tar_stem <- function(
target,
pipeline,
scheduler,
meta,
active
) {
NextMethod()
stem_restore_buds(target, pipeline, scheduler, meta)
}
Expand Down
1 change: 1 addition & 0 deletions R/class_store.R
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ store_unload <- function(store, target) {

#' @export
store_unload.default <- function(store, target) {
target$value <- NULL
}

store_marshal_object <- function(store, object) {
Expand Down
8 changes: 6 additions & 2 deletions R/class_store_repository_cas.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ store_hash_late.tar_repository_cas <- function(store) {

#' @export
store_upload_object.tar_repository_cas <- function(store) {
store_upload_object_cas(store, store$file$stage)
}

store_upload_object_cas <- function(store, path) {
on.exit(unlink(store$file$stage, recursive = TRUE, force = TRUE))
tar_assert_scalar(
store$file$path,
path,
msg = paste(
"for a tar_repository_cas() target, the output must be",
"a single file or single directory."
Expand All @@ -40,7 +44,7 @@ store_upload_object.tar_repository_cas <- function(store) {
store_repository_cas_call_method(
store = store,
text = store$methods_repository$upload,
args = list(key = store$file$hash, path = store$file$stage)
args = list(key = store$file$hash, path = path)
)
}

Expand Down
27 changes: 7 additions & 20 deletions R/class_store_repository_cas_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,7 @@ store_hash_late.tar_repository_cas_file <- function(store) {

#' @export
store_upload_object.tar_repository_cas_file <- function(store) {
tar_assert_scalar(
store$file$path,
msg = paste(
"for a tar_repository_cas() target, the output must be",
"a single file or single directory."
)
)
store_repository_cas_call_method(
store = store,
text = store$methods_repository$upload,
args = list(key = store$file$hash, path = store$file$path)
)
tar_assert_true(
all(file.exists(store$file$path)),
msg = paste0(
"CAS repository upload deleted file ",
store$file$path,
". Uploads should not delete format = \"file\" output files."
)
)
store_upload_object_cas(store, store$file$path)
}

#' @export
Expand All @@ -46,3 +27,9 @@ store_read_object.tar_repository_cas_file <- function(store) {
file_move(from = scratch, to = store$file$path)
store$file$path
}

#' @export
store_unload.tar_repository_cas_file <- function(store, target) {
unlink(as.character(target$value$object))
NextMethod()
}
8 changes: 7 additions & 1 deletion R/class_target.R
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,13 @@ target_produce_record <- function(target, pipeline, meta) {
UseMethod("target_produce_record")
}

target_skip <- function(target, pipeline, scheduler, meta, active) {
target_skip <- function(
target,
pipeline,
scheduler,
meta,
active
) {
UseMethod("target_skip")
}

Expand Down
2 changes: 1 addition & 1 deletion R/tar_progress_branches.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#' and the following columns.
#' * `name`: name of the pattern.
#' * `progress`: progress status: `"dispatched"`, `"completed"`,
#' `"cancelled"`, or `"errored"`.
#' `"canceled"`, or `"errored"`.
#' * `branches`: number of branches in the progress category.
#' * `total`: total number of branches planned for the whole pattern.
#' Values within the same pattern should all be equal.
Expand Down
13 changes: 3 additions & 10 deletions R/tar_repository_cas.R
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@
#'
#' Some functions may need to be adapted and configured based on other
#' inputs. For example, you may want to define
#' `upload = \(key, path) file.move(path, file.path(folder, key))`
#' `upload = \(key, path) file.rename(path, file.path(folder, key))`
#' but do not want to hard-code a value of `folder` when you write the
#' underlying function. `substitute()` can help inject values into the
#' body of a function. For example:
#'
#' ```
#' upload <- \(key, path) {}
#' body(upload) <- substitute(
#' file.move(path, file.path(folder, key)),
#' file.rename(path, file.path(folder, key)),
#' list(folder = "my_cas")
#' )
#' print(upload)
Expand All @@ -130,13 +130,6 @@
#' location. `key` denotes the name of the destination data object
#' in the CAS system.
#'
#' In the case of `format = "file"`, `upload` must not delete or move
#' the original file at `path`. In other words, `path` must still
#' exist in its original form after `upload` finishes.
#' But in the case of non-`"file"` targets,
#' `path` is a staging area which is automatically removed
#' after upload, so `upload` can safely remove `path` if needed.
#'
#' To differentiate between
#' `format = "file"` targets and non-`"file"` targets, the `upload`
#' method can use [tar_format_get()]. For example, to make
Expand Down Expand Up @@ -204,7 +197,7 @@
#' if (!file.exists("cas")) {
#' dir.create("cas", recursive = TRUE)
#' }
#' file.copy(path, file.path("cas", key))
#' file.rename(path, file.path("cas", key))
#' },
#' download = function(key, path) {
#' file.copy(file.path("cas", key), path)
Expand Down
7 changes: 2 additions & 5 deletions R/tar_repository_cas_local.R
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,8 @@ tar_cas_u <- function(cas, key, path) {
cas <- cas %|||% path_cas_dir(tar_runtime$store)
to <- file.path(cas, key)
if (!file.exists(to)) {
if (identical(tar_format_get(), "file")) {
file_copy(path, to) # Defined in R/utils_files.R for files & dirs.
} else {
file_move(path, to) # Defined in R/utils_files.R for files & dirs.
}
# Defined in R/utils_files.R. Works on both files and directories.
file_move(path, to)
}
}

Expand Down
17 changes: 14 additions & 3 deletions R/tar_target.R
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,24 @@
#' stops and throws an error. Options:
#' * `"stop"`: the whole pipeline stops and throws an error.
#' * `"continue"`: the whole pipeline keeps going.
#' * `"null"`: The errored target continues and returns `NULL`.
#' The data hash is deliberately wrong so the target is not
#' up to date for the next run of the pipeline.
#' * `"abridge"`: any currently running targets keep running,
#' but no new targets launch after that.
#' * `"trim"`: all currently running targets stay running. A queued
#' target is allowed to start if:
#'
#' 1. It is not downstream of the error, and
#' 2. It is not a sibling branch from the same [tar_target()] call
#' (if the error happened in a dynamic branch).
#'
#' The idea is to avoid starting any new work that the immediate error
#' impacts. `error = "trim"` is just like `error = "abridge"`,
#' but it allows potentially healthy regions of the dependency graph
#' to begin running.
#' (Visit <https://books.ropensci.org/targets/debugging.html>
#' to learn how to debug targets using saved workspaces.)
#' * `"null"`: The errored target continues and returns `NULL`.
#' The data hash is deliberately wrong so the target is not
#' up to date for the next run of the pipeline.
#' @param memory Character of length 1, memory strategy.
#' If `"persistent"`, the target stays in memory
#' until the end of the pipeline (unless `storage` is `"worker"`,
Expand Down
2 changes: 1 addition & 1 deletion R/tar_target_raw.R
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ tar_target_raw <- function(
tar_assert_flag(iteration, c("vector", "list", "group"))
tar_assert_flag(
error,
c("stop", "continue", "abridge", "workspace", "null")
c("stop", "continue", "null", "abridge", "trim", "workspace")
)
deprecate_error_workspace(error)
tar_assert_flag(memory, c("persistent", "transient"))
Expand Down
18 changes: 15 additions & 3 deletions man/tar_option_set.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 47f734c

Please sign in to comment.