Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
*.rds
*.db
*.log
*.parquet
*.csv

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
165 changes: 128 additions & 37 deletions deep4cast/datasets.py
Original file line number Diff line number Diff line change
@@ -1,94 +1,185 @@
from fastparquet import ParquetFile
import numpy as np
import pandas as pd
from torch.utils.data import Dataset

from deep4cast import transforms


class TimeSeriesDataset(Dataset):
"""Takes a list of time series and provides access to windowed subseries for
training.
"""Provides windowed subseries for training. Each time series is split into
lookback and horizon examples, with the number of examples in each
calculated using ``lookback``, ``horizon`` and ``step``. Requires either
* a list of ``numpy`` arrays or
* a path to parquet files and a CSV containing partition index and partition length.

Arguments:
* time_series (list): List of time series ``numpy`` arrays.
* lookback (int): Number of time steps used as input for forecasting.
* horizon (int): Number of time steps to forecast.
* step (int): Time step size between consecutive examples.
* transform (``transforms.Compose``): Specific transformations to apply to time series examples.
* static_covs (list): Static covariates for each item in ``time_series`` list.
* transform (``transforms.Compose``): List of transformations to apply to time series examples.
* thinning (float): Fraction of examples to include.
* split (str): Optional, specifies ``train`` or ``test`` split.
* time_series (list): List of time series ``numpy`` arrays.
* path_parquet (str): File location of partitioned parquet files.
* path_metadata (list): List of CSV file locations containing time series id and length.

"""
def __init__(self,
time_series,

def __init__(self,
lookback,
horizon,
step,
step,
transform,
static_covs=None,
thinning=1.0):
self.time_series = time_series
thinning=1.0,
split=None,
time_series=None,
path_parquet=None,
path_metadata=None):
self.lookback = lookback
self.horizon = horizon
self.step = step
self.transform = transform
self.static_covs = static_covs
self.split = split
self.path_parquet = path_parquet
self.time_series = time_series

if path_parquet:
self._examples_parquet(path_metadata=path_metadata)
elif time_series:
self._examples_array()

# Store the number of training examples
self._len = int(len(self.example_ids) * thinning)

def _examples_parquet(self, path_metadata):
"""Takes a file location of metadata about the length of each time series
and calculates number of examples in each.

Arguments:
* path_metadata (list): List of CSV files containing time series id and length.

"""
path_file = ParquetFile(self.path_parquet)
self.partitions = path_file.info['partitions'][0]

# Slice each time series into examples, assigning IDs to each
last_id = 0
example_ids = {}
n_dropped = 0
for file_meta in path_metadata:
with open(file_meta) as infile:
for line in infile:
line = line.strip('\n')
line = line.split(',')
index = line[0] # Parition name
length = int(line[1]) # Length of time series
# Withhold the horizon for testing
if self.split is 'train':
length -= self.horizon
# At least the horizon is required for zero-padding
if length < self.horizon:
n_dropped += 1
continue
# Slice each time series into examples, assigning IDs to each
example_ids = self._hashmap(
index=index,
length=length,
example_ids=example_ids)

# Inform user about time series that were too short
if n_dropped > 0:
print('Dropped {} time series due to length.'.format(n_dropped))

self.example_ids = example_ids

def _examples_array(self):
"""Takes a list of time series and calculates number of examples in each.

"""
n_dropped = 0
self.example_ids = {}
example_ids = {}
for i, ts in enumerate(self.time_series):
num_examples = (ts.shape[-1] - self.lookback - self.horizon + self.step) // self.step
# Time series shorter than the forecast horizon need to be dropped.
if ts.shape[-1] < self.horizon:
n_dropped += 1
continue
# For short time series zero pad the input
if ts.shape[-1] < self.lookback + self.horizon:
num_examples = 1
for j in range(num_examples):
self.example_ids[last_id + j] = (i, j * self.step)
last_id += num_examples
# Slice each time series into examples, assigning IDs to each
example_ids = self._hashmap(
index=i,
length=ts.shape[-1],
example_ids=example_ids)

# Inform user about time series that were too short
if n_dropped > 0:
print("Dropped {}/{} time series due to length.".format(
n_dropped, len(self.time_series)
)
)
print('Dropped {} time series due to length.'.format(n_dropped))

self.example_ids = example_ids

# Store the number of training examples
self._len = int(self.example_ids.__len__() * thinning)
def _hashmap(self, index, length, example_ids):
"""Creates a dictionary of windowed examples indexed on the time series
and location within the time series.

Arguments:
* index: Either list index or parquet parition name.
* length (int): Length of the indexed time series.
* example_ids (dict): Dictionary where the key is the example
number and the value is the tuple of
(time series index, start position for example slice).

"""
last_id = len(example_ids)

# only use last lookback + horizon for test case
if self.split is 'test':
length = length - self.lookback - self.horizon
length = max((length, 0))
example_ids[last_id] = (index, length)

return example_ids

num_examples = (length - self.lookback -
self.horizon + self.step) // self.step
# For short time series we will zero pad the input
num_examples = max((num_examples, 1))
# (time series index, start position for example slice)
for j in range(num_examples):
example_ids[last_id + j] = (index, j * self.step)

return example_ids

def __len__(self):
return self._len

def __getitem__(self, idx):
# Get time series
ts_id, lookback_id = self.example_ids[idx]
ts = self.time_series[ts_id]

if self.path_parquet:
path_file = self.path_parquet + self.partitions + '=' + ts_id + '/part.0.parquet'

ts = ParquetFile(path_file)
ts = ts.to_pandas()
ts = ts.values.T
elif self.time_series:
ts = self.time_series[ts_id]

# Prepare input and target. Zero pad if necessary.
if ts.shape[-1] < self.lookback + self.horizon:
# If the time series is too short, we zero pad
X = ts[:, :-self.horizon]
X = np.pad(
X,
pad_width=((0, 0), (self.lookback - X.shape[-1], 0)),
mode='constant',
X,
pad_width=((0, 0), (self.lookback - X.shape[-1], 0)),
mode='constant',
constant_values=0
)
y = ts[:, -self.horizon:]
else:
X = ts[:, lookback_id:lookback_id + self.lookback]
y = ts[:, lookback_id + self.lookback:lookback_id + self.lookback + self.horizon]
y = ts[:, lookback_id + self.lookback:lookback_id +
self.lookback + self.horizon]

# Create the input and output for the sample
sample = {'X': X, 'y': y}
sample = self.transform(sample)

# Static covariates can be attached
if self.static_covs is not None:
sample['X_stat'] = self.static_covs[ts_id]

return sample
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
}
autoclass_content = "both"
use_system_site_packages = True
autodoc_mock_imports = ["numpy", "torch"]
autodoc_mock_imports = ["numpy", "torch", "pandas", "fastparquet"]
Loading