From fbb6251576f0dbcf4e0eed860c359756f5a91c05 Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Mon, 6 Apr 2026 13:49:02 -0400 Subject: [PATCH 01/10] Enable bulk load for Spark --- R/InsertTable.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/InsertTable.R b/R/InsertTable.R index 99a5f702..064d0bfb 100644 --- a/R/InsertTable.R +++ b/R/InsertTable.R @@ -269,7 +269,7 @@ insertTable.DatabaseConnectorJdbcConnection <- function(connection, } } isSqlReservedWord(c(tableName, colnames(data)), warn = TRUE) - useBulkLoad <- (bulkLoad && dbms %in% c("hive", "redshift") && createTable) || + useBulkLoad <- (bulkLoad && dbms %in% c("hive", "redshift", "spark") && createTable) || (bulkLoad && dbms %in% c("pdw", "postgresql") && !tempTable) useCtasHack <- dbms %in% c("pdw", "redshift", "bigquery", "hive") && createTable && nrow(data) > 0 && !useBulkLoad if (dbms == "bigquery" && useCtasHack && is.null(tempEmulationSchema)) { From 04984e6a806c13d5a11b1c7ad0f842464c846023 Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Mon, 6 Apr 2026 13:56:47 -0400 Subject: [PATCH 02/10] Fix Azure cred check --- R/BulkLoad.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/BulkLoad.R b/R/BulkLoad.R index 58cdcf89..63faa663 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -64,7 +64,7 @@ checkBulkLoadCredentials <- function(connection) { envSet <- FALSE container <- FALSE - if (Sys.getenv("AZR_STORAGE_ACCOUNT") != "" && Sys.getenv("AZR_ACCOUNT_KEY") != "" && Sys.setenv("AZR_CONTAINER_NAME") != "") { + if (Sys.getenv("AZR_STORAGE_ACCOUNT") != "" && Sys.getenv("AZR_ACCOUNT_KEY") != "" && Sys.getenv("AZR_CONTAINER_NAME") != "") { envSet <- TRUE } From 04dd91d7a36421c89f2f3a1601c29a0d2790fec2 Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Mon, 6 Apr 2026 15:39:40 -0400 Subject: [PATCH 03/10] Add Azure container name to copy SQL --- R/BulkLoad.R | 1 + inst/sql/sql_server/sparkCopy.sql | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/R/BulkLoad.R b/R/BulkLoad.R index 63faa663..f6c8ea2c 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -418,6 +418,7 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { sqlTableName = sqlTableName, fileName = basename(csvFileName), azureAccountKey = Sys.getenv("AZR_ACCOUNT_KEY"), + azureContainerName = Sys.getenv("AZR_CONTAINER_NAME"), azureStorageAccount = Sys.getenv("AZR_STORAGE_ACCOUNT") ) diff --git a/inst/sql/sql_server/sparkCopy.sql b/inst/sql/sql_server/sparkCopy.sql index e9b43853..e961b55f 100644 --- a/inst/sql/sql_server/sparkCopy.sql +++ b/inst/sql/sql_server/sparkCopy.sql @@ -1,5 +1,5 @@ COPY INTO @sqlTableName -FROM 'abfss://@azureStorageAccount.dfs.core.windows.net/@fileName' +FROM 'abfss://@azureContainerName@@azureStorageAccount.dfs.core.windows.net/@fileName' WITH ( CREDENTIAL (AZURE_SAS_TOKEN = '@azureAccountKey') ) From 6187038334d805946296d49ab14e5d6b5c251d9c Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Mon, 6 Apr 2026 15:56:50 -0400 Subject: [PATCH 04/10] Include verbose error in output --- R/BulkLoad.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/BulkLoad.R b/R/BulkLoad.R index f6c8ea2c..3ea77539 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -427,7 +427,7 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { DatabaseConnector::executeSql(connection = connection, sql = sql, reportOverallTime = FALSE) }, error = function(e) { - abort("Error in DataBricks bulk upload. Please check DataBricks/Azure Storage access.") + abort(paste("Error in DataBricks bulk upload. Please check DataBricks/Azure Storage access.\n", e)) } ) delta <- Sys.time() - start From c9772c94ff809702098b300f7a8b00b20c9ac2d2 Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Tue, 12 May 2026 10:57:52 -0400 Subject: [PATCH 05/10] Remove path from destination file --- R/BulkLoad.R | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/R/BulkLoad.R b/R/BulkLoad.R index 3ea77539..2aaa7b44 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -391,6 +391,7 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { csvFileName <- tempfile("spark_insert_", fileext = ".csv") write.csv(x = data, na = "", file = csvFileName, row.names = FALSE, quote = TRUE) + destinationCsvFileName <- basename(csvFileName) on.exit(unlink(csvFileName)) azureEndpoint <- getAzureEndpoint() @@ -399,13 +400,13 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { AzureStor::storage_upload( targetContainer, src=csvFileName, - dest=csvFileName + dest=destinationCsvFileName ) on.exit( AzureStor::delete_storage_file( targetContainer, - file = csvFileName, + file = destinationCsvFileName, confirm = FALSE ), add = TRUE @@ -416,7 +417,7 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { packageName = "DatabaseConnector", dbms = "spark", sqlTableName = sqlTableName, - fileName = basename(csvFileName), + fileName = destinationCsvFileName, azureAccountKey = Sys.getenv("AZR_ACCOUNT_KEY"), azureContainerName = Sys.getenv("AZR_CONTAINER_NAME"), azureStorageAccount = Sys.getenv("AZR_STORAGE_ACCOUNT") From a4a294f119072fb366fb2459d290834274735e2e Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Tue, 12 May 2026 12:28:20 -0400 Subject: [PATCH 06/10] Modify SQL to remove cred passing --- R/BulkLoad.R | 1 - inst/sql/sql_server/sparkCopy.sql | 3 --- 2 files changed, 4 deletions(-) diff --git a/R/BulkLoad.R b/R/BulkLoad.R index 2aaa7b44..ec814b6f 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -418,7 +418,6 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { dbms = "spark", sqlTableName = sqlTableName, fileName = destinationCsvFileName, - azureAccountKey = Sys.getenv("AZR_ACCOUNT_KEY"), azureContainerName = Sys.getenv("AZR_CONTAINER_NAME"), azureStorageAccount = Sys.getenv("AZR_STORAGE_ACCOUNT") ) diff --git a/inst/sql/sql_server/sparkCopy.sql b/inst/sql/sql_server/sparkCopy.sql index e961b55f..dddf3f3e 100644 --- a/inst/sql/sql_server/sparkCopy.sql +++ b/inst/sql/sql_server/sparkCopy.sql @@ -1,8 +1,5 @@ COPY INTO @sqlTableName FROM 'abfss://@azureContainerName@@azureStorageAccount.dfs.core.windows.net/@fileName' -WITH ( - CREDENTIAL (AZURE_SAS_TOKEN = '@azureAccountKey') -) FILEFORMAT = CSV FORMAT_OPTIONS ( 'header' = 'true', From a8759c6ae79af823d2304008271e2d058f23b9aa Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Tue, 12 May 2026 14:41:50 -0400 Subject: [PATCH 07/10] Add mergeSchema for bulk load --- inst/sql/sql_server/sparkCopy.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/inst/sql/sql_server/sparkCopy.sql b/inst/sql/sql_server/sparkCopy.sql index dddf3f3e..e4165368 100644 --- a/inst/sql/sql_server/sparkCopy.sql +++ b/inst/sql/sql_server/sparkCopy.sql @@ -4,4 +4,5 @@ FILEFORMAT = CSV FORMAT_OPTIONS ( 'header' = 'true', 'inferSchema' = 'true' -); +) +COPY_OPTIONS('mergeSchema' = 'true'); From 1c120ec7c664337b2a65a2dec0c6f8a58480c7a7 Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Tue, 12 May 2026 14:59:23 -0400 Subject: [PATCH 08/10] Remove inferSchema --- inst/sql/sql_server/sparkCopy.sql | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/inst/sql/sql_server/sparkCopy.sql b/inst/sql/sql_server/sparkCopy.sql index e4165368..ebb30604 100644 --- a/inst/sql/sql_server/sparkCopy.sql +++ b/inst/sql/sql_server/sparkCopy.sql @@ -2,7 +2,6 @@ COPY INTO @sqlTableName FROM 'abfss://@azureContainerName@@azureStorageAccount.dfs.core.windows.net/@fileName' FILEFORMAT = CSV FORMAT_OPTIONS ( - 'header' = 'true', - 'inferSchema' = 'true' + 'header' = 'true' ) COPY_OPTIONS('mergeSchema' = 'true'); From de81bd2fdf7599ed899064f7f09434025cb685da Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Tue, 12 May 2026 21:32:24 -0400 Subject: [PATCH 09/10] Add explicit casts when bulk loading --- R/BulkLoad.R | 4 ++++ inst/sql/sql_server/sparkCopy.sql | 5 ++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/R/BulkLoad.R b/R/BulkLoad.R index ec814b6f..5b22bf30 100644 --- a/R/BulkLoad.R +++ b/R/BulkLoad.R @@ -393,6 +393,9 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { write.csv(x = data, na = "", file = csvFileName, row.names = FALSE, quote = TRUE) destinationCsvFileName <- basename(csvFileName) on.exit(unlink(csvFileName)) + + sqlDataTypes <- sapply(data, getSqlDataTypes, dbms = connection@dbms) + selectFields <- paste0(.sql.qescape(names(data), TRUE), "::", sqlDataTypes, collapse = ", ") azureEndpoint <- getAzureEndpoint() containers <- AzureStor::list_storage_containers(azureEndpoint) @@ -417,6 +420,7 @@ bulkLoadSpark <- function(connection, sqlTableName, data) { packageName = "DatabaseConnector", dbms = "spark", sqlTableName = sqlTableName, + selectFields = selectFields, fileName = destinationCsvFileName, azureContainerName = Sys.getenv("AZR_CONTAINER_NAME"), azureStorageAccount = Sys.getenv("AZR_STORAGE_ACCOUNT") diff --git a/inst/sql/sql_server/sparkCopy.sql b/inst/sql/sql_server/sparkCopy.sql index ebb30604..ea1756b8 100644 --- a/inst/sql/sql_server/sparkCopy.sql +++ b/inst/sql/sql_server/sparkCopy.sql @@ -1,5 +1,8 @@ COPY INTO @sqlTableName -FROM 'abfss://@azureContainerName@@azureStorageAccount.dfs.core.windows.net/@fileName' +FROM ( + SELECT @selectFields + FROM 'abfss://@azureContainerName@@azureStorageAccount.dfs.core.windows.net/@fileName' +) FILEFORMAT = CSV FORMAT_OPTIONS ( 'header' = 'true' From 0977eb41d4374a2b5405f68e6b944a954bab36eb Mon Sep 17 00:00:00 2001 From: Anthony Sena Date: Wed, 13 May 2026 08:56:04 -0400 Subject: [PATCH 10/10] Add documentation for Databricks configuration --- R/InsertTable.R | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/R/InsertTable.R b/R/InsertTable.R index 064d0bfb..a86ec95f 100644 --- a/R/InsertTable.R +++ b/R/InsertTable.R @@ -131,7 +131,11 @@ validateInt64Insert <- function() { #' Credentials are configured directly into the System Environment using the #' following keys: Sys.setenv("AZR_STORAGE_ACCOUNT" = #' "some_azure_storage_account", "AZR_ACCOUNT_KEY" = "some_secret_account_key", "AZR_CONTAINER_NAME" = -#' "some_container_name"). +#' "some_container_name"). Prerequisites for Azure Databricks instances: Create an Access Connector +#' for Azure Databricks to provide a secure bridge between Unity Catalog and Azure Data Lake +#' Storage (ADLS Gen2), create the required storage credentials using the access connector, +#' and configure the bulk-loading storage account as an external location using the access +#' connector and storage credentials. #' #' PDW: The MPP bulk loading relies upon the client #' having a Windows OS and the DWLoader exe installed, and the following permissions granted: --Grant