Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,16 @@ jobs:
with:
fetch-depth: 0

- name: Set up Python 3.7.x
- name: Set up Python 3.9.x
uses: actions/setup-python@v4
with:
python-version: "3.7.16"
python-version: "3.9.20"

- name: Install dependencies
run: pip install .[dev]

- name: Lint
run: make lint

- name: Format
run: make format
- name: Checks
run: make check

- name: Test
run: make test
13 changes: 5 additions & 8 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ on:

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:

release-latest:

permissions:
id-token: write # to verify the deployment originates from an appropriate source
contents: write # To allow pushing tags/etc.
id-token: write # to verify the deployment originates from an appropriate source
contents: write # To allow pushing tags/etc.

# Specify runner + deployment step
runs-on: ubuntu-22.04
Expand Down Expand Up @@ -59,7 +57,7 @@ jobs:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.7.16"
python-version: "3.9.20"
- name: Install dependencies
run: pip install .[dev]
- name: Build docs
Expand All @@ -70,14 +68,13 @@ jobs:
path: docs/_build/html

release-docs:

# Add a dependency to the build job
needs: build-docs

# Grant GITHUB_TOKEN the permissions required to make a Pages deployment
permissions:
pages: write # to deploy to Pages
id-token: write # to verify the deployment originates from an appropriate source
pages: write # to deploy to Pages
id-token: write # to verify the deployment originates from an appropriate source

