diff --git a/R/Functions.R b/R/Functions.R index 99e21de..7dd3baa 100644 --- a/R/Functions.R +++ b/R/Functions.R @@ -332,110 +332,124 @@ uniqueSeq <- function(seqs) { # ***************************************************************************** # inter-clone-distance vs intra-clone-distance -calculateInterVsIntra <- function(db, - clone, - vjl_gps, - junction = "junction", - cdr3 = FALSE, - cdr3_col = NA, - nproc = 1, - verbose = FALSE) { - ### Create cluster of nproc size and export namespaces - if(nproc == 1) { - # If needed to run on a single core/cpu then, register DoSEQ - # (needed for 'foreach' in non-parallel mode) +calculateInterVsIntra <- function( + db, + clone, + vjl_gps, + junction = "junction", + cdr3 = FALSE, + cdr3_col = NA, + nproc = 1, + verbose = FALSE +) { + if (nproc == 1) { registerDoSEQ() - } else if( nproc > 1 ) { - cluster <- parallel::makeCluster(nproc, type="PSOCK") - registerDoParallel(cluster,cores=nproc) + } else if (nproc > 1) { + cluster <- parallel::makeCluster(nproc, type = "PSOCK") + registerDoParallel(cluster, cores = nproc) } else { stop('Nproc must be positive.') } - - ### export function to clusters + DNAMatrix_gap0 <- getDNAMatrix(gap = 0) - if (nproc > 1) { - export_functions <- list("pairwiseDist", "DNAMatrix_gap0", "uniqueSeq", "stri_split_fixed") - parallel::clusterExport(cluster, export_functions, envir=environment()) + seq_col <- if (cdr3) cdr3_col else junction + + # this index is later used to quickly get the rows of db corresponding to a clone + clone_to_idx <- split(seq_along(db[[clone]]), db[[clone]]) + + if (nproc > 1) { + parallel::clusterExport( + cluster, + list( + "pairwiseDist", + "DNAMatrix_gap0", + "stri_split_fixed", + "db", + "clone", + "seq_col", + "clone_to_idx" + ), + envir = environment() + ) } - - n_groups <- nrow(vjl_gps) - ### check the progress bar + + n_groups <- nrow(vjl_gps) if (verbose) { pb <- progressBar(n_groups) } - + k <- NULL - # open dataframes - vec_ff <- foreach(k=1:n_groups, - .combine="c", - .errorhandling='stop') %dopar% { - - # ********************************************************************************* - clones <- stringi::stri_split_fixed(vjl_gps$clone_id[k], ",")[[1]] - l <- vjl_gps$junction_length[k] - n_clones <- length(clones) - in_clones <- db[[clone]] %in% clones - seqs <- db[[ifelse(cdr3, cdr3_col, junction)]][in_clones] - names(seqs) <- db[[clone]][in_clones] - seqs <- uniqueSeq(seqs) - ### calculate distance matrix among all seqs - dist_mtx <- pairwiseDist(seqs, dist_mat=DNAMatrix_gap0) - ### prealoocate a vector = no. of max-dist in each clone (intra) + no. of min-dist between clones (inter) - nrow_f <- n_clones + n_clones*(n_clones-1)/2 - vec_f <- rep(NA, nrow_f) - ### calculate minimum and maximum distance in each clone - n <- 0 - if (n_clones == 1) { - n <- n + 1 - vec_f[n] <- max(dist_mtx)/l - names(vec_f)[n] <- paste(clones[1], "NA", "intra", sep="_") - } else { - for (i in 1:(n_clones-1)) { - xx <- dist_mtx[rownames(dist_mtx) == clones[i], colnames(dist_mtx) == clones[i]] - n <- n + 1 - vec_f[n] <- max(xx)/l - names(vec_f)[n] <- paste(clones[i], "NA", "intra", sep="_") - for (j in (i+1):n_clones) { - xy <- dist_mtx[rownames(dist_mtx) == clones[i], colnames(dist_mtx) == clones[j]] - n <- n + 1 - vec_f[n] <- min(xy)/l - names(vec_f)[n] <- paste(clones[i], clones[j], "inter", sep="_") - } - } - yy <- dist_mtx[rownames(dist_mtx) == clones[j], colnames(dist_mtx) == clones[j]] - n <- n + 1 - vec_f[n] <- max(yy)/l - names(vec_f)[n] <- paste(clones[j], "NA", "intra", sep="_") - } - - # update progress - if (verbose) { pb$tick() } - - return(vec_f) - # ********************************************************************************* - } - - ### stop the cluster - if (nproc > 1) { parallel::stopCluster(cluster) } - - # convert to a data.frame - db_dff <- data.frame(keyName = names(vec_ff), - distance = vec_ff, - row.names=NULL) - db_dff$label <- "intra" - db_dff$label[grepl("inter", db_dff$keyName)] <- "inter" - clones_xy <- data.frame(matrix(unlist(stringi::stri_split_fixed(db_dff$keyName, "_", n=3)), - nrow=nrow(db_dff), - byrow=T), - stringsAsFactors=FALSE) - db_dff <- cbind(clones_xy, db_dff) - db_dff$keyName <- NULL - colnames(db_dff)[colnames(db_dff) == "X1"] <- "clone_id_x" - colnames(db_dff)[colnames(db_dff) == "X2"] <- "clone_id_y" - db_dff$X3 <- NULL - - # return results + db_dff <- foreach( + k = 1:n_groups, + .combine = "rbind", + .errorhandling = 'stop' + ) %dopar% + { + clones <- stringi::stri_split_fixed(vjl_gps$clone_id[k], ",")[[1]] + jlen <- vjl_gps$junction_length[k] + + # get te sequences and clone ids for all sequences in the group of clones + row_idx <- unlist(clone_to_idx[clones], use.names = FALSE) + seqs <- db[[seq_col]][row_idx] + cids <- db[[clone]][row_idx] + + # This dedup is faster + # than using a dataframe and calling distinct() + key <- paste0(cids, "\t", seqs) + keep <- !duplicated(key) + seqs <- seqs[keep] + names(seqs) <- cids[keep] + + dist_mtx <- pairwiseDist(seqs, dist_mat = DNAMatrix_gap0) + + rn <- rownames(dist_mtx) + clone_list <- unique(rn) + n_clones <- length(clone_list) + clone_idx <- split(seq_along(rn), rn)[clone_list] + + n_rows <- n_clones + n_clones * (n_clones - 1) %/% 2 + clone_x <- character(n_rows) + clone_y <- character(n_rows) + dists <- numeric(n_rows) + labels <- character(n_rows) + + r <- 0L + for (i in seq_along(clone_list)) { + ii <- clone_idx[[i]] + r <- r + 1L + clone_x[r] <- clone_list[i] + clone_y[r] <- NA_character_ + dists[r] <- max(dist_mtx[ii, ii, drop = FALSE]) / jlen + labels[r] <- "intra" + + if (i < n_clones) { + for (j in (i + 1L):n_clones) { + jj <- clone_idx[[j]] + r <- r + 1L + clone_x[r] <- clone_list[i] + clone_y[r] <- clone_list[j] + dists[r] <- min(dist_mtx[ii, jj, drop = FALSE]) / jlen + labels[r] <- "inter" + } + } + } + + if (verbose) { + pb$tick() + } + + data.frame( + clone_id_x = clone_x, + clone_id_y = clone_y, + distance = dists, + label = labels, + stringsAsFactors = FALSE + ) + } + + if (nproc > 1) { + parallel::stopCluster(cluster) + } return(db_dff) } # *****************************************************************************