diff --git a/examples/demo.py b/examples/demo.py new file mode 100644 index 0000000..6cb88fe --- /dev/null +++ b/examples/demo.py @@ -0,0 +1,67 @@ +import logging +import os + +from dotenv import load_dotenv + +from tinygrid import ERCOT, ERCOTAuth, ERCOTAuthConfig +from tinygrid.ercot.archive import ERCOTArchiveBundle + +load_dotenv() + +logging.basicConfig(level=logging.INFO) + +logger = logging.getLogger(__name__) + +DEBUG = False + + +def main(): + auth = ERCOTAuth( + ERCOTAuthConfig( + username=os.getenv("ERCOT_USERNAME"), + password=os.getenv("ERCOT_PASSWORD"), + subscription_key=os.getenv("ERCOT_SUBSCRIPTION_KEY"), + ) + ) + + ercot = ERCOT(auth=auth) + + logger.info("TEST get_60_day_dam_disclosure") + reports = ercot.get_60_day_dam_disclosure("2025-10-30") + for name, df in reports.items(): + logger.debug(f"{name}") + logger.debug(df.head()) + logger.debug("*" * 30) + + logger.info("TEST get_60_day_dam_disclosure") + reports = ercot.get_60_day_sced_disclosure("2025-10-30") + logger.debug(reports) + + start_ts = "2025-12-20" + end_ts = "2025-12-30" + + logger.info("TEST get_system_wide_actuals") + results = ercot.get_system_wide_actuals(start=start_ts, end=end_ts) + logger.info(results.head()) + logger.debug(results) + + logger.info("TEST get_se_load") + results = ercot.get_se_load(start=start_ts, end=end_ts) + logger.debug(results) + + logger.info("TEST get_se_dc_tie_flows") + results = ercot.get_se_dc_tie_flows(start_ts, end_ts) + logger.debug(results) + + bundle_client = ERCOTArchiveBundle(ercot=ercot) + bundles = bundle_client.bundles("NP4-33-CD") + for bundle_link in bundles.links: + logger.debug(f"{bundle_link = }") + logger.debug("*" * 30) + + dfs = bundle_client.one_bundle(bundles.links[0]) + logger.debug(dfs) + + +if __name__ == "__main__": + main() diff --git a/examples/ercot_app.ipynb b/examples/ercot_app.ipynb new file mode 100644 index 0000000..72677d9 --- /dev/null +++ b/examples/ercot_app.ipynb @@ -0,0 +1,119 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 2, + "id": "ad6e4c18", + "metadata": {}, + "outputs": [], + "source": [ + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "e0a56088", + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "\n", + "import pandas as pd\n", + "from dotenv import load_dotenv\n", + "\n", + "from tinygrid import ERCOT, ERCOTAuth, ERCOTAuthConfig, LocationType, Market\n", + "\n", + "# Load environment variables from .env file\n", + "load_dotenv()\n", + "pd.set_option(\"display.max_columns\", 20)\n", + "pd.set_option(\"display.width\", 200)\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b4e2d805", + "metadata": {}, + "outputs": [], + "source": [ + "auth = ERCOTAuth(ERCOTAuthConfig(\n", + " username=os.getenv(\"ERCOT_USERNAME\"),\n", + " password=os.getenv(\"ERCOT_PASSWORD\"),\n", + " subscription_key=os.getenv(\"ERCOT_SUBSCRIPTION_KEY\"),\n", + "))\n", + "\n", + "ercot = ERCOT(auth=auth)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "8f410d19", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'https://api.ercot.com/api/public-reports'" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ercot.base_url" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ea9b771d", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "No archives found for /np3-966-er/60_dam_gen_res_data from 2025-12-30 23:00:00-06:00 to 2025-12-31 23:00:00-06:00\n" + ] + } + ], + "source": [ + "ercot.get_60_day_dam_disclosure(\"2025-11-1\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6d01935c", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.14.0" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/ercot_demo.ipynb b/examples/ercot_demo.ipynb index 895bbc9..3a092b3 100644 --- a/examples/ercot_demo.ipynb +++ b/examples/ercot_demo.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 27, + "execution_count": 51, "metadata": {}, "outputs": [ { @@ -55,7 +55,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 52, "metadata": { "execution": { "iopub.execute_input": "2025-12-28T16:14:56.927060Z", @@ -89,7 +89,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 53, "metadata": { "execution": { "iopub.execute_input": "2025-12-28T16:14:57.854493Z", diff --git a/pyercot/README.md b/pyercot/README.md index ccf7123..8ebf098 100644 --- a/pyercot/README.md +++ b/pyercot/README.md @@ -1,7 +1,9 @@ # pyercot + A client library for accessing ERCOT Public API Client/Developer Documentation ## Usage + First, create a client: ```python @@ -64,6 +66,7 @@ client = AuthenticatedClient( ``` Things to know: + 1. Every path/method combo becomes a Python module with four functions: 1. `sync`: Blocking request that returns parsed data (if successful) or `None` 1. `sync_detailed`: Blocking request that always returns a `Request`, optionally with `parsed` set if the request was successful. @@ -110,13 +113,16 @@ client.set_httpx_client(httpx.Client(base_url="https://api.example.com", proxies ``` ## Building / publishing this package + This project uses [uv](https://github.com/astral-sh/uv) to manage dependencies and packaging. Here are the basics: + 1. Update the metadata in `pyproject.toml` (e.g. authors, version). -2. If you're using a private repository: https://docs.astral.sh/uv/guides/integration/alternative-indexes/ +2. If you're using a private repository: 3. Build a distribution with `uv build`, builds `sdist` and `wheel` by default. 1. Publish the client with `uv publish`, see documentation for publishing to private indexes. If you want to install this client into another project without publishing it (e.g. for development) then: + 1. If that project **is using uv**, you can simply do `uv add ` from that project 1. If that project is not using uv: 1. Build a wheel with `uv build --wheel`. diff --git a/pyproject.toml b/pyproject.toml index 6bb1c2d..e26dddd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "tenacity>=8.0.0", "pandas>=2.0.0", "openpyxl>=3.1.5", + "returns>=0.26.0", ] [project.urls] @@ -107,6 +108,7 @@ dev = [ "kaleido>=1.2.0", "nbformat>=5.10.4", "plotly>=6.5.0", + "ty>=0.0.8", ] [tool.setuptools.packages.find] diff --git a/tests/test_ercot_helpers.py b/tests/test_ercot_helpers.py index b5233c1..bf6ebff 100644 --- a/tests/test_ercot_helpers.py +++ b/tests/test_ercot_helpers.py @@ -422,57 +422,23 @@ def _needs_historical(self, start: pd.Timestamp, market: str) -> bool: # type: archive.fetch_historical.assert_called_once() -def test_get_60_day_dam_disclosure_uses_archive( - monkeypatch: pytest.MonkeyPatch, -) -> None: +def test_get_60_day_dam_disclosure_uses_archive() -> None: client = ERCOT() - archive = MagicMock() archive.fetch_historical.return_value = pd.DataFrame( {"DeliveryDate": ["2024-01-01"]} ) - monkeypatch.setattr(client, "_get_archive", lambda: archive) - - for method_name in [ - "get_dam_gen_res_as_offers", - "get_dam_load_res_data", - "get_dam_load_res_as_offers", - "get_dam_energy_only_offers", - "get_dam_energy_only_offer_awards", - "get_dam_energy_bids", - "get_dam_energy_bid_awards", - "get_dam_ptp_obl_bids", - "get_dam_ptp_obl_bid_awards", - "get_dam_ptp_obl_opt", - "get_dam_ptp_obl_opt_awards", - ]: - monkeypatch.setattr( - client, method_name, lambda **kwargs: pd.DataFrame({"dummy": [1]}) - ) - - reports = client.get_60_day_dam_disclosure("today") - - assert "dam_gen_resource" in reports + client.get_60_day_dam_disclosure() archive.fetch_historical.assert_called_once() -def test_get_60_day_sced_disclosure(monkeypatch: pytest.MonkeyPatch) -> None: +def test_get_60_day_sced_disclosure() -> None: client = ERCOT() - archive = MagicMock() - archive.fetch_historical.return_value = pd.DataFrame( + documents = MagicMock() + documents.get_60d_sced_disclosure.return_value = pd.DataFrame( {"DeliveryDate": ["2024-01-01"]} ) - monkeypatch.setattr(client, "_get_archive", lambda: archive) - - monkeypatch.setattr( - client, "get_sced_gen_res_data", lambda **kwargs: pd.DataFrame({"a": [1]}) - ) - monkeypatch.setattr( - client, "get_load_res_data_in_sced", lambda **kwargs: pd.DataFrame({"b": [2]}) - ) - reports = client.get_60_day_sced_disclosure("today") - assert "sced_smne" in reports - archive.fetch_historical.assert_called_once() + documents._get_document.assert_called_once() diff --git a/tinygrid/ercot/__init__.py b/tinygrid/ercot/__init__.py index 4a64af9..1a74065 100644 --- a/tinygrid/ercot/__init__.py +++ b/tinygrid/ercot/__init__.py @@ -177,7 +177,7 @@ class ERCOT( ERCOTDocumentsMixin, ERCOTDashboardMixin, ): - """ERCOT (Electric Reliability Council of Texas) SDK client. + """ERCOT (Electricity Reliability Council of Texas) SDK client. Provides a clean, intuitive interface for accessing ERCOT grid data without needing to know about endpoint paths, API categories, or client lifecycle management. diff --git a/tinygrid/ercot/api.py b/tinygrid/ercot/api.py index 199640e..72adb45 100644 --- a/tinygrid/ercot/api.py +++ b/tinygrid/ercot/api.py @@ -610,7 +610,7 @@ def get_solar_forecast( # System-Wide Generation and Transmission Data # ============================================================================ - def get_dc_tie_flows( + def get_se_dc_tie_flows( self, start: str | pd.Timestamp = "today", end: str | pd.Timestamp | None = None, @@ -640,27 +640,23 @@ def get_dc_tie_flows( dc_ties = ercot.get_dc_tie_flows(start="2024-01-15") ``` """ - start_ts, end_ts = parse_date_range(start, end) - - # DC tie data via archive API - df = self._get_archive().fetch_historical( - endpoint="/np6-626-cd/dc_tie", - start=start_ts, - end=end_ts, - ) - df = filter_by_date(df, start_ts, end_ts) - return standardize_columns(df) + start_ts, end_ts = parse_date_range(start, end) + date_column = "TAGCLAST_TIME" + df = self.get_state_estimator_dc_ties_flows_report(start_ts, end_ts) + df.sort_values(date_column, ascending=True, inplace=True) + return filter_by_date(df, start_ts, end_ts, date_column=date_column) - def get_total_generation( + def get_se_load( self, start: str | pd.Timestamp = "today", end: str | pd.Timestamp | None = None, ) -> pd.DataFrame: """Get total ERCOT system generation. - Provides the total MW generation across the ERCOT system from - the state estimator. + The aggregate generation of real and reactive power + (MW and MVAR) for the entire ERCOT system. + EMIL ID: NP6-625-CD @@ -682,26 +678,20 @@ def get_total_generation( ``` """ start_ts, end_ts = parse_date_range(start, end) - - # Total generation via archive API - df = self._get_archive().fetch_historical( - endpoint="/np6-625-cd/se_totalgen", - start=start_ts, - end=end_ts, - ) - - df = filter_by_date(df, start_ts, end_ts) - return standardize_columns(df) + date_column = "SE_EXE_TIME" + df = self.get_state_estimator_load_report(start_ts, end_ts) + df.sort_values(date_column, ascending=True, inplace=True) + return filter_by_date(df, start_ts, end_ts, date_column=date_column) def get_system_wide_actuals( self, start: str | pd.Timestamp = "today", end: str | pd.Timestamp | None = None, ) -> pd.DataFrame: - """Get system-wide actual values per SCED interval. + """Aggregated ERCOT system actual demand by 15-minute interval, + for the most recent hour. - Provides actual system-wide metrics from each SCED execution - including load, generation, and reserves. + Note: ERCOT only retains 7-days worth of data on this endpoint EMIL ID: NP6-235-CD @@ -714,7 +704,7 @@ def get_system_wide_actuals( Note: This data is only available through the archive API for historical - dates. Real-time access may require pyercot updates. + dates. Example: ```python @@ -723,16 +713,10 @@ def get_system_wide_actuals( ``` """ start_ts, end_ts = parse_date_range(start, end) - - # System-wide actuals via archive API - df = self._get_archive().fetch_historical( - endpoint="/np6-235-cd/sys_wide_actuals", - start=start_ts, - end=end_ts, - ) - - df = filter_by_date(df, start_ts, end_ts) - return standardize_columns(df) + df = self.get_system_wide_actuals_docs(start_ts, end_ts) + df = filter_by_date(df, start_ts, end_ts, date_column="DeliveryDate") + df.sort_values("TimeEnding", ascending=True, inplace=True) + return df # ============================================================================ # 60-Day Disclosure Reports @@ -747,20 +731,6 @@ def get_60_day_dam_disclosure( ERCOT publishes these reports with a 60-day delay. This method automatically adjusts the date to fetch the correct historical data. - Returns a dictionary containing multiple DataFrames: - - dam_gen_resource: Generation resource data - - dam_gen_resource_as_offers: Generation resource AS offers - - dam_load_resource: Load resource data - - dam_load_resource_as_offers: Load resource AS offers - - dam_energy_only_offers: Energy-only offers - - dam_energy_only_offer_awards: Energy-only offer awards - - dam_energy_bids: Energy bids - - dam_energy_bid_awards: Energy bid awards - - dam_ptp_obligation_bids: PTP obligation bids - - dam_ptp_obligation_bid_awards: PTP obligation bid awards - - dam_ptp_obligation_options: PTP obligation options - - dam_ptp_obligation_option_awards: PTP obligation option awards - Args: date: Date to fetch disclosure for (data is 60 days delayed) @@ -788,28 +758,18 @@ def get_60_day_dam_disclosure( archive = self._get_archive() # Fetch from archive - df = archive.fetch_historical( + archives: dict[str, pd.DataFrame] = archive.fetch_historical( endpoint="/np3-966-er/60_dam_gen_res_data", start=report_date, end=end_date, ) - # For now, return a single DataFrame - # Full implementation would parse the zip and extract multiple files - return { - "dam_gen_resource": df, - "dam_gen_resource_as_offers": self.get_dam_gen_res_as_offers(), - "dam_load_resource": self.get_dam_load_res_data(), - "dam_load_resource_as_offers": self.get_dam_load_res_as_offers(), - "dam_energy_only_offers": self.get_dam_energy_only_offers(), - "dam_energy_only_offer_awards": self.get_dam_energy_only_offer_awards(), - "dam_energy_bids": self.get_dam_energy_bids(), - "dam_energy_bid_awards": self.get_dam_energy_bid_awards(), - "dam_ptp_obligation_bids": self.get_dam_ptp_obl_bids(), - "dam_ptp_obligation_bid_awards": self.get_dam_ptp_obl_bid_awards(), - "dam_ptp_obligation_options": self.get_dam_ptp_obl_opt(), - "dam_ptp_obligation_option_awards": self.get_dam_ptp_obl_opt_awards(), - } + results = {} + for filename, df in archives.items(): + new_key = f"60d_dam_{filename.lower().split('_')[2].split('-')[0]}" + results[new_key] = df + + return results def get_60_day_sced_disclosure( self, @@ -820,10 +780,20 @@ def get_60_day_sced_disclosure( ERCOT publishes these reports with a 60-day delay. This method automatically adjusts the date to fetch the correct historical data. - Returns a dictionary containing: - - sced_gen_resource: SCED generation resource data - - sced_load_resource: SCED load resource data - - sced_smne: SCED SMNE generation resource data + This report will contain all 60 day disclosure data related to SCED. + + The following individual files are included in the report: + - 60d_HDL_LDL_ManOverride + - 60d_Load_Resource_Data_in_SCED + - 60d_SCED_DSR_Load_Data + - 60d_SCED_EOC_Updates_in_OpHour + - 60d_SCED_Gen_Resource_Data + - 60d_SCED_QSE_Self_Arranged + - 60d_SCED_SMNE_GEN_RES. + + + Returns a dictionary containing lower case keys of the report names + i.e.) 60d_hdl_ldl_man_override Args: date: Date to fetch disclosure for (data is 60 days delayed) @@ -837,28 +807,7 @@ def get_60_day_sced_disclosure( # Get SCED disclosure reports = ercot.get_60_day_sced_disclosure("2024-01-15") - - # Access specific reports - gen_data = reports["sced_gen_resource"] ``` """ date_ts = parse_date(date) - - # Data is published 60 days after the operating day - report_date = date_ts + pd.Timedelta(days=60) - end_date = report_date + pd.Timedelta(days=1) - - archive = self._get_archive() - - # Fetch SMNE data from archive - smne_df = archive.fetch_historical( - endpoint="/np3-965-er/60_sced_smne_gen_res", - start=report_date, - end=end_date, - ) - - return { - "sced_gen_resource": self.get_sced_gen_res_data(), - "sced_load_resource": self.get_load_res_data_in_sced(), - "sced_smne": smne_df, - } + return self.get_60d_sced_disclosure(date_ts) diff --git a/tinygrid/ercot/archive.py b/tinygrid/ercot/archive.py index c866ca5..e1ced2c 100644 --- a/tinygrid/ercot/archive.py +++ b/tinygrid/ercot/archive.py @@ -5,16 +5,13 @@ import io import logging from concurrent.futures import ThreadPoolExecutor, as_completed -from dataclasses import dataclass -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING from zipfile import ZipFile -import httpx import pandas as pd from attrs import define, field from ..constants.ercot import PUBLIC_API_BASE_URL -from ..errors import GridAPIError, GridRetryExhaustedError from ..utils.dates import format_api_datetime if TYPE_CHECKING: @@ -29,7 +26,7 @@ DEFAULT_ARCHIVE_PAGE_SIZE = 1000 -@dataclass +@define class ArchiveLink: """Represents a link to an archived document.""" @@ -63,7 +60,7 @@ class ERCOTArchive: ``` """ - client: ERCOT + ercot: ERCOT batch_size: int = field(default=MAX_BATCH_SIZE) max_concurrent: int = field(default=5) timeout: float = field(default=60.0) @@ -98,7 +95,7 @@ def get_archive_links( "page": page, } - response = self._make_request(url, params) + response = self.ercot.make_request(url, params) if page == 1: meta = response.get("_meta", {}) @@ -145,13 +142,17 @@ def bulk_download( url = f"{PUBLIC_API_BASE_URL}/archive/{emil_id}/download" results: list[tuple[io.BytesIO, str] | None] = [None] * len(doc_ids) + logger.debug(f"Fetching from {url = }") + logger.debug(f"Batch size = {self.batch_size}") + # Batch the downloads for batch_start in range(0, len(doc_ids), self.batch_size): batch_end = min(batch_start + self.batch_size, len(doc_ids)) batch = doc_ids[batch_start:batch_end] payload = {"docIds": batch} - response_bytes = self._make_request( + logger.debug(f"{payload = }") + response_bytes = self.ercot.make_request( url, payload, method="POST", parse_json=False ) @@ -182,7 +183,7 @@ def fetch_historical( start: pd.Timestamp, end: pd.Timestamp, add_post_datetime: bool = False, - ) -> pd.DataFrame: + ) -> dict[str, pd.DataFrame]: """Fetch historical data from archive. Combines archive link fetching and bulk download into a single operation. @@ -202,9 +203,11 @@ def fetch_historical( # Get archive links links = self.get_archive_links(emil_id, start, end) + logger.info(f"Fetching {len(links)} links") + if not links: logger.warning(f"No archives found for {endpoint} from {start} to {end}") - return pd.DataFrame() + return {} # Extract doc IDs and bulk download doc_ids = [link.doc_id for link in links] @@ -212,27 +215,47 @@ def fetch_historical( files = self.bulk_download(doc_ids, emil_id) + logger.info(f"Downloaded {len(files)} for {emil_id = }") + # Parse CSVs from zip files - dfs: list[pd.DataFrame] = [] + dfs: dict[str, pd.DataFrame] = {} for bytes_io, filename in files: + logger.debug(f"{filename = }") try: doc_id = filename.split(".")[0] - df = pd.read_csv(bytes_io, compression="zip") - - if add_post_datetime and doc_id in post_datetimes: - df["postDatetime"] = post_datetimes[doc_id] - - dfs.append(df) + with ZipFile(bytes_io) as zfile: + if len(zfile.namelist()) > 1: + files = self._extract_multiple_files_from_zip(zfile) + for file, name in files: + logger.debug(f"Extracting {name = }") + df = pd.read_csv(file) + if add_post_datetime and doc_id in post_datetimes: + df["postDatetime"] = post_datetimes[doc_id] + dfs[name] = df + else: + df = pd.read_csv(bytes_io, compression="zip") + if add_post_datetime and doc_id in post_datetimes: + df["postDatetime"] = post_datetimes[doc_id] + dfs["archive"] = df except Exception as e: logger.warning(f"Failed to parse {filename}: {e}") - if not dfs: - return pd.DataFrame() + return dfs - result = pd.concat(dfs, ignore_index=True) - logger.info(f"Fetched {len(result)} records from {len(files)} archives") - - return result + def _extract_multiple_files_from_zip( + self, zfile: ZipFile + ) -> list[tuple[io.BytesIO, str]]: + results = [] + with zfile as zf: + for inner_name in zf.namelist(): + with zf.open(inner_name) as inner_file: + results.append( + ( + io.BytesIO(inner_file.read()), + inner_name, + ) + ) + return results def fetch_historical_parallel( self, @@ -284,69 +307,88 @@ def fetch_historical_parallel( def _download_single(self, link: ArchiveLink) -> pd.DataFrame: """Download a single archive file.""" - response = self._make_request(link.url, parse_json=False) + response = self.ercot.make_request(link.url, parse_json=False) return pd.read_csv(io.BytesIO(response), compression="zip") - def _make_request( - self, - url: str, - params: dict[str, Any] | None = None, - method: str = "GET", - parse_json: bool = True, - ) -> dict[str, Any] | bytes: - """Make an authenticated request to the ERCOT API. - Args: - url: Request URL - params: Query parameters (GET) or body (POST) - method: HTTP method - parse_json: If True, parse response as JSON +@define +class BundleLink: + doc_id: str + publish_date: pd.Timestamp + download_url: str - Returns: - Parsed JSON dict or raw bytes - """ - headers = self._get_auth_headers() - - try: - with httpx.Client(timeout=self.timeout) as http_client: - if method == "POST": - response = http_client.post(url, json=params, headers=headers) - else: - response = http_client.get(url, params=params, headers=headers) - - if response.status_code == 429: - raise GridRetryExhaustedError( - "Rate limited by ERCOT API", - status_code=429, - endpoint=url, - ) - if response.status_code != 200: - raise GridAPIError( - f"ERCOT API returned {response.status_code}", - status_code=response.status_code, - response_body=response.text[:500], - endpoint=url, +@define +class Bundles: + emil_id: str + links: list[BundleLink] + + +@define +class ERCOTArchiveBundle: + """The most effective way to download historic data in large quantities""" + + ercot: ERCOT + + def bundles(self, emil_id: str) -> Bundles: + bundle_url = f"{PUBLIC_API_BASE_URL}/bundle/{emil_id}" + response = self.ercot.make_request(bundle_url, parse_json=True) + bundles = response.get("bundles", []) + links = [] + for bundle in bundles: + doc_id = bundle.get("docId") + post_datetime = bundle.get("postDatetime") + download_url = bundle["_links"]["endpoint"]["href"] + if doc_id is not None and post_datetime is not None and download_url: + links.append( + BundleLink( + doc_id=str(doc_id), + publish_date=post_datetime, + download_url=download_url, ) + ) + return Bundles(emil_id=emil_id, links=links) - if parse_json: - return response.json() - return response.content + def one_bundle(self, bundle_link: BundleLink) -> pd.DataFrame: + """Download a single bundle (zip of zips of CSVs) and extract all CSVs as DataFrames. - except httpx.TimeoutException as e: - raise GridAPIError(f"Request timed out: {e}", endpoint=url) from e - except httpx.RequestError as e: - raise GridAPIError(f"Request failed: {e}", endpoint=url) from e + Args: + bundle_link: BundleLink - def _get_auth_headers(self) -> dict[str, str]: - """Get authentication headers from the client.""" - if self.client.auth is None: - return {} + Returns: + DataFrame containing all rows from all CSV files in all inner zip archives + """ + dataframes: list[pd.DataFrame] = [] + response = self.ercot.make_request(bundle_link.download_url, parse_json=False) + # Unzip the outer zip (contains inner zips) + with ZipFile(io.BytesIO(response)) as zfile: + for inner_zip_name in zfile.namelist(): + with zfile.open(inner_zip_name) as inner_zip_bytes: + with ZipFile(inner_zip_bytes) as inner_zip: + for csv_name in inner_zip.namelist(): + with inner_zip.open(csv_name) as csv_file: + try: + df = pd.read_csv(csv_file) + dataframes.append(df) + except Exception as e: + logger.error( + f"Failed to parse CSV {csv_name} in bundle {bundle_link.doc_id}: {e}" + ) + if not dataframes: + return pd.DataFrame() + return pd.concat(dataframes, ignore_index=True) - token = self.client.auth.get_token() - subscription_key = self.client.auth.get_subscription_key() + def all(self, bundles: Bundles) -> list[pd.DataFrame]: + """Download all bundle links from the given Bundle and extract all CSVs as DataFrames. - return { - "Authorization": f"Bearer {token}", - "Ocp-Apim-Subscription-Key": subscription_key, - } + Args: + bundles: Bundles + + Returns: + List of DataFrames, one per CSV file in all inner zip archives + """ + all_dataframes: list[pd.DataFrame] = [] + for bundle_link in bundles.links: + dfs = self.one(bundle_link) + all_dataframes.extend(dfs) + return all_dataframes diff --git a/tinygrid/ercot/client.py b/tinygrid/ercot/client.py index f730682..e0cb67c 100644 --- a/tinygrid/ercot/client.py +++ b/tinygrid/ercot/client.py @@ -8,6 +8,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import TYPE_CHECKING, Any +import httpx import pandas as pd from attrs import define, field from pyercot.errors import UnexpectedStatus @@ -123,6 +124,7 @@ def _get_client(self) -> ERCOTClient | AuthenticatedClient: Returns: Configured ERCOTClient or AuthenticatedClient instance """ + if self.auth is not None: # Ensure we have a valid token (will refresh if expired) try: @@ -648,7 +650,7 @@ def _get_archive(self) -> ERCOTArchive: if not hasattr(self, "_archive") or self._archive is None: from .archive import ERCOTArchive - self._archive = ERCOTArchive(client=self) + self._archive = ERCOTArchive(ercot=self) return self._archive def _call_endpoint_model( @@ -740,3 +742,58 @@ def _product_history_to_dataframe(self, response: dict[str, Any]) -> pd.DataFram if not archives: return pd.DataFrame() return pd.DataFrame(archives) + + def make_request( + self, + url: str, + params: dict[str, Any] | None = None, + method: str = "GET", + parse_json: bool = True, + ) -> dict[str, Any] | bytes: + """Make an authenticated request to the ERCOT API. + + Args: + url: Request URL + params: Query parameters (GET) or body (POST) + method: HTTP method + parse_json: If True, parse response as JSON + + Returns: + Parsed JSON dict or raw bytes + """ + + try: + client = self._get_client().get_httpx_client() + + request = client.build_request( + method=method, + url=url, + params=params if method == "GET" else None, + json=params if method == "POST" else None, + ) + + response = client.send(request) + + if response.status_code == 429: + raise GridRetryExhaustedError( + "Rate limited by ERCOT API", + status_code=429, + endpoint=url, + ) + + if response.status_code != 200: + raise GridAPIError( + f"ERCOT API returned {response.status_code}", + status_code=response.status_code, + response_body=response.text[:500], + endpoint=url, + ) + + if parse_json: + return response.json() + return response.content + + except httpx.TimeoutException as e: + raise GridAPIError(f"Request timed out: {e}", endpoint=url) from e + except httpx.RequestError as e: + raise GridAPIError(f"Request failed: {e}", endpoint=url) from e diff --git a/tinygrid/ercot/documents.py b/tinygrid/ercot/documents.py index 5810636..95819a7 100644 --- a/tinygrid/ercot/documents.py +++ b/tinygrid/ercot/documents.py @@ -52,6 +52,13 @@ def build_download_url(doc_id: str) -> str: "settlement_points_mapping": 10008, # NP4-160-SG # Load Zone info "load_zone_info": 10000, # NP4-33-CD + # 60-Day SCED Disclosure Reports, + "60d_sced_disclosure_reports": 13052, # NP3-695-ER + # System-Wide Demand + "system_wide_actuals": 12340, + # State Estimator Reports + "state_estimator_load_report": 12358, + "state_estimator_dc_ties_flows_report": 12359, } @@ -158,6 +165,8 @@ def _get_documents( "_": int(pd.Timestamp.now().timestamp() * 1000), # Cache buster } + logger.info(f"{params = }") + try: with httpx.Client(timeout=30.0) as client: response = client.get(MIS_BASE_URL, params=params) @@ -191,30 +200,39 @@ def _get_document( self, report_type_id: int, date: pd.Timestamp | None = None, + max_docs=1000, latest: bool = True, - ) -> Document | None: + keep_all: bool = False, + ) -> Document | None | list[Document]: """Fetch a single document from MIS. Args: report_type_id: The MIS report type ID date: Optional date to filter by latest: If True, return the most recent document + keep_all: If True, return all documents starting at date Returns: - Document object or None if not found + Document object (or list) or None if not found """ + if keep_all and latest: + raise ValueError("latest and keep_all are mutually exclusive, pick one") + documents = self._get_documents( report_type_id=report_type_id, date_from=date, - date_to=date + pd.Timedelta(days=1) if date else None, - max_documents=10, + max_documents=max_docs, ) + logger.info(f"Found {len(documents)} docs for {report_type_id}") + if not documents: return None + if keep_all: + return documents + if latest: - # Return most recent by publish date return max(documents, key=lambda d: d.publish_date) return documents[0] @@ -379,6 +397,129 @@ def get_dam_spp_historical(self, year: int) -> pd.DataFrame: return self.read_doc(target_doc) + def get_60d_sced_disclosure( + self, date: pd.Timestamp | str + ) -> dict[str, pd.DataFrame]: + import zipfile + + doc = self._get_document(REPORT_TYPE_IDS["60d_sced_disclosure_reports"], date) + + logger.info(f"Latest = {doc}") + + if not doc: + logger.warning(f"No get_60d_sced_disclosure found for {date}") + return {} + + try: + with httpx.Client(timeout=120.0) as client: + response = client.get(doc.url) + response.raise_for_status() + content = response.content + except Exception as e: + logger.error(f"Failed to download 60-day SCED disclosures: {e}") + return {} + + # Read all CSV files from the ZIP + results: dict[str, pd.DataFrame] = {} + + try: + with zipfile.ZipFile(io.BytesIO(content)) as zf: + for name in zf.namelist(): + if not name.lower().endswith(".csv"): + logger.warning(f"Skipping non-csv file: {name}") + continue + with zf.open(name) as csv_file: + df = pd.read_csv(csv_file) + results[name.lower()] = df + return results + + except Exception as e: + logger.error("Failed to extract zip file for 60-day SCED Disclosure") + raise e + + def get_system_wide_actuals_docs( + self, start_ts: pd.Timestamp, end_ts: pd.Timestamp + ) -> pd.DataFrame: + logger.info(f"Fetching from {start_ts} to {end_ts}") + docs = self._get_document( + REPORT_TYPE_IDS["system_wide_actuals"], + start_ts, + # TODO: Make this an enum to avoid value errors + keep_all=True, + latest=False, + ) + if not docs or not isinstance(docs, list): + logger.warning("Failed to find docs") + return pd.DataFrame() + logger.debug(f"all docs = {docs}") + keep_docs = [ + doc + for doc in docs + if start_ts <= doc.publish_date < end_ts + and doc.constructed_name.endswith("csv.zip") + ] + logger.info(f"Keeping {len(keep_docs)} documents in date range") + + results = [] + with httpx.Client() as client: + for doc in keep_docs: + resp = client.get(doc.url) + content = resp.content + df = pd.read_csv(io.BytesIO(content), compression="zip") + results.append(df) + + df = pd.concat(results) + return df + + def get_state_estimator_load_report( + self, start_ts: pd.Timestamp, end_ts: pd.Timestamp + ) -> pd.DataFrame: + logger.info(f"Fetching from {start_ts} to {end_ts}") + return self._get_reports( + start_ts, end_ts, REPORT_TYPE_IDS["state_estimator_load_report"] + ) + + def get_state_estimator_dc_ties_flows_report( + self, start_ts: pd.Timestamp, end_ts: pd.Timestamp + ): + logger.info(f"Fetching from {start_ts} to {end_ts}") + return self._get_reports( + start_ts, + end_ts, + REPORT_TYPE_IDS["state_estimator_dc_ties_flows_report"], + ) + + def _get_reports(self, start_ts, end_ts, report_type_id) -> pd.DataFrame: + docs = self._get_document( + report_type_id, + start_ts, + # TODO: Make this an enum to avoid value errors + keep_all=True, + latest=False, + ) + if not docs or not isinstance(docs, list): + logger.warning("Failed to find docs") + return pd.DataFrame() + logger.debug(f"all docs = {docs}") + keep_docs = [ + doc + for doc in docs + if start_ts <= doc.publish_date < end_ts + and doc.constructed_name.endswith("csv.zip") + ] + logger.info(f"Keeping {len(keep_docs)} documents in date range") + + results = [] + with httpx.Client() as client: + for doc in keep_docs: + resp = client.get(doc.url) + content = resp.content + df = pd.read_csv(io.BytesIO(content), compression="zip") + results.append(df) + + df = pd.concat(results) + return df + def get_settlement_point_mapping(self) -> dict[str, pd.DataFrame]: """Get the current settlement point mapping. diff --git a/tinygrid/ercot/transforms.py b/tinygrid/ercot/transforms.py index b7dc49f..c48a01b 100644 --- a/tinygrid/ercot/transforms.py +++ b/tinygrid/ercot/transforms.py @@ -126,7 +126,7 @@ def filter_by_date( actual_col = None for col in [ date_column, - "DeliveryDate", # Historical archive format + "DeliveryDate", "Delivery Date", "Oper Day", "OperDay", diff --git a/uv.lock b/uv.lock index dcd06ea..d756bc7 100644 --- a/uv.lock +++ b/uv.lock @@ -2050,6 +2050,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/8e/67/afbb0978d5399bc9ea200f1d4489a23c9a1dad4eee6376242b8182389c79/respx-0.22.0-py2.py3-none-any.whl", hash = "sha256:631128d4c9aba15e56903fb5f66fb1eff412ce28dd387ca3a81339e52dbd3ad0", size = 25127, upload-time = "2024-12-19T22:33:57.837Z" }, ] +[[package]] +name = "returns" +version = "0.26.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/c2/6dda7ef39464568152e35c766a8b49ab1cdb1b03a5891441a7c2fa40dc61/returns-0.26.0.tar.gz", hash = "sha256:180320e0f6e9ea9845330ccfc020f542330f05b7250941d9b9b7c00203fcc3da", size = 105300, upload-time = "2025-07-24T13:11:21.772Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/57/4d/a7545bf6c62b0dbe5795f22ea9e88cc070fdced5c34663ebc5bed2f610c0/returns-0.26.0-py3-none-any.whl", hash = "sha256:7cae94c730d6c56ffd9d0f583f7a2c0b32cfe17d141837150c8e6cff3eb30d71", size = 160515, upload-time = "2025-07-24T13:11:20.041Z" }, +] + [[package]] name = "rfc3339-validator" version = "0.1.4" @@ -2386,6 +2398,7 @@ dependencies = [ { name = "pandas" }, { name = "pyercot" }, { name = "python-dotenv" }, + { name = "returns" }, { name = "tenacity" }, ] @@ -2408,6 +2421,7 @@ dev = [ { name = "kaleido" }, { name = "nbformat" }, { name = "plotly" }, + { name = "ty" }, ] [package.metadata] @@ -2423,6 +2437,7 @@ requires-dist = [ { name = "pytest-cov", marker = "extra == 'dev'", specifier = ">=4.1.0" }, { name = "python-dotenv", specifier = ">=1.0.0" }, { name = "respx", marker = "extra == 'dev'", specifier = ">=0.21.0" }, + { name = "returns", specifier = ">=0.26.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.1.0" }, { name = "tenacity", specifier = ">=8.0.0" }, ] @@ -2435,6 +2450,7 @@ dev = [ { name = "kaleido", specifier = ">=1.2.0" }, { name = "nbformat", specifier = ">=5.10.4" }, { name = "plotly", specifier = ">=6.5.0" }, + { name = "ty", specifier = ">=0.0.8" }, ] [[package]] @@ -2514,6 +2530,31 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/00/c0/8f5d070730d7836adc9c9b6408dec68c6ced86b304a9b26a14df072a6e8c/traitlets-5.14.3-py3-none-any.whl", hash = "sha256:b74e89e397b1ed28cc831db7aea759ba6640cb3de13090ca145426688ff1ac4f", size = 85359, upload-time = "2024-04-19T11:11:46.763Z" }, ] +[[package]] +name = "ty" +version = "0.0.8" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/9d/59e955cc39206a0d58df5374808785c45ec2a8a2a230eb1638fbb4fe5c5d/ty-0.0.8.tar.gz", hash = "sha256:352ac93d6e0050763be57ad1e02087f454a842887e618ec14ac2103feac48676", size = 4828477, upload-time = "2025-12-29T13:50:07.193Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/69/2b/dd61f7e50a69c72f72c625d026e9ab64a0db62b2dd32e7426b520e2429c6/ty-0.0.8-py3-none-linux_armv6l.whl", hash = "sha256:a289d033c5576fa3b4a582b37d63395edf971cdbf70d2d2e6b8c95638d1a4fcd", size = 9853417, upload-time = "2025-12-29T13:50:08.979Z" }, + { url = "https://files.pythonhosted.org/packages/90/72/3f1d3c64a049a388e199de4493689a51fc6aa5ff9884c03dea52b4966657/ty-0.0.8-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:788ea97dc8153a94e476c4d57b2551a9458f79c187c4aba48fcb81f05372924a", size = 9657890, upload-time = "2025-12-29T13:50:27.867Z" }, + { url = "https://files.pythonhosted.org/packages/71/d1/08ac676bd536de3c2baba0deb60e67b3196683a2fabebfd35659d794b5e9/ty-0.0.8-py3-none-macosx_11_0_arm64.whl", hash = "sha256:1b5f1f3d3e230f35a29e520be7c3d90194a5229f755b721e9092879c00842d31", size = 9180129, upload-time = "2025-12-29T13:50:22.842Z" }, + { url = "https://files.pythonhosted.org/packages/af/93/610000e2cfeea1875900f73a375ba917624b0a008d4b8a6c18c894c8dbbc/ty-0.0.8-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6da9ed377fbbcec0a3b60b2ca5fd30496e15068f47cef2344ba87923e78ba996", size = 9683517, upload-time = "2025-12-29T13:50:18.658Z" }, + { url = "https://files.pythonhosted.org/packages/05/04/bef50ba7d8580b0140be597de5cc0ba9a63abe50d3f65560235f23658762/ty-0.0.8-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7d0a2bdce5e701d19eb8d46d9da0fe31340f079cecb7c438f5ac6897c73fc5ba", size = 9676279, upload-time = "2025-12-29T13:50:25.207Z" }, + { url = "https://files.pythonhosted.org/packages/aa/b9/2aff1ef1f41b25898bc963173ae67fc8f04ca666ac9439a9c4e78d5cc0ff/ty-0.0.8-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ef9078799d26d3cc65366e02392e2b78f64f72911b599e80a8497d2ec3117ddb", size = 10073015, upload-time = "2025-12-29T13:50:35.422Z" }, + { url = "https://files.pythonhosted.org/packages/df/0e/9feb6794b6ff0a157c3e6a8eb6365cbfa3adb9c0f7976e2abdc48615dd72/ty-0.0.8-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:54814ac39b4ab67cf111fc0a236818155cf49828976152378347a7678d30ee89", size = 10961649, upload-time = "2025-12-29T13:49:58.717Z" }, + { url = "https://files.pythonhosted.org/packages/f4/3b/faf7328b14f00408f4f65c9d01efe52e11b9bcc4a79e06187b370457b004/ty-0.0.8-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c4baf0a80398e8b6c68fa36ff85045a50ede1906cd4edb41fb4fab46d471f1d4", size = 10676190, upload-time = "2025-12-29T13:50:01.11Z" }, + { url = "https://files.pythonhosted.org/packages/64/a5/cfeca780de7eeab7852c911c06a84615a174d23e9ae08aae42a645771094/ty-0.0.8-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:ac8e23c3faefc579686799ef1649af8d158653169ad5c3a7df56b152781eeb67", size = 10438641, upload-time = "2025-12-29T13:50:29.664Z" }, + { url = "https://files.pythonhosted.org/packages/0e/8d/8667c7e0ac9f13c461ded487c8d7350f440cd39ba866d0160a8e1b1efd6c/ty-0.0.8-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b558a647a073d0c25540aaa10f8947de826cb8757d034dd61ecf50ab8dbd77bf", size = 10214082, upload-time = "2025-12-29T13:50:31.531Z" }, + { url = "https://files.pythonhosted.org/packages/f8/11/e563229870e2c1d089e7e715c6c3b7605a34436dddf6f58e9205823020c2/ty-0.0.8-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:8c0104327bf480508bd81f320e22074477df159d9eff85207df39e9c62ad5e96", size = 9664364, upload-time = "2025-12-29T13:50:05.443Z" }, + { url = "https://files.pythonhosted.org/packages/b1/ad/05b79b778bf5237bcd7ee08763b226130aa8da872cbb151c8cfa2e886203/ty-0.0.8-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:496f1cb87261dd1a036a5609da80ee13de2e6ee4718a661bfa2afb91352fe528", size = 9679440, upload-time = "2025-12-29T13:50:11.289Z" }, + { url = "https://files.pythonhosted.org/packages/12/b5/23ba887769c4a7b8abfd1b6395947dc3dcc87533fbf86379d3a57f87ae8f/ty-0.0.8-py3-none-musllinux_1_2_i686.whl", hash = "sha256:2c488031f92a075ae39d13ac6295fdce2141164ec38c5d47aa8dc24ee3afa37e", size = 9808201, upload-time = "2025-12-29T13:50:21.003Z" }, + { url = "https://files.pythonhosted.org/packages/f8/90/5a82ac0a0707db55376922aed80cd5fca6b2e6d6e9bcd8c286e6b43b4084/ty-0.0.8-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:90d6f08c5982fa3e802b8918a32e326153519077b827f91c66eea4913a86756a", size = 10313262, upload-time = "2025-12-29T13:50:03.306Z" }, + { url = "https://files.pythonhosted.org/packages/14/f7/ff97f37f0a75db9495ddbc47738ec4339837867c4bfa145bdcfbd0d1eb2f/ty-0.0.8-py3-none-win32.whl", hash = "sha256:d7f460ad6fc9325e9cc8ea898949bbd88141b4609d1088d7ede02ce2ef06e776", size = 9254675, upload-time = "2025-12-29T13:50:33.35Z" }, + { url = "https://files.pythonhosted.org/packages/af/51/eba5d83015e04630002209e3590c310a0ff1d26e1815af204a322617a42e/ty-0.0.8-py3-none-win_amd64.whl", hash = "sha256:1641fb8dedc3d2da43279d21c3c7c1f80d84eae5c264a1e8daa544458e433c19", size = 10131382, upload-time = "2025-12-29T13:50:13.719Z" }, + { url = "https://files.pythonhosted.org/packages/38/1c/0d8454ff0f0f258737ecfe84f6e508729191d29663b404832f98fa5626b7/ty-0.0.8-py3-none-win_arm64.whl", hash = "sha256:ec74f022f315bede478ecae1277a01ab618e6500c1d68450d7883f5cd6ed554a", size = 9636374, upload-time = "2025-12-29T13:50:16.344Z" }, +] + [[package]] name = "typing-extensions" version = "4.15.0"