Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 148 additions & 87 deletions mosgim/mosg/lcp_solver.py
Original file line number Diff line number Diff line change
@@ -1,116 +1,178 @@
import sys
from typing import Any

from loguru import logger

import numpy as np
import lemkelcp as lcp
import lemkelcp as lcp
import scipy.special as sp
from scipy.sparse import csr_matrix

from tqdm import tqdm


def logger_configuration() -> None:
def configure_logger() -> None:
"""Configures the global logger settings."""
logger.remove()

logger.add(
sys.stdout, colorize=True, format="(<level>{level}</level>) [<green>{time:HH:mm:ss}</green>] ➤ <level>{message}</level>")
sys.stdout,
colorize=True,
format=(
"(<level>{level}</level>) [<green>{time:HH:mm:ss}</green>] "
"➤ <level>{message}</level>"
),
)


class CreateLCP:
def __init__(self, nbig: int, mbig: int, nT: int) -> None:
class LCPProblemConstructor:
"""
Constructs components for a LCP.
"""
def __init__(self, max_harmonic_order: int, max_harmonic_degree: int, num_time_steps: int) -> None:
"""
Initializes the LCP problem constructor.

Parameters
----------
nbig : int
Max order of spherical harmonic expansion
mbig : int
Max degree of spherical harmonic expansion (0 <= mbig <= nbig)
nT : int
Number of time steps
max_harmonic_order : int
Max order of spherical harmonic expansion.
max_harmonic_degree : int
Max degree of spherical harmonic expansion (0 <= max_harmonic_degree <= max_harmonic_order).
num_time_steps : int
Number of time steps.
"""
self.__nbig = nbig
self.__mbig = mbig
self.__nT = nT

self.__vcoefs = np.vectorize(self.__calc_coefs, excluded=[
'M', 'N'], otypes=[np.ndarray])

def __calc_coefs(self, M, N, theta, phi):
self._max_harmonic_order: int = max_harmonic_order
self._max_harmonic_degree: int = max_harmonic_degree
self._num_time_steps: int = num_time_steps

self._vectorized_calculate_coefficients = np.vectorize(
self._calculate_coefficients,
excluded=['harmonics_degrees_mesh', 'harmonics_orders_mesh'],
otypes=[np.ndarray]
)

def _calculate_coefficients(
self,
harmonics_degrees_mesh: np.ndarray,
harmonics_orders_mesh: np.ndarray,
local_time_rad: np.ndarray,
colatitude_rad: np.ndarray
) -> np.ndarray:
"""
Calculates spherical harmonic coefficients.

Parameters
----------
M
Meshgrid of harmonics degrees
N
Meshgrid of harmonics orders
theta
Array of LTs of IPPs in rads
phi
Array of co latitudes of IPPs in rads
harmonics_degrees_mesh : np.ndarray
Meshgrid of harmonics degrees.
harmonics_orders_mesh : np.ndarray
Meshgrid of harmonics orders.
local_time_rad : np.ndarray
Array of Local Times (LT) of Ionospheric Pierce Points (IPPs) in radians.
colatitude_rad : np.ndarray
Array of colatitudes of IPPs in radians.
"""
n_coefs = len(M)
a = np.zeros(n_coefs)

# complex harmonics on meshgrid
Ymn = sp.sph_harm(np.abs(M), N, theta, phi)

# introducing real basis according to scipy normalization
a[M < 0] = Ymn[M < 0].imag * np.sqrt(2) * (-1.) ** M[M < 0]
a[M > 0] = Ymn[M > 0].real * np.sqrt(2) * (-1.) ** M[M > 0]
a[M == 0] = Ymn[M == 0].real

return a

def construct(self, theta, phi, timeindex):
num_coefficients: int = len(harmonics_degrees_mesh)
coefficients: np.ndarray = np.zeros(num_coefficients)

spherical_harmonics_complex: np.ndarray = sp.sph_harm(
np.abs(harmonics_degrees_mesh),
harmonics_orders_mesh,
local_time_rad,
colatitude_rad
)

m_less_than_zero_idx: np.ndarray = harmonics_degrees_mesh < 0
m_greater_than_zero_idx: np.ndarray = harmonics_degrees_mesh > 0
m_equals_zero_idx: np.ndarray = harmonics_degrees_mesh == 0

coefficients[m_less_than_zero_idx] = (
spherical_harmonics_complex[m_less_than_zero_idx].imag *
np.sqrt(2) *
(-1.) ** harmonics_degrees_mesh[m_less_than_zero_idx]
)
coefficients[m_greater_than_zero_idx] = (
spherical_harmonics_complex[m_greater_than_zero_idx].real *
np.sqrt(2) *
(-1.) ** harmonics_degrees_mesh[m_greater_than_zero_idx]
)
coefficients[m_equals_zero_idx] = spherical_harmonics_complex[m_equals_zero_idx].real

return coefficients

def construct_matrix_a(
self,
local_time_rad: np.ndarray,
colatitude_rad: np.ndarray,
time_indices: np.ndarray
):
"""
Constructs the matrix A for the LCP problem.

Parameters
----------
theta
Array of LTs of IPPs in rads
phi
Array of co latitudes of IPPs in rads
timeindex
Number of time steps
local_time_rad : np.ndarray
Array of LTs of IPPs in radians.
colatitude_rad : np.ndarray
Array of colatitudes of IPPs in radians.
time_indices : np.ndarray
Array of time step indices for each IPP.
"""

# Construct matrix of the problem (A)
n_ind = np.arange(0, self.__nbig + 1, 1)
m_ind = np.arange(-self.__mbig, self.__mbig + 1, 1)
M, N = np.meshgrid(m_ind, n_ind)
Y = sp.sph_harm(np.abs(M), N, 0, 0)
idx = np.isfinite(Y)
M = M[idx]
N = N[idx]
n_coefs = len(M)

