Skip to content

Commit

Permalink
Use bigrquerystorage for downloads (#604)
Browse files Browse the repository at this point in the history
  • Loading branch information
hadley authored Sep 30, 2024
1 parent 3a93d82 commit 01c574e
Show file tree
Hide file tree
Showing 20 changed files with 508 additions and 132 deletions.
15 changes: 8 additions & 7 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Description: Easily talk to Google's 'BigQuery' database from R.
License: MIT + file LICENSE
URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery
BugReports: https://github.com/r-dbi/bigrquery/issues
Depends:
Depends:
R (>= 4.0)
Imports:
Imports:
bit64,
brio,
cli,
Expand All @@ -29,8 +29,9 @@ Imports:
prettyunits,
rlang (>= 1.1.0),
tibble,
nanoparquet (> 0.3.1)
nanoparquet (>= 0.3.1)
Suggests:
bigrquerystorage (>= 1.1.0.9000),
blob,
covr,
dbplyr (>= 2.4.0),
Expand All @@ -41,9 +42,7 @@ Suggests:
testthat (>= 3.1.5),
wk (>= 0.3.2),
withr
Remotes:
r-lib/nanoparquet
LinkingTo:
LinkingTo:
cli,
cpp11,
rapidjsonr
Expand All @@ -54,7 +53,7 @@ Config/testthat/start-first: bq-table, dplyr
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.3.2
Collate:
Collate:
'bigrquery-package.R'
'bq-auth.R'
'bq-dataset.R'
Expand Down Expand Up @@ -84,3 +83,5 @@ Collate:
'import-standalone-types-check.R'
'utils.R'
'zzz.R'
Remotes:
meztez/bigrquerystorage
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export(bq_perform_extract)
export(bq_perform_load)
export(bq_perform_query)
export(bq_perform_query_dry_run)
export(bq_perform_query_schema)
export(bq_perform_upload)
export(bq_project_datasets)
export(bq_project_jobs)
Expand Down
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# bigrquery (development version)

* If the bigrquerystorage package is installed, `bq_table_download()` (and
hence `collect()`, `dbGetQuery()` and `dbFetch()` will use it. This will
drastically improve the speed of downloading large datasets. A big thanks
to @meztez for creating the bigrquerystorage package!

* The `bq_perform_upload()` function now allows users to choose the transmission format (JSON or PARQUET) for data sent to BigQuery (@apalacio9502, #608).

* bigrquery now requires R 4.0, in line with our version support principles.

# bigrquery 1.5.1
Expand Down
109 changes: 84 additions & 25 deletions R/bq-download.R
Original file line number Diff line number Diff line change
@@ -1,61 +1,77 @@
#' Download table data
#'
#' This retrieves rows in chunks of `page_size`. It is most suitable for results
#' of smaller queries (<100 MB, say). For larger queries, it is better to
#' export the results to a CSV file stored on google cloud and use the
#' bq command line tool to download locally.
#' @description
#' This function provides two ways to download data from BigQuery, transfering
#' data using either JSON or arrow, depending on the `api` argument. If
#' bigrquerystorage is installed, `api = "arrow"` will be used (because it's
#' so much faster, but see the limitions below), otherwise you can select
#' deliberately by using `api = "json"` or `api = "arrow"`.
#'
#' @section Complex data:
#' bigrquery will retrieve nested and repeated columns in to list-columns
#' ## Arrow API
#'
#' The arrow API is much faster, but has heavier dependencies: bigrquerystorage
#' requires the arrow package, which can be tricky to compile on Linux (but you
#' usually should be able to get a binary from
#' [Posit Public Package Manager](https://posit.co/products/cloud/public-package-manager/).
#'
#' There's one known limitation of `api = "arrow"`: when querying public data,
#' you'll now need to provide a `billing` project.
#'
#' ## JSON API
#'
#' The JSON API retrieves rows in chunks of `page_size`. It is most suitable
#' for results of smaller queries (<100 MB, say). Unfortunately due to
#' limitations in the BigQuery API, you may need to vary this parameter
#' depending on the complexity of the underlying data.
#'
#' The JSON API will convert nested and repeated columns in to list-columns
#' as follows:
#'
#' * Repeated values (arrays) will become a list-column of vectors.
#' * Records will become list-columns of named lists.
#' * Repeated records will become list-columns of data frames.
#'
#' @section Larger datasets:
#' In my timings, this code takes around 1 minute per 100 MB of data.
#' If you need to download considerably more than this, I recommend:
#'
#' * Export a `.csv` file to Cloud Storage using [bq_table_save()].
#' * Use the `gsutil` command line utility to download it.
#' * Read the csv file into R with `readr::read_csv()` or `data.table::fread()`.
#'
#' Unfortunately you can not export nested or repeated formats into CSV, and
#' the formats that BigQuery supports (arvn and ndjson) that allow for
#' nested/repeated values, are not well supported in R.
#'
#' @return Because data retrieval may generate list-columns and the `data.frame`
#' print method can have problems with list-columns, this method returns
#' a tibble. If you need a `data.frame`, coerce the results with
#' [as.data.frame()].
#' @param x A [bq_table]
#' @param n_max Maximum number of results to retrieve. Use `Inf` to retrieve all
#' rows.
#' @param page_size The number of rows requested per chunk. It is recommended to
#' leave this unspecified until you have evidence that the `page_size`
#' selected automatically by `bq_table_download()` is problematic.
#' @param page_size (JSON only) The number of rows requested per chunk. It is
#' recommended to leave this unspecified until you have evidence that the
#' `page_size` selected automatically by `bq_table_download()` is problematic.
#'
#' When `page_size = NULL` bigrquery determines a conservative, natural chunk
#' size empirically. If you specify the `page_size`, it is important that each
#' chunk fits on one page, i.e. that the requested row limit is low enough to
#' prevent the API from paginating based on response size.
#' @param start_index Starting row index (zero-based).
#' @param max_connections Number of maximum simultaneous connections to
#' BigQuery servers.
#' @param start_index (JSON only) Starting row index (zero-based).
#' @param max_connections (JSON only) Number of maximum simultaneous
#' connections to BigQuery servers.
#' @param api Which API to use? The `"json"` API works where ever bigrquery
#' does, but is slow and can require fiddling with the `page_size` parameter.
#' The `"arrow"` API is faster and more reliable, but only works if you
#' have also installed the bigrquerystorage package.
#'
#' Because the `"arrow"` API is so much faster, it will be used automatically
#' if the bigrquerystorage package is installed.
#' @inheritParams api-job
#' @param bigint The R type that BigQuery's 64-bit integer types should be
#' mapped to. The default is `"integer"`, which returns R's `integer` type,
#' but results in `NA` for values above/below +/- 2147483647. `"integer64"`
#' returns a [bit64::integer64], which allows the full range of 64 bit
#' integers.
#' @param billing (Arrow only) Project to bill; defaults to the project of `x`,
#' and typically only needs to be specified if you're working with public
#' datasets.
#' @param max_results `r lifecycle::badge("deprecated")` Deprecated. Please use
#' `n_max` instead.
#' @section Google BigQuery API documentation:
#' * [list](https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list)
#' @export
#' @examplesIf bq_testable()
#' df <- bq_table_download("publicdata.samples.natality", n_max = 35000)
#' df <- bq_table_download("publicdata.samples.natality", n_max = 35000, billing = bq_test_project())
bq_table_download <-
function(x,
n_max = Inf,
Expand All @@ -64,20 +80,55 @@ bq_table_download <-
max_connections = 6L,
quiet = NA,
bigint = c("integer", "integer64", "numeric", "character"),
api = c("json", "arrow"),
billing = x$project,
max_results = deprecated()) {
x <- as_bq_table(x)
check_number_whole(n_max, min = 0, allow_infinite = TRUE)
check_number_whole(start_index, min = 0)
check_number_whole(max_connections, min = 1)
quiet <- check_quiet(quiet)
bigint <- arg_match(bigint)
api <- check_api(api)

if (lifecycle::is_present(max_results)) {
lifecycle::deprecate_warn(
"1.4.0", "bq_table_download(max_results)", "bq_table_download(n_max)"
)
n_max <- max_results
}

if (api == "arrow") {
check_installed("bigrquerystorage", "required to download using arrow API")
if (!missing(page_size)) {
cli::cli_warn(
'{.arg page_size} is ignored when {.code api == "arrow"}',
call = environment()
)
}
if (!missing(start_index)) {
cli::cli_warn(
'{.arg start_index} is ignored when {.code api == "arrow"}',
call = environment()
)
}
if (!missing(max_connections)) {
cli::cli_warn(
'{.arg max_connections} is ignored when {.code api == "arrow"}',
call = environment()
)
}

return(bigrquerystorage::bqs_table_download(
x = toString(x),
parent = billing,
n_max = n_max,
quiet = quiet,
bigint = bigint,
as_tibble = TRUE
))
}

params <- set_row_params(
nrow = bq_table_nrow(x),
n_max = n_max,
Expand Down Expand Up @@ -202,6 +253,14 @@ bq_table_download <-
parse_postprocess(table_data, bigint = bigint)
}

check_api <- function(api = c("json", "arrow"), error_call = caller_env()) {
if (identical(api, c("json", "arrow"))) {
if (has_bigrquerystorage()) "arrow" else "json"
} else {
arg_match(api, error_call = error_call)
}
}

# This function is a modified version of
# https://github.com/r-dbi/RPostgres/blob/master/R/PqResult.R
parse_postprocess <- function(df, bigint) {
Expand Down
72 changes: 58 additions & 14 deletions R/bq-perform.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ export_json <- function(values) {
#' Google Cloud.
#'
#' For Google Cloud Storage URIs: Each URI can contain one
#' `'*'`` wildcard character and it must come after the 'bucket' name.
#' `'*'` wildcard character and it must come after the 'bucket' name.
#' Size limits related to load jobs apply to external data sources.
#'
#' For Google Cloud Bigtable URIs: Exactly one URI can be specified and
Expand Down Expand Up @@ -358,21 +358,13 @@ bq_perform_query_dry_run <- function(query, billing,
parameters = NULL,
use_legacy_sql = FALSE) {

check_string(query)
check_string(billing)
check_bool(use_legacy_sql)

query <- list(
query = unbox(query),
useLegacySql = unbox(use_legacy_sql)
query <- bq_perform_query_data(
query = query,
default_dataset = default_dataset,
parameters = parameters,
use_legacy_sql = use_legacy_sql
)
if (!is.null(parameters)) {
parameters <- as_bq_params(parameters)
query$queryParameters <- as_json(parameters)
}
if (!is.null(default_dataset)) {
query$defaultDataset <- datasetReference(default_dataset)
}

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))
Expand All @@ -386,6 +378,58 @@ bq_perform_query_dry_run <- function(query, billing,
structure(bytes, class = "bq_bytes")
}

#' @export
#' @rdname api-perform
bq_perform_query_schema <- function(query, billing,
...,
default_dataset = NULL,
parameters = NULL) {

query <- bq_perform_query_data(
query = query,
default_dataset = default_dataset,
parameters = parameters,
use_legacy_sql = FALSE
)

url <- bq_path(billing, jobs = "")
body <- list(configuration = list(query = query, dryRun = unbox(TRUE)))

res <- bq_post(
url,
body = bq_body(body, ...),
query = list(fields = "statistics")
)
# https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
res$statistics$query$schema$fields
}

bq_perform_query_data <- function(query,
...,
default_dataset = NULL,
parameters = NULL,
use_legacy_sql = FALSE,
call = caller_env()) {
check_string(query, error_call = call)
check_bool(use_legacy_sql, error_call = call)

query <- list(
query = unbox(query),
useLegacySql = unbox(use_legacy_sql)
)
if (!is.null(parameters)) {
parameters <- as_bq_params(parameters)
query$queryParameters <- as_json(parameters)
}
if (!is.null(default_dataset)) {
query$defaultDataset <- datasetReference(default_dataset)
}

query
}



#' @export
#' @rdname api-perform
bq_perform_copy <- function(src, dest,
Expand Down
2 changes: 1 addition & 1 deletion R/dbi-connection.R
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ setMethod("dbCreateTable", "BigQueryConnection", dbCreateTable_bq)

dbReadTable_bq <- function(conn, name, ...) {
tb <- as_bq_table(conn, name)
bq_table_download(tb, ...)
bq_table_download(tb, ..., api = "json")
}

#' @rdname DBI
Expand Down
31 changes: 22 additions & 9 deletions R/dbi-result.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,31 @@ setMethod(
"dbFetch", "BigQueryResult",
function(res, n = -1, ...) {
check_number_whole(n, min = -1, allow_infinite = TRUE)
if (n == -1) n <- Inf

if (n == -1 || n == Inf) {
if (has_bigrquerystorage() && n == Inf && res@cursor$cur() == 0) {
# https://github.com/meztez/bigrquerystorage/issues/48
n <- res@cursor$left()

# If possible, download complete dataset using arrow
data <- bq_table_download(res@bq_table,
n_max = n,
bigint = res@bigint,
quiet = res@quiet,
api = "arrow"
)
} else {
# Otherwise, fall back to slower JSON API
data <- bq_table_download(res@bq_table,
n_max = n,
start_index = res@cursor$cur(),
page_size = res@page_size,
bigint = res@bigint,
quiet = res@quiet,
api = "json"
)
}

data <- bq_table_download(res@bq_table,
n_max = n,
start_index = res@cursor$cur(),
page_size = res@page_size,
bigint = res@bigint,
quiet = res@quiet
)

res@cursor$adv(nrow(data))

data
Expand Down
Loading

0 comments on commit 01c574e

Please sign in to comment.