Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 144 additions & 118 deletions src/ElecSolver/FrequencySystemBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,75 +22,110 @@ def __init__(self,impedence_coords,impedence_data,mutuals_coords,mutuals_data):
mutual value between impedences impedence_data[mutuals_coords[0,i]] impedence_data[mutuals_coords[1,i]]
The mutual follows the order given in impedence coords
"""
all_points = np.unique(impedence_coords)
if all_points.shape != np.max(impedence_coords)+1:
IndexError("There is one or multiple lonely nodes please clean your impedence graph")
self.impedence_coords = impedence_coords
self.impedence_data = impedence_data
self.mutuals_coords = mutuals_coords
self.mutuals_data = mutuals_data
self.size = np.max(impedence_coords)+1
self.number_intensities = self.impedence_data.shape[0]
## making graph and checking number of subgraphs
unique_coords = np.unique(self.impedence_coords,axis=1)
self.current_sources_coords=np.zeros((2,0),dtype=int)
self.current_sources_data=np.array([],dtype=int)
self.voltage_sources_coords=np.zeros((2,0),dtype=int)
self.voltage_sources_data=np.array([],dtype=int)
self.source_count = 0
## initializing second member as empty
self.rhs = (np.array([]),(np.array([],dtype=int),))

self.analysed=False

def graph_analysis(self):
self.all_coords = np.concatenate((self.impedence_coords,self.voltage_sources_coords),axis=1)
all_points = np.unique(self.all_coords)
if all_points.shape != np.max(self.all_coords)+1:
print("Warning: There is one or multiple lonely nodes please clean your impedence graph")
self.all_impedences = np.concatenate([self.impedence_data,self.voltage_sources_data],axis=0)

# actual number of node in the system
self.size = np.max(self.all_coords)+1
# number of intensities
self.number_intensities = self.all_impedences.shape[0]
## making graph and checking number of subgraphs for ground enforcing and intensity checking
unique_coords = np.unique(self.all_coords,axis=1)
sym_graph = np.concatenate((unique_coords,np.stack((unique_coords[1],unique_coords[0]),axis=0)),axis=1)
links = np.ones(sym_graph.shape[1])
self.graph = nx.from_scipy_sparse_array(coo_matrix((links,(sym_graph[0],sym_graph[1]))))
## keep the subgraphs
self.list_of_subgraphs = [ list(sub) for sub in nx.connected_components(self.graph)]
self.number_of_subsystems = len(self.list_of_subgraphs)
## location of ground
self.affected_potentials = [-1]*self.number_of_subsystems
## by default remove 1 node equation per subsytem otherwise system is singular
self.deleted_equation_current = [subsystem[0] for subsystem in self.list_of_subgraphs]
## number of tension sources
self.source_count = 0
self.source_signs = np.array([],dtype=int)
self.rhs = (np.array([]),(np.array([],dtype=int),))
## shifter for intensities equations
rescaler = np.zeros(self.size)
rescaler[self.deleted_equation_current]=1
rescaler = -np.cumsum(rescaler)
self.rescaler =rescaler.astype(int)
offset_j = self.impedence_data.shape[0]
## offsets for simplifying building the system
offset_j = self.all_impedences.shape[0]
offset_i = self.size-len(self.deleted_equation_current)
self.offset_i = offset_i
self.offset_j = offset_j

## State -> analysed
self.analysed=True

def set_ground(self,*args):
"""Function to affect a ground to subsystems
If the system already has a ground provided then a warning is displayed and ground reaffected
"""
## Running graph analysis if not done
if not self.analysed:
self.graph_analysis()
for index in args:
for pivot,subsystem in enumerate(self.list_of_subgraphs):
if index in subsystem:
if self.affected_potentials[pivot]!=-1:
print(f"Subsystem {pivot} already add a mass, reaffecting the value")
print(f"Subsystem {pivot} already add a ground, reaffecting the value")
self.affected_potentials[pivot]=index
break

def affect_potentials(self):
"""Function to check whether the masses were affected and assign some if not
"""Function to check whether the grounds were all affected and assign some if some are missing
"""
## Running graph analysis if not done
if not self.analysed:
self.graph_analysis()
for i in range(len(self.affected_potentials)):
if -1 == self.affected_potentials[i]:
self.affected_potentials[i]= self.list_of_subgraphs[i][0]
print(f"Subsytem {i} has not been affected to the mass, we chose {self.list_of_subgraphs[i][0]}")
print(f"Subsytem {i} has not been affected to the ground, we chose {self.list_of_subgraphs[i][0]}")

