Skip to content
This repository was archived by the owner on Aug 19, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractGraphCommitJob extends AbstractJob implements Runnable {
public abstract class AbstractGraphCommitJob extends AbstractJob<Void> {

private Logger logger = LoggerFactory.getLogger(AbstractGraphCommitJob.class);

Expand Down Expand Up @@ -60,7 +60,7 @@ public GraphMetadata.Id getDstGraphId() {
}

@Override
public void run() {
public Void call() {
logger.debug("Starting commit on "
+ srcGraphId
+ " to "
Expand Down Expand Up @@ -92,10 +92,12 @@ public void run() {
} catch (Throwable t) {
setJobState(jobId, JobMetadata.ERROR, t.getMessage());
logger.error("error running job: " + t.getMessage());
return;
return null;
}

logger.debug("finished job: " + jobId);

return null;
}

protected void copyIndices(DendriteGraph srcGraph, DendriteGraph dstGraph) {
Expand Down
16 changes: 6 additions & 10 deletions src/main/java/org/lab41/dendrite/jobs/AbstractGraphUpdateJob.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
package org.lab41.dendrite.jobs;

import com.thinkaurelius.titan.core.TitanTransaction;
import com.thinkaurelius.titan.core.attribute.FullDouble;
import com.tinkerpop.blueprints.Vertex;
import com.tinkerpop.blueprints.oupls.jung.GraphJung;
import com.tinkerpop.blueprints.util.wrappers.batch.BatchGraph;
import org.lab41.dendrite.jobs.AbstractJob;
import org.lab41.dendrite.metagraph.DendriteGraph;
import org.lab41.dendrite.metagraph.DendriteGraphBatchWrapper;
import org.lab41.dendrite.metagraph.DendriteGraphTx;
import org.lab41.dendrite.metagraph.MetaGraph;
import org.lab41.dendrite.metagraph.models.JobMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractGraphUpdateJob extends AbstractJob {
public abstract class AbstractGraphUpdateJob extends AbstractJob<Void> {

static Logger logger = LoggerFactory.getLogger(AbstractGraphUpdateJob.class);

Expand All @@ -26,7 +19,8 @@ public AbstractGraphUpdateJob(MetaGraph metaGraph, JobMetadata.Id jobId, Dendrit
this.graph = graph;
}

public void run() {
@Override
public Void call() throws Exception {
logger.debug("Starting " + getClass().getSimpleName() + " analysis on "
+ graph.getId()
+ " job " + jobId
Expand Down Expand Up @@ -59,9 +53,11 @@ public void run() {
}

logger.debug("Finished analysis on " + jobId);

return null;
}

protected abstract void updateGraph();
protected abstract void updateGraph() throws Exception;

protected void createIndices() { }

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/lab41/dendrite/jobs/AbstractJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractJob {
import java.util.concurrent.Callable;

public abstract class AbstractJob<V> implements Callable<V> {

static Logger logger = LoggerFactory.getLogger(AbstractJob.class);

Expand Down
75 changes: 75 additions & 0 deletions src/main/java/org/lab41/dendrite/jobs/ExportEdgeListJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2014 In-Q-Tel/Lab41
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.lab41.dendrite.jobs;

import com.tinkerpop.blueprints.Direction;
import com.tinkerpop.blueprints.Edge;
import com.tinkerpop.blueprints.Vertex;
import org.lab41.dendrite.metagraph.DendriteGraph;
import org.lab41.dendrite.metagraph.MetaGraph;
import org.lab41.dendrite.metagraph.models.JobMetadata;

import java.io.BufferedOutputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;

public class ExportEdgeListJob extends AbstractJob<Void> {

private static final String UTF8 = "UTF-8";
private static final byte[] TAB;
private static final byte[] NEWLINE;

static {
try {
TAB = "\t".getBytes(UTF8);
NEWLINE = "\n".getBytes(UTF8);
} catch (UnsupportedEncodingException e) {
throw new IllegalArgumentException("Can not find " + UTF8 + " encoding");
}
}

private final DendriteGraph graph;
private final OutputStream outputStream;

public ExportEdgeListJob(MetaGraph metaGraph, JobMetadata.Id jobId, DendriteGraph graph, OutputStream outputStream) {
super(metaGraph, jobId);

this.graph = graph;
this.outputStream = outputStream;
}

@Override
public Void call() throws Exception {

try (BufferedOutputStream bos = new BufferedOutputStream(outputStream)) {
for (Edge edge : graph.getEdges()) {
Vertex outVertex = edge.getVertex(Direction.OUT);
Vertex inVertex = edge.getVertex(Direction.IN);
String label = edge.getLabel();

bos.write(outVertex.getId().toString().getBytes(UTF8));
bos.write(TAB);
bos.write(inVertex.getId().toString().getBytes(UTF8));
bos.write(TAB);
bos.write(label.getBytes(UTF8));
bos.write(NEWLINE);
}
}

return null;
}
}
4 changes: 2 additions & 2 deletions src/main/java/org/lab41/dendrite/jobs/FaunusJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class FaunusJob extends AbstractJob implements Callable<Object> {
public class FaunusJob extends AbstractJob<Void> {

private enum State { ACTIVE, DONE, ERROR }

Expand All @@ -34,7 +34,7 @@ public FaunusJob(MetaGraph metaGraph, JobMetadata.Id jobId, FaunusPipeline faunu
}

@Override
public Object call() throws Exception {
public Void call() throws Exception {
FaunusCompiler compiler = faunusPipeline.getCompiler();
FaunusJobControl jobControl = new FaunusJobControl(faunusPipeline.getGraph(), compiler.getJobs());

Expand Down
113 changes: 113 additions & 0 deletions src/main/java/org/lab41/dendrite/jobs/snap/AbstractSnapJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright 2014 In-Q-Tel/Lab41
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.lab41.dendrite.jobs.snap;

import org.apache.commons.io.IOUtils;
import org.lab41.dendrite.jobs.AbstractGraphUpdateJob;
import org.lab41.dendrite.jobs.ExportEdgeListJob;
import org.lab41.dendrite.metagraph.DendriteGraph;
import org.lab41.dendrite.metagraph.MetaGraph;
import org.lab41.dendrite.metagraph.models.JobMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;

public abstract class AbstractSnapJob extends AbstractGraphUpdateJob {

private static final Logger logger = LoggerFactory.getLogger(AbstractSnapJob.class);

protected AbstractSnapJob(MetaGraph metaGraph, JobMetadata.Id jobId, DendriteGraph graph) {
super(metaGraph, jobId, graph);
}

@Override
protected void updateGraph() throws Exception {
Path tmpDir = Files.createTempDirectory("tmp");

try {
Path inputPath = tmpDir.resolve("input");

try {
runExport(inputPath);

Path outputPath = tmpDir.resolve("output");

try {
runSnap(inputPath, outputPath);

runImport(outputPath);
} finally {
Files.delete(outputPath);
}

} finally {
Files.delete(inputPath);
}

} finally {
Files.delete(tmpDir);
}
}

private void runExport(Path exportPath) throws Exception {
try (OutputStream os = new FileOutputStream(exportPath.toFile())) {
ExportEdgeListJob exportEdgeListJob = new ExportEdgeListJob(metaGraph, jobId, graph, os);
exportEdgeListJob.call();
}
}

private void runSnap(Path inputFile, Path outputFile) throws Exception {
String cmd = getSnapCommand(inputFile, outputFile);

logger.debug("running: " + cmd);

Process p = Runtime.getRuntime().exec(new String[]{"bash", "-c", cmd});

int exitStatus = p.waitFor();

logger.debug("snap finished with ", exitStatus);

if (exitStatus != 0) {
String stdout = IOUtils.toString(p.getInputStream());
String stderr = IOUtils.toString(p.getErrorStream());

throw new Exception("Snap process failed: [" + exitStatus + "]:\n" + stdout + "\n" + stderr);
}
}

protected abstract String getSnapCommand(Path inputFile, Path outputFile);

private void runImport(Path outputFile) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(outputFile.toFile()));
String line;

while ((line = br.readLine()) != null) {
// skip over any comments.
if (line.startsWith("#")) {
continue;
}

String[] parts = line.split("\t");
importLine(parts);
}
}

protected abstract void importLine(String[] parts);
}
99 changes: 99 additions & 0 deletions src/main/java/org/lab41/dendrite/jobs/snap/CentralityJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2014 In-Q-Tel/Lab41
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.lab41.dendrite.jobs.snap;

import com.thinkaurelius.titan.core.attribute.FullDouble;
import com.tinkerpop.blueprints.Vertex;
import org.lab41.dendrite.metagraph.DendriteGraph;
import org.lab41.dendrite.metagraph.MetaGraph;
import org.lab41.dendrite.metagraph.models.JobMetadata;

import java.nio.file.Path;

public class CentralityJob extends AbstractSnapJob {

private static final String DEGREE_KEY = "snapDegrees";
private static final String CLOSENESS_KEY = "snapCloseness";
private static final String BETWEENNESS_KEY = "snapBetweenness";
private static final String EIGENVECTOR_KEY = "snapEigenvector";
private static final String NETWORK_CONSTRAINT_KEY = "snapNetworkConstraint";
private static final String CLUSTERING_COEFFICIENT_KEY = "snapClusteringCoefficient";
private static final String PAGERANK_KEY = "snapPageRank";
private static final String HUB_SCORE_KEY = "snapHupScore";
private static final String AUTHORITY_SCORE_KEY = "snapAuthorityScore";

private final Path pathToExecutable;

public CentralityJob(MetaGraph metaGraph,
JobMetadata.Id jobId,
DendriteGraph graph,
Path pathToSnap) {
super(metaGraph, jobId, graph);

this.pathToExecutable = pathToSnap.resolve("centrality").resolve("centrality");
}

@Override
protected String getSnapCommand(Path inputFile, Path outputFile) {
return pathToExecutable.toString() +
" -i:" + inputFile +
" -o:" + outputFile;
}

@Override
protected void importLine(String[] parts) {
String id = parts[0];
double degree = Double.valueOf(parts[1]);
double closeness = Double.valueOf(parts[2]);
double betweenness = Double.valueOf(parts[3]);
double eigenvector = Double.valueOf(parts[4]);
double networkConstraint = Double.valueOf(parts[5]);
double clusteringCoefficient = Double.valueOf(parts[6]);
double pagerank = Double.valueOf(parts[7]);
double hubScore = Double.valueOf(parts[8]);
double authorityScore = Double.valueOf(parts[9]);

// feed snap output as input for updating each vertex
Vertex vertex = graph.getVertex(id);
vertex.setProperty(DEGREE_KEY, degree);
vertex.setProperty(CLOSENESS_KEY, closeness);
vertex.setProperty(BETWEENNESS_KEY, betweenness);
vertex.setProperty(EIGENVECTOR_KEY, eigenvector);
vertex.setProperty(NETWORK_CONSTRAINT_KEY, networkConstraint);
vertex.setProperty(CLUSTERING_COEFFICIENT_KEY, clusteringCoefficient);
vertex.setProperty(PAGERANK_KEY, pagerank);
vertex.setProperty(HUB_SCORE_KEY, hubScore);
vertex.setProperty(AUTHORITY_SCORE_KEY, authorityScore);
}

@Override
protected void createIndices() {
createVertexIndex(DEGREE_KEY, FullDouble.class);
createVertexIndex(CLOSENESS_KEY, FullDouble.class);
createVertexIndex(BETWEENNESS_KEY, FullDouble.class);
createVertexIndex(EIGENVECTOR_KEY, FullDouble.class);
createVertexIndex(NETWORK_CONSTRAINT_KEY, FullDouble.class);
createVertexIndex(CLUSTERING_COEFFICIENT_KEY, FullDouble.class);
createVertexIndex(PAGERANK_KEY, FullDouble.class);
createVertexIndex(HUB_SCORE_KEY, FullDouble.class);
createVertexIndex(AUTHORITY_SCORE_KEY, FullDouble.class);
createVertexIndex(DEGREE_KEY, FullDouble.class);
createVertexIndex(DEGREE_KEY, FullDouble.class);
createVertexIndex(DEGREE_KEY, FullDouble.class);
createVertexIndex(DEGREE_KEY, FullDouble.class);
}
}
Loading