diff --git a/R/BulkLoad.R b/R/BulkLoad.R index 58cdcf89..5b22bf30 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -64,7 +64,7 @@ checkBulkLoadCredentials <- function(connection) { envSet <- FALSE container <- FALSE - if (Sys.getenv("AZR_STORAGE_ACCOUNT") != "" && Sys.getenv("AZR_ACCOUNT_KEY") != "" && Sys.setenv("AZR_CONTAINER_NAME") != "") { + if (Sys.getenv("AZR_STORAGE_ACCOUNT") != "" && Sys.getenv("AZR_ACCOUNT_KEY") != "" && Sys.getenv("AZR_CONTAINER_NAME") != "") { envSet <- TRUE } @@ -391,7 +391,11 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { csvFileName <- tempfile("spark_insert_", fileext = ".csv") write.csv(x = data, na = "", file = csvFileName, row.names = FALSE, quote = TRUE) + destinationCsvFileName <- basename(csvFileName) on.exit(unlink(csvFileName)) + + sqlDataTypes <- sapply(data, getSqlDataTypes, dbms = connection@dbms) + selectFields <- paste0(.sql.qescape(names(data), TRUE), "::", sqlDataTypes, collapse = ", ") azureEndpoint <- getAzureEndpoint() containers <- AzureStor::list_storage_containers(azureEndpoint) @@ -399,13 +403,13 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { AzureStor::storage_upload( targetContainer, src=csvFileName, - dest=csvFileName + dest=destinationCsvFileName ) on.exit( AzureStor::delete_storage_file( targetContainer, - file = csvFileName, + file = destinationCsvFileName, confirm = FALSE ), add = TRUE @@ -416,8 +420,9 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { packageName = "DatabaseConnector", dbms = "spark", sqlTableName = sqlTableName, - fileName = basename(csvFileName), - azureAccountKey = Sys.getenv("AZR_ACCOUNT_KEY"), + selectFields = selectFields, + fileName = destinationCsvFileName, + azureContainerName = Sys.getenv("AZR_CONTAINER_NAME"), azureStorageAccount = Sys.getenv("AZR_STORAGE_ACCOUNT") ) @@ -426,7 +431,7 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE) }, error = function(e) { - abort("Error in DataBricks bulk upload. Please check DataBricks/Azure Storage access.") + abort(paste("Error in DataBricks bulk upload. Please check DataBricks/Azure Storage access.\n", e)) } ) delta <- Sys.time() - start diff --git a/R/InsertTable.R b/R/InsertTable.R index 99a5f702..a86ec95f 100644 --- a/R/InsertTable.R +++ b/R/InsertTable.R @@ -131,7 +131,11 @@ validateInt64Insert <- function() { #' Credentials are configured directly into the System Environment using the #' following keys: Sys.setenv("AZR_STORAGE_ACCOUNT" = #' "some_azure_storage_account", "AZR_ACCOUNT_KEY" = "some_secret_account_key", "AZR_CONTAINER_NAME" = -#' "some_container_name"). +#' "some_container_name"). Prerequisites for Azure Databricks instances: Create an Access Connector +#' for Azure Databricks to provide a secure bridge between Unity Catalog and Azure Data Lake +#' Storage (ADLS Gen2), create the required storage credentials using the access connector, +#' and configure the bulk-loading storage account as an external location using the access +#' connector and storage credentials. #' #' PDW: The MPP bulk loading relies upon the client #' having a Windows OS and the DWLoader exe installed, and the following permissions granted: --Grant @@ -269,7 +273,7 @@ insertTable.DatabaseConnectorJdbcConnection <- function(connection, } } isSqlReservedWord(c(tableName, colnames(data)), warn = TRUE) - useBulkLoad <- (bulkLoad && dbms %in% c("hive", "redshift") && createTable) || + useBulkLoad <- (bulkLoad && dbms %in% c("hive", "redshift", "spark") && createTable) || (bulkLoad && dbms %in% c("pdw", "postgresql") && !tempTable) useCtasHack <- dbms %in% c("pdw", "redshift", "bigquery", "hive") && createTable && nrow(data) > 0 && !useBulkLoad if (dbms == "bigquery" && useCtasHack && is.null(tempEmulationSchema)) { diff --git a/inst/sql/sql_server/sparkCopy.sql b/inst/sql/sql_server/sparkCopy.sql index e9b43853..ea1756b8 100644 --- a/inst/sql/sql_server/sparkCopy.sql +++ b/inst/sql/sql_server/sparkCopy.sql @@ -1,10 +1,10 @@ COPY INTO @sqlTableName -FROM 'abfss://@azureStorageAccount.dfs.core.windows.net/@fileName' -WITH ( - CREDENTIAL (AZURE_SAS_TOKEN = '@azureAccountKey') -) +FROM ( + SELECT @selectFields + FROM 'abfss://@azureContainerName@@azureStorageAccount.dfs.core.windows.net/@fileName' +) FILEFORMAT = CSV FORMAT_OPTIONS ( - 'header' = 'true', - 'inferSchema' = 'true' -); + 'header' = 'true' +) +COPY_OPTIONS('mergeSchema' = 'true');