def build_system(self):
"""Building sytem assuming that data was given as COO matrices
Fully vectorized for best performance
it is faster but less understandable
"""
## affecting masses if need be
## Running graph analysis if not done
if not self.analysed:
self.graph_analysis()
## affecting grounds if need be
self.affect_potentials()
## Building rhs
self.build_second_member()
## Building a sparse COO matrix


## Building all vectorized values necessary
i_s_vals = np.max(self.impedence_coords,axis=0)
j_s_vals = np.min(self.impedence_coords,axis=0)
values = self.impedence_data



## node laws
i_s_vals = np.max(self.all_coords,axis=0)
j_s_vals = np.min(self.all_coords,axis=0)
data_nodes = np.concatenate((np.ones(self.number_intensities),-np.ones(self.number_intensities)),axis=0)
j_s_nodes = np.tile(np.arange(self.number_intensities,dtype=int),(2,))
i_s_nodes =np.concatenate((i_s_vals,j_s_vals),axis=0)


# Removing one current equation per subsytem
mask_removed_eq = ~np.isin(i_s_nodes,self.deleted_equation_current)
data_nodes = data_nodes[mask_removed_eq]
Expand All @@ -101,8 +136,11 @@ def build_system(self):


## Kirchoff
i_s_edges = self.offset_i + np.concatenate([np.arange(self.number_intensities,dtype=int)]*3,axis=0 )
j_s_edges = np.concatenate([self.offset_j+i_s_vals,self.offset_j+j_s_vals,np.arange(self.number_intensities,dtype=int)],axis=0)
i_s_vals = np.max(self.impedence_coords,axis=0)
j_s_vals = np.min(self.impedence_coords,axis=0)
values = self.impedence_data
i_s_edges = self.offset_i + np.concatenate([np.arange(self.impedence_data.shape[0],dtype=int)]*3,axis=0 )
j_s_edges = np.concatenate([self.offset_j+i_s_vals,self.offset_j+j_s_vals,np.arange(self.impedence_data.shape[0],dtype=int)],axis=0)
data_edges = np.concatenate([np.ones(values.shape[0],dtype=complex),-np.ones(values.shape[0],dtype=complex),values],axis=0)


Expand All @@ -116,19 +154,79 @@ def build_system(self):
data_additionnal = np.tile(self.mutuals_data*sign[self.mutuals_coords[0]]*sign[self.mutuals_coords[1]],(2,))


## mass equations (1 per subsystem)
i_s_mass = np.arange(self.offset_i+self.offset_j,self.offset_i+self.offset_j+len(self.affected_potentials))
j_s_mass = self.offset_j+np.array(self.affected_potentials)
data_mass = np.ones(len(self.affected_potentials))
## ground equations (1 per subsystem)
i_s_ground = np.arange(self.offset_i+self.offset_j,self.size+self.offset_j) - self.source_count
j_s_ground = self.offset_j+np.array(self.affected_potentials)
data_ground = np.ones(len(self.affected_potentials))

## voltage sources equations
i_s_sources = np.tile(np.arange(0,self.voltage_sources_data.shape[0]),2)+self.number_intensities+self.size - self.source_count
j_s_sources = np.concatenate((self.offset_j + self.voltage_sources_coords[0],self.offset_j+self.voltage_sources_coords[1]),axis=0)
data_source = np.concatenate((np.ones_like(self.voltage_sources_data),-np.ones_like(self.voltage_sources_data)))

i_s = np.concatenate((i_s_nodes,i_s_edges,i_s_additionnal,i_s_mass),axis=0)
j_s = np.concatenate((j_s_nodes,j_s_edges,j_s_additionnal,j_s_mass),axis=0)
data = np.concatenate((data_nodes,data_edges,data_additionnal,data_mass),axis=0)

