Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes & changes #77

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .build-tools.cson
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
providers: [
{
key: "bt"
config:
commands: [
{
project: "/home/django/Documents/Datama/datama-core"
source: "/home/django/Documents/Datama/datama-core/.build-tools.cson"
name: "build & install"
command: "R CMD INSTALL . --build"
wd: "."
env: {}
modifier:
shell:
command: "sh -c"
environment:
name: "child_process"
config:
stdoe: "both"
stdout:
pipeline: []
stderr:
pipeline: []
output:
console:
close_success: true
queue_in_buffer: true
colors: true
stdin: true
linter:
no_trace: false
immediate: false
version: 2
}
]
}
]
5 changes: 3 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
Package: bigQueryR
Title: Interface with Google BigQuery with Shiny Compatibility
Version: 0.5.0.9000
Version: 0.5.0.9002
Authors@R: c(person("Mark", "Edmondson",email = "r@sunholo.com",
role = c("aut", "cre")),
person("Hadley", "Wickham", , "hadley@rstudio.com", role = "ctb")
person("Hadley", "Wickham", , "hadley@rstudio.com", role = "ctb"),
person("Django", "Unchained", , "django@datama.fr", role = "ctb")
)
Description: Interface with 'Google BigQuery',
see <https://cloud.google.com/bigquery/> for more information.
Expand Down
5 changes: 5 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# bigQuery 0.5.0.9001

* Added a dryRun parameter for BigQuery which enable a query to check the estimated size of the overall computation (in bytes).
* Added a timeoutMs for large (sync) queries. Async queries are not affected by this change.

# bigQuery 0.5.0.9000

* Add missing numeric type for BigQuery schema parsing (#65)
Expand Down
58 changes: 51 additions & 7 deletions R/dataParseFunctions.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,64 @@ parse_bqr_query <- function(x){

schema <- x$schema$fields
## ffs
ncol <- 0
col_name <- c()
for (i in seq_along(schema$name)){
if(tolower(schema[i,]$type) != "record"){
ncol <- ncol + 1
col_name <- c(col_name,schema[i,]$name)
}else{
ncol <- ncol + nrow(schema[i,]$fields[[1]])
col_name <- c(col_name,paste0(schema[i,]$name,".",paste0(schema[i,]$fields[[1]]$name)))
}
}

data_f <- as.data.frame(matrix(unlist(unlist(x$rows)),
ncol = length(schema$name),
byrow = TRUE),
ncol = ncol,
byrow = TRUE),
stringsAsFactors = FALSE)

types <- tolower(schema$type)

converter_funcs <- converter[types]

for(i in seq_along(converter_funcs)){
data_f[,i] <- converter_funcs[[i]](data_f[, i])
# /**
# * This function parse schema and convert a dataframe according to schema and types
# * Defined in converter variable
# * @param list schema the schema to use
# * @param list df the dataframe to apply the schema
# * @return list the converted dataframe
# */
converter_funcs <- function(schema,df){
data_f <- df
types <- tolower(schema$type) # Convert types to a lower case to match it against converter values.
# Loop through all our types which are order the same way as the df
for (k in seq_along(types)){
type <- types[k]
# Base case for the recursion. If the type is not a record type (column of column)
if(type != "record"){
# The convert the df at the same position as the type
# E.g type = string int string then df[1] will be string, df[2] int etc...
data_f[,k] <- converter[[type]](data_f[,k]) #this returns the function at the position type in converter variable
}else{
# If it's a RECORD type then.
# First, if we have a RECORD type named "C" and columns of "C" are "Bars1" of type [int] and "Bars2" of type [str]
# Then the df will be C.bars1,C.bars2 at position k to k+2
# Because records are stored by the privous lines of code in a flatten way
# [A,B,[C.Bars1,C.Bars2]] will be [A,B,C.Bars1,C.Bars2] where A and B are normal columns
# See line 26 col_name <- c(...
# So in this bit of code, we extract the subdataframe ranging from k to k+n-1 where n is the number of rows in the sub schema of the RECORD C
# and we apply the same function to it's subset from k to k+n-1
sub_schema <- schema[k,]$fields[[1]]
n <- nrow(sub_schema)
data_f[,k:(k+n-1)] <- converter_funcs(sub_schema,data_f[,k:(k+n-1)])
}
}
return(data_f)
}

names(data_f) <- schema$name
# converter_funcs <- converter[types]
data_f <- converter_funcs(schema,data_f)

names(data_f) <- col_name

out <- data_f

Expand Down
26 changes: 21 additions & 5 deletions R/query.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#' @param maxResults Max number per page of results. Set total rows with LIMIT in your query.
#' @param useLegacySql Whether the query you pass is legacy SQL or not. Default TRUE
#' @param useQueryCache Whether to use the query cache. Default TRUE, set to FALSE for realtime queries.
#' @param callbacks a list of two functions callbacks$processing(pageNumber) and callbacks$done(). processing takes argument pageNumber (int) and done is called when all data is fetched
#'
#' @return a data.frame.
#' If there is an SQL error, a data.frame with
Expand Down Expand Up @@ -37,9 +38,20 @@ bqr_query <- function(projectId = bqr_get_global_project(),
useLegacySql = TRUE,
useQueryCache = TRUE,
dryRun = FALSE,
timeoutMs = 600*1000){
timeoutMs = 600*1000,
callbacks=NULL){
check_bq_auth()

if(is.null(callbacks) || !( is.list(callbacks) && "processing" %in% names(callbacks) && "done" %in% names(callbacks) ) ){
default_processing <- function(i){
message("Page #: ", i)
}
default_done <- function(){
message("All data fetched.")
}
callbacks <- list(processing=default_processing,done=default_done)
}

if(endsWith(query, ".sql")){
query <- readChar(query, nchars = file.info(query)$size)
}
Expand Down Expand Up @@ -82,7 +94,7 @@ bqr_query <- function(projectId = bqr_get_global_project(),
data_parse_function = parse_bqr_query,
checkTrailingSlash = FALSE)
data <- try(q(the_body = body,
path_arguments = list(projects = projectId)))
path_arguments = list(projects = projectId)))
}

if(is.error(data)) {
Expand All @@ -92,24 +104,28 @@ bqr_query <- function(projectId = bqr_get_global_project(),
}

pageToken <- attr(data, "pageToken")

if(!is.null(pageToken)){
message("Paging through query results")
jobId <- attr(data, "jobReference")$jobId
location <- attr(data, "jobReference")$location
pr <- googleAuthR::gar_api_generator("https://www.googleapis.com/bigquery/v2",
"GET",
path_args = list(projects = projectId,
queries = jobId),
pars_args = list(pageToken = pageToken),
pars_args = list(pageToken = pageToken, location = location),
data_parse_function = parse_bqr_query)
i <- 1
while(!is.null(pageToken)){
message("Page #: ", i)
# message("Page #: ", i)
callbacks$processing(i)
data_page <- pr(pars_arguments = list(pageToken = pageToken))
data <- rbind(data, data_page)
pageToken <- attr(data_page, "pageToken")
i <- i + 1
}
message("All data fetched.")
callbacks$done()
# message("All data fetched.")

}

Expand Down