Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.39.0] - 2026-04-02
### Updated
- Refactored Chronicle modules to use centralized `chronicle_request` and `chronicle_paginated_request` helper functions for improved code consistency and maintainability
- Standardized `as_list` parameter support across paginated API methods

## [0.38.0] - 2026-03-31
### Added
- CLI local configuration support with `--local` flag for config set and view commands
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "secops"
version = "0.38.0"
version = "0.39.0"
description = "Python SDK for wrapping the Google SecOps API for common use cases"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
94 changes: 45 additions & 49 deletions src/secops/chronicle/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
CaseList,
CasePriority,
)
from secops.chronicle.utils.format_utils import (
format_resource_id,
remove_none_values,
)
from secops.chronicle.utils.request_utils import (
chronicle_paginated_request,
chronicle_request,
Expand Down Expand Up @@ -58,32 +62,29 @@ def get_cases(
Raises:
APIError: If the API request fails
"""
params: dict[str, Any] = {"pageSize": str(page_size)}

if page_token:
params["pageToken"] = page_token
params = remove_none_values(
{
"pageSize": str(page_size),
"pageToken": page_token,
"tenantId": tenant_id,
}
)

if start_time:
params["createTime.startTime"] = start_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)

if end_time:
params["createTime.endTime"] = end_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)

if case_ids:
for case_id in case_ids:
params["caseId"] = case_id

if asset_identifiers:
for asset in asset_identifiers:
params["assetId"] = asset

if tenant_id:
params["tenantId"] = tenant_id

return chronicle_request(
client,
method="GET",
Expand Down Expand Up @@ -296,17 +297,15 @@ def execute_bulk_close(
f"Valid values: {valid_values}"
) from ve

body: dict[str, Any] = {
"casesIds": case_ids,
"closeReason": close_reason,
}

if root_cause is not None:
body["rootCause"] = root_cause
if close_comment is not None:
body["closeComment"] = close_comment
if dynamic_parameters is not None:
body["dynamicParameters"] = dynamic_parameters
body = remove_none_values(
{
"casesIds": case_ids,
"closeReason": close_reason,
"rootCause": root_cause,
"closeComment": close_comment,
"dynamicParameters": dynamic_parameters,
}
)

return chronicle_request(
client,
Expand Down Expand Up @@ -363,21 +362,20 @@ def get_case(client, case_name: str, expand: str | None = None) -> Case:
Raises:
APIError: If the API request fails
"""
if not case_name.startswith("projects/"):
endpoint_path = f"cases/{case_name}"
else:
endpoint_path = case_name
endpoint_path = format_resource_id(case_name)

params: dict[str, Any] = {}
if expand:
params["expand"] = expand
params = remove_none_values(
{
"expand": expand,
}
)

data = chronicle_request(
client,
method="GET",
endpoint_path=endpoint_path,
endpoint_path=f"cases/{endpoint_path}",
api_version=APIVersion.V1BETA,
params=params,
params=params or None,
error_message="Failed to get case",
)

Expand Down Expand Up @@ -418,15 +416,14 @@ def list_cases(
Raises:
APIError: If the API request fails
"""
extra_params: dict[str, Any] = {}
if filter_query:
extra_params["filter"] = filter_query
if order_by:
extra_params["orderBy"] = order_by
if expand:
extra_params["expand"] = expand
if distinct_by:
extra_params["distinctBy"] = distinct_by
extra_params = remove_none_values(
{
"filter": filter_query,
"orderBy": order_by,
"expand": expand,
"distinctBy": distinct_by,
}
)

return chronicle_paginated_request(
client,
Expand All @@ -435,7 +432,7 @@ def list_cases(
items_key="cases",
page_size=page_size,
page_token=page_token,
extra_params=extra_params if extra_params else None,
extra_params=extra_params or None,
as_list=as_list,
)

Expand Down Expand Up @@ -500,10 +497,7 @@ def patch_case(
APIError: If the API request fails
ValueError: If an invalid priority value is provided
"""
if not case_name.startswith("projects/"):
endpoint_path = f"cases/{case_name}"
else:
endpoint_path = case_name
endpoint_path = format_resource_id(case_name)

if "priority" in case_data and isinstance(case_data["priority"], str):
case_priority = case_data["priority"]
Expand All @@ -519,17 +513,19 @@ def patch_case(
f"Valid values: {valid_values}"
) from ve

params: dict[str, Any] = {}
if update_mask:
params["updateMask"] = update_mask
params = remove_none_values(
{
"updateMask": update_mask,
}
)

data = chronicle_request(
client,
method="PATCH",
endpoint_path=endpoint_path,
endpoint_path=f"cases/{endpoint_path}",
api_version=APIVersion.V1BETA,
json=case_data,
params=params if params else None,
params=params or None,
error_message="Failed to patch case",
)

Expand Down
Loading
Loading