diff --git a/acceptance/bundle/deploy/wal/chain-10-jobs/databricks.yml b/acceptance/bundle/deploy/wal/chain-10-jobs/databricks.yml new file mode 100644 index 0000000000..2652cdbed6 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-10-jobs/databricks.yml @@ -0,0 +1,117 @@ +bundle: + name: wal-chain-test + +resources: + jobs: + # Linear chain: job_01 -> job_02 -> ... -> job_10 + # Execution order: job_01 first, job_10 last + job_01: + name: "job-01" + description: "first in chain" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_02: + name: "job-02" + description: "depends on ${resources.jobs.job_01.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_03: + name: "job-03" + description: "depends on ${resources.jobs.job_02.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_04: + name: "job-04" + description: "depends on ${resources.jobs.job_03.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_05: + name: "job-05" + description: "depends on ${resources.jobs.job_04.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_06: + name: "job-06" + description: "depends on ${resources.jobs.job_05.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_07: + name: "job-07" + description: "depends on ${resources.jobs.job_06.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_08: + name: "job-08" + description: "depends on ${resources.jobs.job_07.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_09: + name: "job-09" + description: "depends on ${resources.jobs.job_08.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_10: + name: "job-10" + description: "depends on ${resources.jobs.job_09.id}" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/chain-10-jobs/out.test.toml b/acceptance/bundle/deploy/wal/chain-10-jobs/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-10-jobs/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/chain-10-jobs/output.txt b/acceptance/bundle/deploy/wal/chain-10-jobs/output.txt new file mode 100644 index 0000000000..4c4d781c80 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-10-jobs/output.txt @@ -0,0 +1,73 @@ +=== First deploy (crashes on job_10) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] + +=== WAL content after crash === +{"lineage":"[UUID]","serial": [SERIAL]} +{"k":"resources.jobs.job_01","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"first in chain","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-01","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]}}} +{"k":"resources.jobs.job_02","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-02","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_01","label":"${resources.jobs.job_01.id}"}]}} +{"k":"resources.jobs.job_03","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-03","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_02","label":"${resources.jobs.job_02.id}"}]}} +{"k":"resources.jobs.job_04","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-04","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_03","label":"${resources.jobs.job_03.id}"}]}} +{"k":"resources.jobs.job_05","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-05","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_04","label":"${resources.jobs.job_04.id}"}]}} +{"k":"resources.jobs.job_06","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-06","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_05","label":"${resources.jobs.job_05.id}"}]}} +{"k":"resources.jobs.job_07","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-07","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_06","label":"${resources.jobs.job_06.id}"}]}} +{"k":"resources.jobs.job_08","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-08","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_07","label":"${resources.jobs.job_07.id}"}]}} +{"k":"resources.jobs.job_09","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/state/metadata.json"},"description":"depends on 1001","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-09","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files/test.py"},"task_key":"task"}]},"depends_on":[{"node":"resources.jobs.job_08","label":"${resources.jobs.job_08.id}"}]}} + +=== Number of jobs saved in WAL === +9 + +=== Bundle summary (reads from WAL) === +Name: wal-chain-test +Target: default +Workspace: + User: [USERNAME] + Path: /Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default +Resources: + Jobs: + job_01: + Name: job-01 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_02: + Name: job-02 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_03: + Name: job-03 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_04: + Name: job-04 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_05: + Name: job-05 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_06: + Name: job-06 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_07: + Name: job-07 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_08: + Name: job-08 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_09: + Name: job-09 + URL: [DATABRICKS_URL]/jobs/1001?o=[NUMID] + job_10: + Name: job-10 + URL: (not deployed) + +=== Second deploy (recovery) === + +>>> [CLI] bundle deploy --force-lock +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-chain-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/chain-10-jobs/script b/acceptance/bundle/deploy/wal/chain-10-jobs/script new file mode 100644 index 0000000000..6cf2dd32f0 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-10-jobs/script @@ -0,0 +1,22 @@ +echo "=== First deploy (crashes on job_10) ===" +trace errcode $CLI bundle deploy + +echo "" +echo "=== WAL content after crash ===" +cat .databricks/bundle/default/resources.json.wal 2>/dev/null || echo "No WAL file" + +echo "" +echo "=== Number of jobs saved in WAL ===" +grep -c '"k":"resources.jobs' .databricks/bundle/default/resources.json.wal 2>/dev/null || echo "0" + +echo "" +echo "=== Bundle summary (reads from WAL) ===" +$CLI bundle summary + +echo "" +echo "=== Second deploy (recovery) ===" +trace $CLI bundle deploy --force-lock + +echo "" +echo "=== WAL after successful deploy ===" +cat .databricks/bundle/default/resources.json.wal 2>/dev/null || echo "WAL deleted (expected)" diff --git a/acceptance/bundle/deploy/wal/chain-10-jobs/test.py b/acceptance/bundle/deploy/wal/chain-10-jobs/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-10-jobs/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/chain-10-jobs/test.toml b/acceptance/bundle/deploy/wal/chain-10-jobs/test.toml new file mode 100644 index 0000000000..c4308521be --- /dev/null +++ b/acceptance/bundle/deploy/wal/chain-10-jobs/test.toml @@ -0,0 +1,17 @@ +# Linear chain: job_01 -> job_02 -> ... -> job_10 +# Let first 9 jobs/create succeed, then kill on the 10th + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +KillCallerOffset = 9 +KillCaller = 1 +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml new file mode 100644 index 0000000000..cc9024fada --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/databricks.yml @@ -0,0 +1,25 @@ +bundle: + name: wal-corrupted-test + +resources: + jobs: + valid_job: + name: "valid-job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + another_valid: + name: "another-valid" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt b/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt new file mode 100644 index 0000000000..f5e7f346d8 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/output.txt @@ -0,0 +1,24 @@ +=== Creating state file with serial 5 === +=== Creating WAL with corrupted LAST entry === +=== WAL content === +{"lineage":"test-lineage-123","serial": [SERIAL]} +{"k":"resources.jobs.valid_job","v":{"__id__": "[ID]","state":{"name":"valid-job"}}} +{"k":"resources.jobs.another_valid","v":{"__id__": "[ID]","state":{"name":"another-valid"}}} +not valid json - corrupted last line (partial write from crash) +=== Deploy (should recover valid entries, skip corrupted last line) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-corrupted-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state (should have recovered entries) === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.another_valid", + "resources.jobs.valid_job" + ] +} +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/script b/acceptance/bundle/deploy/wal/corrupted-wal-entry/script new file mode 100644 index 0000000000..fc36ed754f --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/script @@ -0,0 +1,37 @@ +echo "=== Creating state file with serial 5 ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-123", + "serial": 5, + "state": {} +} +EOF + +echo "=== Creating WAL with corrupted LAST entry ===" +# Corrupted last line is expected (partial write from crash) and should be skipped. +# Valid entries before it should be recovered. +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-123","serial":6} +{"k":"resources.jobs.valid_job","v":{"__id__":"1111","state":{"name":"valid-job"}}} +{"k":"resources.jobs.another_valid","v":{"__id__":"2222","state":{"name":"another-valid"}}} +not valid json - corrupted last line (partial write from crash) +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should recover valid entries, skip corrupted last line) ===" +trace $CLI bundle deploy 2>&1 + +echo "=== Final state (should have recovered entries) ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys | sort)}' + +echo "=== WAL after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.toml new file mode 100644 index 0000000000..9c9ab5a30b --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-entry/test.toml @@ -0,0 +1,14 @@ +# WAL with corrupted LAST entry - valid entries should be recovered, corrupted last line skipped. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get?job_id=1111" +Response.Body = '{"job_id": 1111, "settings": {"name": "valid-job"}}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get?job_id=2222" +Response.Body = '{"job_id": 2222, "settings": {"name": "another-valid"}}' + diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-middle/databricks.yml b/acceptance/bundle/deploy/wal/corrupted-wal-middle/databricks.yml new file mode 100644 index 0000000000..aef2c714ec --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-middle/databricks.yml @@ -0,0 +1,25 @@ +bundle: + name: wal-corrupted-middle-test + +resources: + jobs: + job_one: + name: "job-one" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_two: + name: "job-two" + tasks: + - task_key: "task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-middle/out.test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-middle/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-middle/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-middle/output.txt b/acceptance/bundle/deploy/wal/corrupted-wal-middle/output.txt new file mode 100644 index 0000000000..4396aade67 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-middle/output.txt @@ -0,0 +1,25 @@ +=== Creating state file with serial 5 === +=== Creating WAL with corrupted MIDDLE entry === +=== WAL content === +{"lineage":"test-lineage-456","serial": [SERIAL]} +{"k":"resources.jobs.job_one","v":{"__id__": "[ID]","state":{"name":"job-one"}}} +not valid json - CORRUPTED MIDDLE LINE +{"k":"resources.jobs.job_two","v":{"__id__": "[ID]","state":{"name":"job-two"}}} +=== Deploy (WAL should be deleted due to middle corruption) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-corrupted-middle-test/default/files... +Warn: Failed to read WAL file, deleting and proceeding: WAL line 3: corrupted entry in middle of WAL: invalid character 'o' in literal null (expecting 'u') +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state (fresh deploy, not recovered from WAL) === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.job_one", + "resources.jobs.job_two" + ] +} +=== WAL after deploy === +WAL deleted (expected - due to middle corruption) diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-middle/script b/acceptance/bundle/deploy/wal/corrupted-wal-middle/script new file mode 100644 index 0000000000..46dc1922d1 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-middle/script @@ -0,0 +1,37 @@ +echo "=== Creating state file with serial 5 ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-456", + "serial": 5, + "state": {} +} +EOF + +echo "=== Creating WAL with corrupted MIDDLE entry ===" +# Corruption in the middle is NOT expected (only last line can be partial write). +# This should cause WAL to be deleted entirely, no recovery. +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-456","serial":6} +{"k":"resources.jobs.job_one","v":{"__id__":"1111","state":{"name":"job-one"}}} +not valid json - CORRUPTED MIDDLE LINE +{"k":"resources.jobs.job_two","v":{"__id__":"2222","state":{"name":"job-two"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (WAL should be deleted due to middle corruption) ===" +trace $CLI bundle deploy 2>&1 + +echo "=== Final state (fresh deploy, not recovered from WAL) ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys | sort)}' + +echo "=== WAL after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected - due to middle corruption)" +fi diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-middle/test.py b/acceptance/bundle/deploy/wal/corrupted-wal-middle/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-middle/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/corrupted-wal-middle/test.toml b/acceptance/bundle/deploy/wal/corrupted-wal-middle/test.toml new file mode 100644 index 0000000000..8aa40be8d7 --- /dev/null +++ b/acceptance/bundle/deploy/wal/corrupted-wal-middle/test.toml @@ -0,0 +1,13 @@ +# WAL with corrupted MIDDLE entry - WAL should be deleted, no recovery. +# Corruption in the middle is unexpected (not a partial write from crash). +# The entire WAL is discarded and a fresh deploy happens. + +# Since WAL is discarded, jobs will be created fresh (not recovered) +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 9999}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get?job_id=9999" +Response.Body = '{"job_id": 9999, "settings": {"name": "fresh-job"}}' + diff --git a/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml b/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml new file mode 100644 index 0000000000..31480454c5 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/databricks.yml @@ -0,0 +1,27 @@ +bundle: + name: wal-crash-test + +resources: + jobs: + job_a: + name: "test-job-a" + description: "first job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_b: + name: "test-job-b" + description: "depends on ${resources.jobs.job_a.id}" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml b/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/crash-after-create/output.txt b/acceptance/bundle/deploy/wal/crash-after-create/output.txt new file mode 100644 index 0000000000..9ab9f4cf9c --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/output.txt @@ -0,0 +1,34 @@ +=== First deploy (crashes after job_a create, before job_b) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] +=== WAL should exist after crash === +WAL exists (expected) +{"lineage":"[UUID]","serial": [SERIAL]} +{"k":"resources.jobs.job_a","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/state/metadata.json"},"description":"first job","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"test-job-a","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files/test.py"},"task_key":"task-a"}]}}} +=== State file after crash (should be empty) === +{ + "serial": [SERIAL], + "state_keys": [] +} +=== Second deploy (should recover from WAL and complete) === + +>>> [CLI] bundle deploy --force-lock +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-crash-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== State file after recovery === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.job_a", + "resources.jobs.job_b" + ] +} +=== WAL file after successful deploy === +WAL file deleted (expected) diff --git a/acceptance/bundle/deploy/wal/crash-after-create/script b/acceptance/bundle/deploy/wal/crash-after-create/script new file mode 100644 index 0000000000..d09f6ab06e --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/script @@ -0,0 +1,26 @@ +echo "=== First deploy (crashes after job_a create, before job_b) ===" +trace errcode $CLI bundle deploy + +echo "=== WAL should exist after crash ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (expected)" + cat .databricks/bundle/default/resources.json.wal +else + echo "WAL missing (unexpected)" +fi + +echo "=== State file after crash (should be empty) ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== Second deploy (should recover from WAL and complete) ===" +trace $CLI bundle deploy --force-lock + +echo "=== State file after recovery ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== WAL file after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "WAL file deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/crash-after-create/test.py b/acceptance/bundle/deploy/wal/crash-after-create/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/crash-after-create/test.toml b/acceptance/bundle/deploy/wal/crash-after-create/test.toml new file mode 100644 index 0000000000..5023224e57 --- /dev/null +++ b/acceptance/bundle/deploy/wal/crash-after-create/test.toml @@ -0,0 +1,17 @@ +# WAL recovery after real crash. First deploy creates job_a then crashes. +# Second deploy recovers from WAL and completes successfully. +# job_b depends on job_a, so jobs/get is called after job_a's SaveState. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +KillCaller = 1 +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + diff --git a/acceptance/bundle/deploy/wal/empty-wal/databricks.yml b/acceptance/bundle/deploy/wal/empty-wal/databricks.yml new file mode 100644 index 0000000000..147a1e1482 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-empty-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/empty-wal/out.test.toml b/acceptance/bundle/deploy/wal/empty-wal/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/empty-wal/output.txt b/acceptance/bundle/deploy/wal/empty-wal/output.txt new file mode 100644 index 0000000000..21b6851080 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/output.txt @@ -0,0 +1,22 @@ +=== Creating state directory === +=== Creating empty WAL file === +=== Empty WAL file exists === +[FILE_INFO] .databricks/bundle/default/resources.json.wal +=== Deploy (should handle empty WAL gracefully) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-empty-test/default/files... +Warn: Failed to read WAL file, deleting and proceeding: WAL file is empty +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +Empty WAL deleted (expected) +=== State file content === +{ + "lineage": "[UUID]", + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/empty-wal/script b/acceptance/bundle/deploy/wal/empty-wal/script new file mode 100644 index 0000000000..f693753ac7 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/script @@ -0,0 +1,21 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating empty WAL file ===" +touch .databricks/bundle/default/resources.json.wal + +echo "=== Empty WAL file exists ===" +ls -la .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should handle empty WAL gracefully) ===" +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "Empty WAL deleted (expected)" +fi + +echo "=== State file content ===" +cat .databricks/bundle/default/resources.json | jq -S '{lineage: .lineage, serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/empty-wal/test.py b/acceptance/bundle/deploy/wal/empty-wal/test.py new file mode 100644 index 0000000000..11b15b1a45 --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/test.py @@ -0,0 +1 @@ +print("hello") diff --git a/acceptance/bundle/deploy/wal/empty-wal/test.toml b/acceptance/bundle/deploy/wal/empty-wal/test.toml new file mode 100644 index 0000000000..b97264c2be --- /dev/null +++ b/acceptance/bundle/deploy/wal/empty-wal/test.toml @@ -0,0 +1,13 @@ +# Empty WAL file should be deleted and deploy should proceed normally. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + +[[Repls]] +Old = '-rw[^ ]+ \d+ [^ ]+ [^ ]+ \d+ [A-Z][a-z]+ \d+ \d+:\d+' +New = '[FILE_INFO]' diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml b/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml new file mode 100644 index 0000000000..67079aaef8 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-future-serial-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml b/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/output.txt b/acceptance/bundle/deploy/wal/future-serial-wal/output.txt new file mode 100644 index 0000000000..b0e5bda558 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/output.txt @@ -0,0 +1,13 @@ +=== Creating state file (serial=2) === +=== Creating WAL with future serial (serial=5, expected=3) === +=== WAL content === +{"lineage":"test-lineage-123","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":{"__id__": "[ID]","state":{"name":"test-job"}}} +=== Deploy (should fail with corruption error) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-future-serial-test/default/files... +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL serial (5) is ahead of expected (3), state may be corrupted + + +Exit code: [KILLED] diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/script b/acceptance/bundle/deploy/wal/future-serial-wal/script new file mode 100644 index 0000000000..7b1784b0c6 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/script @@ -0,0 +1,28 @@ +echo "=== Creating state file (serial=2) ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "test-lineage-123", + "serial": 2, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating WAL with future serial (serial=5, expected=3) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"test-lineage-123","serial":5} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should fail with corruption error) ===" +trace errcode $CLI bundle deploy diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/test.py b/acceptance/bundle/deploy/wal/future-serial-wal/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/future-serial-wal/test.toml b/acceptance/bundle/deploy/wal/future-serial-wal/test.toml new file mode 100644 index 0000000000..424fe2f127 --- /dev/null +++ b/acceptance/bundle/deploy/wal/future-serial-wal/test.toml @@ -0,0 +1,4 @@ +# WAL with serial ahead of state - indicates corruption, should error. +# State has serial=2, WAL has serial=5 (expected would be 3). + +# No server stubs needed - deploy should fail before any API calls. diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml b/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml new file mode 100644 index 0000000000..014ec7f886 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-lineage-mismatch-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml b/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt b/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt new file mode 100644 index 0000000000..7f6c3a89bd --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/output.txt @@ -0,0 +1,13 @@ +=== Creating state file with lineage-A === +=== Creating WAL with lineage-B (mismatch) === +=== WAL content === +{"lineage":"wal-lineage-bbb","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":{"__id__": "[ID]","state":{"name":"test-job"}}} +=== Deploy (should fail with lineage mismatch error) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-lineage-mismatch-test/default/files... +Error: reading state from [TEST_TMP_DIR]/.databricks/bundle/default/resources.json: WAL recovery failed: WAL lineage (wal-lineage-bbb) does not match state lineage (state-lineage-aaa) + + +Exit code: [KILLED] diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/script b/acceptance/bundle/deploy/wal/lineage-mismatch/script new file mode 100644 index 0000000000..b241246e6c --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/script @@ -0,0 +1,28 @@ +echo "=== Creating state file with lineage-A ===" +mkdir -p .databricks/bundle/default +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "state-lineage-aaa", + "serial": 1, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating WAL with lineage-B (mismatch) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"wal-lineage-bbb","serial":2} +{"k":"resources.jobs.test_job","v":{"__id__":"1001","state":{"name":"test-job"}}} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should fail with lineage mismatch error) ===" +trace errcode $CLI bundle deploy diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/test.py b/acceptance/bundle/deploy/wal/lineage-mismatch/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml b/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml new file mode 100644 index 0000000000..509cc82f09 --- /dev/null +++ b/acceptance/bundle/deploy/wal/lineage-mismatch/test.toml @@ -0,0 +1,4 @@ +# WAL with different lineage than state - should error. +# State has lineage "state-lineage-aaa", WAL has lineage "wal-lineage-bbb". + +# No server stubs needed - deploy should fail before any API calls. diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/databricks.yml b/acceptance/bundle/deploy/wal/multiple-crashes/databricks.yml new file mode 100644 index 0000000000..3dc96ed856 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/databricks.yml @@ -0,0 +1,27 @@ +bundle: + name: wal-multi-crash-test + +resources: + jobs: + job_a: + name: "test-job-a" + description: "first job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_b: + name: "test-job-b" + description: "depends on ${resources.jobs.job_a.id}" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/out.test.toml b/acceptance/bundle/deploy/wal/multiple-crashes/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/output.txt b/acceptance/bundle/deploy/wal/multiple-crashes/output.txt new file mode 100644 index 0000000000..33dd984b74 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/output.txt @@ -0,0 +1,39 @@ +=== First deploy (crashes after job_a create) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] +=== WAL after first crash === +WAL exists +{"lineage":"[UUID]","serial": [SERIAL]} +{"k":"resources.jobs.job_a","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/state/metadata.json"},"description":"first job","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"test-job-a","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/files/test.py"},"task_key":"task-a"}]}}} +=== Second deploy (crashes during job_a update) === + +>>> errcode [CLI] bundle deploy --force-lock +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] +=== WAL after second crash === +WAL still exists +=== Third deploy (should succeed) === + +>>> [CLI] bundle deploy --force-lock +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-multi-crash-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.job_a", + "resources.jobs.job_b" + ] +} +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/script b/acceptance/bundle/deploy/wal/multiple-crashes/script new file mode 100644 index 0000000000..0adcd2a980 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/script @@ -0,0 +1,29 @@ +echo "=== First deploy (crashes after job_a create) ===" +trace errcode $CLI bundle deploy + +echo "=== WAL after first crash ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists" + cat .databricks/bundle/default/resources.json.wal +fi + +echo "=== Second deploy (crashes during job_a update) ===" +trace errcode $CLI bundle deploy --force-lock + +echo "=== WAL after second crash ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL still exists" +fi + +echo "=== Third deploy (should succeed) ===" +trace $CLI bundle deploy --force-lock + +echo "=== Final state ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== WAL after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/test.py b/acceptance/bundle/deploy/wal/multiple-crashes/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/multiple-crashes/test.toml b/acceptance/bundle/deploy/wal/multiple-crashes/test.toml new file mode 100644 index 0000000000..c5981d6720 --- /dev/null +++ b/acceptance/bundle/deploy/wal/multiple-crashes/test.toml @@ -0,0 +1,19 @@ +# Multiple real crashes during deployment - WAL should persist until successful finalize. +# First deploy: crashes after job_a create (kill on jobs/get) +# Second deploy: crashes during job_a update (kill on jobs/reset) +# Third deploy: succeeds (both counters exhausted) + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +KillCaller = 1 +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +KillCaller = 1 +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + diff --git a/acceptance/bundle/deploy/wal/normal-deploy/databricks.yml b/acceptance/bundle/deploy/wal/normal-deploy/databricks.yml new file mode 100644 index 0000000000..413705d40c --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/normal-deploy/out.test.toml b/acceptance/bundle/deploy/wal/normal-deploy/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/normal-deploy/output.txt b/acceptance/bundle/deploy/wal/normal-deploy/output.txt new file mode 100644 index 0000000000..ccb189ff09 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/output.txt @@ -0,0 +1,16 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +WAL file deleted after successful deploy (expected) +=== State file content === +{ + "lineage": "[UUID]", + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/normal-deploy/script b/acceptance/bundle/deploy/wal/normal-deploy/script new file mode 100644 index 0000000000..5acc4d9b58 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/script @@ -0,0 +1,12 @@ +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected - should be deleted after Finalize)" + cat .databricks/bundle/default/resources.json.wal +else + echo "WAL file deleted after successful deploy (expected)" +fi + +echo "=== State file content ===" +cat .databricks/bundle/default/resources.json | jq -S '{lineage: .lineage, serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/normal-deploy/test.py b/acceptance/bundle/deploy/wal/normal-deploy/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/normal-deploy/test.toml b/acceptance/bundle/deploy/wal/normal-deploy/test.toml new file mode 100644 index 0000000000..1299046974 --- /dev/null +++ b/acceptance/bundle/deploy/wal/normal-deploy/test.toml @@ -0,0 +1,9 @@ +# WAL is created during deploy, used for state tracking, and deleted after Finalize. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' diff --git a/acceptance/bundle/deploy/wal/stale-wal/databricks.yml b/acceptance/bundle/deploy/wal/stale-wal/databricks.yml new file mode 100644 index 0000000000..6b24f6fd26 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-stale-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/stale-wal/out.test.toml b/acceptance/bundle/deploy/wal/stale-wal/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/stale-wal/output.txt b/acceptance/bundle/deploy/wal/stale-wal/output.txt new file mode 100644 index 0000000000..682534de7c --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/output.txt @@ -0,0 +1,22 @@ +=== Creating state directory === +=== Creating state file (serial=2) === +=== Creating stale WAL with old serial (serial=1) === +=== WAL content before deploy === +{"lineage":"stale-test-lineage","serial": [SERIAL]} +{"k":"resources.jobs.stale_job","v":{"__id__": "[ID]","state":{"name":"stale-job"}}} +=== Deploy (should ignore stale WAL) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-stale-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Checking WAL file after deploy === +Stale WAL deleted (expected) +=== State file should NOT contain stale_job === +{ + "serial": [SERIAL], + "state_keys": [ + "resources.jobs.test_job" + ] +} diff --git a/acceptance/bundle/deploy/wal/stale-wal/script b/acceptance/bundle/deploy/wal/stale-wal/script new file mode 100644 index 0000000000..d814639a00 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/script @@ -0,0 +1,40 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating state file (serial=2) ===" +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "stale-test-lineage", + "serial": 2, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating stale WAL with old serial (serial=1) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"stale-test-lineage","serial":1} +{"k":"resources.jobs.stale_job","v":{"__id__":"9999","state":{"name":"stale-job"}}} +EOF + +echo "=== WAL content before deploy ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Deploy (should ignore stale WAL) ===" +trace $CLI bundle deploy + +echo "=== Checking WAL file after deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL file exists (unexpected)" +else + echo "Stale WAL deleted (expected)" +fi + +echo "=== State file should NOT contain stale_job ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' diff --git a/acceptance/bundle/deploy/wal/stale-wal/test.py b/acceptance/bundle/deploy/wal/stale-wal/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/stale-wal/test.toml b/acceptance/bundle/deploy/wal/stale-wal/test.toml new file mode 100644 index 0000000000..934683ba6d --- /dev/null +++ b/acceptance/bundle/deploy/wal/stale-wal/test.toml @@ -0,0 +1,9 @@ +# Deploy with a stale WAL (old serial) - WAL should be deleted and ignored. + +[[Server]] +Pattern = "POST /api/2.2/jobs/reset" +Response.Body = '{}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/databricks.yml b/acceptance/bundle/deploy/wal/summary-after-crash/databricks.yml new file mode 100644 index 0000000000..86376fd7ba --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/databricks.yml @@ -0,0 +1,27 @@ +bundle: + name: wal-summary-test + +resources: + jobs: + job_a: + name: "job-a" + description: "first job" + tasks: + - task_key: "task-a" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 + job_b: + name: "job-b" + description: "depends on ${resources.jobs.job_a.id}" + tasks: + - task_key: "task-b" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/out.test.toml b/acceptance/bundle/deploy/wal/summary-after-crash/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/output.txt b/acceptance/bundle/deploy/wal/summary-after-crash/output.txt new file mode 100644 index 0000000000..9a2644a60b --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/output.txt @@ -0,0 +1,29 @@ +=== Deploy (job_a created and saved, then crash on jobs/get) === + +>>> errcode [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-summary-test/default/files... +Deploying resources... +[PROCESS_KILLED] + +Exit code: [KILLED] +=== State directory contents after crash === +deployment.json +resources.json +resources.json.wal +sync-snapshots +=== WAL should exist after crash === +WAL exists (expected) +{"lineage":"[UUID]","serial": [SERIAL]} +{"k":"resources.jobs.job_a","v":{"__id__": "[ID]","state":{"deployment":{"kind":"BUNDLE","metadata_file_path":"/Workspace/Users/[USERNAME]/.bundle/wal-summary-test/default/state/metadata.json"},"description":"first job","edit_mode":"UI_LOCKED","format":"MULTI_TASK","max_concurrent_runs":1,"name":"job-a","queue":{"enabled":true},"tasks":[{"new_cluster":{"node_type_id":"[NODE_TYPE_ID]","num_workers":0,"spark_version":"15.4.x-scala2.12"},"spark_python_task":{"python_file":"/Workspace/Users/[USERNAME]/.bundle/wal-summary-test/default/files/test.py"},"task_key":"task-a"}]}}} +=== State file after crash === +{ + "serial": [SERIAL], + "state_keys": [] +} +=== Bundle summary (should show job_a from WAL) === + +>>> [CLI] bundle summary -o json +{ + "job_a_id": "1001", + "job_b_id": null +} diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/script b/acceptance/bundle/deploy/wal/summary-after-crash/script new file mode 100644 index 0000000000..3b007062c6 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/script @@ -0,0 +1,19 @@ +echo "=== Deploy (job_a created and saved, then crash on jobs/get) ===" +trace errcode $CLI bundle deploy + +echo "=== State directory contents after crash ===" +ls .databricks/bundle/default/ + +echo "=== WAL should exist after crash ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (expected)" + cat .databricks/bundle/default/resources.json.wal +else + echo "WAL missing (unexpected)" +fi + +echo "=== State file after crash ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== Bundle summary (should show job_a from WAL) ===" +trace $CLI bundle summary -o json | jq '{job_a_id: .resources.jobs.job_a.id, job_b_id: .resources.jobs.job_b.id}' diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/test.py b/acceptance/bundle/deploy/wal/summary-after-crash/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/summary-after-crash/test.toml b/acceptance/bundle/deploy/wal/summary-after-crash/test.toml new file mode 100644 index 0000000000..961030e981 --- /dev/null +++ b/acceptance/bundle/deploy/wal/summary-after-crash/test.toml @@ -0,0 +1,14 @@ +# Bundle summary should show resources recovered from WAL after a real crash. +# job_b depends on job_a, so after job_a is created and SaveState is called, +# refreshRemoteState calls jobs/get to fetch job_a's state for job_b's reference. +# We kill on jobs/get - AFTER job_a's SaveState, so WAL contains job_a. + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.Body = '{"job_id": 1001}' + +[[Server]] +Pattern = "GET /api/2.2/jobs/get" +KillCaller = 1 +Response.Body = '{"job_id": 1001, "settings": {"name": "test-job"}}' + diff --git a/acceptance/bundle/deploy/wal/test.toml b/acceptance/bundle/deploy/wal/test.toml new file mode 100644 index 0000000000..1632ddb195 --- /dev/null +++ b/acceptance/bundle/deploy/wal/test.toml @@ -0,0 +1,48 @@ +# WAL (Write-Ahead Log) tests verify crash recovery during bundle deployment. +# These tests simulate process crashes using KillCaller and verify state recovery. +# Only runs with direct engine since WAL is a direct-engine feature. + +Local = true +Env.DATABRICKS_CLI_TEST_PID = "1" + +[EnvMatrix] +DATABRICKS_BUNDLE_ENGINE = ["direct"] + +[[Repls]] +Old = 'script: line \d+:\s+\d+ Killed(: 9)?\s+"\$@"' +New = '[PROCESS_KILLED]' + +[[Repls]] +Old = '(\n>>> errcode [^\n]+\n)\nExit code:' +New = """${1}[PROCESS_KILLED] + +Exit code:""" + +[[Repls]] +Old = 'Exit code: (137|1)' +New = 'Exit code: [KILLED]' + +[[Repls]] +Old = "\r" +New = '' + +[[Repls]] +Old = '"lineage":\s*"[0-9a-f-]+"' +New = '"lineage": "[UUID]"' + +[[Repls]] +Old = '"serial":\s*\d+' +New = '"serial": [SERIAL]' + +[[Repls]] +Old = '"__id__":\s*"\d+"' +New = '"__id__": "[ID]"' + +[[Repls]] +Old = '"job_id":\s*"\d+"' +New = '"job_id": "[ID]"' + +# Strip single-node cluster warnings (they appear in varying order and aren't relevant to WAL tests) +[[Repls]] +Old = '(?s)Warning: Single node cluster.*?ResourceClass: SingleNode\n \n\n' +New = '' diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml b/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml new file mode 100644 index 0000000000..457a2d3e96 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/databricks.yml @@ -0,0 +1,15 @@ +bundle: + name: wal-delete-test + +resources: + jobs: + test_job: + name: "test-job" + tasks: + - task_key: "test-task" + spark_python_task: + python_file: ./test.py + new_cluster: + spark_version: 15.4.x-scala2.12 + node_type_id: i3.xlarge + num_workers: 0 diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml b/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml new file mode 100644 index 0000000000..54146af564 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/output.txt b/acceptance/bundle/deploy/wal/wal-with-delete/output.txt new file mode 100644 index 0000000000..8f52732d3e --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/output.txt @@ -0,0 +1,21 @@ +=== Creating state directory === +=== Creating state file (job exists) === +=== Creating WAL with delete entry (simulating crash during delete) === +=== WAL content === +{"lineage":"delete-test-lineage","serial": [SERIAL]} +{"k":"resources.jobs.test_job","v":null} +=== Updating config to remove job === +=== Deploy (should recover delete from WAL) === + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/wal-delete-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! +=== Final state (should have no jobs) === +{ + "serial": [SERIAL], + "state_keys": [] +} +=== WAL after successful deploy === +WAL deleted (expected) diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/script b/acceptance/bundle/deploy/wal/wal-with-delete/script new file mode 100644 index 0000000000..f840355267 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/script @@ -0,0 +1,48 @@ +echo "=== Creating state directory ===" +mkdir -p .databricks/bundle/default + +echo "=== Creating state file (job exists) ===" +cat > .databricks/bundle/default/resources.json << 'EOF' +{ + "state_version": 1, + "cli_version": "0.0.0", + "lineage": "delete-test-lineage", + "serial": 1, + "state": { + "resources.jobs.test_job": { + "__id__": "1001", + "state": {"name": "test-job"} + } + } +} +EOF + +echo "=== Creating WAL with delete entry (simulating crash during delete) ===" +cat > .databricks/bundle/default/resources.json.wal << 'EOF' +{"lineage":"delete-test-lineage","serial":2} +{"k":"resources.jobs.test_job","v":null} +EOF + +echo "=== WAL content ===" +cat .databricks/bundle/default/resources.json.wal + +echo "=== Updating config to remove job ===" +cat > databricks.yml << 'EOF' +bundle: + name: wal-delete-test + +resources: {} +EOF + +echo "=== Deploy (should recover delete from WAL) ===" +trace $CLI bundle deploy + +echo "=== Final state (should have no jobs) ===" +cat .databricks/bundle/default/resources.json | jq -S '{serial: .serial, state_keys: (.state | keys)}' + +echo "=== WAL after successful deploy ===" +if [ -f ".databricks/bundle/default/resources.json.wal" ]; then + echo "WAL exists (unexpected)" +else + echo "WAL deleted (expected)" +fi diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/test.py b/acceptance/bundle/deploy/wal/wal-with-delete/test.py new file mode 100644 index 0000000000..1ff8e07c70 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/test.py @@ -0,0 +1 @@ +print("test") diff --git a/acceptance/bundle/deploy/wal/wal-with-delete/test.toml b/acceptance/bundle/deploy/wal/wal-with-delete/test.toml new file mode 100644 index 0000000000..4f81ae4695 --- /dev/null +++ b/acceptance/bundle/deploy/wal/wal-with-delete/test.toml @@ -0,0 +1,7 @@ +# WAL recovery after crash during delete operation (simulated). +# Delete was recorded in WAL but not finalized. Deploy should complete the delete. +# Note: Real crash testing for delete is not possible because there's no API call +# after DeleteState (unlike create which has refreshRemoteState after SaveState). + +# No server stubs needed - the delete was already done (recorded in WAL) +# and the job no longer needs API calls diff --git a/acceptance/internal/config.go b/acceptance/internal/config.go index afa13a0d8e..7336af49c3 100644 --- a/acceptance/internal/config.go +++ b/acceptance/internal/config.go @@ -151,6 +151,12 @@ type ServerStub struct { // Useful for testing crash recovery scenarios where first deploy crashes but retry succeeds. // Requires DATABRICKS_CLI_TEST_PID=1 to be set in the test environment. KillCaller int + + // Number of requests to let pass before starting to kill. + // Combined with KillCaller, this creates a window: requests 1 to Offset succeed, + // requests Offset+1 to Offset+KillCaller are killed, rest succeed. + // Example: KillCallerOffset=9, KillCaller=1 means let 9 requests pass, kill the 10th. + KillCallerOffset int } // FindConfigs finds all the config relevant for this test, diff --git a/acceptance/internal/prepare_server.go b/acceptance/internal/prepare_server.go index c484dd59a7..2356098a5e 100644 --- a/acceptance/internal/prepare_server.go +++ b/acceptance/internal/prepare_server.go @@ -184,8 +184,9 @@ func startLocalServer(t *testing.T, s.ResponseCallback = logResponseCallback(t) } - // Track remaining kill counts per pattern (for KillCaller > 0) + // Track remaining kill counts and offset counts per pattern (for KillCaller > 0) killCounters := make(map[string]int) + offsetCounters := make(map[string]int) killCountersMu := &sync.Mutex{} for ind := range stubs { @@ -196,9 +197,10 @@ func startLocalServer(t *testing.T, items := strings.Split(stub.Pattern, " ") require.Len(t, items, 2) - // Initialize kill counter for this pattern + // Initialize kill counter and offset counter for this pattern if stub.KillCaller > 0 { killCounters[stub.Pattern] = stub.KillCaller + offsetCounters[stub.Pattern] = stub.KillCallerOffset } s.Handle(items[0], items[1], func(req testserver.Request) any { @@ -219,7 +221,7 @@ func startLocalServer(t *testing.T, } } - if shouldKillCaller(stub, killCounters, killCountersMu) { + if shouldKillCaller(stub, offsetCounters, killCounters, killCountersMu) { killCaller(t, stub.Pattern, req.Headers) } @@ -232,12 +234,19 @@ func startLocalServer(t *testing.T, return s.URL } -func shouldKillCaller(stub ServerStub, killCounters map[string]int, mu *sync.Mutex) bool { +func shouldKillCaller(stub ServerStub, offsetCounters, killCounters map[string]int, mu *sync.Mutex) bool { if stub.KillCaller <= 0 { return false } mu.Lock() defer mu.Unlock() + + // Still in offset period? Let this request pass. + if offsetCounters[stub.Pattern] > 0 { + offsetCounters[stub.Pattern]-- + return false + } + if killCounters[stub.Pattern] <= 0 { return false } diff --git a/acceptance/selftest/kill_caller/offset/out.test.toml b/acceptance/selftest/kill_caller/offset/out.test.toml new file mode 100644 index 0000000000..d560f1de04 --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/out.test.toml @@ -0,0 +1,5 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["terraform", "direct"] diff --git a/acceptance/selftest/kill_caller/offset/output.txt b/acceptance/selftest/kill_caller/offset/output.txt new file mode 100644 index 0000000000..03407dd0d8 --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/output.txt @@ -0,0 +1,33 @@ + +>>> [CLI] current-user me +{ + "id":"123", + "userName":"test@example.com" +} +Attempt 1 done - success (offset) + +>>> [CLI] current-user me +{ + "id":"123", + "userName":"test@example.com" +} +Attempt 2 done - success (offset) + +>>> errcode [CLI] current-user me +[PROCESS_KILLED] + +Exit code: [KILLED] +Attempt 3 done - killed + +>>> errcode [CLI] current-user me +[PROCESS_KILLED] + +Exit code: [KILLED] +Attempt 4 done - killed + +>>> [CLI] current-user me +{ + "id":"123", + "userName":"test@example.com" +} +Attempt 5 done - success (past kill window) diff --git a/acceptance/selftest/kill_caller/offset/script b/acceptance/selftest/kill_caller/offset/script new file mode 100644 index 0000000000..3411e87480 --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/script @@ -0,0 +1,17 @@ +# First 2 attempts should succeed (offset period) +trace $CLI current-user me +echo "Attempt 1 done - success (offset)" + +trace $CLI current-user me +echo "Attempt 2 done - success (offset)" + +# Attempts 3-4 should be killed +trace errcode $CLI current-user me +echo "Attempt 3 done - killed" + +trace errcode $CLI current-user me +echo "Attempt 4 done - killed" + +# Attempt 5 should succeed again +trace $CLI current-user me +echo "Attempt 5 done - success (past kill window)" diff --git a/acceptance/selftest/kill_caller/offset/test.toml b/acceptance/selftest/kill_caller/offset/test.toml new file mode 100644 index 0000000000..5eab09dbfa --- /dev/null +++ b/acceptance/selftest/kill_caller/offset/test.toml @@ -0,0 +1,11 @@ +# Let first 2 requests pass, kill next 2, then allow rest +[[Server]] +Pattern = "GET /api/2.0/preview/scim/v2/Me" +KillCallerOffset = 2 +KillCaller = 2 +Response.Body = ''' +{ + "id": "123", + "userName": "test@example.com" +} +''' diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index 993eb7238a..14161de32e 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -22,7 +22,12 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa } if len(plan.Plan) == 0 { - // Avoid creating state file if nothing to deploy + // Still need to finalize if WAL recovery happened to commit the recovered state + if b.StateDB.RecoveredFromWAL() { + if err := b.StateDB.Finalize(); err != nil { + logdiag.LogError(ctx, err) + } + } return } diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index bd8cfa24c3..e72ca4baf6 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -41,7 +41,7 @@ func (b *DeploymentBundle) init(client *databricks.WorkspaceClient) error { // ValidatePlanAgainstState validates that a plan's lineage and serial match the current state. // This should be called early in the deployment process, before any file operations. // If the plan has no lineage (first deployment), validation is skipped. -func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error { +func ValidatePlanAgainstState(ctx context.Context, statePath string, plan *deployplan.Plan) error { // If plan has no lineage, this is a first deployment before any state exists // No validation needed if plan.Lineage == "" { @@ -49,7 +49,7 @@ func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error { } var stateDB dstate.DeploymentState - err := stateDB.Open(statePath) + err := stateDB.Open(ctx, statePath) if err != nil { // If state file doesn't exist but plan has lineage, something is wrong if os.IsNotExist(err) { @@ -74,7 +74,7 @@ func ValidatePlanAgainstState(statePath string, plan *deployplan.Plan) error { // InitForApply initializes the DeploymentBundle for applying a pre-computed plan. // This is used when --plan is specified to skip the planning phase. func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks.WorkspaceClient, statePath string, plan *deployplan.Plan) error { - err := b.StateDB.Open(statePath) + err := b.StateDB.Open(ctx, statePath) if err != nil { return fmt.Errorf("reading state from %s: %w", statePath, err) } @@ -110,7 +110,7 @@ func (b *DeploymentBundle) InitForApply(ctx context.Context, client *databricks. } func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks.WorkspaceClient, configRoot *config.Root, statePath string) (*deployplan.Plan, error) { - err := b.StateDB.Open(statePath) + err := b.StateDB.Open(ctx, statePath) if err != nil { return nil, fmt.Errorf("reading state from %s: %w", statePath, err) } diff --git a/bundle/direct/dstate/state.go b/bundle/direct/dstate/state.go index c105e9c49c..a0c9e9e480 100644 --- a/bundle/direct/dstate/state.go +++ b/bundle/direct/dstate/state.go @@ -12,15 +12,18 @@ import ( "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/statemgmt/resourcestate" "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/log" "github.com/google/uuid" ) const currentStateVersion = 1 type DeploymentState struct { - Path string - Data Database - mu sync.Mutex + Path string + Data Database + mu sync.Mutex + wal *WAL + recoveredFromWAL bool } type Database struct { @@ -56,17 +59,26 @@ func (db *DeploymentState) SaveState(key, newID string, state any, dependsOn []d db.Data.State = make(map[string]ResourceEntry) } - jsonMessage, err := json.MarshalIndent(state, " ", " ") + jsonMessage, err := json.MarshalIndent(state, "", " ") if err != nil { return err } - db.Data.State[key] = ResourceEntry{ + entry := ResourceEntry{ ID: newID, State: json.RawMessage(jsonMessage), DependsOn: dependsOn, } + if err := db.ensureWALOpen(); err != nil { + return fmt.Errorf("failed to open WAL: %w", err) + } + if err := db.wal.writeEntry(key, &entry); err != nil { + return fmt.Errorf("failed to write WAL entry: %w", err) + } + + db.Data.State[key] = entry + return nil } @@ -79,11 +91,48 @@ func (db *DeploymentState) DeleteState(key string) error { return nil } + if err := db.ensureWALOpen(); err != nil { + return fmt.Errorf("failed to open WAL: %w", err) + } + if err := db.wal.writeEntry(key, nil); err != nil { + return fmt.Errorf("failed to write WAL entry: %w", err) + } + delete(db.Data.State, key) return nil } +// ensureWALOpen opens the WAL file and writes the header if not already done. +// Must be called while holding db.mu. +func (db *DeploymentState) ensureWALOpen() error { + if db.wal != nil { + return nil + } + + wal, err := openWAL(db.Path) + if err != nil { + return err + } + + lineage := db.Data.Lineage + if lineage == "" { + lineage = uuid.New().String() + db.Data.Lineage = lineage + } + + // WAL serial is the NEXT serial (current + 1) + walSerial := db.Data.Serial + 1 + + if err := wal.writeHeader(lineage, walSerial); err != nil { + wal.close() + return err + } + + db.wal = wal + return nil +} + func (db *DeploymentState) GetResourceEntry(key string) (ResourceEntry, bool) { db.AssertOpened() db.mu.Lock() @@ -97,7 +146,7 @@ func (db *DeploymentState) GetResourceEntry(key string) (ResourceEntry, bool) { return result, ok } -func (db *DeploymentState) Open(path string) error { +func (db *DeploymentState) Open(ctx context.Context, path string) error { db.mu.Lock() defer db.mu.Unlock() @@ -114,17 +163,35 @@ func (db *DeploymentState) Open(path string) error { // Create new database with serial=0, will be incremented to 1 in Finalize() db.Data = NewDatabase("", 0) db.Path = path - return nil + + // Write state file immediately to ensure it exists before any WAL operations. + // This guarantees we have a base state file for recovery validation. + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return fmt.Errorf("failed to create state directory: %w", err) + } + if err := db.unlockedSave(); err != nil { + return err + } + } else { + return err } - return err + } else { + err = json.Unmarshal(data, &db.Data) + if err != nil { + return err + } + db.Path = path } - err = json.Unmarshal(data, &db.Data) + recovered, err := recoverFromWAL(ctx, path, &db.Data) if err != nil { - return err + return fmt.Errorf("WAL recovery failed: %w", err) + } + if recovered { + log.Infof(ctx, "Recovered deployment state from WAL") + db.recoveredFromWAL = true } - db.Path = path return nil } @@ -132,14 +199,32 @@ func (db *DeploymentState) Finalize() error { db.mu.Lock() defer db.mu.Unlock() - // Generate lineage on first save + // Generate lineage on first save (if WAL wasn't opened) if db.Data.Lineage == "" { db.Data.Lineage = uuid.New().String() } db.Data.Serial++ - return db.unlockedSave() + err := db.unlockedSave() + if err != nil { + return err + } + + if db.wal != nil { + if err := db.wal.truncate(); err != nil { + return fmt.Errorf("failed to truncate WAL: %w", err) + } + db.wal = nil + } else { + // No WAL was opened, but we should still clean up any stale WAL file + wp := walPath(db.Path) + if err := os.Remove(wp); err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove stale WAL file: %w", err) + } + } + + return nil } func (db *DeploymentState) AssertOpened() { @@ -148,6 +233,12 @@ func (db *DeploymentState) AssertOpened() { } } +// RecoveredFromWAL returns true if state was recovered from WAL during Open(). +// This is used to determine if Finalize() should be called even with an empty plan. +func (db *DeploymentState) RecoveredFromWAL() bool { + return db.recoveredFromWAL +} + func (db *DeploymentState) ExportState(ctx context.Context) resourcestate.ExportedResourcesMap { result := make(resourcestate.ExportedResourcesMap) for key, entry := range db.Data.State { diff --git a/bundle/direct/dstate/wal.go b/bundle/direct/dstate/wal.go new file mode 100644 index 0000000000..37dd1bffa2 --- /dev/null +++ b/bundle/direct/dstate/wal.go @@ -0,0 +1,198 @@ +package dstate + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "os" + + "github.com/databricks/cli/libs/log" +) + +type WALHeader struct { + Lineage string `json:"lineage"` + Serial int `json:"serial"` +} + +type WALEntry struct { + K string `json:"k"` + V *ResourceEntry `json:"v,omitempty"` // nil means delete +} + +type WAL struct { + path string + file *os.File +} + +func walPath(statePath string) string { + return statePath + ".wal" +} + +func openWAL(statePath string) (*WAL, error) { + wp := walPath(statePath) + f, err := os.OpenFile(wp, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return nil, fmt.Errorf("failed to open WAL file %q: %w", wp, err) + } + return &WAL{path: wp, file: f}, nil +} + +func (w *WAL) writeHeader(lineage string, serial int) error { + header := WALHeader{ + Lineage: lineage, + Serial: serial, + } + return w.writeJSON(header) +} + +func (w *WAL) writeEntry(key string, entry *ResourceEntry) error { + walEntry := WALEntry{ + K: key, + V: entry, + } + return w.writeJSON(walEntry) +} + +func (w *WAL) writeJSON(v any) error { + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal WAL entry: %w", err) + } + data = append(data, '\n') + + _, err = w.file.Write(data) + if err != nil { + return fmt.Errorf("failed to write WAL entry: %w", err) + } + + return nil +} + +func (w *WAL) close() error { + if w.file != nil { + return w.file.Close() + } + return nil +} + +func (w *WAL) truncate() error { + if w.file != nil { + w.file.Close() + w.file = nil + } + err := os.Remove(w.path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to remove WAL file %q: %w", w.path, err) + } + return nil +} + +func readWAL(ctx context.Context, statePath string) (*WALHeader, []WALEntry, error) { + wp := walPath(statePath) + f, err := os.Open(wp) + if err != nil { + return nil, nil, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + var lines [][]byte + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + lineCopy := make([]byte, len(line)) + copy(lineCopy, line) + lines = append(lines, lineCopy) + } + if err := scanner.Err(); err != nil { + return nil, nil, fmt.Errorf("failed to read WAL file: %w", err) + } + + if len(lines) == 0 { + return nil, nil, errors.New("WAL file is empty") + } + + var header WALHeader + if err := json.Unmarshal(lines[0], &header); err != nil { + return nil, nil, fmt.Errorf("failed to parse WAL header: %w", err) + } + + var entries []WALEntry + for i := 1; i < len(lines); i++ { + lineNum := i + 1 + isLastLine := i == len(lines)-1 + + var e WALEntry + if err := json.Unmarshal(lines[i], &e); err != nil { + if isLastLine { + log.Debugf(ctx, "WAL line %d: skipping corrupted last entry: %v", lineNum, err) + continue + } + return nil, nil, fmt.Errorf("WAL line %d: corrupted entry in middle of WAL: %w", lineNum, err) + } + + if e.K == "" { + if isLastLine { + log.Debugf(ctx, "WAL line %d: skipping last entry with empty key", lineNum) + continue + } + return nil, nil, fmt.Errorf("WAL line %d: entry with empty key in middle of WAL", lineNum) + } + + entries = append(entries, e) + } + + return &header, entries, nil +} + +func recoverFromWAL(ctx context.Context, statePath string, db *Database) (bool, error) { + wp := walPath(statePath) + + if _, err := os.Stat(wp); os.IsNotExist(err) { + return false, nil + } + + header, entries, err := readWAL(ctx, statePath) + if err != nil { + log.Warnf(ctx, "Failed to read WAL file, deleting and proceeding: %v", err) + os.Remove(wp) + return false, nil + } + + expectedSerial := db.Serial + 1 + if header.Serial < expectedSerial { + log.Debugf(ctx, "Deleting stale WAL (serial %d < expected %d)", header.Serial, expectedSerial) + os.Remove(wp) + return false, nil + } + + if header.Serial > expectedSerial { + return false, fmt.Errorf("WAL serial (%d) is ahead of expected (%d), state may be corrupted", header.Serial, expectedSerial) + } + + if db.Lineage != "" && header.Lineage != "" && db.Lineage != header.Lineage { + return false, fmt.Errorf("WAL lineage (%s) does not match state lineage (%s)", header.Lineage, db.Lineage) + } + + if db.Lineage == "" && header.Lineage != "" { + db.Lineage = header.Lineage + } + + if db.State == nil { + db.State = make(map[string]ResourceEntry) + } + + for _, entry := range entries { + if entry.V != nil { + db.State[entry.K] = *entry.V + } else { + delete(db.State, entry.K) + } + } + + return true, nil +} diff --git a/bundle/direct/dstate/wal_test.go b/bundle/direct/dstate/wal_test.go new file mode 100644 index 0000000000..9c2250c830 --- /dev/null +++ b/bundle/direct/dstate/wal_test.go @@ -0,0 +1,497 @@ +package dstate + +import ( + "context" + "encoding/json" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle/deployplan" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWALPath(t *testing.T) { + assert.Equal(t, "/path/to/state.json.wal", walPath("/path/to/state.json")) +} + +func TestWALWriteAndRead(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + wal, err := openWAL(statePath) + require.NoError(t, err) + + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + entry1 := &ResourceEntry{ + ID: "12345", + State: json.RawMessage(`{"name":"job1"}`), + } + err = wal.writeEntry("resources.jobs.job1", entry1) + require.NoError(t, err) + + entry2 := &ResourceEntry{ + ID: "67890", + State: json.RawMessage(`{"name":"job2"}`), + } + err = wal.writeEntry("resources.jobs.job2", entry2) + require.NoError(t, err) + + err = wal.writeEntry("resources.jobs.old_job", nil) + require.NoError(t, err) + + err = wal.close() + require.NoError(t, err) + + ctx := context.Background() + header, entries, err := readWAL(ctx, statePath) + require.NoError(t, err) + + assert.Equal(t, "test-lineage", header.Lineage) + assert.Equal(t, 1, header.Serial) + + require.Len(t, entries, 3) + + assert.Equal(t, "resources.jobs.job1", entries[0].K) + require.NotNil(t, entries[0].V) + assert.Equal(t, "12345", entries[0].V.ID) + + assert.Equal(t, "resources.jobs.job2", entries[1].K) + require.NotNil(t, entries[1].V) + assert.Equal(t, "67890", entries[1].V.ID) + + assert.Equal(t, "resources.jobs.old_job", entries[2].K) + assert.Nil(t, entries[2].V) +} + +func TestWALTruncate(t *testing.T) { + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + _, err = os.Stat(walFilePath) + require.NoError(t, err) + + err = wal.truncate() + require.NoError(t, err) + + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) +} + +func TestRecoverFromWAL_NoWAL(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + db := NewDatabase("", 0) + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.False(t, recovered) +} + +func TestRecoverFromWAL_ValidWAL(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + entry := &ResourceEntry{ + ID: "12345", + State: json.RawMessage(`{"name":"job1"}`), + } + err = wal.writeEntry("resources.jobs.job1", entry) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + db := NewDatabase("", 0) + + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + + assert.Equal(t, "test-lineage", db.Lineage) + require.Contains(t, db.State, "resources.jobs.job1") + assert.Equal(t, "12345", db.State["resources.jobs.job1"].ID) +} + +func TestRecoverFromWAL_StaleWAL(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + db := NewDatabase("test-lineage", 2) // serial 2 makes WAL stale + + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.False(t, recovered) + + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) +} + +func TestRecoverFromWAL_FutureWAL(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 5) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + db := NewDatabase("test-lineage", 0) + + _, err = recoverFromWAL(ctx, statePath, &db) + assert.Error(t, err) + assert.Contains(t, err.Error(), "ahead of expected") +} + +func TestRecoverFromWAL_LineageMismatch(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("lineage-A", 1) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + db := NewDatabase("lineage-B", 0) + + _, err = recoverFromWAL(ctx, statePath, &db) + assert.Error(t, err) + assert.Contains(t, err.Error(), "lineage") +} + +func TestRecoverFromWAL_DeleteOperation(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + + entry := &ResourceEntry{ + ID: "12345", + State: json.RawMessage(`{"name":"job1"}`), + } + err = wal.writeEntry("resources.jobs.job1", entry) + require.NoError(t, err) + + err = wal.writeEntry("resources.jobs.job1", nil) + require.NoError(t, err) + + err = wal.close() + require.NoError(t, err) + + db := NewDatabase("", 0) + + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + + assert.NotContains(t, db.State, "resources.jobs.job1") +} + +func TestDeploymentState_WALIntegration(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "12345", map[string]string{"name": "job1"}, nil) + require.NoError(t, err) + + _, err = os.Stat(walFilePath) + require.NoError(t, err) + + header, entries, err := readWAL(ctx, statePath) + require.NoError(t, err) + assert.Equal(t, 1, header.Serial) + require.Len(t, entries, 1) + assert.Equal(t, "resources.jobs.job1", entries[0].K) + assert.Equal(t, "12345", entries[0].V.ID) + + err = db.Finalize() + require.NoError(t, err) + + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) + + data, err := os.ReadFile(statePath) + require.NoError(t, err) + var savedDB Database + err = json.Unmarshal(data, &savedDB) + require.NoError(t, err) + assert.Equal(t, 1, savedDB.Serial) + assert.Contains(t, savedDB.State, "resources.jobs.job1") +} + +func TestDeploymentState_WALRecoveryOnOpen(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + initialDB := NewDatabase("test-lineage", 5) + initialDB.State["resources.jobs.existing"] = ResourceEntry{ + ID: "existing-id", + State: json.RawMessage(`{"name":"existing"}`), + } + data, err := json.Marshal(initialDB) + require.NoError(t, err) + err = os.WriteFile(statePath, data, 0o600) + require.NoError(t, err) + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 6) + require.NoError(t, err) + entry := &ResourceEntry{ + ID: "new-id", + State: json.RawMessage(`{"name":"new"}`), + } + err = wal.writeEntry("resources.jobs.new", entry) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + var db DeploymentState + err = db.Open(ctx, statePath) + require.NoError(t, err) + + assert.Contains(t, db.Data.State, "resources.jobs.existing") + assert.Contains(t, db.Data.State, "resources.jobs.new") + assert.Equal(t, "new-id", db.Data.State["resources.jobs.new"].ID) +} + +func TestDeploymentState_DeleteStateWritesWAL(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "12345", map[string]string{"name": "job1"}, nil) + require.NoError(t, err) + + err = db.DeleteState("resources.jobs.job1") + require.NoError(t, err) + + _, entries, err := readWAL(ctx, statePath) + require.NoError(t, err) + + require.Len(t, entries, 2) + assert.Equal(t, "resources.jobs.job1", entries[1].K) + assert.Nil(t, entries[1].V) + + err = db.Finalize() + require.NoError(t, err) + + data, err := os.ReadFile(statePath) + require.NoError(t, err) + var savedDB Database + err = json.Unmarshal(data, &savedDB) + require.NoError(t, err) + assert.NotContains(t, savedDB.State, "resources.jobs.job1") +} + +func TestDeploymentState_WALWithDependsOn(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + dependsOn := []deployplan.DependsOnEntry{ + {Node: "resources.clusters.cluster1", Label: "${resources.clusters.cluster1.id}"}, + } + + err = db.SaveState("resources.jobs.job1", "12345", map[string]string{"name": "job1"}, dependsOn) + require.NoError(t, err) + + _, entries, err := readWAL(ctx, statePath) + require.NoError(t, err) + + require.Len(t, entries, 1) + require.NotNil(t, entries[0].V) + require.Len(t, entries[0].V.DependsOn, 1) + assert.Equal(t, "resources.clusters.cluster1", entries[0].V.DependsOn[0].Node) +} + +func TestRecoverFromWAL_CorruptedMiddleLine(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + content := `{"lineage":"test","serial":1} +{"k":"resources.jobs.job1","v":{"__id__":"12345","state":{}}} +not valid json +{"k":"resources.jobs.job2","v":{"__id__":"67890","state":{}}} +` + err := os.WriteFile(walFilePath, []byte(content), 0o600) + require.NoError(t, err) + + db := NewDatabase("", 0) + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.False(t, recovered) + assert.Empty(t, db.State) + + _, err = os.Stat(walFilePath) + assert.True(t, os.IsNotExist(err)) +} + +func TestRecoverFromWAL_CorruptedLastLine(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + content := `{"lineage":"test","serial":1} +{"k":"resources.jobs.job1","v":{"__id__":"12345","state":{}}} +{"k":"resources.jobs.job2","v":{"__id__":"67890","state":{}}} +not valid json +` + err := os.WriteFile(walFilePath, []byte(content), 0o600) + require.NoError(t, err) + + db := NewDatabase("", 0) + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + + assert.Contains(t, db.State, "resources.jobs.job1") + assert.Contains(t, db.State, "resources.jobs.job2") + assert.Equal(t, "12345", db.State["resources.jobs.job1"].ID) + assert.Equal(t, "67890", db.State["resources.jobs.job2"].ID) +} + +func TestDeploymentState_RecoveredFromWALFlag(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + initialDB := NewDatabase("test-lineage", 0) + data, err := json.Marshal(initialDB) + require.NoError(t, err) + err = os.WriteFile(statePath, data, 0o600) + require.NoError(t, err) + + wal, err := openWAL(statePath) + require.NoError(t, err) + err = wal.writeHeader("test-lineage", 1) + require.NoError(t, err) + err = wal.writeEntry("resources.jobs.job1", &ResourceEntry{ID: "123", State: json.RawMessage(`{}`)}) + require.NoError(t, err) + err = wal.close() + require.NoError(t, err) + + var db DeploymentState + err = db.Open(ctx, statePath) + require.NoError(t, err) + + assert.True(t, db.RecoveredFromWAL()) +} + +func TestRecoverFromWAL_LineageAdoption(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + content := `{"lineage":"adopted-lineage","serial":1} +{"k":"resources.jobs.job1","v":{"__id__":"12345","state":{}}} +` + err := os.WriteFile(walFilePath, []byte(content), 0o600) + require.NoError(t, err) + + db := NewDatabase("", 0) // empty lineage + recovered, err := recoverFromWAL(ctx, statePath, &db) + require.NoError(t, err) + assert.True(t, recovered) + assert.Equal(t, "adopted-lineage", db.Lineage) +} + +func TestReadWAL_EmptyFile(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + walFilePath := walPath(statePath) + + err := os.WriteFile(walFilePath, []byte(""), 0o600) + require.NoError(t, err) + + _, _, err = readWAL(ctx, statePath) + assert.Error(t, err) + assert.Contains(t, err.Error(), "empty") +} + +func TestDeploymentState_MultipleOperationsSameKey(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + statePath := filepath.Join(dir, "resources.json") + + var db DeploymentState + err := db.Open(ctx, statePath) + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "111", map[string]string{"v": "1"}, nil) + require.NoError(t, err) + + err = db.DeleteState("resources.jobs.job1") + require.NoError(t, err) + + err = db.SaveState("resources.jobs.job1", "222", map[string]string{"v": "2"}, nil) + require.NoError(t, err) + + _, entries, err := readWAL(ctx, statePath) + require.NoError(t, err) + require.Len(t, entries, 3) + assert.Equal(t, "111", entries[0].V.ID) + assert.Nil(t, entries[1].V) + assert.Equal(t, "222", entries[2].V.ID) + + err = db.Finalize() + require.NoError(t, err) + + entry, ok := db.GetResourceEntry("resources.jobs.job1") + require.True(t, ok) + assert.Equal(t, "222", entry.ID) +} diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index c30b133d97..c73aa672f7 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -65,7 +65,7 @@ func (d *DeploymentUnit) SetRemoteState(remoteState any) error { } func (b *DeploymentBundle) ExportState(ctx context.Context, path string) (resourcestate.ExportedResourcesMap, error) { - err := b.StateDB.Open(path) + err := b.StateDB.Open(ctx, path) if err != nil { return nil, err } diff --git a/cmd/bundle/utils/process.go b/cmd/bundle/utils/process.go index 44f5e2b73a..bd71fdcb74 100644 --- a/cmd/bundle/utils/process.go +++ b/cmd/bundle/utils/process.go @@ -210,7 +210,7 @@ func ProcessBundleRet(cmd *cobra.Command, opts ProcessOptions) (*bundle.Bundle, // Validate that the plan's lineage and serial match the current state // This must happen before any file operations _, localPath := b.StateFilenameDirect(ctx) - err = direct.ValidatePlanAgainstState(localPath, plan) + err = direct.ValidatePlanAgainstState(ctx, localPath, plan) if err != nil { logdiag.LogError(ctx, err) return b, stateDesc, root.ErrAlreadyPrinted