i_s = np.concatenate((i_s_nodes,i_s_edges,i_s_additionnal,i_s_ground,i_s_sources),axis=0)
j_s = np.concatenate((j_s_nodes,j_s_edges,j_s_additionnal,j_s_ground,j_s_sources),axis=0)
data = np.concatenate((data_nodes,data_edges,data_additionnal,data_ground,data_source),axis=0)

self.system = (data,(i_s.astype(int),j_s.astype(int)))
return self.system

def build_second_member(self,check=True):
"""General second member builder, need to be called after graph analysis and voltage and current sources tests

Parameters
----------
check : bool, optional
Whether to check or not that current injections append on the same subsystem, setting to False speeds up, by default True

Returns
-------
tuple
rhs tuple in case it is needed

Raises
------
IndexError
raised when current injection does not belong to the same subsystem
"""
## Testing current
if check:
for i in range(self.current_sources_coords.shape[1]):
input_node=self.current_sources_coords[0,i]
output_node=self.current_sources_coords[1,i]
if self.number_of_subsystems>=2:
valid = False
for system in self.list_of_subgraphs:
if input_node in system and output_node in system:
valid =True
else:
continue
if not valid:
raise IndexError("Nodes indicated do not belong to the same subsystem")


## Building current injection
in_current_nodes = self.current_sources_coords[0]
in_current_data = - self.current_sources_data
out_current_nodes = self.current_sources_coords[1]
out_current_data = self.current_sources_data
current_nodes = np.concatenate((in_current_nodes,out_current_nodes),axis=0)
current_data = np.concatenate((in_current_data,out_current_data),axis=0)

# Removing deleted node equations
mask_removed_eq = ~np.isin(current_nodes,self.deleted_equation_current)
current_nodes = current_nodes[mask_removed_eq]
current_data = current_data[mask_removed_eq]
current_nodes = current_nodes + self.rescaler[current_nodes]

## Building voltage rhs
voltage_nodes = np.arange(0,self.voltage_sources_data.shape[0])+self.number_intensities+self.size - self.source_count
voltage_data = self.voltage_sources_data

self.rhs=(np.concatenate((current_data,voltage_data),axis=0),(np.concatenate((current_nodes,voltage_nodes),axis=0),))
return self.rhs

def get_system(self,sparse_rhs=False):
"""Function to get the system
Parameters:
Expand All @@ -143,9 +241,9 @@ def get_system(self,sparse_rhs=False):
rhs: np.ndarray
Second member of the system
"""
size = self.number_intensities+self.size
(data_rhs,(nodes,)) = self.rhs
sys = coo_matrix(self.system)
size = sys.shape[0]
sys = coo_matrix(self.system,shape=(size,size))
if sparse_rhs:
(data_rhs,(nodes,)) = self.rhs
rhs = coo_array((data_rhs,(nodes,)),shape=(size,))
Expand All @@ -156,7 +254,7 @@ def get_system(self,sparse_rhs=False):
np.add.at(rhs, nodes, data_rhs)
return sys,rhs

def build_second_member_intensity(self,intensity,input_node,output_node):
def add_current_source(self,intensity,input_node,output_node):
"""Function to build a second member for the scenario of current injection in the sparse system

Parameters
Expand All @@ -167,106 +265,34 @@ def build_second_member_intensity(self,intensity,input_node,output_node):
which node to take for injection
output_node : int
which node for current retrieval

Returns
-------
Tuple(np.array,(np.array))
second member of the linear system in a COO like format
"""
data = []
nodes = []
if self.number_of_subsystems>=2:
valid = False
for system in self.list_of_subgraphs:
if input_node in system and output_node in system:
valid =True
else:
continue
if not valid:
raise IndexError("Nodes indicated do not belong to the same subsystem")

if input_node not in self.deleted_equation_current:
nodes.append(input_node+self.rescaler[input_node])
data.append(-intensity) # inbound intensity
if output_node not in self.deleted_equation_current:
nodes.append(output_node+self.rescaler[output_node])
data.append(intensity) # outbound intensity
(data_rhs,(nodes_rhs,)) = self.rhs
self.rhs = (np.append(data_rhs,np.array(data),axis=0),(np.append(nodes_rhs,np.array(nodes),axis=0),))
return self.rhs
self.current_sources_coords = np.append(self.current_sources_coords,np.array([[input_node],[output_node]]),axis=1)
self.current_sources_data = np.append(self.current_sources_data,np.array([intensity]))

