diff --git a/.build-tools.cson b/.build-tools.cson new file mode 100644 index 0000000..df4b394 --- /dev/null +++ b/.build-tools.cson @@ -0,0 +1,37 @@ +providers: [ + { + key: "bt" + config: + commands: [ + { + project: "/home/django/Documents/Datama/datama-core" + source: "/home/django/Documents/Datama/datama-core/.build-tools.cson" + name: "build & install" + command: "R CMD INSTALL . --build" + wd: "." + env: {} + modifier: + shell: + command: "sh -c" + environment: + name: "child_process" + config: + stdoe: "both" + stdout: + pipeline: [] + stderr: + pipeline: [] + output: + console: + close_success: true + queue_in_buffer: true + colors: true + stdin: true + linter: + no_trace: false + immediate: false + version: 2 + } + ] + } +] diff --git a/DESCRIPTION b/DESCRIPTION index 454d847..99a9e35 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,9 +1,10 @@ Package: bigQueryR Title: Interface with Google BigQuery with Shiny Compatibility -Version: 0.5.0.9000 +Version: 0.5.0.9002 Authors@R: c(person("Mark", "Edmondson",email = "r@sunholo.com", role = c("aut", "cre")), - person("Hadley", "Wickham", , "hadley@rstudio.com", role = "ctb") + person("Hadley", "Wickham", , "hadley@rstudio.com", role = "ctb"), + person("Django", "Unchained", , "django@datama.fr", role = "ctb") ) Description: Interface with 'Google BigQuery', see for more information. diff --git a/NEWS.md b/NEWS.md index 9b1dfe8..25e8c07 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,8 @@ +# bigQuery 0.5.0.9001 + +* Added a dryRun parameter for BigQuery which enable a query to check the estimated size of the overall computation (in bytes). +* Added a timeoutMs for large (sync) queries. Async queries are not affected by this change. + # bigQuery 0.5.0.9000 * Add missing numeric type for BigQuery schema parsing (#65) diff --git a/R/dataParseFunctions.R b/R/dataParseFunctions.R index 0a90e8f..d8560c3 100644 --- a/R/dataParseFunctions.R +++ b/R/dataParseFunctions.R @@ -15,20 +15,64 @@ parse_bqr_query <- function(x){ schema <- x$schema$fields ## ffs + ncol <- 0 + col_name <- c() + for (i in seq_along(schema$name)){ + if(tolower(schema[i,]$type) != "record"){ + ncol <- ncol + 1 + col_name <- c(col_name,schema[i,]$name) + }else{ + ncol <- ncol + nrow(schema[i,]$fields[[1]]) + col_name <- c(col_name,paste0(schema[i,]$name,".",paste0(schema[i,]$fields[[1]]$name))) + } + } + data_f <- as.data.frame(matrix(unlist(unlist(x$rows)), - ncol = length(schema$name), - byrow = TRUE), + ncol = ncol, + byrow = TRUE), stringsAsFactors = FALSE) types <- tolower(schema$type) - converter_funcs <- converter[types] - - for(i in seq_along(converter_funcs)){ - data_f[,i] <- converter_funcs[[i]](data_f[, i]) + # /** + # * This function parse schema and convert a dataframe according to schema and types + # * Defined in converter variable + # * @param list schema the schema to use + # * @param list df the dataframe to apply the schema + # * @return list the converted dataframe + # */ + converter_funcs <- function(schema,df){ + data_f <- df + types <- tolower(schema$type) # Convert types to a lower case to match it against converter values. + # Loop through all our types which are order the same way as the df + for (k in seq_along(types)){ + type <- types[k] + # Base case for the recursion. If the type is not a record type (column of column) + if(type != "record"){ + # The convert the df at the same position as the type + # E.g type = string int string then df[1] will be string, df[2] int etc... + data_f[,k] <- converter[[type]](data_f[,k]) #this returns the function at the position type in converter variable + }else{ + # If it's a RECORD type then. + # First, if we have a RECORD type named "C" and columns of "C" are "Bars1" of type [int] and "Bars2" of type [str] + # Then the df will be C.bars1,C.bars2 at position k to k+2 + # Because records are stored by the privous lines of code in a flatten way + # [A,B,[C.Bars1,C.Bars2]] will be [A,B,C.Bars1,C.Bars2] where A and B are normal columns + # See line 26 col_name <- c(... + # So in this bit of code, we extract the subdataframe ranging from k to k+n-1 where n is the number of rows in the sub schema of the RECORD C + # and we apply the same function to it's subset from k to k+n-1 + sub_schema <- schema[k,]$fields[[1]] + n <- nrow(sub_schema) + data_f[,k:(k+n-1)] <- converter_funcs(sub_schema,data_f[,k:(k+n-1)]) + } + } + return(data_f) } - names(data_f) <- schema$name + # converter_funcs <- converter[types] + data_f <- converter_funcs(schema,data_f) + + names(data_f) <- col_name out <- data_f diff --git a/R/query.R b/R/query.R index 75ab6f2..ecc50c4 100644 --- a/R/query.R +++ b/R/query.R @@ -6,6 +6,7 @@ #' @param maxResults Max number per page of results. Set total rows with LIMIT in your query. #' @param useLegacySql Whether the query you pass is legacy SQL or not. Default TRUE #' @param useQueryCache Whether to use the query cache. Default TRUE, set to FALSE for realtime queries. +#' @param callbacks a list of two functions callbacks$processing(pageNumber) and callbacks$done(). processing takes argument pageNumber (int) and done is called when all data is fetched #' #' @return a data.frame. #' If there is an SQL error, a data.frame with @@ -37,9 +38,20 @@ bqr_query <- function(projectId = bqr_get_global_project(), useLegacySql = TRUE, useQueryCache = TRUE, dryRun = FALSE, - timeoutMs = 600*1000){ + timeoutMs = 600*1000, + callbacks=NULL){ check_bq_auth() + if(is.null(callbacks) || !( is.list(callbacks) && "processing" %in% names(callbacks) && "done" %in% names(callbacks) ) ){ + default_processing <- function(i){ + message("Page #: ", i) + } + default_done <- function(){ + message("All data fetched.") + } + callbacks <- list(processing=default_processing,done=default_done) + } + if(endsWith(query, ".sql")){ query <- readChar(query, nchars = file.info(query)$size) } @@ -82,7 +94,7 @@ bqr_query <- function(projectId = bqr_get_global_project(), data_parse_function = parse_bqr_query, checkTrailingSlash = FALSE) data <- try(q(the_body = body, - path_arguments = list(projects = projectId))) + path_arguments = list(projects = projectId))) } if(is.error(data)) { @@ -92,24 +104,28 @@ bqr_query <- function(projectId = bqr_get_global_project(), } pageToken <- attr(data, "pageToken") + if(!is.null(pageToken)){ message("Paging through query results") jobId <- attr(data, "jobReference")$jobId + location <- attr(data, "jobReference")$location pr <- googleAuthR::gar_api_generator("https://www.googleapis.com/bigquery/v2", "GET", path_args = list(projects = projectId, queries = jobId), - pars_args = list(pageToken = pageToken), + pars_args = list(pageToken = pageToken, location = location), data_parse_function = parse_bqr_query) i <- 1 while(!is.null(pageToken)){ - message("Page #: ", i) + # message("Page #: ", i) + callbacks$processing(i) data_page <- pr(pars_arguments = list(pageToken = pageToken)) data <- rbind(data, data_page) pageToken <- attr(data_page, "pageToken") i <- i + 1 } - message("All data fetched.") + callbacks$done() + # message("All data fetched.") }