Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 108 additions & 94 deletions R/Functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -332,110 +332,124 @@ uniqueSeq <- function(seqs) {

# *****************************************************************************
# inter-clone-distance vs intra-clone-distance
calculateInterVsIntra <- function(db,
clone,
vjl_gps,
junction = "junction",
cdr3 = FALSE,
cdr3_col = NA,
nproc = 1,
verbose = FALSE) {
### Create cluster of nproc size and export namespaces
if(nproc == 1) {
# If needed to run on a single core/cpu then, register DoSEQ
# (needed for 'foreach' in non-parallel mode)
calculateInterVsIntra <- function(
db,
clone,
vjl_gps,
junction = "junction",
cdr3 = FALSE,
cdr3_col = NA,
nproc = 1,
verbose = FALSE
) {
if (nproc == 1) {
registerDoSEQ()
} else if( nproc > 1 ) {
cluster <- parallel::makeCluster(nproc, type="PSOCK")
registerDoParallel(cluster,cores=nproc)
} else if (nproc > 1) {
cluster <- parallel::makeCluster(nproc, type = "PSOCK")
registerDoParallel(cluster, cores = nproc)
} else {
stop('Nproc must be positive.')
}

### export function to clusters

DNAMatrix_gap0 <- getDNAMatrix(gap = 0)
if (nproc > 1) {
export_functions <- list("pairwiseDist", "DNAMatrix_gap0", "uniqueSeq", "stri_split_fixed")
parallel::clusterExport(cluster, export_functions, envir=environment())
seq_col <- if (cdr3) cdr3_col else junction

# this index is later used to quickly get the rows of db corresponding to a clone
clone_to_idx <- split(seq_along(db[[clone]]), db[[clone]])

if (nproc > 1) {
parallel::clusterExport(
cluster,
list(
"pairwiseDist",
"DNAMatrix_gap0",
"stri_split_fixed",
"db",
"clone",
"seq_col",
"clone_to_idx"
),
envir = environment()
)
}

n_groups <- nrow(vjl_gps)
### check the progress bar

n_groups <- nrow(vjl_gps)
if (verbose) {
pb <- progressBar(n_groups)
}

k <- NULL
# open dataframes
vec_ff <- foreach(k=1:n_groups,
.combine="c",
.errorhandling='stop') %dopar% {

# *********************************************************************************
clones <- stringi::stri_split_fixed(vjl_gps$clone_id[k], ",")[[1]]
l <- vjl_gps$junction_length[k]
n_clones <- length(clones)
in_clones <- db[[clone]] %in% clones
seqs <- db[[ifelse(cdr3, cdr3_col, junction)]][in_clones]
names(seqs) <- db[[clone]][in_clones]
seqs <- uniqueSeq(seqs)
### calculate distance matrix among all seqs
dist_mtx <- pairwiseDist(seqs, dist_mat=DNAMatrix_gap0)
### prealoocate a vector = no. of max-dist in each clone (intra) + no. of min-dist between clones (inter)
nrow_f <- n_clones + n_clones*(n_clones-1)/2
vec_f <- rep(NA, nrow_f)
### calculate minimum and maximum distance in each clone
n <- 0
if (n_clones == 1) {
n <- n + 1
vec_f[n] <- max(dist_mtx)/l
names(vec_f)[n] <- paste(clones[1], "NA", "intra", sep="_")
} else {
for (i in 1:(n_clones-1)) {
xx <- dist_mtx[rownames(dist_mtx) == clones[i], colnames(dist_mtx) == clones[i]]
n <- n + 1
vec_f[n] <- max(xx)/l
names(vec_f)[n] <- paste(clones[i], "NA", "intra", sep="_")
for (j in (i+1):n_clones) {
xy <- dist_mtx[rownames(dist_mtx) == clones[i], colnames(dist_mtx) == clones[j]]
n <- n + 1
vec_f[n] <- min(xy)/l
names(vec_f)[n] <- paste(clones[i], clones[j], "inter", sep="_")
}
}
yy <- dist_mtx[rownames(dist_mtx) == clones[j], colnames(dist_mtx) == clones[j]]
n <- n + 1
vec_f[n] <- max(yy)/l
names(vec_f)[n] <- paste(clones[j], "NA", "intra", sep="_")
}

# update progress
if (verbose) { pb$tick() }

return(vec_f)
# *********************************************************************************
}

### stop the cluster
if (nproc > 1) { parallel::stopCluster(cluster) }

# convert to a data.frame
db_dff <- data.frame(keyName = names(vec_ff),
distance = vec_ff,
row.names=NULL)
db_dff$label <- "intra"
db_dff$label[grepl("inter", db_dff$keyName)] <- "inter"
clones_xy <- data.frame(matrix(unlist(stringi::stri_split_fixed(db_dff$keyName, "_", n=3)),
nrow=nrow(db_dff),
byrow=T),
stringsAsFactors=FALSE)
db_dff <- cbind(clones_xy, db_dff)
db_dff$keyName <- NULL
colnames(db_dff)[colnames(db_dff) == "X1"] <- "clone_id_x"
colnames(db_dff)[colnames(db_dff) == "X2"] <- "clone_id_y"
db_dff$X3 <- NULL

# return results
db_dff <- foreach(
k = 1:n_groups,
.combine = "rbind",
.errorhandling = 'stop'
) %dopar%
{
clones <- stringi::stri_split_fixed(vjl_gps$clone_id[k], ",")[[1]]
jlen <- vjl_gps$junction_length[k]

# get te sequences and clone ids for all sequences in the group of clones
row_idx <- unlist(clone_to_idx[clones], use.names = FALSE)
seqs <- db[[seq_col]][row_idx]
cids <- db[[clone]][row_idx]

# This dedup is faster
# than using a dataframe and calling distinct()
key <- paste0(cids, "\t", seqs)
keep <- !duplicated(key)
seqs <- seqs[keep]
names(seqs) <- cids[keep]

dist_mtx <- pairwiseDist(seqs, dist_mat = DNAMatrix_gap0)

rn <- rownames(dist_mtx)
clone_list <- unique(rn)
n_clones <- length(clone_list)
clone_idx <- split(seq_along(rn), rn)[clone_list]

n_rows <- n_clones + n_clones * (n_clones - 1) %/% 2
clone_x <- character(n_rows)
clone_y <- character(n_rows)
dists <- numeric(n_rows)
labels <- character(n_rows)

r <- 0L
for (i in seq_along(clone_list)) {
ii <- clone_idx[[i]]
r <- r + 1L
clone_x[r] <- clone_list[i]
clone_y[r] <- NA_character_
dists[r] <- max(dist_mtx[ii, ii, drop = FALSE]) / jlen
labels[r] <- "intra"

if (i < n_clones) {
for (j in (i + 1L):n_clones) {
jj <- clone_idx[[j]]
r <- r + 1L
clone_x[r] <- clone_list[i]
clone_y[r] <- clone_list[j]
dists[r] <- min(dist_mtx[ii, jj, drop = FALSE]) / jlen
labels[r] <- "inter"
}
}
}

if (verbose) {
pb$tick()
}

data.frame(
clone_id_x = clone_x,
clone_id_y = clone_y,
distance = dists,
label = labels,
stringsAsFactors = FALSE
)
}

if (nproc > 1) {
parallel::stopCluster(cluster)
}
return(db_dff)
}
# *****************************************************************************
Expand Down