From 778b723247eeffcabff7409108bde521b312248a Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Fri, 27 Feb 2026 07:41:19 -0800 Subject: [PATCH 1/2] docs: update all Python SDK/CLI references to v5.0.0 Migrate all documentation from the old Manager-based Python SDK (v4.x) to the new Client + Organization architecture (v5.0.0). Updates CLI commands from the old argparse patterns to the new Click-based ` ` pattern across 12 files. Co-Authored-By: Claude Opus 4.6 --- .../adapters/tutorials/webhook-adapter.md | 2 +- .../endpoint-agent/uninstallation.md | 12 +- .../troubleshooting/non-responding-sensors.md | 17 +- .../tutorials/writing-testing-rules.md | 20 +- docs/4-data-queries/query-cli.md | 2 +- .../extensions/labs/ai-agent-engine.md | 13 +- .../extensions/labs/playbook.md | 30 +- .../extensions/using-extensions.md | 4 +- docs/5-integrations/services/replay.md | 30 +- docs/6-developer-guide/sdk-overview.md | 252 +-- docs/6-developer-guide/sdks/index.md | 22 +- docs/6-developer-guide/sdks/python-sdk.md | 1888 +++++------------ 12 files changed, 740 insertions(+), 1552 deletions(-) diff --git a/docs/2-sensors-deployment/adapters/tutorials/webhook-adapter.md b/docs/2-sensors-deployment/adapters/tutorials/webhook-adapter.md index d51c6a17..f2d3cf81 100644 --- a/docs/2-sensors-deployment/adapters/tutorials/webhook-adapter.md +++ b/docs/2-sensors-deployment/adapters/tutorials/webhook-adapter.md @@ -51,7 +51,7 @@ After creating the webhook, you will be provided with a geo-dependent URL, respe * Python SDK: ```python -python3 -c "import limacharlie; print(limacharlie.Manager().getOrgURLs()['hooks'])" +python3 -c "from limacharlie.client import Client; from limacharlie.sdk.organization import Organization; print(Organization(Client()).get_urls()['hooks'])" ``` ## Using the webhook adapter diff --git a/docs/2-sensors-deployment/endpoint-agent/uninstallation.md b/docs/2-sensors-deployment/endpoint-agent/uninstallation.md index dc704f68..b8b44e70 100644 --- a/docs/2-sensors-deployment/endpoint-agent/uninstallation.md +++ b/docs/2-sensors-deployment/endpoint-agent/uninstallation.md @@ -21,10 +21,14 @@ On Windows, the command defaults to uninstalling the sensor as if installed from To run the uninstall command against *all* Sensors, a simple loop with the SDK in Python would work: ```python -import limacharlie -lc = limacharlie.Manager() -for sensor in lc.sensors(): - sensor.task( 'uninstall' ) +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization + +client = Client() +org = Organization(client) +for sensor_info in org.list_sensors(): + sensor = org.get_sensor(sensor_info["sid"]) + sensor.task("uninstall") ``` ### Using a D&R Rule diff --git a/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md b/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md index 9de15d11..1aaec6ec 100644 --- a/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md +++ b/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md @@ -7,8 +7,9 @@ A common request is to alert an administrator if a Sensor that normally forwards ## Example Playbook Code ```python -import limacharlie import time +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor # LimaCharlie D&R Rule to trigger this playbook # every 30 minutes. @@ -28,19 +29,19 @@ import time SENSOR_SELECTOR = "plat == windows and `server` in tags" DATA_WITHIN = 10 * 60 * 1000 # 10 minutes -def notify_missing_data(sdk: limacharlie.Limacharlie, sensor: limacharlie.Sensor): +def notify_missing_data(sdk: Organization, sensor: Sensor): # TODO: Implement this, but it's optional if all you want is a detection # since those will be generated automatically. pass -def get_relevant_sensors(sdk: limacharlie.Limacharlie) -> list[limacharlie.Sensor]: - sensors = sdk.sensors(selector=SENSOR_SELECTOR) +def get_relevant_sensors(sdk: Organization) -> list[Sensor]: + sensors = sdk.list_sensors(selector=SENSOR_SELECTOR) relevant_sensors = [] - for sensor in sensors: - relevant_sensors.append(sensor) + for sensor_info in sensors: + relevant_sensors.append(sdk.get_sensor(sensor_info["sid"])) return relevant_sensors -def playbook(sdk: limacharlie.Limacharlie, data: dict) -> dict | None: +def playbook(sdk: Organization, data: dict) -> dict | None: # Get the sensors we care about. relevant_sensors = get_relevant_sensors(sdk) @@ -49,7 +50,7 @@ def playbook(sdk: limacharlie.Limacharlie, data: dict) -> dict | None: # For each sensor, check if we've received data within that time period. for sensor in relevant_sensors: # To do that we will get the data overview and see if a recent time stamp is present. - data_overview = sensor.getHistoricOverview(int(time.time() - DATA_WITHIN), int(time.time())) + data_overview = sensor.get_overview(int(time.time() - DATA_WITHIN), int(time.time())) after = int(time.time() * 1000) - DATA_WITHIN for timestamp in data_overview: if timestamp > after: diff --git a/docs/3-detection-response/tutorials/writing-testing-rules.md b/docs/3-detection-response/tutorials/writing-testing-rules.md index a4e84492..a1ffe397 100644 --- a/docs/3-detection-response/tutorials/writing-testing-rules.md +++ b/docs/3-detection-response/tutorials/writing-testing-rules.md @@ -78,7 +78,7 @@ The Organization ID (OID) identifies uniquely your organization while the API ke ### Login to the CLI -Back in your terminal, log in with your credentials: `limacharlie login`. +Back in your terminal, log in with your credentials: `limacharlie auth login`. 1. When asked for the Organization ID, paste your OID from the previous step. 2. When asked for a name for this access, you can leave it blank to set the default credentials. @@ -264,7 +264,9 @@ respond: name: T1196 ``` -Now validate: `limacharlie replay --validate --rule-content T1196.rule` +Now validate the rule structure. Save the detect and respond components to separate files (`T1196_detect.yaml` and `T1196_respond.yaml`), then run: + +`limacharlie dr validate --detect T1196_detect.yaml --respond T1196_respond.yaml` After a few seconds, you should see a response with `success: true` if the rule validates properly. @@ -312,7 +314,7 @@ should still NOT match because it's not a `.cpl`. Now we can run our 3 samples against the rule using Replay, -`limacharlie replay --rule-content T1196.rule --events positive.json` should output a result +`limacharlie dr test --input-file T1196.rule --events positive.json` should output a result indicating the event matched (by actioning the `report`) like: ```json @@ -328,7 +330,7 @@ indicating the event matched (by actioning the `report`) like: ... ``` -`limacharlie replay --rule-content T1196.rule --events negative-1.json` should output a result +`limacharlie dr test --input-file T1196.rule --events negative-1.json` should output a result indicating the event did NOT match like: ```json @@ -341,7 +343,7 @@ indicating the event did NOT match like: } ``` -`limacharlie replay --rule-content T1196.rule --events negative-2.json` be the same as `negative-1.json`. +`limacharlie dr test --input-file T1196.rule --events negative-2.json` be the same as `negative-1.json`. ### Testing Historical Data @@ -353,7 +355,11 @@ costs associated. Running our rule against the last week of data is simple: -`limacharlie replay --rule-content T1196.rule --entire-org --last-seconds 604800` +```bash +START=$(date -d '7 days ago' +%s) +END=$(date +%s) +limacharlie replay run --detect-file T1196_detect.yaml --respond-file T1196_respond.yaml --start $START --end $END +``` No matches should look like that: @@ -375,5 +381,5 @@ Once your rule is done and you've evaluated various events for matches, you can Now is the time to push the new rule to production, the easy part. -Simply run `limacharlie dr add --rule-name T1196 --rule-file T1196.rule` +Simply run `limacharlie dr set --key T1196 --input-file T1196.rule` and confirm it is operational by running `limacharlie dr list`. diff --git a/docs/4-data-queries/query-cli.md b/docs/4-data-queries/query-cli.md index cddb8122..65f9c712 100644 --- a/docs/4-data-queries/query-cli.md +++ b/docs/4-data-queries/query-cli.md @@ -1,6 +1,6 @@ # Query with CLI -The command line interface found in the Python CLI/SDK can be invoked like `limacharlie query` once installed (`pip install limacharlie`). +The command line interface found in the Python CLI/SDK can be invoked like `limacharlie search` once installed (`pip install limacharlie`). ## Context diff --git a/docs/5-integrations/extensions/labs/ai-agent-engine.md b/docs/5-integrations/extensions/labs/ai-agent-engine.md index deb01da6..f65ae231 100644 --- a/docs/5-integrations/extensions/labs/ai-agent-engine.md +++ b/docs/5-integrations/extensions/labs/ai-agent-engine.md @@ -53,13 +53,14 @@ Here is an example D&R rule starting a new invocation of a playbook. ### Python example ```python -# Import LC SDK -import limacharlie import json -# Instantiate the SDK with default creds. -lc = limacharlie.Manager() -# Instantiate the Extension manager object. -ext = limacharlie.Extension(lc) +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.extensions import Extensions + +client = Client() +org = Organization(client) +ext = Extensions(org) # Issue a request to the "ext-ai-agent-engine" extension for the "my-agent-name" agent. response = ext.request("ext-ai-agent-engine", "start_session", { diff --git a/docs/5-integrations/extensions/labs/playbook.md b/docs/5-integrations/extensions/labs/playbook.md index 49059d84..841bc95e 100644 --- a/docs/5-integrations/extensions/labs/playbook.md +++ b/docs/5-integrations/extensions/labs/playbook.md @@ -54,12 +54,13 @@ Here is an example D&R rule starting a new invocation of a playbook. ### Python example ```python -# Import LC SDK -import limacharlie -# Instantiate the SDK with default creds. -lc = limacharlie.Manager() -# Instantiate the Extension manager object. -ext = limacharlie.Extension(lc) +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.extensions import Extensions + +client = Client() +org = Organization(client) +ext = Extensions(org) # Issue a request to the "ext-playbook" extension. response = ext.request("ext-playbook", "run_playbook", { @@ -78,7 +79,7 @@ print(response) A playbook is a normal python script. The only required component is a top level function called `playbook` which takes 2 arguments: -* `sdk`: an instance of the LC Python SDK ( `limacharlie.Manager()` ) pre-authenticated to the relevant Organization based on the credentials provided, if any, `None` otherwise. +* `sdk`: an instance of the LC Python SDK (`Organization` from `limacharlie.sdk.organization`) pre-authenticated to the relevant Organization based on the credentials provided, if any, `None` otherwise. * `data`: the optional JSON dictionary provided as context to your playbook. The function must return a dictionary with the following optional keys: @@ -95,13 +96,13 @@ This allows your playbook to return information about its execution, return data The following is a sample playbook that sends a webhook to an external product with a secret stored in LimaCharlie, and it returns the data as the response from the playbook. ```python -import limacharlie import json import urllib.request +from limacharlie.sdk.hive import Hive def playbook(sdk, data): # Get the secret we need from LimaCharlie. - mySecret = limacharlie.Hive(sdk, "secret").get("my-secret-name").data["secret"] + mySecret = Hive(sdk, "secret").get("my-secret-name").data["secret"] # Send the Webhook. request = urllib.request.Request("https://example.com/webhook", data=json.dumps(data).encode('utf-8'), headers={ @@ -133,8 +134,6 @@ When a playbook generates a detection, you can customize the detection category The following example checks if a server sensor has missed a check-in and creates a detection with a custom category name: ```python -import limacharlie - def playbook(sdk, data): if not sdk: return {"error": "LC API key required"} @@ -145,8 +144,9 @@ def playbook(sdk, data): threshold = 3600 # 1 hour in seconds missing_sensors = [] - for sensor in sdk.sensors(): - info = sensor.getInfo() + for sensor_info in sdk.list_sensors(): + sensor = sdk.get_sensor(sensor_info["sid"]) + info = sensor.get_info() last_seen = info.get('last_seen', 0) if (current_time - last_seen) > threshold: missing_sensors.append({ @@ -169,7 +169,7 @@ def playbook(sdk, data): return { "data": {"status": "all sensors checked in"} } -```python +``` **Important:** The `cat` field must be placed at the **top level** of the return dictionary, alongside `detection`, not inside it. When this playbook creates a detection, it will appear in the Detections UI with the category name "Server-Sensor-Missing-Check-In" instead of the default "playbook-detection". @@ -227,7 +227,7 @@ hives: return {"error": "LC API key required to list sensors"} return { "data": { - "sensors": [s.getInfo() for s in sdk.sensors()] + "sensors": [sdk.get_sensor(s["sid"]).get_info() for s in sdk.list_sensors()] } } usr_mtd: diff --git a/docs/5-integrations/extensions/using-extensions.md b/docs/5-integrations/extensions/using-extensions.md index 555edf1c..a41d9985 100644 --- a/docs/5-integrations/extensions/using-extensions.md +++ b/docs/5-integrations/extensions/using-extensions.md @@ -14,7 +14,7 @@ Configurations are a great way of storing rarely-written settings for an Extensi The structure of the configuration for a given Extension is published by the Extension via its "schema". -Schemas are available through the [Schema API](https://api.limacharlie.io/static/swagger/#/Extension-Schema/getExtensionSchema) or the LimaCharlie CLI: `limacharlie extension get_schema --help`. +Schemas are available through the [Schema API](https://api.limacharlie.io/static/swagger/#/Extension-Schema/getExtensionSchema) or the LimaCharlie CLI: `limacharlie extension schema --help`. ### Requests @@ -22,7 +22,7 @@ Requests are, as the name implies, direct individual requests to an Extension. A The "action" and "payload" entirely depends on the Extension it is destined to. The list of actions and individual payload structures available for an Extension is documented by each Extension using the "schema" they publish. -Schemas are available through the [Schema API](https://api.limacharlie.io/static/swagger/#/Extension-Schema/getExtensionSchema) or the LimaCharlie CLI: `limacharlie extension get_schema --help`. +Schemas are available through the [Schema API](https://api.limacharlie.io/static/swagger/#/Extension-Schema/getExtensionSchema) or the LimaCharlie CLI: `limacharlie extension schema --help`. ## Interacting diff --git a/docs/5-integrations/services/replay.md b/docs/5-integrations/services/replay.md index c7f3c4bb..70c08a17 100644 --- a/docs/5-integrations/services/replay.md +++ b/docs/5-integrations/services/replay.md @@ -43,7 +43,7 @@ The returned data from the API contains the following: ### Query Language -To use Replay in LCQL Mode (LimaCharlie Query Language), you can specify your query in the `query` parameter of the Replay Request (defined below) when using the REST interface, or you can use the LimaCharlie Python SDK/CLI's [query interface](https://github.com/refractionPOINT/python-limacharlie/blob/master/limacharlie/Query.py): `limacharlie query --help`. +To use Replay in LCQL Mode (LimaCharlie Query Language), you can specify your query in the `query` parameter of the Replay Request (defined below) when using the REST interface, or you can use the LimaCharlie Python SDK/CLI's [query interface](https://github.com/refractionPOINT/python-limacharlie/blob/master/limacharlie/Query.py): `limacharlie search --help`. ### Python CLI @@ -52,30 +52,32 @@ The [Python CLI](https://github.com/refractionPOINT/python-limacharlie) gives yo Sample command line to query one sensor: ``` -limacharlie-replay --sid 9cbed57a-6d6a-4af0-b881-803a99b177d9 --start 1556568500 --end 1556568600 --rule-content ./test_rule.txt +limacharlie replay run --detect-file ./test_detect.yaml --respond-file ./test_respond.yaml --start 1556568500 --end 1556568600 ``` Sample command line to query an entire organization: ``` -limacharlie-replay --entire-org --start 1555359000 --end 1556568600 --rule-name my-rule-name +limacharlie replay run --name my-rule-name --start 1555359000 --end 1556568600 ``` -If specifying a rule as content with the `--rule-content`, the format should be - in `JSON` or `YAML` like: +When specifying a rule via `--detect-file` and `--respond-file`, each file should be in `JSON` or `YAML` format. For example, a detect file: ```yaml -detect: - event: DNS_REQUEST - op: is - path: event/DOMAIN_NAME - value: www.dilbert.com -respond: - - action: report - name: dilbert-is-here +event: DNS_REQUEST +op: is +path: event/DOMAIN_NAME +value: www.dilbert.com ``` -Instead of specifying the `--entire-org` or `--sid` flags, you may use events from a local file via the `--events` flag. +And a respond file: + +```yaml +- action: report + name: dilbert-is-here +``` + +Instead of replaying against an entire organization, you may use events from a local file via the `limacharlie dr test` command with the `--events` flag. We invite you to look at the command line usage itself, as the tool evolves. diff --git a/docs/6-developer-guide/sdk-overview.md b/docs/6-developer-guide/sdk-overview.md index d2108ada..3d773a49 100644 --- a/docs/6-developer-guide/sdk-overview.md +++ b/docs/6-developer-guide/sdk-overview.md @@ -174,7 +174,7 @@ In addition to the PyPi distribution we also offer a pre-built Docker image on D docker run refractionpoint/limacharlie:latest whoami # Using a specific version (Docker image tag matches the library version) -docker run refractionpoint/limacharlie:4.9.13 whoami +docker run refractionpoint/limacharlie:5.0.0 whoami # If you already have a credential file locally, you can mount it inside the Docker container docker run -v ${HOME}/.limacharlie:/root/.limacharlie:ro refractionpoint/limacharlie:latest whoami @@ -187,20 +187,20 @@ Authenticating to use the SDK / CLI can be done in a few ways. **Option 1 - Logging In** The simplest is to login to an Organization using an [API key](../7-administration/access/api-keys.md). -Use `limacharlie login` to store credentials locally. You will need an `OID` (Organization ID) and an API key, and (optionally) a `UID` (User ID), all of which you can get from the Access Management --> REST API section of the web interface. +Use `limacharlie auth login` to store credentials locally. You will need an `OID` (Organization ID) and an API key, and (optionally) a `UID` (User ID), all of which you can get from the Access Management --> REST API section of the web interface. The login interface supports named environments, or a default one used when no environment is selected. -To list available environments: +To list available organizations: ```bash -limacharlie use +limacharlie auth list-orgs ``` -Setting a given environment in the current shell session can be done like this: +Setting a given organization in the current shell session can be done like this: ```bash -limacharlie use my-dev-org +limacharlie auth use-org my-dev-org ``` You can also specify a `UID` (User ID) during login to use a *user* API key representing @@ -211,162 +211,122 @@ You can also specify a `UID` (User ID) during login to use a *user* API key repr ### SDK -The root of the functionality in the SDK is from the `Manager` object. It holds the credentials and is tied to a specific LimaCharlie Organization. +The SDK is organized around two main objects: `Client` (handles authentication and HTTP) and `Organization` (provides access to all org-scoped operations). -You can authenticate the `Manager` using an `oid` (and optionally a `uid`), along with either a `secret_api_key` or `jwt` directly. Alternatively you can just use an environment name (as specified in `limacharlie login`). If no creds are provided, the `Manager` will try to use the default environment and credentials. +You can authenticate the `Client` using an `oid` and `api_key` directly, or use environment variables and config file credentials. If no credentials are provided, the `Client` resolves them from the environment (as configured via `limacharlie auth login`). #### Importing ```python -import limacharlie +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor YARA_SIG = 'https://raw.githubusercontent.com/Yara-Rules/rules/master/Malicious_Documents/Maldoc_PDF.yar' # Create an instance of the SDK. -mgr = limacharlie.Manager() +client = Client() +org = Organization(client) # Get a list of all the sensors in the current Organization. -all_sensors = mgr.sensors() +all_sensors = list(org.list_sensors()) -# Select the first sensor in the list. -sensor = all_sensors[0] +# Select the first sensor. +sensor = org.get_sensor(all_sensors[0]["sid"]) # Tag this sensor with a tag for 10 minutes. -sensor.tag( 'suspicious', ttl = 60 * 10 ) +sensor.add_tag('suspicious', ttl=60 * 10) # Send a task to the sensor (unidirectionally, not expecting a response). -sensor.task( 'os_processes' ) +sensor.task('os_processes') # Send a yara scan to that sensor for processes "evil.exe". -sensor.task( 'yara_scan -e *evil.exe ' + YARA_SIG ) -``` - -#### Use of gevent - -Note that the SDK uses the `gevent` package which sometimes has issues with other - packages that operate at a low level in python. For example, Jupyter notebooks - may see freezing on importing `limacharlie` and require a tweak to load: - -```json -{ - "display_name": "IPython 2 w/gevent", - "language": "python", - "argv": [ - "python", - "-c", "from gevent.monkey import patch_all; patch_all(thread=False); from ipykernel.kernelapp import main; main()", - "-f", - "{connection_file}" - ] -} +sensor.task('yara_scan -e *evil.exe ' + YARA_SIG) ``` ### Components -#### Manager +#### Client -This is a the general component that provides access to the managing functions of the API like querying sensors online, creating and removing Outputs etc. +The `Client` handles HTTP communication with the LimaCharlie API, including JWT generation/refresh, retry with exponential backoff, and rate limit awareness. Import from `limacharlie.client`. -#### Firehose +#### Organization -The `Firehose` is a simple object that listens on a port for LimaCharlie.io data. Under the hood it creates a Syslog Output on limacharlie.io pointing to itself and removes it on shutdown. Data from limacharlie.io is added to `firehose.queue` (a `gevent Queue`) as it is received. +The `Organization` is the main entry point for all org-scoped operations: listing sensors, managing rules, accessing hives, outputs, users, extensions, and more. Import from `limacharlie.sdk.organization`. -It is a basic building block of automation for limacharlie.io. +#### Sensor -#### Spout +The `Sensor` object is returned by `org.get_sensor(sid)`. -Much like the `Firehose`, the Spout receives data from LimaCharlie.io, the difference - is that the `Spout` does not require opening a local port to listen actively on. Instead - it leverages `stream.limacharlie.io` to receive the data stream over HTTPS. +It supports `task`, `hostname`, `add_tag`, `remove_tag`, `get_tags`, `isolate`, `rejoin`, and more. This is the main way to interact with a specific sensor. Import from `limacharlie.sdk.sensor`. -A `Spout` is automatically created when you instantiate a `Manager` with the -`is_interactive = True` and `inv_id = XXXX` arguments in order to provide real-time - feedback from tasking sensors. +The `task` function sends a task to the sensor unidirectionally, meaning it does not + receive the response from the sensor (if any). -#### Sensor +#### Firehose -This is the object returned by `manager.sensor( sensor_id )`. +The `Firehose` is a TLS push-mode streaming listener. Under the hood it creates an Output on limacharlie.io pointing to itself and removes it on shutdown. Import from `limacharlie.sdk.firehose`. -It supports a `task`, `hostname`, `tag`, `untag`, `getTags` and more functions. This - is the main way to interact with a specific sensor. +#### Spout -The `task` function sends a task to the sensor unidirectionally, meaning it does not - receive the response from the sensor (if any). If you want to interact with a sensor - in real-time, use the interactive mode (as mentioned in the `Spout`) and use either - the `request` function to receive replies through a `FutureResults` object or the -`simpleRequest` to wait for the response and receive it as a return value. +Much like the `Firehose`, the `Spout` receives data from LimaCharlie.io, the difference + is that the `Spout` does not require opening a local port to listen actively on. Instead + it leverages `stream.limacharlie.io` to receive the data stream over HTTPS. Import from `limacharlie.sdk.spout`. -#### Artifacts +#### Hive -The `Artifacts` is a helpful class to upload artifacts to LimaCharlie without going through a sensor. +The `Hive` provides access to LimaCharlie's key-value configuration store. Used for D&R rules, secrets, playbooks, and more. Import from `limacharlie.sdk.hive`. #### Extensions -The `Extensions` can be used to subscribe to and manage extensions within your org. +The `Extensions` class manages extension subscriptions and requests. ```python -import limacharlie -from limacharlie import Extension +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.extensions import Extensions -mgr = limacharlie.Manager() -ext = Extension(mgr) +client = Client() +org = Organization(client) +ext = Extensions(org) ext.subscribe('binlib') ``` -#### Payloads +#### Search -The `Payloads` can be used to manage various executable [payloads](../2-sensors-deployment/endpoint-agent/payloads.md) accessible to sensors. +The `Search` object allows you to execute LCQL queries, validate queries, estimate costs, and manage saved queries. Import from `limacharlie.sdk.search`. #### Replay -The `Replay` object allows you to interact with [Replay](../5-integrations/services/replay.md) jobs managed by LimaCharlie. These allow you to re-run D&R Rules on historical data. - -Sample command line to query one sensor: - -``` -limacharlie-replay --sid 9cbed57a-6d6a-4af0-b881-803a99b177d9 --start 1556568500 --end 1556568600 --rule-content ./test_rule.txt -``` - -Sample command line to query an entire organization: - -``` -limacharlie-replay --entire-org --start 1555359000 --end 1556568600 --rule-name my-rule-name -``` +The `Replay` object allows you to interact with [Replay](../5-integrations/services/replay.md) jobs managed by LimaCharlie. These allow you to re-run D&R Rules on historical data. Import from `limacharlie.sdk.replay`. -#### Search +#### Artifacts -The `Search` object allows you to perform an IOC search across multiple organizations. +The `Artifacts` class is used to upload, list, and download artifacts. Import from `limacharlie.sdk.artifacts`. -#### SpotCheck +#### Payloads -The `SpotCheck` object (sometimes called Fleet Check) allows you to manage an active (query sensors directly as opposed to searching on indexed historical data) search for various IOCs on an organization's sensors. +The `Payloads` can be used to manage various executable [payloads](../2-sensors-deployment/endpoint-agent/payloads.md) accessible to sensors. #### Configs The `Configs` is used to retrieve an organization's configuration as a config file, or apply - an existing config file to an organization. This is the concept of Infrastructure as Code. - -#### Webhook - -The `Webhook` object demonstrates handling [webhooks emitted by the LimaCharlie cloud](../2-sensors-deployment/adapters/tutorials/webhook-adapter.md), including verifying the shared-secret signing of the webhooks. + an existing config file to an organization. This is the concept of Infrastructure as Code. Import from `limacharlie.sdk.configs`. ### Examples: -* [Basic Manager Operations](https://github.com/refractionPOINT/python-limacharlie/blob/master/samples/demo_manager.py) -* [Basic Firehose Operations](https://github.com/refractionPOINT/python-limacharlie/blob/master/samples/demo_firehose.py) -* [Basic Spout Operations](https://github.com/refractionPOINT/python-limacharlie/blob/master/samples/demo_spout.py) -* [Basic Integrated Operations](https://github.com/refractionPOINT/python-limacharlie/blob/master/samples/demo_interactive_sensor.py) * [Sample Configs](https://github.com/refractionPOINT/python-limacharlie/tree/master/limacharlie/sample_configs) ### Command Line Interface -Many of the objects available as part of the LimaCharlie Python SDK also support various command line interfaces. +The CLI uses a `limacharlie ` command pattern. Every command supports `--help` for detailed usage and `--ai-help` for AI-optimized explanations. -#### Query +#### Search / Query [LimaCharlie Query Language (LCQL)](../4-data-queries/lcql-examples.md) provides a flexible, intuitive and interactive way to explore your data in LimaCharlie. ```bash -limacharlie query --help +limacharlie search --help ``` #### ARLs @@ -377,72 +337,54 @@ ARLs can be used in the [YARA manager](../5-integrations/extensions/limacharlie/ Testing an ARL before applying it somewhere can be helpful to shake out access or authentication errors beforehand. You can test an ARL and see what files are fetched, and their contents, by running the following command: -```powershell -limacharlie get-arl -a [github,Yara-Rules/rules/email] +```bash +limacharlie arl get -a [github,Yara-Rules/rules/email] ``` -#### Firehose - -Listens on interface `1.2.3.4`, port `9424` for incoming connections from LimaCharlie.io. - Receives only events from hosts tagged with `fh_test`. +#### Streaming -```python -python -m limacharlie.Firehose 1.2.3.4:9424 event -n firehose_test -t fh_test --oid c82e5c17-d519-4ef5-a4ac-caa4a95d31ca -``` - -#### Spout +Stream events, detections, or audit logs in real-time. Uses pull-mode spouts (HTTPS) or push-mode firehose listeners (TLS). -Behaves similarly to the Firehose, but instead of listening from an internet accessible port, it connects to the `stream.limacharlie.io` service to stream the output over HTTPS. This means the Spout allows you to get ad-hoc output like the Firehose, but it also works through NATs and proxies. +```bash +# Stream events (pull-mode via stream.limacharlie.io, works through NATs and proxies) +limacharlie stream events +limacharlie stream events --tag server -It is MUCH more convenient for short term ad-hoc outputs, but it is less reliable than a Firehose for very large amounts of data. +# Stream detections +limacharlie stream detections -```python -python -m limacharlie.Spout event --oid c82e5c17-d519-4ef5-a4ac-caa4a95d31ca +# Stream audit logs +limacharlie stream audit ``` -#### Configs +#### Sync (Infrastructure as Code) -The `fetch` command will get a list of the Detection & Response rules in your - organization and will write them to the config file specified or the default - config file `lc_conf.yaml` in YAML format. +The `pull` command will fetch the organization configuration and write it to a local YAML file. ```bash -limacharlie configs fetch --oid c82e5c17-d519-4ef5-a4ac-c454a95d31ca` +limacharlie sync pull --oid c82e5c17-d519-4ef5-a4ac-c454a95d31ca ``` -Then `push` can upload the rules specified in the config file (or the default one) - to your organization. The optional `--force` argument will remove active rules not - found in the config file. The `--dry-run` simulates the sync and displays the changes - that would occur. - -The `--config` allows you to specify an alternate config file and the `--api-key` allows - you to specify a file on disk where the API should be read from (otherwise, of if `-` is - specified as a file, the API Key is read from STDIN). +Then `push` can upload the configuration specified in the YAML file to your organization. The `--dry-run` simulates the sync and displays the changes that would occur. ```bash -limacharlie configs push --dry-run --oid c82e5c17-d519-4ef5-a4ac-c454a95d31ca --config /path/to/template.yaml --all --ignore-inaccessible +limacharlie sync push --dry-run --oid c82e5c17-d519-4ef5-a4ac-c454a95d31ca --config /path/to/template.yaml ``` -All these capabilities are also supported directly by the `limacharlie.Configs` object. +All these capabilities are also supported directly by the `Configs` SDK class (`limacharlie.sdk.configs`). -The Sync functionality currently supports all common useful configurations. The `--no-rules` and `--no-outputs` flags can be used to ignore one or the other in config files and sync. Additional flags are also supported, see `limacharlie configs --help`. +The Sync functionality supports all common useful configurations. Use the hive flags (`--hive-dr-general`, `--hive-fp`, `--outputs`, etc.) to control which resource types are synced. See `limacharlie sync --help` for all options. -To understand better the config format, do a `fetch` from your organization. Notice the use of the `include` +To understand better the config format, do a `pull` from your organization. Notice the use of the `include` statement. Using this statement you can combine multiple config files together, making it ideal for the management of complex rule sets and their versioning. #### Spot Checks -Used to perform Organization-wide checks for specific indicators of compromise. Available as a custom API `SpotCheck` object or as a module from the command line. Supports many types of IoCs like file names, directories, registry keys, file hashes and YARA signatures. - -```python -python -m limacharlie.SpotCheck --no-macos --no-linux --tags vip --file c:\\evil.exe` -``` +Used to perform Organization-wide checks for specific indicators of compromise. Supports many types of IoCs like file names, directories, registry keys, file hashes and YARA signatures. -For detailed usage: - -```python -python -m limacharlie.SpotCheck --help +```bash +limacharlie spotcheck --help ``` #### Search @@ -461,20 +403,12 @@ Shortcut utility to manage extensions. limacharlie extension --help ``` -#### Artifact Upload - -Shortcut utility to upload and retrieve Artifacts within LimaCharlie with just the CLI (no agent). - -```bash -limacharlie artifacts --help -``` - -#### Artifact Download +#### Artifacts -Shortcut utility to download Artifact Collection in LimaCharlie locally. +Shortcut utility to upload, list, and download Artifacts within LimaCharlie. ```bash -limacharlie artifacts get_original --help +limacharlie artifact --help ``` #### Replay @@ -498,8 +432,8 @@ limacharlie dr --help Print out to STDOUT events or detections matching the parameter. ```bash -limacharlie events --help -limacharlie detections --help +limacharlie event --help +limacharlie detection --help ``` #### List Sensors @@ -507,36 +441,36 @@ limacharlie detections --help Print out all basic sensor information for all sensors matching the [selector](../8-reference/sensor-selector-expressions.md). ```bash -limacharlie sensors --selector 'plat == windows' +limacharlie sensor list --selector 'plat == windows' ``` -#### Invite Users +#### Add Users -Invite single or multiple users to LimaCharlie. Invited users will be sent an email to confirm their address, enable the account and create a new password. +Add single or multiple users to a LimaCharlie organization. Added users will be sent an email to confirm their address, enable the account and create a new password. -Keep in mind that this actions operates in the user context which means you need to use user scoped API key. For more information on how to obtain one, see +Keep in mind that this action operates in the user context which means you need to use a user scoped API key. For more information on how to obtain one, see -Invite a single user: +Add a single user: ```bash -limacharlie users invite --email=user1@example.com +limacharlie user add --email user1@example.com ``` -Invite multiple users: +Add multiple users: ```bash -limacharlie users invite --email=user1@example.com,user2@example.com,user3@example.com +limacharlie user add --email user1@example.com,user2@example.com,user3@example.com ``` -Invite multiple users from new line delimited entries in a text file: +Add multiple users from new line delimited entries in a text file: ```bash -cat users_to_invite.txt +cat users_to_add.txt user1@example.com user2@example.com user3@example.com ``` ```bash -limacharlie users invite --file=users_to_invite.txt +limacharlie user add --file users_to_add.txt ``` diff --git a/docs/6-developer-guide/sdks/index.md b/docs/6-developer-guide/sdks/index.md index bdcbb126..ce59cf51 100644 --- a/docs/6-developer-guide/sdks/index.md +++ b/docs/6-developer-guide/sdks/index.md @@ -41,7 +41,7 @@ pip install limacharlie ``` **Key Features:** -- Manager class for all platform operations +- Client + Organization architecture for all platform operations - Sensor tasking and management - Real-time streaming (Firehose/Spout) - Detection rule management via Hive @@ -52,19 +52,17 @@ pip install limacharlie ### Python ```python -import limacharlie +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization -# Initialize the manager -manager = limacharlie.Manager( - oid='your-org-id', - secret_api_key='your-api-key' -) +# Initialize client and organization +client = Client(oid='your-org-id', api_key='your-api-key') +org = Organization(client) -# List all online sensors -sensors = manager.sensors() -for sensor in sensors: - if sensor.isOnline(): - print(f"Online: {sensor.sid}") +# List all sensors +for sensor_info in org.list_sensors(): + sensor = org.get_sensor(sensor_info["sid"]) + print(f"Sensor: {sensor.sid} - {sensor.hostname}") ``` ### Go diff --git a/docs/6-developer-guide/sdks/python-sdk.md b/docs/6-developer-guide/sdks/python-sdk.md index 81639edc..a245039e 100644 --- a/docs/6-developer-guide/sdks/python-sdk.md +++ b/docs/6-developer-guide/sdks/python-sdk.md @@ -5,15 +5,17 @@ 2. [Installation](#installation) 3. [Authentication](#authentication) 4. [Core Classes](#core-classes) -5. [Manager Class](#manager-class) +5. [Organization](#organization) 6. [Sensor Management](#sensor-management) 7. [Detection and Response Rules](#detection-and-response-rules) 8. [Real-time Data Streaming](#real-time-data-streaming) -9. [Artifacts and Payloads](#artifacts-and-payloads) -10. [Event Ingestion](#event-ingestion) -11. [Advanced Features](#advanced-features) -12. [Error Handling](#error-handling) -13. [Complete Examples](#complete-examples) +9. [Artifacts](#artifacts) +10. [Hive Operations](#hive-operations) +11. [Search (LCQL)](#search-lcql) +12. [Extensions](#extensions) +13. [Infrastructure as Code](#infrastructure-as-code) +14. [Error Handling](#error-handling) +15. [Complete Examples](#complete-examples) ## Overview @@ -25,16 +27,17 @@ The LimaCharlie Python SDK provides a comprehensive interface for interacting wi - **Detection & Response**: Create and manage detection rules with automated response actions - **Real-time Streaming**: Receive events, detections, and audit logs in real-time - **Artifact Management**: Collect and manage forensic artifacts -- **Event Processing**: Ingest and query historical events -- **API Integration**: Full access to LimaCharlie REST API endpoints +- **LCQL Queries**: Execute, validate, and save search queries +- **Hive Storage**: Key-value configuration store for rules, secrets, playbooks, and more +- **Infrastructure as Code**: Pull/push org configuration as version-controlled YAML files ### SDK Version -Current version: 4.9.24 +Current version: 5.0.0 ## Installation ### Requirements -- Python 3.6 or higher +- Python 3.9 or higher - pip package manager ### Install via pip @@ -46,17 +49,18 @@ pip install limacharlie ```bash git clone https://github.com/refractionPOINT/python-limacharlie.git cd python-limacharlie -pip install -r requirements.txt -python setup.py install +pip install -e ".[dev]" ``` ### Dependencies -```python -# Core dependencies -pyyaml>=5.1 -requests>=2.20.0 -python-dateutil>=2.8.0 -``` +Core dependencies (automatically installed): + +- `requests` - HTTP client +- `click` - CLI framework +- `pyyaml` - YAML parsing +- `jmespath` - Data filtering +- `rich` - Terminal output formatting +- `cryptography` - Encryption ## Authentication @@ -66,19 +70,29 @@ The LimaCharlie SDK supports multiple authentication methods: 1. **API Key Authentication** (Recommended for automation) 2. **OAuth Authentication** (For user-based access) -3. **JWT Token Authentication** (For temporary access) -4. **Environment-based Authentication** (Using configuration files) +3. **Environment-based Authentication** (Using configuration files) + +### Credential Resolution Order + +Credentials are resolved in priority order (highest first): + +1. Explicit parameters passed to `Client()` +2. `LC_OID`, `LC_API_KEY`, `LC_UID` environment variables +3. Named environment from `LC_CURRENT_ENV` (or 'default') +4. Default credentials in `~/.limacharlie` config file ### API Key Authentication ```python -import limacharlie +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization # Direct API key authentication -manager = limacharlie.Manager( +client = Client( oid='YOUR_ORGANIZATION_ID', # UUID format - secret_api_key='YOUR_API_KEY' # UUID format + api_key='YOUR_API_KEY' # UUID format ) +org = Organization(client) ``` ### Environment-based Authentication @@ -86,13 +100,18 @@ manager = limacharlie.Manager( The SDK can read credentials from environment variables or configuration files: ```python +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization + # Using environment variables # Set: LC_OID="your-org-id" # Set: LC_API_KEY="your-api-key" -manager = limacharlie.Manager() +client = Client() +org = Organization(client) # Using a specific environment from config file -manager = limacharlie.Manager(environment='production') +client = Client(environment='production') +org = Organization(client) ``` ### Configuration File Format @@ -109,340 +128,244 @@ env: production: oid: "12345678-1234-1234-1234-123456789012" api_key: "87654321-4321-4321-4321-210987654321" - + staging: oid: "87654321-4321-4321-4321-210987654321" api_key: "12345678-1234-1234-1234-123456789012" - - dev: - oid: "11111111-1111-1111-1111-111111111111" - api_key: "22222222-2222-2222-2222-222222222222" -``` - -### OAuth Authentication - -```python -# OAuth configuration in ~/.limacharlie -oauth: - client_id: "your-client-id" - client_secret: "your-client-secret" - refresh_token: "your-refresh-token" - id_token: "your-id-token" ``` ### User-based Authentication ```python +from limacharlie.client import Client + # Authenticate as a user instead of organization -manager = limacharlie.Manager( +client = Client( uid='USER_ID', - secret_api_key='USER_API_KEY' + api_key='USER_API_KEY' ) ``` ## Core Classes -### Class Hierarchy +### Architecture ``` limacharlie -├── Manager # Main entry point for API operations -├── Sensor # Individual sensor management -├── Firehose # Real-time data streaming (push) -├── Spout # Real-time data streaming (pull) -├── Configs # Configuration management -├── Payloads # Artifact and payload management -├── Logs/Artifacts # Log and artifact retrieval -├── Hive # Hive data storage -├── Extensions # Add-on extensions -├── Billing # Billing and usage information -├── User # User management -├── UserPreferences # User preferences -├── Webhook # Webhook management -├── WebhookSender # Send data to webhooks -├── Jobs # Background job management -├── Query # Query builder for LCQL -├── Replay # Event replay functionality -├── Search # Search operations -├── SpotCheck # Spot check operations -└── Replicants # Service replicants (external services) +├── client.Client # HTTP client with JWT and retry logic +├── sdk/ +│ ├── organization.Organization # Main entry point for org operations +│ ├── sensor.Sensor # Individual sensor management +│ ├── hive.Hive # Key-value configuration store +│ ├── hive.HiveRecord # Individual hive record +│ ├── dr_rules.DRRules # D&R rule management +│ ├── fp_rules.FPRules # False positive rule management +│ ├── search.Search # LCQL query execution +│ ├── firehose.Firehose # Real-time push streaming +│ ├── spout.Spout # Real-time pull streaming +│ ├── extensions.Extensions # Extension management +│ ├── artifacts.Artifacts # Artifact collection/retrieval +│ ├── payloads.Payloads # Payload management +│ ├── replay.Replay # Rule replay +│ ├── configs.Configs # Infrastructure as Code +│ └── ... # Additional modules +└── errors # Custom exception hierarchy ``` -## Manager Class +### Client -The `Manager` class is the primary entry point for interacting with LimaCharlie. - -### Initialization +The `Client` handles HTTP communication, JWT generation/refresh, retry with exponential backoff, and rate limit awareness. ```python -import limacharlie +from limacharlie.client import Client -# Basic initialization -manager = limacharlie.Manager( - oid='ORG_ID', - secret_api_key='API_KEY' -) +# Basic usage +client = Client(oid='ORG_ID', api_key='API_KEY') -# Advanced initialization with all parameters -manager = limacharlie.Manager( - oid='ORG_ID', # Organization ID (UUID) - secret_api_key='API_KEY', # API Key (UUID) - environment='production', # Named environment from config - inv_id='investigation-123', # Investigation ID for tracking - print_debug_fn=print, # Debug output function - is_interactive=True, # Enable interactive mode - extra_params={'key': 'value'}, # Additional parameters - jwt='existing-jwt-token', # Use existing JWT - uid='USER_ID', # User ID for user-based auth - onRefreshAuth=callback_func, # Callback on auth refresh - isRetryQuotaErrors=True # Auto-retry on quota errors -) +# As context manager +with Client(oid='ORG_ID', api_key='API_KEY') as client: + data = client.request("GET", "sensors") ``` -### Core Manager Methods +## Organization -#### Organization Management +The `Organization` class is the primary entry point for interacting with a LimaCharlie organization. + +### Core Methods ```python +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization + +client = Client(oid='ORG_ID', api_key='API_KEY') +org = Organization(client) + # Get organization information -org_info = manager.getOrgInfo() -# Returns: {'oid': '...', 'name': '...', 'tier': '...', ...} +org_info = org.get_info() +# Returns: {'oid': '...', 'name': '...', ...} # Get organization URLs -urls = manager.getOrgURLs() -# Returns: {'webapp': 'https://...', 'api': 'https://...', ...} +urls = org.get_urls() +# Returns: {'webapp': 'https://...', 'api': 'https://...', 'hooks': '...', ...} -# Get organization configuration -config = manager.getOrgConfig() -# Returns: {'sensor_quota': 100, 'retention': 30, ...} - -# Update organization configuration -manager.setOrgConfig({ - 'sensor_quota': 200, - 'retention': 60 -}) +# Get usage statistics +stats = org.get_stats() -# Get billing information -billing = limacharlie.Billing(manager) -billing_info = billing.getInfo() -``` +# Get organization configuration +value = org.get_config('config_name') -#### Sensor Listing and Search +# Set organization configuration +org.set_config('config_name', 'new_value') -```python -# List all sensors -sensors = manager.sensors() -# Returns: {'sensor-id-1': {...}, 'sensor-id-2': {...}, ...} +# Get organization errors +errors = org.get_errors() -# List sensors with details -detailed_sensors = manager.sensors(with_details=True) +# Dismiss an error +org.dismiss_error('component_name') -# Get sensor count -online_count = manager.getOnlineHosts() -# Returns: 42 +# Get MITRE ATT&CK coverage report +mitre = org.get_mitre_report() -# Search sensors by hostname -matching_sensors = manager.getHostnames(['web-server-*', 'db-*']) -# Returns: {'web-server-01': 'sensor-id-1', ...} +# Get event schemas +schemas = org.get_schemas(platform='windows') +schema = org.get_schema('NEW_PROCESS') -# Get sensors by tag -tagged_sensors = manager.getSensorsWithTag('production') -# Returns: ['sensor-id-1', 'sensor-id-2', ...] - -# Get sensor information -sensor_info = manager.getSensorInfo('SENSOR_ID') -# Returns: {'hostname': '...', 'platform': '...', 'architecture': '...', ...} +# Current identity and permissions +identity = org.who_am_i() ``` -#### Installation Keys +### User Management ```python -# List installation keys -keys = manager.getInstallationKeys() -# Returns: [{'key': '...', 'tags': [...], 'desc': '...', ...}, ...] - -# Create new installation key -new_key = manager.setInstallationKey( - key_name='prod-servers', - tags=['production', 'server'], - desc='Production server deployment key' -) -# Returns: 'INSTALLATION_KEY_STRING' - -# Delete installation key -manager.delInstallationKey('INSTALLATION_KEY_ID') -``` - -#### Outputs Management +# List organization users +users = org.get_users() +# Returns: ['user1@example.com', 'user2@example.com', ...] -```python -# List all outputs -outputs = manager.outputs() -# Returns: {'output1': {...}, 'output2': {...}, ...} - -# Add new output -manager.add_output( - name='siem_output', - module='syslog', - type='detect', - config={ - 'dest_host': 'siem.example.com', - 'dest_port': 514, - 'protocol': 'tcp' - } -) +# Add a user +org.add_user('new_user@example.com') -# Delete output -manager.del_output('siem_output') -``` +# Remove a user +org.remove_user('user@example.com') -#### Artifact Collection +# Get user permissions +permissions = org.get_user_permissions() -```python -# List available artifacts -artifacts = manager.getArtifacts( - start_time=1234567890, # Unix timestamp - end_time=1234567999 -) +# Grant a permission +org.add_user_permission('user@example.com', 'dr.set') -# Get specific artifact -artifact_data = manager.getArtifact('ARTIFACT_ID') +# Revoke a permission +org.remove_user_permission('user@example.com', 'dr.set') -# Get artifact as JSON -artifact_json = manager.getArtifactAsJson('ARTIFACT_ID') +# Set a predefined role (Owner, Administrator, Operator, Viewer, Basic) +org.set_user_role('user@example.com', 'Operator') ``` -#### Service Management +### Organization Lifecycle ```python -# List services -services = manager.getServices() +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization -# Subscribe to a service -manager.subscribeToService('virustotal') +client = Client(uid='USER_ID', api_key='USER_API_KEY') +org = Organization(client) -# Unsubscribe from service -manager.unsubscribeFromService('virustotal') +# List accessible organizations +orgs = org.list_accessible_orgs() -# Get service configuration -service_config = manager.getServiceConfig('virustotal') +# Check name availability +availability = Organization.check_name(client, 'my-new-org') -# Update service configuration -manager.setServiceConfig('virustotal', { - 'api_key': 'YOUR_VT_API_KEY' -}) +# Create a new organization +new_org = Organization.create_org(client, 'my-new-org', location='us') ``` ## Sensor Management The `Sensor` class provides detailed control over individual sensors. -### Creating a Sensor Object +### Listing and Getting Sensors ```python -# Create from Manager -sensor = manager.sensor('SENSOR_ID') +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization -# Direct instantiation -from limacharlie import Sensor -sensor = Sensor(manager, 'SENSOR_ID') +client = Client() +org = Organization(client) -# With detailed info pre-loaded -sensor = Sensor(manager, 'SENSOR_ID', detailedInfo={...}) +# List all sensors (returns generator of dicts) +for sensor_info in org.list_sensors(): + print(sensor_info['sid'], sensor_info.get('hostname')) + +# List with a selector filter +for sensor_info in org.list_sensors(selector='plat == windows'): + print(sensor_info['sid']) + +# List online sensors only +for sensor_info in org.online_sensors(): + print(sensor_info['sid']) + +# Get a specific sensor object +sensor = org.get_sensor('SENSOR_ID') ``` ### Sensor Properties and Methods -#### Getting Sensor Information - ```python -# Check if sensor is online -is_online = sensor.isOnline() -# Returns: True/False +sensor = org.get_sensor('SENSOR_ID') -# Get sensor hostname -hostname = sensor.getHostname() -# Returns: 'web-server-01' +# Get full sensor information +info = sensor.get_info() +# Returns: {'hostname': '...', 'plat': ..., 'arch': ..., ...} -# Get platform information -platform = sensor.getPlatform() -# Returns: 'windows', 'linux', 'macos', etc. +# Check platform +if sensor.is_windows: + print("Windows sensor") +elif sensor.is_linux: + print("Linux sensor") +elif sensor.is_macos: + print("macOS sensor") -# Get architecture -architecture = sensor.getArchitecture() -# Returns: 'x64', 'x86', 'arm64', etc. +# Get hostname +hostname = sensor.hostname -# Get full sensor information -info = sensor.getInfo() -# Returns: {'hostname': '...', 'platform': '...', 'last_seen': ..., ...} +# Check if sensor is online +is_online = sensor.is_online() -# Get sensor tags -tags = sensor.getTags() -# Returns: ['production', 'web-server', ...] +# Wait for sensor to come online (blocking) +came_online = sensor.wait_online(timeout=300) # 5 minutes ``` -#### Sending Tasks to Sensors +### Sending Tasks to Sensors ```python -# Send a single task -response = sensor.task('os_info') +# Send a single task (fire-and-forget) +sensor.task('os_processes') # Send multiple tasks -responses = sensor.task([ - 'os_info', - 'os_packages', - 'os_services' -]) +sensor.task(['os_info', 'os_processes', 'os_services']) # Task with investigation ID -sensor.setInvId('investigation-123') -response = sensor.task('os_processes') - -# Direct task with inv_id -response = sensor.task('os_processes', inv_id='investigation-456') +sensor.task('os_processes', inv_id='investigation-123') ``` -#### Interactive Task Execution +### Tag Management ```python -# Manager must be in interactive mode -manager = limacharlie.Manager( - oid='ORG_ID', - secret_api_key='API_KEY', - is_interactive=True, - inv_id='investigation-123' -) - -sensor = manager.sensor('SENSOR_ID') - -# Request with future results -future = sensor.request('os_info') - -# Wait for results -result = future.getNewResponses(timeout=30) - -# Simple request (blocking) -result = sensor.simpleRequest('os_info', timeout=30) +# Get sensor tags +tags = sensor.get_tags() +# Returns: ['production', 'web-server', ...] -# Request with completion tracking -results = sensor.simpleRequest( - ['os_info', 'os_processes'], - timeout=60, - until_completion=True # Wait for all tasks to complete -) +# Add a tag +sensor.add_tag('critical') -# Request with callback -def process_response(event): - print(f"Received: {event}") +# Add a tag with TTL (auto-removed after N seconds) +sensor.add_tag('investigating', ttl=3600) -sensor.simpleRequest( - 'os_processes', - timeout=30, - until_completion=process_response # Called for each response -) +# Remove a tag +sensor.remove_tag('test') ``` -#### Sensor Isolation +### Network Isolation ```python # Isolate sensor from network @@ -452,1207 +375,526 @@ sensor.isolate() sensor.rejoin() # Check isolation status -is_isolated = sensor.isIsolated() +is_isolated = sensor.is_isolated() ``` -#### Sensor Management +### Sensor Events ```python -# Tag management -sensor.addTag('critical') -sensor.removeTag('test') +import time -# Wait for sensor to come online -came_online = sensor.waitToComeOnline(timeout=300) # 5 minutes -# Returns: True if online, False if timeout +# Get historical events for a sensor +end = int(time.time()) +start = end - 3600 # 1 hour ago -# Delete sensor -sensor.delete() +for event in sensor.get_events(start=start, end=end, event_type='NEW_PROCESS', limit=100): + print(event) -# Drain sensor (mark for deletion after offline) -sensor.drain() +# Get event overview/timeline +overview = sensor.get_overview(start=start, end=end) ``` -### Available Sensor Tasks - -Common tasks that can be sent to sensors: +### Sensor Lifecycle ```python -# System Information -'os_info' # Get OS information -'os_packages' # List installed packages -'os_services' # List running services -'os_processes' # List running processes -'os_autoruns' # List autorun entries -'os_drivers' # List loaded drivers - -# File Operations -'file_info ' # Get file information -'file_get ' # Retrieve file content -'file_del ' # Delete file -'file_mov ' # Move file -'file_hash ' # Get file hash - -# Process Operations -'mem_map ' # Get process memory map -'mem_read
' # Read process memory -'mem_strings ' # Get process strings -'kill ' # Kill process - -# Network Operations -'netstat' # Get network connections -'dns_resolve ' # Resolve DNS - -# Registry Operations (Windows) -'reg_list ' # List registry keys -'reg_get ' # Get registry value - -# History and Forensics -'history_dump' # Dump sensor history -'hidden_module_scan' # Scan for hidden modules -'exec_oob_scan' # Out-of-band executable scan +# Delete sensor permanently +sensor.delete() ``` ## Detection and Response Rules -**Note**: The direct D&R rule methods (`rules()`, `add_rule()`, `del_rule()`, `add_fp()`, `del_fp()`) are deprecated. Please use the Hive accessors instead for managing Detection & Response rules. See the Hive Operations section for details on using the key-value storage system for rule management. +D&R rules can be managed through the Hive system or the `DRRules` convenience class. -## Real-time Data Streaming - -### Firehose (Push-based Streaming) - -The Firehose class receives data pushed from LimaCharlie: +### Using Hive (Recommended) ```python -import limacharlie - -manager = limacharlie.Manager('ORG_ID', 'API_KEY') - -# Create a Firehose for events -firehose = limacharlie.Firehose( - manager, - listen_on='0.0.0.0:4443', # Listen interface and port - data_type='event', # event, detect, or audit - name='my_firehose', # Output name in LC - public_dest='1.2.3.4:4443', # Public IP:port for LC to connect - ssl_cert='/path/to/cert.pem', # Optional SSL cert - ssl_key='/path/to/key.pem', # Optional SSL key - is_parse=True, # Parse JSON automatically - max_buffer=1024, # Max messages to buffer - inv_id='investigation-123', # Filter by investigation ID - tag='production', # Filter by sensor tag - sid='SENSOR_ID' # Filter by sensor ID -) +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.hive import Hive, HiveRecord -# Start receiving data -firehose.start() - -# Process events -while True: - event = firehose.queue.get() - print(f"Received event: {event}") - - # Process event - if event.get('event_type') == 'PROCESS_START': - process_path = event.get('event', {}).get('FILE_PATH') - print(f"Process started: {process_path}") - -# Shutdown -firehose.shutdown() -``` +client = Client() +org = Organization(client) -### Spout (Pull-based Streaming) +# Access D&R rules hive +hive = Hive(org, "dr-general") -The Spout class pulls data from LimaCharlie: +# List all rules +rules = hive.list() +for name, record in rules.items(): + print(name, record.enabled) -```python -# Create a Spout for detections -spout = limacharlie.Spout( - manager, - data_type='detect', # event, detect, audit, or tailored - is_parse=True, # Parse JSON - inv_id='investigation-123', - tag='critical', - cat='malware', # Detection category filter - sid='SENSOR_ID', - is_compressed=True, # Use compression - rel_parent_events=True, # Include parent events - rel_child_events=True # Include child events +# Get a specific rule +record = hive.get("my-detection-rule") +print(record.data) # {'detect': {...}, 'respond': [...]} + +# Create or update a rule +new_rule = HiveRecord( + name="my-new-rule", + data={ + "detect": { + "event": "NEW_PROCESS", + "op": "contains", + "path": "event/COMMAND_LINE", + "value": "mimikatz" + }, + "respond": [ + {"action": "report", "name": "mimikatz-detected"} + ] + } ) +hive.set(new_rule) -# Process detections -for detection in spout: - print(f"Detection: {detection['detect_name']}") - print(f"Sensor: {detection['sid']}") - print(f"Event: {detection['event']}") - - # Take action based on detection - if detection['detect_name'] == 'ransomware_behavior': - sensor = manager.sensor(detection['sid']) - sensor.isolate() - -# Shutdown -spout.shutdown() +# Delete a rule +hive.delete("my-old-rule") ``` -### Tailored Event Streaming +### Using DRRules Convenience Class ```python -# Stream specific event types -spout = limacharlie.Spout( - manager, - data_type='tailored', - event_type=['PROCESS_START', 'NETWORK_CONNECT'], - is_parse=True -) +from limacharlie.sdk.dr_rules import DRRules -for event in spout: - if event['event_type'] == 'PROCESS_START': - print(f"Process: {event['event']['FILE_PATH']}") - elif event['event_type'] == 'NETWORK_CONNECT': - print(f"Connection: {event['event']['DESTINATION']}") -``` +dr = DRRules(org) + +# List rules (general namespace) +rules = dr.list() + +# Get a rule +rule = dr.get("my-rule") + +# Create a rule +dr.create("my-rule", { + "detect": {"event": "NEW_PROCESS", "op": "exists", "path": "event/FILE_PATH"}, + "respond": [{"action": "report", "name": "process-detected"}] +}) -## Artifacts and Payloads +# Delete a rule +dr.delete("my-rule") +``` -### Managing Artifacts +### False Positive Rules ```python -# Initialize Artifacts/Logs manager -artifacts = limacharlie.Artifacts(manager) - -# List artifacts in time range -artifact_list = artifacts.listArtifacts( - start_time=1234567890, - end_time=1234567999, - sid='SENSOR_ID', # Optional: filter by sensor - type='log' # Optional: filter by type -) +from limacharlie.sdk.fp_rules import FPRules -# Get specific artifact -artifact_data = artifacts.getArtifact('ARTIFACT_ID') +fp = FPRules(org) -# Get artifact metadata -metadata = artifacts.getArtifactMetadata('ARTIFACT_ID') +# List FP rules +fp_rules = fp.list() -# Delete artifact -artifacts.deleteArtifact('ARTIFACT_ID') +# Create an FP rule +fp.create("my-fp-rule", { + "op": "is", + "path": "detect/event/FILE_PATH", + "value": "C:\\Windows\\System32\\svchost.exe" +}) ``` -### Payloads Management +### Replay (Testing Rules Against Historical Data) ```python -# Initialize Payloads manager -payloads = limacharlie.Payloads(manager) +from limacharlie.sdk.replay import Replay -# List available payloads -payload_list = payloads.list() +replay = Replay(org) -# Create a new payload -payload_id = payloads.create( - name='custom_script', - data=b'#!/bin/bash\necho "Hello World"', - description='Custom bash script' +# Run a rule against historical data +result = replay.run( + rule_name="my-rule", + start=1700000000, + end=1700100000, ) -# Get payload -payload_data = payloads.get('PAYLOAD_ID') - -# Delete payload -payloads.delete('PAYLOAD_ID') - -# Use payload in a task -sensor.task(f'run_payload {payload_id}') +# Test a rule against sample events +events = [{"routing": {...}, "event": {...}}] +result = replay.scan_events( + events, + rule_content={"detect": {...}, "respond": [...]}, +) ``` -## Event Ingestion +## Real-time Data Streaming -### Ingesting Custom Events +### Spout (Pull-based Streaming) -```python -# Ingest a single event -manager.ingestEvent({ - 'event_type': 'CUSTOM_EVENT', - 'timestamp': 1234567890, - 'data': { - 'key': 'value', - 'severity': 'high' - } -}) +The `Spout` pulls data from `stream.limacharlie.io` over HTTPS. Works through NATs and proxies. Best for short-term ad-hoc streaming. -# Batch ingestion -events = [ - { - 'event_type': 'CUSTOM_LOGIN', - 'timestamp': 1234567890, - 'user': 'admin', - 'source_ip': '192.168.1.100' - }, - { - 'event_type': 'CUSTOM_FILE_ACCESS', - 'timestamp': 1234567891, - 'file': '/etc/passwd', - 'user': 'admin' - } -] +```python +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.spout import Spout -for event in events: - manager.ingestEvent(event) -``` +client = Client() +org = Organization(client) -### Ingesting Third-party Logs +# Stream events +spout = Spout(org, "my-spout", data_type="event", tag="production") -```python -# Ingest syslog events -def ingest_syslog(syslog_line): - # Parse syslog format - parsed = parse_syslog(syslog_line) - - manager.ingestEvent({ - 'event_type': 'SYSLOG', - 'timestamp': parsed['timestamp'], - 'hostname': parsed['hostname'], - 'facility': parsed['facility'], - 'severity': parsed['severity'], - 'message': parsed['message'] - }) - -# Ingest Windows Event Logs -def ingest_windows_event(event_xml): - # Parse Windows Event XML - parsed = parse_windows_event(event_xml) - - manager.ingestEvent({ - 'event_type': 'WINDOWS_EVENT', - 'timestamp': parsed['TimeCreated'], - 'event_id': parsed['EventID'], - 'provider': parsed['Provider'], - 'level': parsed['Level'], - 'data': parsed['EventData'] - }) +try: + while True: + data = spout.pull(timeout=5) + if data is not None: + print(data) +finally: + spout.shutdown() ``` -## Advanced Features - -### Hive Operations +### Firehose (Push-based Streaming) -The Hive is LimaCharlie's key-value storage system: +The `Firehose` creates a TLS server that LimaCharlie connects to and pushes data. Best for large-scale, long-running streaming. ```python -# Initialize Hive -hive = limacharlie.Hive(manager) +from limacharlie.sdk.firehose import Firehose -# Store data -hive_record = limacharlie.HiveRecord( - hive_name='threat_intel', - key='malware_hash_123', - data={ - 'hash': 'abc123...', - 'type': 'ransomware', - 'first_seen': 1234567890 - }, - ttl=86400 # TTL in seconds +# Create a firehose listener +fh = Firehose( + org, + listen_on="0.0.0.0:4443", + data_type="event", + name="my-firehose", + public_dest="1.2.3.4:4443", + max_buffer=1024, + tag="production", ) -hive.set(hive_record) - -# Get data -record = hive.get('threat_intel', 'malware_hash_123') -print(record.data) - -# List keys -keys = hive.list('threat_intel') -# Delete data -hive.delete('threat_intel', 'malware_hash_123') - -# Bulk operations -records = [ - limacharlie.HiveRecord('intel', f'hash_{i}', {'value': i}) - for i in range(100) -] -hive.setBulk(records) +try: + while True: + data = fh.get(timeout=5) + if data is not None: + print(data) +finally: + fh.shutdown() ``` -### Query Language (LCQL) +## Artifacts ```python -from limacharlie import Query +from limacharlie.sdk.artifacts import Artifacts -# Build queries programmatically -query = Query() +artifacts = Artifacts(org) -# Simple query -results = manager.query( - query='event_type == "PROCESS_START" AND event.FILE_PATH contains "powershell"', - limit=100 -) +# List artifacts +for artifact in artifacts.list(sensor='SENSOR_ID', start=start_time, end=end_time): + print(artifact) -# Complex query with time range -results = manager.query( - query=''' - event_type == "NETWORK_CONNECT" - AND event.DESTINATION contains "suspicious.com" - AND timestamp > now() - 86400 - ''', - start_time=time.time() - 86400, - end_time=time.time(), - limit=1000 -) +# Get download URL for an artifact +url = artifacts.get_download_url('ARTIFACT_ID') -# Query with aggregation -results = manager.query( - query='event_type == "PROCESS_START"', - select=['event.FILE_PATH', 'count()'], - group_by=['event.FILE_PATH'], - order_by='count() DESC' -) +# Download an artifact to a local file +artifacts.download('ARTIFACT_ID', '/path/to/output') ``` -### Replay Functionality +## Hive Operations + +The Hive is LimaCharlie's key-value storage system used for D&R rules, secrets, playbooks, SOPs, lookups, and more. ```python -# Initialize Replay -replay = limacharlie.Replay(manager) - -# Start replay of historical events -replay.start( - start_time=time.time() - 3600, # 1 hour ago - end_time=time.time(), - filters={ - 'event_type': ['PROCESS_START', 'FILE_CREATE'], - 'sid': 'SENSOR_ID' - }, - rules=['suspicious_process', 'ransomware_behavior'] # Test specific rules -) +from limacharlie.sdk.hive import Hive, HiveRecord -# Get replay results -results = replay.getResults() -for result in results: - print(f"Rule triggered: {result['rule_name']}") - print(f"Event: {result['event']}") -``` +# Access a hive (e.g., secrets) +hive = Hive(org, "secret") -### Jobs Management +# List all records +records = hive.list() +for name, record in records.items(): + print(name, record.data) -```python -# Create a job -job = limacharlie.Job(manager) - -job_id = job.create( - name='daily_scan', - schedule='0 2 * * *', # Cron format - task='full_scan', - sensors=['SENSOR_ID_1', 'SENSOR_ID_2'] +# Get a specific record +record = hive.get("my-secret") +print(record.data) + +# Get metadata only (without data payload) +metadata = hive.get_metadata("my-secret") + +# Create or update a record +new_record = HiveRecord( + name="my-secret", + data={"secret": "my-secret-value"}, + enabled=True, + tags=["automation"], + comment="API key for external service", ) +hive.set(new_record) -# List jobs -jobs = job.list() +# Set expiry on a record +hive.expire("temp-record", expiry=1700000000) -# Get job status -status = job.getStatus('JOB_ID') +# Enable/disable a record +hive.set_enabled("my-rule", enabled=False) -# Cancel job -job.cancel('JOB_ID') +# Delete a record +hive.delete("old-record") ``` -### Extensions Management +## Search (LCQL) ```python -# Initialize Extensions -extensions = limacharlie.Extension(manager) - -# List available extensions -available = extensions.list() +from limacharlie.sdk.search import Search +import time -# Install extension -extensions.install('velociraptor') +search = Search(org) + +end = int(time.time()) +start = end - 3600 # 1 hour ago + +# Execute a query (returns generator) +for result in search.execute( + query="event.FILE_PATH ends with .exe", + start_time=start, + end_time=end, + stream="event", + limit=100, +): + print(result) + +# Validate a query +validation = search.validate( + query="event.FILE_PATH ends with .exe", + start_time=start, + end_time=end, + stream="event", +) -# Configure extension -extensions.configure('velociraptor', { - 'api_key': 'YOUR_API_KEY', - 'server_url': 'https://velociraptor.example.com' -}) +# Estimate query cost +estimate = search.estimate( + query="event.FILE_PATH ends with .exe", + start_time=start, + end_time=end, + stream="event", +) -# Uninstall extension -extensions.uninstall('velociraptor') +# Saved queries +search.save_query("my-query", "event.FILE_PATH ends with .exe", description="Find executables") +saved = search.list_saved_queries() +search.delete_saved_query("my-query") ``` +## Extensions -## Error Handling +```python +from limacharlie.sdk.extensions import Extensions -### Exception Types +ext = Extensions(org) -```python -from limacharlie.utils import LcApiException +# List subscribed extensions +subscribed = ext.list_subscribed() -try: - manager = limacharlie.Manager('INVALID_OID', 'INVALID_KEY') -except LcApiException as e: - print(f"API Error: {e}") - # Handle authentication error +# List all available extensions +available = ext.get_all() -try: - sensor = manager.sensor('INVALID_SENSOR_ID') - sensor.task('os_info') -except LcApiException as e: - if 'not found' in str(e): - print("Sensor not found") - elif 'offline' in str(e): - print("Sensor is offline") - else: - print(f"Unexpected error: {e}") -``` +# Subscribe to an extension +ext.subscribe('ext-reliable-tasking') -### Retry Logic +# Unsubscribe from an extension +ext.unsubscribe('ext-reliable-tasking') -```python -import time -from limacharlie.utils import LcApiException - -def retry_operation(func, max_retries=3, delay=1): - """Retry an operation with exponential backoff""" - for attempt in range(max_retries): - try: - return func() - except LcApiException as e: - if 'rate limit' in str(e).lower(): - wait_time = delay * (2 ** attempt) - print(f"Rate limited, waiting {wait_time}s...") - time.sleep(wait_time) - else: - raise - raise Exception(f"Failed after {max_retries} attempts") - -# Usage -result = retry_operation(lambda: manager.sensors()) +# Get extension schema +schema = ext.get_schema('ext-reliable-tasking') + +# Make a request to an extension +response = ext.request( + extension_name='ext-reliable-tasking', + action='task', + data={ + 'task': 'os_version', + 'selector': 'plat == windows', + 'ttl': 3600, + }, +) ``` -### Handling Quota Errors +## Infrastructure as Code ```python -# Enable automatic retry on quota errors -manager = limacharlie.Manager( - oid='ORG_ID', - secret_api_key='API_KEY', - isRetryQuotaErrors=True # Auto-retry on HTTP 429 -) +from limacharlie.sdk.configs import Configs -# Manual handling -try: - sensors = manager.sensors() -except LcApiException as e: - if e.http_code == 429: # Too Many Requests - retry_after = e.headers.get('Retry-After', 60) - print(f"Quota exceeded, retry after {retry_after}s") - time.sleep(int(retry_after)) - sensors = manager.sensors() # Retry -``` +configs = Configs(org) -## Complete Examples +# Pull organization configuration to a local file +configs.pull(config_file='lc_conf.yaml') -### Example 1: Automated Threat Hunting +# Push configuration from a local file +configs.push(config_file='lc_conf.yaml', dry_run=True) -```python -import limacharlie -import time -from datetime import datetime, timedelta - -class ThreatHunter: - def __init__(self, org_id, api_key): - self.manager = limacharlie.Manager(org_id, api_key) - self.suspicious_processes = [ - 'mimikatz.exe', - 'lazagne.exe', - 'pwdump.exe', - 'procdump.exe' - ] - - def hunt_suspicious_processes(self): - """Hunt for suspicious processes across all sensors""" - print("Starting threat hunt...") - - # Get all online sensors - sensors = self.manager.sensors() - online_sensors = [ - sid for sid, info in sensors.items() - if info.get('online', False) - ] - - print(f"Found {len(online_sensors)} online sensors") - - findings = [] - - for sid in online_sensors: - sensor = self.manager.sensor(sid) - hostname = sensor.getHostname() - - print(f"Scanning {hostname}...") - - # Get running processes - try: - result = sensor.simpleRequest('os_processes', timeout=30) - if result and 'processes' in result: - for process in result['processes']: - process_name = process.get('name', '').lower() - for suspicious in self.suspicious_processes: - if suspicious.lower() in process_name: - findings.append({ - 'sensor': sid, - 'hostname': hostname, - 'process': process_name, - 'pid': process.get('pid'), - 'path': process.get('path') - }) - print(f" [!] Found suspicious process: {process_name}") - except Exception as e: - print(f" Error scanning {hostname}: {e}") - - return findings - - def monitor_detections(self, duration=3600): - """Monitor for detections in real-time""" - print(f"Monitoring detections for {duration} seconds...") - - spout = limacharlie.Spout( - self.manager, - data_type='detect', - is_parse=True - ) - - start_time = time.time() - - try: - for detection in spout: - if time.time() - start_time > duration: - break - - print(f"\n[DETECTION] {detection['detect_name']}") - print(f" Sensor: {detection['hostname']} ({detection['sid']})") - print(f" Time: {datetime.fromtimestamp(detection['ts'])}") - - # Auto-respond to critical detections - if detection.get('priority', 0) >= 4: - sensor = self.manager.sensor(detection['sid']) - print(f" [!] High priority detection - isolating sensor") - sensor.isolate() - - # Collect forensics - sensor.task([ - 'history_dump', - 'os_processes', - 'netstat' - ]) - finally: - spout.shutdown() - - def generate_hunt_report(self, findings): - """Generate a hunting report""" - report = { - 'timestamp': datetime.now().isoformat(), - 'total_sensors': len(self.manager.sensors()), - 'findings_count': len(findings), - 'findings': findings, - 'recommendations': [] - } - - if findings: - report['recommendations'].append( - "Suspicious processes detected - investigate immediately" - ) - report['recommendations'].append( - "Consider isolating affected systems" - ) - report['recommendations'].append( - "Review process execution history on affected systems" - ) - - return report - -# Usage -if __name__ == "__main__": - hunter = ThreatHunter('ORG_ID', 'API_KEY') - - # Hunt for suspicious processes - findings = hunter.hunt_suspicious_processes() - - # Generate report - report = hunter.generate_hunt_report(findings) - print(f"\nHunt Report: {report}") - - # Monitor for new detections - hunter.monitor_detections(duration=3600) +# Push with force (remove resources not in config) +configs.push(config_file='lc_conf.yaml', is_force=True) ``` -### Example 2: Incident Response Automation +## Error Handling -```python -import limacharlie -import json -from datetime import datetime -import hashlib - -class IncidentResponder: - def __init__(self, org_id, api_key): - self.manager = limacharlie.Manager( - org_id, - api_key, - is_interactive=True, - inv_id=f"incident_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - ) - self.artifacts = limacharlie.Artifacts(self.manager) - self.hive = limacharlie.Hive(self.manager) - - def respond_to_incident(self, sensor_id, incident_type): - """Orchestrate incident response""" - print(f"Responding to {incident_type} on sensor {sensor_id}") - - sensor = self.manager.sensor(sensor_id) - hostname = sensor.getHostname() - - # Create incident record - incident = { - 'id': hashlib.md5(f"{sensor_id}{datetime.now()}".encode()).hexdigest(), - 'type': incident_type, - 'sensor': sensor_id, - 'hostname': hostname, - 'timestamp': datetime.now().isoformat(), - 'status': 'investigating', - 'artifacts': [], - 'actions': [] - } - - # Step 1: Isolate if critical - if incident_type in ['ransomware', 'data_theft', 'backdoor']: - print(f" Isolating {hostname}...") - sensor.isolate() - incident['actions'].append({ - 'action': 'isolate', - 'timestamp': datetime.now().isoformat() - }) - - # Step 2: Collect forensics - print(" Collecting forensic data...") - forensics_tasks = [ - 'os_processes', - 'netstat', - 'os_autoruns', - 'history_dump' - ] - - for task in forensics_tasks: - try: - result = sensor.simpleRequest(task, timeout=60) - if result: - # Store in Hive for analysis - self.hive.set(limacharlie.HiveRecord( - hive_name='incidents', - key=f"{incident['id']}_{task}", - data=result, - ttl=2592000 # 30 days - )) - incident['artifacts'].append(task) - except Exception as e: - print(f" Failed to collect {task}: {e}") - - # Step 3: Collect memory dump for critical incidents - if incident_type in ['ransomware', 'backdoor']: - print(" Collecting memory dump...") - try: - sensor.task('os_memory_dump') - incident['actions'].append({ - 'action': 'memory_dump', - 'timestamp': datetime.now().isoformat() - }) - except Exception as e: - print(f" Memory dump failed: {e}") - - # Step 4: Kill malicious processes - if incident_type == 'malware': - self.kill_malicious_processes(sensor) - incident['actions'].append({ - 'action': 'kill_processes', - 'timestamp': datetime.now().isoformat() - }) - - # Step 5: Update incident status - incident['status'] = 'contained' - self.hive.set(limacharlie.HiveRecord( - hive_name='incidents', - key=incident['id'], - data=incident, - ttl=7776000 # 90 days - )) - - return incident - - def kill_malicious_processes(self, sensor): - """Kill known malicious processes""" - malicious_patterns = [ - 'mimikatz', - 'cobalt', - 'empire', - 'meterpreter' - ] - - try: - result = sensor.simpleRequest('os_processes', timeout=30) - if result and 'processes' in result: - for process in result['processes']: - process_name = process.get('name', '').lower() - for pattern in malicious_patterns: - if pattern in process_name: - print(f" Killing {process_name} (PID: {process['pid']})") - sensor.task(f"kill {process['pid']}") - except Exception as e: - print(f" Error killing processes: {e}") - - def analyze_network_connections(self, sensor_id): - """Analyze network connections for IOCs""" - sensor = self.manager.sensor(sensor_id) - - try: - result = sensor.simpleRequest('netstat', timeout=30) - suspicious_connections = [] - - if result and 'connections' in result: - for conn in result['connections']: - # Check for suspicious ports - if conn.get('dst_port') in [4444, 5555, 31337, 1337]: - suspicious_connections.append(conn) - - # Check for known C2 IPs (would come from threat intel) - # This is a placeholder - real implementation would check against threat intel - if conn.get('dst_ip') in ['1.2.3.4', '5.6.7.8']: - suspicious_connections.append(conn) - - return suspicious_connections - except Exception as e: - print(f"Error analyzing network: {e}") - return [] - - def generate_incident_timeline(self, incident_id): - """Generate timeline of incident events""" - timeline = [] - - # Get incident data from Hive - incident = self.hive.get('incidents', incident_id) - - if incident: - # Query events around incident time - results = self.manager.query( - query=f"sid == '{incident.data['sensor']}'", - start_time=datetime.fromisoformat(incident.data['timestamp']).timestamp() - 3600, - end_time=datetime.fromisoformat(incident.data['timestamp']).timestamp() + 3600, - limit=1000 - ) - - for event in results: - timeline.append({ - 'timestamp': event['ts'], - 'event_type': event['event_type'], - 'summary': self.summarize_event(event) - }) - - return sorted(timeline, key=lambda x: x['timestamp']) - - def summarize_event(self, event): - """Create human-readable event summary""" - event_type = event.get('event_type') - - if event_type == 'PROCESS_START': - return f"Process started: {event.get('event', {}).get('FILE_PATH', 'unknown')}" - elif event_type == 'NETWORK_CONNECT': - return f"Network connection to {event.get('event', {}).get('DESTINATION', 'unknown')}" - elif event_type == 'FILE_CREATE': - return f"File created: {event.get('event', {}).get('FILE_PATH', 'unknown')}" - else: - return f"Event: {event_type}" - -# Usage -if __name__ == "__main__": - responder = IncidentResponder('ORG_ID', 'API_KEY') - - # Respond to a ransomware incident - incident = responder.respond_to_incident( - sensor_id='SENSOR_ID', - incident_type='ransomware' - ) - - print(f"\nIncident Response Complete:") - print(json.dumps(incident, indent=2)) - - # Generate timeline - timeline = responder.generate_incident_timeline(incident['id']) - print(f"\nIncident Timeline: {len(timeline)} events") -``` +### Exception Hierarchy -### Example 3: Compliance and Audit Automation +The SDK uses a structured exception hierarchy with actionable suggestions: ```python -import limacharlie -import csv -from datetime import datetime, timedelta -import json - -class ComplianceAuditor: - def __init__(self, org_id, api_key): - self.manager = limacharlie.Manager(org_id, api_key) - self.required_software = { - 'windows': ['Windows Defender', 'BitLocker'], - 'linux': ['auditd', 'aide'], - 'macos': ['XProtect', 'Gatekeeper'] - } - - def audit_sensor_compliance(self, sensor_id): - """Audit individual sensor for compliance""" - sensor = self.manager.sensor(sensor_id) - - compliance_report = { - 'sensor_id': sensor_id, - 'hostname': sensor.getHostname(), - 'platform': sensor.getPlatform(), - 'timestamp': datetime.now().isoformat(), - 'checks': {}, - 'compliant': True - } - - # Check 1: Sensor online status - compliance_report['checks']['sensor_online'] = { - 'status': sensor.isOnline(), - 'required': True - } - if not sensor.isOnline(): - compliance_report['compliant'] = False - - # Check 2: Required software installed - if sensor.isOnline(): - platform = sensor.getPlatform() - required = self.required_software.get(platform, []) - - try: - result = sensor.simpleRequest('os_packages', timeout=60) - installed_packages = [] - - if result and 'packages' in result: - installed_packages = [p.get('name', '') for p in result['packages']] - - for software in required: - found = any(software.lower() in pkg.lower() for pkg in installed_packages) - compliance_report['checks'][f'software_{software}'] = { - 'status': found, - 'required': True - } - if not found: - compliance_report['compliant'] = False - except Exception as e: - compliance_report['checks']['software_audit'] = { - 'status': False, - 'error': str(e) - } - compliance_report['compliant'] = False - - # Check 3: Security patches - if sensor.isOnline(): - try: - # Check for critical patches (platform-specific) - if platform == 'windows': - result = sensor.simpleRequest('os_patches', timeout=60) - if result: - missing_critical = [ - p for p in result.get('missing', []) - if p.get('severity') == 'Critical' - ] - compliance_report['checks']['critical_patches'] = { - 'status': len(missing_critical) == 0, - 'missing_count': len(missing_critical), - 'required': True - } - if missing_critical: - compliance_report['compliant'] = False - except Exception as e: - pass # Not all platforms support patch checking - - return compliance_report - - def audit_organization_compliance(self): - """Audit entire organization for compliance""" - print("Starting organization-wide compliance audit...") - - sensors = self.manager.sensors() - audit_results = [] - - for sid in sensors: - print(f"Auditing sensor {sid}...") - report = self.audit_sensor_compliance(sid) - audit_results.append(report) - - # Generate summary - summary = { - 'timestamp': datetime.now().isoformat(), - 'total_sensors': len(sensors), - 'compliant_sensors': sum(1 for r in audit_results if r['compliant']), - 'non_compliant_sensors': sum(1 for r in audit_results if not r['compliant']), - 'compliance_rate': 0, - 'details': audit_results - } - - if summary['total_sensors'] > 0: - summary['compliance_rate'] = ( - summary['compliant_sensors'] / summary['total_sensors'] * 100 - ) - - return summary - - def generate_audit_log(self, days=30): - """Generate audit log for compliance reporting""" - - end_time = datetime.now() - start_time = end_time - timedelta(days=days) - - # Query audit events - audit_events = self.manager.query( - query='event_type == "AUDIT"', - start_time=start_time.timestamp(), - end_time=end_time.timestamp(), - limit=10000 - ) - - # Process and categorize audit events - audit_log = { - 'period': { - 'start': start_time.isoformat(), - 'end': end_time.isoformat() - }, - 'events': { - 'authentication': [], - 'authorization': [], - 'configuration_changes': [], - 'data_access': [] - }, - 'summary': {} - } - - for event in audit_events: - event_data = event.get('event', {}) - audit_type = event_data.get('type', 'unknown') - - if 'auth' in audit_type.lower(): - audit_log['events']['authentication'].append(event) - elif 'config' in audit_type.lower(): - audit_log['events']['configuration_changes'].append(event) - elif 'access' in audit_type.lower(): - audit_log['events']['data_access'].append(event) - else: - audit_log['events']['authorization'].append(event) - - # Generate summary statistics - audit_log['summary'] = { - 'total_events': len(audit_events), - 'authentication_events': len(audit_log['events']['authentication']), - 'configuration_changes': len(audit_log['events']['configuration_changes']), - 'data_access_events': len(audit_log['events']['data_access']) - } - - return audit_log - - def export_compliance_report(self, audit_results, filename='compliance_report.csv'): - """Export compliance audit results to CSV""" - - with open(filename, 'w', newline='') as csvfile: - fieldnames = [ - 'sensor_id', 'hostname', 'platform', - 'compliant', 'timestamp', 'issues' - ] - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - - writer.writeheader() - for result in audit_results['details']: - issues = [] - for check, status in result['checks'].items(): - if not status.get('status', False): - issues.append(check) - - writer.writerow({ - 'sensor_id': result['sensor_id'], - 'hostname': result['hostname'], - 'platform': result['platform'], - 'compliant': result['compliant'], - 'timestamp': result['timestamp'], - 'issues': ', '.join(issues) - }) - - print(f"Compliance report exported to {filename}") - -# Usage -if __name__ == "__main__": - auditor = ComplianceAuditor('ORG_ID', 'API_KEY') - - # Perform organization-wide audit - audit_results = auditor.audit_organization_compliance() - - print(f"\nCompliance Audit Summary:") - print(f" Total Sensors: {audit_results['total_sensors']}") - print(f" Compliant: {audit_results['compliant_sensors']}") - print(f" Non-Compliant: {audit_results['non_compliant_sensors']}") - print(f" Compliance Rate: {audit_results['compliance_rate']:.1f}%") - - # Export results - auditor.export_compliance_report(audit_results) - - # Generate audit log - audit_log = auditor.generate_audit_log(days=30) - print(f"\nAudit Log Summary (Last 30 days):") - print(json.dumps(audit_log['summary'], indent=2)) +from limacharlie.errors import ( + LimaCharlieError, # Base exception (exit code 1) + AuthenticationError, # Auth failures (exit code 2) + NotFoundError, # Resource not found (exit code 3) + ValidationError, # Input validation (exit code 4) + RateLimitError, # Rate limit hit (exit code 5) + PermissionDeniedError, # Permission denied (exit code 2) + ApiError, # General API errors (exit code 1) + ConfigError, # Configuration errors (exit code 1) +) ``` -## Best Practices +### Usage -### Performance Optimization - -1. **Batch Operations** ```python -# Good: Batch sensor queries -sensors = manager.sensors(with_details=True) +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.errors import ( + AuthenticationError, + NotFoundError, + RateLimitError, + LimaCharlieError, +) -# Bad: Individual queries for each sensor -for sid in sensor_ids: - sensor = manager.sensor(sid) - info = sensor.getInfo() +try: + client = Client(oid='ORG_ID', api_key='API_KEY') + org = Organization(client) + sensor = org.get_sensor('SENSOR_ID') + sensor.task('os_info') +except AuthenticationError as e: + print(f"Auth failed: {e}") + # Suggestion: Run 'limacharlie auth login' to configure credentials +except NotFoundError as e: + print(f"Not found: {e}") +except RateLimitError as e: + print(f"Rate limited, retry after: {e.retry_after}s") +except LimaCharlieError as e: + print(f"Error: {e}") + if e.suggestion: + print(f"Suggestion: {e.suggestion}") ``` -2. **Use Streaming for Large Datasets** -```python -# Good: Stream events -spout = limacharlie.Spout(manager, 'event') -for event in spout: - process_event(event) +### Built-in Retry Logic -# Bad: Query all events at once -events = manager.query('*', limit=1000000) -``` +The `Client` automatically retries on HTTP 429 (rate limit) and 504 (gateway timeout) with exponential backoff. No manual retry logic is needed for transient errors. -3. **Connection Pooling** -```python -# Reuse Manager instance -manager = limacharlie.Manager('ORG_ID', 'API_KEY') -# Use this manager instance throughout your application -``` +## Complete Examples -### Security Best Practices +### Example 1: Automated Sensor Inventory -1. **Credential Management** ```python -# Good: Use environment variables or secure vaults -import os -api_key = os.environ.get('LC_API_KEY') +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization -# Bad: Hardcode credentials -api_key = 'hardcoded-api-key' -``` +client = Client() +org = Organization(client) -2. **Least Privilege** -```python -# Create API keys with minimal required permissions -# Use read-only keys when write access isn't needed -``` +# Build a sensor inventory +inventory = {} +for sensor_info in org.list_sensors(): + sid = sensor_info['sid'] + sensor = org.get_sensor(sid) + info = sensor.get_info() + tags = sensor.get_tags() -3. **Audit Logging** -```python -# Enable audit logging for all operations -manager = limacharlie.Manager( - oid='ORG_ID', - secret_api_key='API_KEY', - print_debug_fn=audit_logger.log -) + inventory[sid] = { + 'hostname': info.get('hostname', 'unknown'), + 'platform': info.get('plat', 'unknown'), + 'tags': tags, + 'online': sensor.is_online(), + } + +print(f"Total sensors: {len(inventory)}") +for sid, data in inventory.items(): + print(f" {data['hostname']} ({sid[:8]}...) - {data['platform']} - tags: {data['tags']}") ``` -### Error Handling Best Practices +### Example 2: D&R Rule Deployment -1. **Graceful Degradation** ```python -def get_sensor_info_safe(sensor_id): - try: - sensor = manager.sensor(sensor_id) - return sensor.getInfo() - except LcApiException: - return {'error': 'Unable to retrieve sensor info'} -``` +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.hive import Hive, HiveRecord -2. **Retry with Backoff** -```python -@retry(max_attempts=3, backoff_factor=2) -def reliable_task(sensor, command): - return sensor.task(command) -``` +client = Client() +org = Organization(client) +hive = Hive(org, "dr-general") -3. **Timeout Handling** -```python -# Always set reasonable timeouts -result = sensor.simpleRequest('os_processes', timeout=60) +# Deploy a detection rule +rule = HiveRecord( + name="detect-mimikatz", + data={ + "detect": { + "op": "and", + "event": "NEW_PROCESS", + "rules": [ + {"op": "is windows"}, + { + "op": "contains", + "path": "event/COMMAND_LINE", + "value": "mimikatz", + "case sensitive": False, + } + ] + }, + "respond": [ + {"action": "report", "name": "mimikatz-detected"}, + {"action": "task", "command": "history_dump"}, + ] + }, + enabled=True, + tags=["threat-hunting"], + comment="Detect mimikatz execution", +) +hive.set(rule) +print("Rule deployed successfully") ``` -## Troubleshooting - -### Common Issues and Solutions +### Example 3: Real-time Detection Monitoring -1. **Authentication Failures** ```python -# Check API key format -assert len(api_key) == 36 # UUID format -assert api_key.count('-') == 4 +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.spout import Spout -# Verify organization ID -assert len(oid) == 36 # UUID format -``` - -2. **Sensor Offline** -```python -# Wait for sensor to come online -if not sensor.isOnline(): - came_online = sensor.waitToComeOnline(timeout=300) - if not came_online: - print("Sensor did not come online") -``` +client = Client() +org = Organization(client) -3. **Rate Limiting** -```python -# Handle rate limits -manager = limacharlie.Manager( - oid='ORG_ID', - secret_api_key='API_KEY', - isRetryQuotaErrors=True -) -``` +# Stream detections in real-time +spout = Spout(org, "detection-monitor", data_type="detect") -4. **Network Issues** -```python -# Check connectivity -import requests try: - response = requests.get('https://api.limacharlie.io/health') - assert response.status_code == 200 -except: - print("Cannot reach LimaCharlie API") + while True: + detection = spout.pull(timeout=10) + if detection is not None: + routing = detection.get('routing', {}) + print(f"Detection: {detection.get('cat', 'unknown')}") + print(f" Sensor: {routing.get('hostname', 'unknown')} ({routing.get('sid', 'unknown')})") + + # Auto-isolate on critical detections + if 'ransomware' in detection.get('cat', '').lower(): + sid = routing.get('sid') + if sid: + sensor = org.get_sensor(sid) + sensor.isolate() + print(f" [!] Sensor isolated due to ransomware detection") +finally: + spout.shutdown() ``` -## API Reference Links +### Example 4: Extension Request (Playbook Execution) -- [LimaCharlie REST API Documentation](https://api.limacharlie.io/openapi) -- [LimaCharlie Python SDK GitHub](https://github.com/refractionPOINT/python-limacharlie) -- [LimaCharlie Documentation](https://docs.limacharlie.io) -- [LimaCharlie Web Console](https://app.limacharlie.io) - -## Support - -For SDK issues or questions: -- GitHub Issues: https://github.com/refractionPOINT/python-limacharlie/issues -- LimaCharlie Support: support@limacharlie.io -- Community Slack: https://slack.limacharlie.io +```python +from limacharlie.client import Client +from limacharlie.sdk.organization import Organization +from limacharlie.sdk.extensions import Extensions ---- +client = Client() +org = Organization(client) +ext = Extensions(org) -## See Also +# Trigger a playbook via the ext-playbook extension +response = ext.request("ext-playbook", "run_playbook", { + "name": "my-playbook", + "credentials": "hive://secret/my-api-key", + "data": { + "target_sensor": "SENSOR_ID", + "action": "investigate", + } +}) -- [SDK Overview](index.md) -- [Go SDK](go-sdk.md) -- [API Keys](../../7-administration/access/api-keys.md) +print(response) +``` From 133e3a64c766ee747a5336b94c1163c41dbf9b8e Mon Sep 17 00:00:00 2001 From: Maxime Lamothe-Brassard Date: Fri, 27 Feb 2026 07:46:52 -0800 Subject: [PATCH 2/2] docs: fix incorrect SDK method calls found during review Replace non-existent `org.get_sensor(sid)` with correct `Sensor(org, sid)` constructor across all files. Also remove non-existent `hive.expire()`, `hive.set_enabled()`, `search.save_query()` methods and replace with correct Hive-based patterns. Fix `org.online_sensors()` to the actual `org.get_online_sensors()` method. Co-Authored-By: Claude Opus 4.6 --- .../endpoint-agent/uninstallation.md | 3 +- .../troubleshooting/non-responding-sensors.md | 2 +- .../extensions/labs/playbook.md | 7 +++- docs/6-developer-guide/sdk-overview.md | 4 +- docs/6-developer-guide/sdks/index.md | 3 +- docs/6-developer-guide/sdks/python-sdk.md | 41 +++++++++++-------- 6 files changed, 35 insertions(+), 25 deletions(-) diff --git a/docs/2-sensors-deployment/endpoint-agent/uninstallation.md b/docs/2-sensors-deployment/endpoint-agent/uninstallation.md index b8b44e70..c4c30435 100644 --- a/docs/2-sensors-deployment/endpoint-agent/uninstallation.md +++ b/docs/2-sensors-deployment/endpoint-agent/uninstallation.md @@ -23,11 +23,12 @@ To run the uninstall command against *all* Sensors, a simple loop with the SDK i ```python from limacharlie.client import Client from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor client = Client() org = Organization(client) for sensor_info in org.list_sensors(): - sensor = org.get_sensor(sensor_info["sid"]) + sensor = Sensor(org, sensor_info["sid"]) sensor.task("uninstall") ``` diff --git a/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md b/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md index 1aaec6ec..b65f5a52 100644 --- a/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md +++ b/docs/2-sensors-deployment/troubleshooting/non-responding-sensors.md @@ -38,7 +38,7 @@ def get_relevant_sensors(sdk: Organization) -> list[Sensor]: sensors = sdk.list_sensors(selector=SENSOR_SELECTOR) relevant_sensors = [] for sensor_info in sensors: - relevant_sensors.append(sdk.get_sensor(sensor_info["sid"])) + relevant_sensors.append(Sensor(sdk, sensor_info["sid"])) return relevant_sensors def playbook(sdk: Organization, data: dict) -> dict | None: diff --git a/docs/5-integrations/extensions/labs/playbook.md b/docs/5-integrations/extensions/labs/playbook.md index 841bc95e..db6237fb 100644 --- a/docs/5-integrations/extensions/labs/playbook.md +++ b/docs/5-integrations/extensions/labs/playbook.md @@ -134,6 +134,8 @@ When a playbook generates a detection, you can customize the detection category The following example checks if a server sensor has missed a check-in and creates a detection with a custom category name: ```python +from limacharlie.sdk.sensor import Sensor + def playbook(sdk, data): if not sdk: return {"error": "LC API key required"} @@ -145,7 +147,7 @@ def playbook(sdk, data): missing_sensors = [] for sensor_info in sdk.list_sensors(): - sensor = sdk.get_sensor(sensor_info["sid"]) + sensor = Sensor(sdk, sensor_info["sid"]) info = sensor.get_info() last_seen = info.get('last_seen', 0) if (current_time - last_seen) > threshold: @@ -222,12 +224,13 @@ hives: my-playbook: data: python: |- + from limacharlie.sdk.sensor import Sensor def playbook(sdk, data): if not sdk: return {"error": "LC API key required to list sensors"} return { "data": { - "sensors": [sdk.get_sensor(s["sid"]).get_info() for s in sdk.list_sensors()] + "sensors": [Sensor(sdk, s["sid"]).get_info() for s in sdk.list_sensors()] } } usr_mtd: diff --git a/docs/6-developer-guide/sdk-overview.md b/docs/6-developer-guide/sdk-overview.md index 3d773a49..60643240 100644 --- a/docs/6-developer-guide/sdk-overview.md +++ b/docs/6-developer-guide/sdk-overview.md @@ -232,7 +232,7 @@ org = Organization(client) all_sensors = list(org.list_sensors()) # Select the first sensor. -sensor = org.get_sensor(all_sensors[0]["sid"]) +sensor = Sensor(org, all_sensors[0]["sid"]) # Tag this sensor with a tag for 10 minutes. sensor.add_tag('suspicious', ttl=60 * 10) @@ -256,7 +256,7 @@ The `Organization` is the main entry point for all org-scoped operations: listin #### Sensor -The `Sensor` object is returned by `org.get_sensor(sid)`. +A `Sensor` is created with `Sensor(org, sid)`. It supports `task`, `hostname`, `add_tag`, `remove_tag`, `get_tags`, `isolate`, `rejoin`, and more. This is the main way to interact with a specific sensor. Import from `limacharlie.sdk.sensor`. diff --git a/docs/6-developer-guide/sdks/index.md b/docs/6-developer-guide/sdks/index.md index ce59cf51..433269bc 100644 --- a/docs/6-developer-guide/sdks/index.md +++ b/docs/6-developer-guide/sdks/index.md @@ -54,6 +54,7 @@ pip install limacharlie ```python from limacharlie.client import Client from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor # Initialize client and organization client = Client(oid='your-org-id', api_key='your-api-key') @@ -61,7 +62,7 @@ org = Organization(client) # List all sensors for sensor_info in org.list_sensors(): - sensor = org.get_sensor(sensor_info["sid"]) + sensor = Sensor(org, sensor_info["sid"]) print(f"Sensor: {sensor.sid} - {sensor.hostname}") ``` diff --git a/docs/6-developer-guide/sdks/python-sdk.md b/docs/6-developer-guide/sdks/python-sdk.md index a245039e..475fb789 100644 --- a/docs/6-developer-guide/sdks/python-sdk.md +++ b/docs/6-developer-guide/sdks/python-sdk.md @@ -288,6 +288,7 @@ The `Sensor` class provides detailed control over individual sensors. ```python from limacharlie.client import Client from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor client = Client() org = Organization(client) @@ -300,18 +301,19 @@ for sensor_info in org.list_sensors(): for sensor_info in org.list_sensors(selector='plat == windows'): print(sensor_info['sid']) -# List online sensors only -for sensor_info in org.online_sensors(): - print(sensor_info['sid']) +# Get list of online sensor IDs +online_sids = org.get_online_sensors() +for sid in online_sids: + print(sid) # Get a specific sensor object -sensor = org.get_sensor('SENSOR_ID') +sensor = Sensor(org, 'SENSOR_ID') ``` ### Sensor Properties and Methods ```python -sensor = org.get_sensor('SENSOR_ID') +sensor = Sensor(org, 'SENSOR_ID') # Get full sensor information info = sensor.get_info() @@ -614,12 +616,6 @@ new_record = HiveRecord( ) hive.set(new_record) -# Set expiry on a record -hive.expire("temp-record", expiry=1700000000) - -# Enable/disable a record -hive.set_enabled("my-rule", enabled=False) - # Delete a record hive.delete("old-record") ``` @@ -661,10 +657,16 @@ estimate = search.estimate( stream="event", ) -# Saved queries -search.save_query("my-query", "event.FILE_PATH ends with .exe", description="Find executables") -saved = search.list_saved_queries() -search.delete_saved_query("my-query") +# Saved queries are managed through Hive +from limacharlie.sdk.hive import Hive, HiveRecord +query_hive = Hive(org, "query") +query_hive.set(HiveRecord( + name="my-query", + data={"query": "event.FILE_PATH ends with .exe", "stream": "event"}, + enabled=True, +)) +saved = query_hive.list() +query_hive.delete("my-query") ``` ## Extensions @@ -742,6 +744,7 @@ from limacharlie.errors import ( ```python from limacharlie.client import Client from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor from limacharlie.errors import ( AuthenticationError, NotFoundError, @@ -752,7 +755,7 @@ from limacharlie.errors import ( try: client = Client(oid='ORG_ID', api_key='API_KEY') org = Organization(client) - sensor = org.get_sensor('SENSOR_ID') + sensor = Sensor(org, 'SENSOR_ID') sensor.task('os_info') except AuthenticationError as e: print(f"Auth failed: {e}") @@ -778,6 +781,7 @@ The `Client` automatically retries on HTTP 429 (rate limit) and 504 (gateway tim ```python from limacharlie.client import Client from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor client = Client() org = Organization(client) @@ -786,7 +790,7 @@ org = Organization(client) inventory = {} for sensor_info in org.list_sensors(): sid = sensor_info['sid'] - sensor = org.get_sensor(sid) + sensor = Sensor(org, sid) info = sensor.get_info() tags = sensor.get_tags() @@ -848,6 +852,7 @@ print("Rule deployed successfully") ```python from limacharlie.client import Client from limacharlie.sdk.organization import Organization +from limacharlie.sdk.sensor import Sensor from limacharlie.sdk.spout import Spout client = Client() @@ -868,7 +873,7 @@ try: if 'ransomware' in detection.get('cat', '').lower(): sid = routing.get('sid') if sid: - sensor = org.get_sensor(sid) + sensor = Sensor(org, sid) sensor.isolate() print(f" [!] Sensor isolated due to ransomware detection") finally: