Skip to content

Commit 2c0dffa

Browse files
committed
Introduces measure config dataclasses
1 parent 60d3957 commit 2c0dffa

1 file changed

Lines changed: 318 additions & 0 deletions

File tree

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
"""
2+
This file is part of CLIMADA.
3+
4+
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
5+
6+
CLIMADA is free software: you can redistribute it and/or modify it under the
7+
terms of the GNU General Public License as published by the Free
8+
Software Foundation, version 3.
9+
10+
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
11+
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
12+
PARTICULAR PURPOSE. See the GNU General Public License for more details.
13+
14+
You should have received a copy of the GNU General Public License along
15+
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
16+
17+
---
18+
19+
Define configuration dataclasses for Measure reading and writing.
20+
"""
21+
22+
from __future__ import annotations
23+
24+
import dataclasses
25+
import logging
26+
from abc import ABC
27+
from dataclasses import asdict, dataclass, field, fields
28+
from datetime import datetime
29+
from typing import TYPE_CHECKING, Dict, Optional, Tuple, Union
30+
31+
import pandas as pd
32+
33+
from climada.util.string_parsers import parse_color, parse_mapping_string, parse_range
34+
35+
if TYPE_CHECKING:
36+
from climada.entity.measures.base import Measure
37+
from climada.entity.measures.cost_income import CostIncome
38+
39+
LOGGER = logging.getLogger(__name__)
40+
41+
42+
@dataclass
43+
class _ModifierConfig(ABC):
44+
def to_dict(self):
45+
# 1. Get the current values as a dict
46+
current_data = asdict(self)
47+
48+
# 2. Identify fields where the current value differs from the default
49+
non_default_data = {}
50+
for f in fields(self):
51+
current_value = getattr(self, f.name)
52+
53+
# Logic to get the default value (handling both default and default_factory)
54+
default_value = f.default
55+
if (
56+
f.default_factory is not field().default_factory
57+
): # Check if factory exists
58+
default_value = f.default_factory()
59+
60+
if current_value != default_value:
61+
non_default_data[f.name] = current_data[f.name]
62+
63+
non_default_data.pop("haz_type", None)
64+
return non_default_data
65+
66+
@classmethod
67+
def from_dict(cls, d: dict):
68+
filtered = cls._filter_dict_to_fields(d)
69+
return cls(**filtered)
70+
71+
@classmethod
72+
def _filter_dict_to_fields(cls, d: dict):
73+
"""Filter out values that do not match the dataclass fields."""
74+
filtered = dict(
75+
filter(lambda k: k[0] in [f.name for f in fields(cls)], d.items())
76+
)
77+
return filtered
78+
79+
def _filter_out_default_fields(self):
80+
non_defaults = {}
81+
defaults = {}
82+
for f in fields(self):
83+
val = getattr(self, f.name)
84+
default = f.default
85+
if f.default_factory is not field().default_factory:
86+
default = f.default_factory()
87+
88+
if val != default:
89+
non_defaults[f.name] = val
90+
else:
91+
defaults[f.name] = val
92+
return non_defaults, defaults
93+
94+
def __repr__(self) -> str:
95+
non_defaults, defaults = self._filter_out_default_fields()
96+
ndf_fields_str = (
97+
"\n\t\t\t".join(f"{k}={v!r}" for k, v in non_defaults.items())
98+
if non_defaults
99+
else None
100+
)
101+
fields_str = (
102+
"\n\t\t\t".join(f"{k}={v!r}" for k, v in defaults.items())
103+
if defaults
104+
else None
105+
)
106+
fields = (
107+
"(" "\n\t\tNon default fields:" f"\n\t\t\t{ndf_fields_str}"
108+
if ndf_fields_str
109+
else "()"
110+
)
111+
return f"{self.__class__.__name__}{fields}"
112+
113+
114+
@dataclass(repr=False)
115+
class ImpfsetModifierConfig(_ModifierConfig):
116+
"""Configuration for impact function modifiers."""
117+
118+
haz_type: str
119+
impf_ids: Optional[Union[int, str, list[Union[int, str]]]] = None
120+
impf_mdd_mult: float = 1.0
121+
impf_mdd_add: float = 0.0
122+
impf_paa_mult: float = 1.0
123+
impf_paa_add: float = 0.0
124+
impf_int_mult: float = 1.0
125+
impf_int_add: float = 0.0
126+
new_impfset_path: Optional[str] = None
127+
"""Excel filepath for new impfset."""
128+
129+
def __post_init__(self):
130+
if self.new_impfset_path is not None and any(
131+
[
132+
self.impf_mdd_add,
133+
self.impf_mdd_mult,
134+
self.impf_paa_add,
135+
self.impf_paa_mult,
136+
self.impf_int_add,
137+
self.impf_int_mult,
138+
]
139+
):
140+
LOGGER.warning(
141+
"Both new impfset object and impfset modifiers are provided, "
142+
"modifiers will be applied after changing the impfset."
143+
)
144+
145+
146+
@dataclass(repr=False)
147+
class HazardModifierConfig(_ModifierConfig):
148+
"""Configuration for impact function modifiers."""
149+
150+
haz_type: str
151+
haz_int_mult: Optional[float] = 1.0
152+
haz_int_add: Optional[float] = 0.0
153+
new_hazard_path: Optional[str] = None
154+
"""HDF5 filepath for new hazard."""
155+
impact_rp_cutoff: Optional[float] = None
156+
157+
def __post_init__(self):
158+
if self.new_hazard_path is not None and any(
159+
[self.haz_int_mult, self.haz_int_add, self.impact_rp_cutoff]
160+
):
161+
LOGGER.warning(
162+
"Both new hazard object and hazard modifiers are provided, "
163+
"modifiers will be applied after changing the hazard."
164+
)
165+
166+
167+
@dataclass(repr=False)
168+
class ExposuresModifierConfig(_ModifierConfig):
169+
"""Configuration for impact function modifiers."""
170+
171+
reassign_impf_id: Optional[Dict[str, Dict[int | str, int | str]]] = None
172+
set_to_zero: Optional[list[int]] = None
173+
new_exposures_path: Optional[str] = None
174+
"""HDF5 filepath for new exposure"""
175+
176+
def __post_init__(self):
177+
if self.new_exposures_path is not None and any(
178+
[self.reassign_impf_id, self.set_to_zero]
179+
):
180+
LOGGER.warning(
181+
"Both new exposures object and exposures modifiers are provided, "
182+
"modifiers will be applied after changing the exposures."
183+
)
184+
185+
186+
@dataclass(repr=False)
187+
class CostIncomeConfig(_ModifierConfig):
188+
"""Serializable configuration for CostIncome."""
189+
190+
mkt_price_year: Optional[int] = field(default_factory=lambda: datetime.today().year)
191+
init_cost: float = 0.0
192+
periodic_cost: float = 0.0
193+
periodic_income: float = 0.0
194+
cost_yearly_growth_rate: float = 0.0
195+
income_yearly_growth_rate: float = 0.0
196+
freq: str = "Y"
197+
custom_cash_flows: Optional[list[dict]] = None
198+
199+
def to_cost_income(self) -> CostIncome:
200+
df = None
201+
if self.custom_cash_flows is not None:
202+
df = pd.DataFrame(self.custom_cash_flows)
203+
df["date"] = pd.to_datetime(df["date"])
204+
return CostIncome(
205+
mkt_price_year=self.mkt_price_year,
206+
init_cost=self.init_cost,
207+
periodic_cost=self.periodic_cost,
208+
periodic_income=self.periodic_income,
209+
cost_yearly_growth_rate=self.cost_yearly_growth_rate,
210+
income_yearly_growth_rate=self.income_yearly_growth_rate,
211+
custom_cash_flows=df,
212+
freq=self.freq,
213+
)
214+
215+
@classmethod
216+
def from_cost_income(cls, ci: CostIncome) -> "CostIncomeConfig":
217+
"""Round-trip from a live CostIncome object."""
218+
custom = None
219+
if ci.custom_cash_flows is not None:
220+
custom = (
221+
ci.custom_cash_flows.reset_index()
222+
.rename(columns={"index": "date"})
223+
.assign(date=lambda df: df["date"].dt.strftime("%Y-%m-%d"))
224+
.to_dict(orient="records")
225+
)
226+
return cls(
227+
mkt_price_year=ci.mkt_price_year.year, # datetime → int
228+
init_cost=abs(ci.init_cost), # stored negative → positive
229+
periodic_cost=abs(ci.periodic_cost),
230+
periodic_income=ci.periodic_income,
231+
cost_yearly_growth_rate=ci.cost_growth_rate,
232+
income_yearly_growth_rate=ci.income_growth_rate,
233+
freq=ci.freq,
234+
custom_cash_flows=custom,
235+
)
236+
237+
238+
@dataclass(repr=False)
239+
class MeasureConfig(_ModifierConfig):
240+
name: str
241+
haz_type: str
242+
impfset_modifier: ImpfsetModifierConfig
243+
hazard_modifier: HazardModifierConfig
244+
exposures_modifier: ExposuresModifierConfig
245+
cost_income: CostIncomeConfig
246+
implementation_duration: Optional[str] = None
247+
color_rgb: Optional[Tuple[float, float, float]] = None
248+
249+
def __repr__(self) -> str:
250+
fields_str = "\n\t".join(f"{k}={v!r}" for k, v in self.__dict__.items())
251+
return f"{self.__class__.__name__}(\n\t{fields_str})"
252+
253+
def to_dict(self) -> dict:
254+
return {
255+
"name": self.name,
256+
"haz_type": self.haz_type,
257+
**self.impfset_modifier.to_dict(),
258+
**self.hazard_modifier.to_dict(),
259+
**self.exposures_modifier.to_dict(),
260+
**self.cost_income.to_dict(),
261+
"implementation_duration": self.implementation_duration,
262+
"color_rgb": list(self.color_rgb) if self.color_rgb is not None else None,
263+
}
264+
265+
@classmethod
266+
def from_dict(cls, d: dict) -> "MeasureConfig":
267+
color = d.get("color_rgb")
268+
return cls(
269+
name=d["name"],
270+
haz_type=d["haz_type"],
271+
impfset_modifier=ImpfsetModifierConfig.from_dict(d),
272+
hazard_modifier=HazardModifierConfig.from_dict(d),
273+
exposures_modifier=ExposuresModifierConfig.from_dict(d),
274+
cost_income=CostIncomeConfig.from_dict(d),
275+
implementation_duration=d.get("implementation_duration"),
276+
color_rgb=(
277+
tuple(color) if color is not None and not pd.isna(color) else None
278+
),
279+
)
280+
281+
def to_yaml(self, path: str) -> None:
282+
import yaml
283+
284+
with open(path, "w") as f:
285+
yaml.dump(
286+
{"measures": [self.to_dict()]},
287+
f,
288+
default_flow_style=False,
289+
sort_keys=False,
290+
)
291+
292+
@classmethod
293+
def from_yaml(cls, path: str) -> "MeasureConfig":
294+
import yaml
295+
296+
with open(path) as f:
297+
return cls.from_dict(yaml.safe_load(f)["measures"][0])
298+
299+
@classmethod
300+
def from_row(
301+
cls, row: pd.Series, haz_type: Optional[str] = None
302+
) -> "MeasureConfig":
303+
"""Build a MeasureConfig from a legacy Excel row."""
304+
row_dict = row.to_dict()
305+
return cls.from_dict(row_dict)
306+
307+
308+
def _serialize_modifier_dict(d: dict) -> dict:
309+
"""Stringify keys, convert tuples to lists for JSON."""
310+
return {str(k): list(v) for k, v in d.items()}
311+
312+
313+
def _deserialize_modifier_dict(d: dict) -> dict:
314+
"""Restore int keys where possible, values back to tuples."""
315+
return {
316+
(int(k) if isinstance(k, str) and k.isdigit() else k): tuple(v)
317+
for k, v in d.items()
318+
}

0 commit comments

Comments
 (0)