len_rhs = len(phi)

a = self.__vcoefs(M=M, N=N, theta=theta, phi=phi)

logger.info(f"coefs done {n_coefs}")

# prepare (A) in csr sparse format
data = np.empty(len_rhs * n_coefs)
rowi = np.empty(len_rhs * n_coefs)
coli = np.empty(len_rhs * n_coefs)

for i in tqdm(range(0, len_rhs, 1)):
data[i * n_coefs: (i + 1) * n_coefs] = a[i]
rowi[i * n_coefs: (i + 1) * n_coefs] = i * \
np.ones(n_coefs).astype('int32')

coli[i * n_coefs: (i + 1) * n_coefs] = np.arange(timeindex[i]
* n_coefs, (timeindex[i] + 1) * n_coefs, 1).astype('int32')

A = csr_matrix((data, (rowi, coli)), shape=(
len_rhs, (self.__nT + 1) * n_coefs))

logger.success("matrix (A) done")

return A

order_indices: np.ndarray = np.arange(0, self._max_harmonic_order + 1, 1)
degree_indices: np.ndarray = np.arange(-self._max_harmonic_degree, self._max_harmonic_degree + 1, 1)

harmonics_degrees_mesh, harmonics_orders_mesh = np.meshgrid(degree_indices, order_indices)

spherical_harmonics_test: np.ndarray = sp.sph_harm(
np.abs(harmonics_degrees_mesh), harmonics_orders_mesh, 0, 0
)
finite_indices: np.ndarray = np.isfinite(spherical_harmonics_test)

valid_harmonics_degrees_mesh: np.ndarray = harmonics_degrees_mesh[finite_indices]
valid_harmonics_orders_mesh: np.ndarray = harmonics_orders_mesh[finite_indices]
num_coefficients: int = len(valid_harmonics_degrees_mesh)

num_rhs_elements: int = len(colatitude_rad)

calculated_coefficients: np.ndarray = self._vectorized_calculate_coefficients(
harmonics_degrees_mesh=valid_harmonics_degrees_mesh,
harmonics_orders_mesh=valid_harmonics_orders_mesh,
local_time_rad=local_time_rad,
colatitude_rad=colatitude_rad
)

logger.info(f"Coefficient calculation done. Number of coefficients: {num_coefficients}")

num_non_zero_elements: int = num_rhs_elements * num_coefficients
data: np.ndarray = np.empty(num_non_zero_elements)
row_indices: np.ndarray = np.empty(num_non_zero_elements, dtype=np.int32)
col_indices: np.ndarray = np.empty(num_non_zero_elements, dtype=np.int32)

for i in tqdm(range(num_rhs_elements), desc="Constructing sparse matrix A"):
start_idx: int = i * num_coefficients
end_idx: int = (i + 1) * num_coefficients

data[start_idx:end_idx] = calculated_coefficients[i]
row_indices[start_idx:end_idx] = i

current_time_index: int = time_indices[i]
col_start_offset: int = current_time_index * num_coefficients
col_indices[start_idx:end_idx] = np.arange(
col_start_offset, col_start_offset + num_coefficients, 1
)

matrix_a_shape = (num_rhs_elements, (self._num_time_steps + 1) * num_coefficients)
matrix_a = csr_matrix(
(data, (row_indices, col_indices)),
shape=matrix_a_shape
)

logger.success("Matrix (A) construction done.")
return matrix_a

def create_lcp(data):
logger_configuration()
configure_logger()

nT = 24

Expand All @@ -123,7 +185,7 @@ def create_lcp(data):
time_m = np.array([int(_ / (len(colat) * len(mlt)))
for _ in range(len(colat) * len(mlt) * (nT + 1))])

G = CreateLCP(nbig=15, mbig=15, nT=nT).construct(
G = LCPProblemConstructor(nbig=15, mbig=15, nT=nT).construct(
theta=np.deg2rad(mlt_m),
phi=np.deg2rad(colat_m),
timeindex=time_m
Expand All @@ -148,4 +210,3 @@ def create_lcp(data):
c = data['res'] + NGT.dot(sol[0])
w = G.dot(c)
return c

46 changes: 39 additions & 7 deletions mosgim/utils/time_util.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,43 @@
"""
Utilities for calculating time differences in seconds.

This module provides vectorized functions to compute:
- The number of seconds elapsed since the beginning of the day for a given time.
- The number of seconds between two specified time points.
"""
from datetime import datetime

import numpy as np

@np.vectorize
def sec_of_day(time):
day_start = time.replace(hour=0, minute=0, second=0, microsecond=0)
return (time - day_start).total_seconds()

@np.vectorize(otypes=[float])
def seconds_since_midnight(time_obj: datetime) -> float:
"""
Calculates the number of seconds elapsed since midnight for the given datetime object.

Args:
time_obj: The datetime object for which to calculate the seconds.

Returns:
The number of seconds since the start of the day (00:00:00).
"""
day_start = time_obj.replace(hour=0, minute=0, second=0, microsecond=0)
return (time_obj - day_start).total_seconds()


def seconds_in_interval(
end_time: datetime, start_time: datetime
) -> float:
"""
Calculates the number of seconds between two datetime objects.

The calculation is `end_time` - `start_time`.

Args:
end_time: The later datetime object.
start_time: The earlier (reference) datetime object.

def sec_of_interval(time, time0):
return (time - time0).total_seconds()
sec_of_interval = np.vectorize(sec_of_interval, excluded='time0')
Returns:
The time difference in seconds.
"""
return (end_time - start_time).total_seconds()