def build_second_member_tension(self,tension,input_node,output_node):
"""building second member in the case of an enforced tension
To be able to enforce a tension we need to remove one of the equation in the system
and add the tension enforced equation
By default we remove the first kirchoff law within the correct subsystem
Warning this function changes the system, please re-get the system before solving it (call get_system)
def add_voltage_source(self,voltage,input_node,output_node):
"""Adding a voltage source to the system. This adds one equation and one degree of freedom in the system (the source intensity)

Parameters
----------
tension : complex
enforced tension
voltage : complex
enforced voltage
input_node : int
node where the tension is enforce
node where the voltage is enforced
output_node : int
node from where the tension is enforced
node from where the voltage is enforced

Returns
-------
Tuple(np.array,(np.array))
second member of the linear system in a COO like format
"""
targeted_subsystem = 0
if self.number_of_subsystems>=2:
valid = False
for index_sub,system in enumerate(self.list_of_subgraphs):
if input_node in system and output_node in system:
valid =True
targeted_subsystem=index_sub
else:
continue
if not valid:
raise IndexError("Nodes indicated do not belong to the same subsystem")
print("Warning this function changes the system, please re-evaluate it before solving the system (call get_system)")
(data,(i_s,j_s)) = self.system
(data_rhs,(nodes,)) = self.rhs

new_eq = self.number_intensities+self.size +self.source_count
sign = np.sign(output_node-input_node)

## Adding the new equation in S1
if input_node in self.deleted_equation_current:
data = np.append(data,np.array([1.,-1.,-sign]),axis=0)
i_s = np.append(i_s,np.array([new_eq,new_eq,output_node+self.rescaler[output_node]]),axis=0)
j_s = np.append(j_s,np.array([self.offset_j + input_node,self.offset_j+output_node,new_eq]),axis=0)
elif output_node in self.deleted_equation_current:
data = np.append(data,np.array([1.,-1.,sign]),axis=0)
i_s = np.append(i_s,np.array([new_eq,new_eq,input_node+self.rescaler[input_node]]),axis=0)
j_s = np.append(j_s,np.array([self.offset_j + input_node,self.offset_j+output_node,new_eq]),axis=0)
else:
data = np.append(data,np.array([1.,-1.,sign,-sign]),axis=0)
i_s = np.append(i_s,np.array([new_eq,new_eq,input_node+self.rescaler[input_node],output_node+self.rescaler[output_node]]),axis=0)
j_s = np.append(j_s,np.array([self.offset_j + input_node,self.offset_j+output_node,new_eq,new_eq]),axis=0)
self.voltage_sources_coords = np.append(self.voltage_sources_coords,np.array([[input_node],[output_node]]),axis=1)
self.voltage_sources_data = np.append(self.voltage_sources_data,np.array([voltage]))
self.source_count+=1

## second member value
nodes= np.append(nodes, [new_eq],axis=0)
data_rhs= np.append(data_rhs, [tension],axis=0)

## reaffecting the systems and second member
self.system = (data,(i_s,j_s))
self.rhs = (data_rhs,(nodes,))

## counter for tension source
self.source_count +=1
self.source_signs=np.append(self.source_signs,[sign])
return self.rhs

def build_intensity_and_voltage_from_vector(self,sol):
sign = np.sign(self.impedence_coords[1]-self.impedence_coords[0])
sign = np.sign(self.all_coords[1]-self.all_coords[0])
if self.source_count!=0:
return SolutionFrequency(sol[:self.number_intensities]*sign,
sol[self.number_intensities:-self.source_count],
sol[...,-self.source_count:]*self.source_signs
return SolutionFrequency(sol[:self.number_intensities-self.source_count]*sign[:self.number_intensities-self.source_count],
sol[self.number_intensities:],
sol[...,self.number_intensities-self.source_count:self.number_intensities]*sign[self.number_intensities-self.source_count:self.number_intensities]
)

else:
Expand Down
Loading
Loading