Conversation
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
| latent_space_selection: | ||
| Key or Keys which specifies the latent or feature space used for computing the conditional variance. | ||
| A single key has to be a latent space in :attr:`~anndata.AnnData.obsm` or | ||
| a gene in :attr:`~anndata.AnnData.var_names`. |
There was a problem hiding this comment.
a feature in ..., because we might also store proteins/ATAC, etc.
| Key or Keys which specifies the latent or feature space used for computing the conditional variance. | ||
| A single key has to be a latent space in :attr:`~anndata.AnnData.obsm` or | ||
| a gene in :attr:`~anndata.AnnData.var_names`. | ||
| A set of keys has to be a subset of genes in :attr:`~anndata.AnnData.var_names`. |
There was a problem hiding this comment.
Type hinting doesn't say set, but list.
| source: K, | ||
| target: K, | ||
| forward: bool = True, | ||
| latent_space_selection: Union[str, list[str]] = "X_pca", |
There was a problem hiding this comment.
let's not call it latent space. can also be raw space, e.g. gene space. Also, the types are not clear
| mask = [var_name in latent_space_selection for var_name in self.adata.var_names] | ||
| latent_space = self.adata[:, mask].X.toarray() | ||
| else: | ||
| raise KeyError("Unknown latent space selection.") |
There was a problem hiding this comment.
When we have a key error, we want to print what the wrong key is.
| filter_value = source if forward else target | ||
| opposite_filter_value = target if forward else source | ||
|
|
||
| if isinstance(latent_space_selection, str): | ||
| if latent_space_selection in self.adata.obsm: | ||
| latent_space = self.adata.obsm[latent_space_selection] | ||
| elif latent_space_selection in self.adata.var_names: | ||
| latent_space = self.adata[:, latent_space_selection in self.adata.var_names].X.toarray() | ||
| else: | ||
| raise KeyError("Gene/Latent space not found.") | ||
| elif type(latent_space_selection) in [list, np.ndarray]: | ||
| mask = [var_name in latent_space_selection for var_name in self.adata.var_names] | ||
| latent_space = self.adata[:, mask].X.toarray() | ||
| else: | ||
| raise KeyError("Unknown latent space selection.") |
There was a problem hiding this comment.
Let's make this a function (within the function)
| ) | ||
|
|
||
| cond_var = [] | ||
| for i in range(cond_dists.shape[1]): # type: ignore[union-attr] |
|
|
||
| batch_size = batch_size if batch_size is not None else len(df) | ||
| func = self.push if forward else self.pull | ||
| for batch in range(0, len(df), batch_size): |
There was a problem hiding this comment.
why do we actually do this ? :)
| batch_size=batch_size, | ||
| ) | ||
| if key_added is None: | ||
| assert isinstance(out, pd.DataFrame) |
There was a problem hiding this comment.
check for some properties, e.g. no NaN, non-negativity
| @pytest.mark.parametrize("key_added", [None, "test"]) | ||
| @pytest.mark.parametrize("batch_size", [None, 2]) | ||
| @pytest.mark.parametrize("latent_space_selection", ["X_pca", "KLF12", ["KLF12", "Dlip3", "Dref"]]) | ||
| def test_compute_variance_pipeline( |
There was a problem hiding this comment.
also check for raise Error with wrong attributes.
Added function
compute_variance()as well as a test, similar tocompute_entropy().