# Deploy to the github-pages environment
environment:
Expand Down
20 changes: 20 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.7.2
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
# Run the formatter.
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v5.0.0 # Use the ref you want to point at
hooks:
- id: trailing-whitespace
- id: check-yaml
- id: pretty-format-json
args:
- "--autofix"
- "--indent=4"
- id: requirements-txt-fixer
13 changes: 3 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,9 @@ FILES := $(shell git diff --name-only --diff-filter=AM $$(git merge-base origin/
test:
pytest -vv

.PHONY: lint
lint:
ruff check .

.PHONY: format
format:
ruff format .

.PHONY: tidy
tidy: format lint
.PHONY: check
check:
pre-commit run -a

# Removes the directory that contains bytecode cache files
# that are automatically generated by python.
Expand Down
9 changes: 5 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ dependencies = [
dynamic = ["description", "version"]
name = "syncsparkpy"
readme = "README.md"
requires-python = ">=3.7"
requires-python = ">=3.9"

[project.optional-dependencies]
dev = [
"Sphinx==4.3.0",
"deepdiff==6.3.0",
"pre-commit==4.0.1",
"pytest-asyncio==0.21.0",
"pytest-env==0.8.1",
"pytest==7.2.0",
Expand Down Expand Up @@ -73,7 +74,7 @@ pythonpath = ["."]
[tool.ruff]
exclude = ["artifacts/*"]
line-length = 100
target-version = "py37"
target-version = "py39"
[tool.ruff.lint]
ignore = ["E501"]
preview = true
Expand All @@ -91,12 +92,12 @@ extend-immutable-calls = [
"fastapi.Security",
]
[tool.ruff.lint.mccabe]
max-complexity = 10
max-complexity = 20


[tool.pyright]
pythonPlatform = "All"
pythonVersion = "3.7"
pythonVersion = "3.9"
reportUnnecessaryTypeIgnoreComment = "error"
typeCheckingMode = "standard"
useLibraryCodeForTypes = false
2 changes: 1 addition & 1 deletion sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Library for leveraging the power of Sync"""

__version__ = "1.11.7"
__version__ = "2.0.0"

TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
59 changes: 30 additions & 29 deletions sync/_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import time
import zipfile
from collections import defaultdict
from collections.abc import Collection
from datetime import datetime, timezone
from pathlib import Path
from time import sleep
from typing import Collection, Dict, List, Optional, Set, Tuple, Union
from typing import Optional, Union
from urllib.parse import urlparse

import boto3 as boto
Expand Down Expand Up @@ -70,9 +71,9 @@ def get_cluster(cluster_id: str) -> Response[dict]:
def create_submission_with_cluster_info(
run_id: str,
project_id: str,
cluster: Dict,
cluster_info: Dict,
cluster_activity_events: Dict,
cluster: dict,
cluster_info: dict,
cluster_activity_events: dict,
plan_type: DatabricksPlanType,
compute_type: DatabricksComputeType,
skip_eventlog: bool = False,
Expand Down Expand Up @@ -185,7 +186,7 @@ def create_submission_for_run(

def _create_submission(
cluster_id: str,
tasks: List[dict],
tasks: list[dict],
plan_type: str,
compute_type: str,
project_id: str,
Expand Down Expand Up @@ -215,12 +216,12 @@ def _create_submission(

def _get_run_information(
cluster_id: str,
tasks: List[dict],
tasks: list[dict],
plan_type: str,
compute_type: str,
allow_failed_tasks: bool = False,
allow_incomplete_cluster_report: bool = False,
) -> Response[Tuple[DatabricksClusterReport, bytes]]:
) -> Response[tuple[DatabricksClusterReport, bytes]]:
if not allow_failed_tasks and any(
task["state"].get("result_state") != "SUCCESS" for task in tasks
):
Expand All @@ -243,7 +244,7 @@ def _get_run_information(
return cluster_report_response


def _get_event_log_from_cluster(cluster: Dict, tasks: List[Dict]) -> Response[bytes]:
def _get_event_log_from_cluster(cluster: dict, tasks: list[dict]) -> Response[bytes]:
spark_context_id = _get_run_spark_context_id(tasks)
end_time = max(task["end_time"] for task in tasks)
eventlog_response = _get_eventlog(cluster, spark_context_id.result, end_time)
Expand All @@ -257,7 +258,7 @@ def _get_event_log_from_cluster(cluster: Dict, tasks: List[Dict]) -> Response[by


def _maybe_get_event_log_from_cluster(
cluster: Dict, tasks: List[Dict], dbfs_eventlog_file_size: Union[int, None]
cluster: dict, tasks: list[dict], dbfs_eventlog_file_size: Union[int, None]
) -> Response[bytes]:
spark_context_id = _get_run_spark_context_id(tasks)
end_time = max(task["end_time"] for task in tasks)
Expand Down Expand Up @@ -330,7 +331,7 @@ def get_cluster_report(

def _get_cluster_report(
cluster_id: str,
cluster_tasks: List[dict],
cluster_tasks: list[dict],
plan_type: str,
compute_type: str,
allow_incomplete: bool,
Expand All @@ -342,7 +343,7 @@ def _create_cluster_report(
cluster: dict,
cluster_info: dict,
cluster_activity_events: dict,
tasks: List[dict],
tasks: list[dict],
plan_type: DatabricksPlanType,
compute_type: DatabricksComputeType,
) -> DatabricksClusterReport:
Expand All @@ -366,7 +367,7 @@ def handle_successful_job_run(
project_id: Union[str, None] = None,
allow_incomplete_cluster_report: bool = False,
exclude_tasks: Union[Collection[str], None] = None,
) -> Response[Dict[str, str]]:
) -> Response[dict[str, str]]:
"""Create's Sync project submissions for each eligible cluster in the run (see :py:func:`~record_run`)

If project ID is provided only submit run data for the cluster tagged with it, or the only cluster if there is such.
Expand Down Expand Up @@ -455,7 +456,7 @@ def record_run(
project_id: Union[str, None] = None,
allow_incomplete_cluster_report: bool = False,
exclude_tasks: Union[Collection[str], None] = None,
) -> Response[Dict[str, str]]:
) -> Response[dict[str, str]]:
"""Create's Sync project submissions for each eligible cluster in the run.

If project ID is provided only submit run data for the cluster tagged with it, or the only cluster if there is such.
Expand Down Expand Up @@ -508,11 +509,11 @@ def record_run(


def _record_project_clusters(
project_cluster_tasks: Dict[str, Tuple[str, List[dict]]],
project_cluster_tasks: dict[str, tuple[str, list[dict]]],
plan_type: str,
compute_type: str,
allow_incomplete_cluster_report: bool,
) -> Dict[str, str]:
) -> dict[str, str]:
"""Creates project submissions/predictions and returns a map of project IDs to the new submissions/predictions IDs"""
result_ids = {}
for cluster_project_id, (cluster_id, tasks) in project_cluster_tasks.items():
Expand Down Expand Up @@ -792,7 +793,7 @@ def get_project_cluster_settings(
return Response(result=cluster_template)


def run_job_object(job: dict) -> Response[Tuple[str, str]]:
def run_job_object(job: dict) -> Response[tuple[str, str]]:
"""Create a Databricks one-off run based on the job configuration.

:param job: Databricks job object
Expand Down Expand Up @@ -1134,7 +1135,7 @@ def _wait_for_cluster_termination(

def _cluster_log_destination(
cluster: dict,
) -> Union[Tuple[str, str, str, str], Tuple[None, None, None, None]]:
) -> Union[tuple[str, str, str, str], tuple[None, None, None, None]]:
cluster_log_conf = cluster.get("cluster_log_conf", {})
s3_log_url = cluster_log_conf.get("s3", {}).get("destination")
dbfs_log_url = cluster_log_conf.get("dbfs", {}).get("destination")
Expand All @@ -1156,7 +1157,7 @@ def _cluster_log_destination(
return None, None, None, None


def _get_job_cluster(tasks: List[dict], job_clusters: list) -> Response[dict]:
def _get_job_cluster(tasks: list[dict], job_clusters: list) -> Response[dict]:
if len(tasks) == 1:
return _get_task_cluster(tasks[0], job_clusters)

Expand All @@ -1173,7 +1174,7 @@ def _get_job_cluster(tasks: List[dict], job_clusters: list) -> Response[dict]:
def _get_project_job_clusters(
job: dict,
exclude_tasks: Union[Collection[str], None] = None,
) -> Dict[str, Tuple[Tuple[str], dict]]:
) -> dict[str, tuple[tuple[str], dict]]:
"""Returns a mapping of project IDs to cluster paths and clusters.

Cluster paths are tuples that can be used to locate clusters in a job object, e.g.
Expand Down Expand Up @@ -1215,7 +1216,7 @@ def _get_project_cluster_tasks(
project_id: Optional[str] = None,
cluster_path: Optional[str] = None,
exclude_tasks: Union[Collection[str], None] = None,
) -> Dict[str, Tuple[str, List[dict]]]:
) -> dict[str, tuple[str, list[dict]]]:
"""Returns a mapping of project IDs to cluster-ID-tasks pairs"""
project_cluster_tasks = _get_cluster_tasks(run, exclude_tasks)

Expand Down Expand Up @@ -1265,7 +1266,7 @@ def _get_project_cluster_tasks(
def _get_cluster_tasks(
run: dict,
exclude_tasks: Union[Collection[str], None] = None,
) -> Dict[str, Dict[str, Tuple[str, List[dict]]]]:
) -> dict[str, dict[str, tuple[str, list[dict]]]]:
"""Returns a mapping of project IDs to cluster paths to cluster IDs and tasks"""
job_clusters = {c["job_cluster_key"]: c["new_cluster"] for c in run.get("job_clusters", [])}

Expand Down Expand Up @@ -1311,7 +1312,7 @@ def _get_cluster_tasks(
return result_cluster_project_tasks


def _get_run_spark_context_id(tasks: List[dict]) -> Response[str]:
def _get_run_spark_context_id(tasks: list[dict]) -> Response[str]:
context_ids = {
task["cluster_instance"]["spark_context_id"] for task in tasks if "cluster_instance" in task
}
Expand Down Expand Up @@ -1341,7 +1342,7 @@ def _get_task_cluster(task: dict, clusters: list) -> Response[dict]:
return Response(result=cluster)


def _s3_contents_have_all_rollover_logs(contents: List[dict], run_end_time_seconds: float):
def _s3_contents_have_all_rollover_logs(contents: list[dict], run_end_time_seconds: float):
final_rollover_log = contents and next(
(
content
Expand Down Expand Up @@ -1380,16 +1381,16 @@ def _dbfs_directory_has_all_rollover_logs(contents: dict, run_end_time_millis: f
)


def _dbfs_any_file_has_zero_size(dbfs_contents: Dict) -> bool:
def _dbfs_any_file_has_zero_size(dbfs_contents: dict) -> bool:
any_zeros = any(file["file_size"] == 0 for file in dbfs_contents["files"])
if any_zeros:
logger.info("One or more dbfs event log files has a file size of zero")
return any_zeros


def _check_total_file_size_changed(
last_total_file_size: int, dbfs_contents: Dict
) -> Tuple[bool, int]:
last_total_file_size: int, dbfs_contents: dict
) -> tuple[bool, int]:
new_total_file_size = sum([file.get("file_size", 0) for file in dbfs_contents.get("files", {})])
if new_total_file_size == last_total_file_size:
return False, new_total_file_size
Expand Down Expand Up @@ -1687,9 +1688,9 @@ def get_all_cluster_events(cluster_id: str):


def _update_monitored_timelines(
running_instance_ids: Set[str],
active_timelines_by_id: Dict[str, dict],
) -> Tuple[Dict[str, dict], List[dict]]:
running_instance_ids: set[str],
active_timelines_by_id: dict[str, dict],
) -> tuple[dict[str, dict], list[dict]]:
"""
Shared monitoring method for both Azure and Databricks to reduce complexity.
Compares the current running instances (keyed by id) to the running
Expand Down
Loading