Skip to content

Commit

Permalink
Add ingest script
Browse files Browse the repository at this point in the history
  • Loading branch information
Damonamajor committed Aug 8, 2024
1 parent 9a473af commit 85810aa
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 79 deletions.
11 changes: 11 additions & 0 deletions Analysis_helpers.R → analyses/Analysis_helpers.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,14 @@ model_fetch_run_subset <- function(run_id, year, analyses_paths, append_run_id =
tictoc::toc()
return(data_list)
}


rename_var <- function(var_name, suffix, new_suffix) {
if (exists(var_name) && is.data.frame(get(var_name))) {
if (grepl(paste0("_", suffix, "$"), var_name)) {
new_name <- sub(paste0("_", suffix, "$"), new_suffix, var_name)
assign(new_name, get(var_name), envir = .GlobalEnv)
rm(list = var_name, envir = .GlobalEnv)
}
}
}
119 changes: 40 additions & 79 deletions analyses/new-feature-template.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,16 @@ cpp11::cpp_source(code = "
")
ignore_sigpipes()
# Initialize a dictionary of file paths. See misc/file_dict.csv for details
paths <- model_file_dict(model_params$run_id, model_params$year)
```

```{r download_new_data}
base_paths <- model_file_dict(params$run_id, params$run_id_year)
comp_paths <- model_file_dict(params$comparison_run_id, params$comparison_run_id_year)
comparison_paths <- model_file_dict(
params$comparison_run_id,
params$comparison_run_id_year
)
run_id <- params$run_id
comparison_run_id <- params$comparison_run_id
analyses_paths <- list(
output = list(
Expand All @@ -82,7 +84,7 @@ analyses_paths <- list(
),
list(
s3 = base_paths$output$performance_test$s3,
key = "performance"
key = "performance_test"
),
list(
s3 = base_paths$output$shap$s3,
Expand All @@ -91,112 +93,71 @@ analyses_paths <- list(
)
)
comp_paths <- list(
source("analyses/Analysis_helpers.R")
data_new <- model_fetch_run_subset(
params$run_id,
params$run_id_year,
analyses_paths, TRUE
)
list2env(data_new, envir = .GlobalEnv)
rm(data_new)
comparison_paths <- list(
output = list(
list(
s3 = comp_paths$output$assessment_card$s3,
s3 = base_paths$output$assessment_card$s3,
key = "assessment_card"
),
list(
s3 = comp_paths$output$assessment_pin$s3,
s3 = base_paths$output$assessment_pin$s3,
key = "assessment_pin"
),
list(
s3 = comp_paths$output$metadata$s3,
s3 = base_paths$output$metadata$s3,
key = "metadata"
),
list(
s3 = comp_paths$output$performance_test$s3,
key = "performance"
s3 = base_paths$output$performance_test$s3,
key = "performance_test"
),
list(
s3 = comp_paths$output$shap$s3,
s3 = base_paths$output$shap$s3,
key = "shap"
)
)
)
fetch_analyses <- function(run_id, year, analyses_paths) {
tictoc::tic(paste0("Fetched run: ", run_id))
s3_objs <- grep("s3://", unlist(analyses_paths$output), value = TRUE)
bucket <- strsplit(s3_objs[1], "/")[[1]][3]
data_comparison <- model_fetch_run_subset(
params$comparison_run_id,
params$comparison_run_id_year,
comparison_paths, TRUE
)
data_list <- list()
list2env(data_comparison, envir = .GlobalEnv)
for (analyses_path in analyses_paths$output) {
is_directory <- endsWith(analyses_path$s3, "/")
if (is_directory) {
partitioned_by_run <- endsWith(analyses_path$s3, paste0("run_id=", run_id, "/"))
if (partitioned_by_run) {
dir_path <- analyses_path$s3
} else {
dir_path <- paste0(analyses_path$s3, "year=", year, "/run_id=", run_id, "/")
}
rm(data_comparison)
message("Now fetching: ", dir_path)
objs_prefix <- sub(paste0("s3://", bucket, "/"), "", dir_path)
objs <- aws.s3::get_bucket_df(bucket, objs_prefix)
objs <- dplyr::filter(objs, Size > 0)
if (nrow(objs) > 0) {
combined_data <- NULL
for (key in objs$Key) {
message("Now fetching: ", key)
local_temp_path <- file.path(tempdir(), basename(key))
aws.s3::save_object(key, bucket = bucket, file = local_temp_path)
# Read the Parquet file and append it to combined_data
temp_data <- arrow::read_parquet(local_temp_path)
if (is.null(combined_data)) {
combined_data <- temp_data
} else {
combined_data <- dplyr::bind_rows(combined_data, temp_data)
}
}
data_list[[analyses_path$key]] <- combined_data
} else {
warning(analyses_path$key, " does not exist for this run")
}
} else {
message("Now fetching: ", analyses_path$s3)
if (aws.s3::object_exists(analyses_path$s3, bucket = bucket)) {
local_temp_path <- file.path(tempdir(), basename(analyses_path$s3))
aws.s3::save_object(analyses_path$s3, bucket = bucket, file = local_temp_path)
data_list[[analyses_path$key]] <- arrow::read_parquet(local_temp_path)
} else {
warning(analyses_path$key, " does not exist for this run")
}
}
}
all_vars <- ls()
tictoc::toc()
return(data_list)
# Iterate over all variables and rename if necessary
for (var_name in all_vars) {
rename_var(var_name, params$run_id, "_new")
rename_var(var_name, params$comparison_run_id, "_comparison")
}
data <- fetch_analyses(params$run_id, params$run_id_year, analyses_paths)
data_comparison <- fetch_analyses(params$comparison_run_id, params$comparison_run_id_year, comp_paths)
performance <- data$performance
metadata <- data$metadata
shap <- data$shap
assessment_card <- data$assessment_card %>%
select(meta_pin, meta_card_num, pred_card_initial_fmv, meta_township_code, !!sym(params$added_feature))
assessment_card_full <- data$assessment_card
assessment_pin <- data$assessment_pin
lockfile_assessment <- metadata$dvc_md5_assessment_data
lockfile_assessment <- metadata_new$dvc_md5_assessment_data
# Define S3 paths for assessment'? data
# Define S3 paths for assessment' data
s3_path_assessment <- paste0(
"s3://ccao-data-dvc-us-east-1/files/md5/",
substr(lockfile_assessment, 1, 2), "/",
substr(lockfile_assessment, 3, nchar(lockfile_assessment))
)
assessment_data <- s3read_using(FUN = read_parquet, object = s3_path_assessment)
assessment_data_new <- read_parquet(s3_path_assessment)
```

```{r download_comparison_data}
Expand Down

0 comments on commit 85810aa

Please sign in to comment.