Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ RUN install -d -m 777 /usr/local/var/run/watchman

WORKDIR "/opt/mirror"
COPY --from=mirror-builder /tmp/mirror/mirror ./
COPY --from=mirror-builder /tmp/mirror/build/libs/mirror-all.jar ./
COPY --from=mirror-builder /tmp/mirror/build/libs/mirror.jar ./
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure these references are fixed in master; can you change back to the mirror-all?

RUN chmod a+s /usr/sbin/useradd /usr/sbin/groupadd
ADD docker/docker-entrypoint.sh docker-entrypoint.sh
ENTRYPOINT ["./docker-entrypoint.sh"]
3 changes: 1 addition & 2 deletions mirror
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
#!/bin/bash

SCRIPT_DIRECTORY="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
MAIN=mirror.Mirror
JAR=mirror-all.jar
JAR=mirror.jar
OPTS="-Xmx2G -XX:+HeapDumpOnOutOfMemoryError"

if [ -e ${SCRIPT_DIRECTORY}/${JAR} ]; then
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/mirror/Mirror.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public static class MirrorClientCommand extends BaseCommand {
@Option(name = { "-li", "--use-internal-patterns" }, description = "use hardcoded include/excludes that generally work well for internal repos")
public boolean useInternalPatterns;

@Option(name = { "-sd", "--sync-direction"}, description = "direction to sync files, defaults to \"BOTH\", allowed values: \"INBOUND\", \"OUTBOUND\", \"BOTH\"")
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, so INBOUND is from the perspective of the client, so means "download from server, but don't upload"? Wondering if something like FROM_SERVER or TO_SERVER would be clearer, but that sounds awkward.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess once we get into the MirrorSession side of things, we do need something that is flippable/complement-able, as you're doing.

public SyncDirection syncDirection = SyncDirection.BOTH;

@Override
protected void runIfChecksOkay() {
try {
Expand All @@ -193,7 +196,8 @@ protected void runIfChecksOkay() {
new ConnectionDetector.Impl(channelFactory),
watcherFactory,
new NativeFileAccess(Paths.get(localRoot).toAbsolutePath()),
channelFactory);
channelFactory,
syncDirection);
client.startSession();
// dumb way of waiting until they hit control-c
CountDownLatch cl = new CountDownLatch(1);
Expand Down
19 changes: 17 additions & 2 deletions src/main/java/mirror/MirrorClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MirrorClient {
private final FileWatcherFactory watcherFactory;
private final FileAccess fileAccess;
private final ChannelFactory channelFactory;
private final SyncDirection syncDirection;
private volatile TaskLogic sessionStarter;
private volatile MirrorSession session;

Expand All @@ -44,12 +45,24 @@ public MirrorClient(
FileWatcherFactory watcherFactory,
FileAccess fileAccess,
ChannelFactory channelFactory) {
this(paths, taskFactory, detector, watcherFactory, fileAccess, channelFactory, SyncDirection.BOTH);
}

public MirrorClient(
MirrorPaths paths,
TaskFactory taskFactory,
ConnectionDetector detector,
FileWatcherFactory watcherFactory,
FileAccess fileAccess,
ChannelFactory channelFactory,
SyncDirection syncDirection) {
this.paths = paths;
this.taskFactory = taskFactory;
this.detector = detector;
this.watcherFactory = watcherFactory;
this.fileAccess = fileAccess;
this.channelFactory = channelFactory;
this.syncDirection = syncDirection;
}

/** Connects to the server and starts a sync session. */
Expand All @@ -73,7 +86,7 @@ private void startSession(ChannelFactory channelFactory, CountDownLatch onFailur
return;
}

session = new MirrorSession(taskFactory, paths, fileAccess, watcherFactory);
session = new MirrorSession(taskFactory, paths, fileAccess, watcherFactory, syncDirection);
session.addStoppedCallback(channel::shutdownNow);
// Automatically re-connect when we're disconnected
session.addStoppedCallback(() -> {
Expand All @@ -98,7 +111,9 @@ private void startSession(ChannelFactory channelFactory, CountDownLatch onFailur
.setRemotePath(paths.remoteRoot.toString())
.setClientId(getClientId())
.setVersion(Mirror.getVersion())
.addAllState(localState);
.addAllState(localState)
.setAllowInbound(syncDirection.getAllowInbound())
.setAllowOutbound(syncDirection.getAllowOutbound());
paths.addParameters(req);
withTimeout(stub).initialSync(req.build(), new StreamObserver<InitialSyncResponse>() {
@Override
Expand Down
20 changes: 19 additions & 1 deletion src/main/java/mirror/MirrorServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,16 @@ public synchronized void initialSync(InitialSyncRequest request, StreamObserver<
sessions.get(sessionId).stop();
}

//This is the sync direction from the client's point of view. We need to use the complement to construct the session.
SyncDirection syncDirection = getSyncDirection(request);

log.info("Starting new session " + sessionId);
MirrorSession session = new MirrorSession(taskFactory, paths, fileAccessFactory.newFileAccess(paths.root.toAbsolutePath()), watcherFactory);
MirrorSession session = new MirrorSession(
taskFactory,
paths,
fileAccessFactory.newFileAccess(paths.root.toAbsolutePath()),
watcherFactory,
syncDirection.getComplement());

sessions.put(sessionId, session);
session.addStoppedCallback(() -> {
Expand Down Expand Up @@ -201,4 +209,14 @@ private void sendErrorIfClockDriftExists(TimeCheckRequest request, StreamObserve
}
responseObserver.onCompleted();
}

private SyncDirection getSyncDirection(InitialSyncRequest request) {
if (request.getAllowOutbound() && !request.getAllowInbound()) {
return SyncDirection.OUTBOUND;
}
if (!request.getAllowOutbound() && request.getAllowInbound()) {
return SyncDirection.INBOUND;
}
return SyncDirection.BOTH;
}
}
8 changes: 6 additions & 2 deletions src/main/java/mirror/MirrorSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,20 @@ public class MirrorSession {
private final FileWatcher fileWatcher;
private final UpdateTree tree;
private final SyncLogic syncLogic;
private final SyncDirection syncDirection;
private volatile SaveToRemote saveToRemote;
private volatile OutgoingConnection outgoingChanges;

public MirrorSession(TaskFactory taskFactory, MirrorPaths paths, FileAccess fileAccess, FileWatcherFactory fileWatcherFactory) {
public MirrorSession(TaskFactory taskFactory, MirrorPaths paths, FileAccess fileAccess, FileWatcherFactory fileWatcherFactory, SyncDirection syncDirection) {
this.fileAccess = fileAccess;
this.fileWatcher = fileWatcherFactory.newWatcher(paths, queues.incomingQueue);
this.tree = UpdateTree.newRoot(paths);
this.syncDirection = syncDirection;

// Run all our tasks in a pool so they are terminated together
taskPool = taskFactory.newTaskPool();

syncLogic = new SyncLogic(queues, fileAccess, tree);
syncLogic = new SyncLogic(queues, fileAccess, tree, syncDirection);
// started in diffAndStartPolling

saveToLocal = new SaveToLocal(queues, fileAccess);
Expand Down Expand Up @@ -79,6 +81,7 @@ public List<Update> calcInitialState() throws Exception {
// We've drained the initial state, so we can tell FileWatcher to start polling now.
// This will start filling up the queue, but not technically start processing/sending
// updates to the remote (see #startPolling).
// TODO: We don't need to watch files if we're inbound only - we do need the initial state list though
start(fileWatcher);

initialUpdates.forEach(u -> tree.addLocal(u));
Expand All @@ -93,6 +96,7 @@ public List<Update> calcInitialState() throws Exception {
seedRemote.add(n.restorePath(n.getLocal()));
}
});
// TODO: maybe interrupt the watcher here?
return seedRemote;
}

Expand Down
59 changes: 59 additions & 0 deletions src/main/java/mirror/SyncDirection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package mirror;

public enum SyncDirection {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protobuf supports enums:

https://developers.google.com/protocol-buffers/docs/proto#enum

If we put this enum in there, then you could just send the sync direction as-is over the wire, and not have to encode it/decode it as the separate allow inbound / allow outbound properties.

INBOUND {
@Override
public boolean getAllowInbound() {
return true;
}

@Override
public boolean getAllowOutbound() {
return false;
}

@Override
public SyncDirection getComplement() {
return OUTBOUND;
}
},
OUTBOUND {
@Override
public boolean getAllowInbound() {
return false;
}

@Override
public boolean getAllowOutbound() {
return true;
}

@Override
public SyncDirection getComplement() {
return INBOUND;
}
},
BOTH {
@Override
public boolean getAllowInbound() {
return true;
}

@Override
public boolean getAllowOutbound() {
return true;
}

@Override
public SyncDirection getComplement() {
return BOTH;
}
};


public abstract boolean getAllowInbound();

public abstract boolean getAllowOutbound();

public abstract SyncDirection getComplement();
}
6 changes: 4 additions & 2 deletions src/main/java/mirror/SyncLogic.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class SyncLogic implements TaskLogic {
private final Queues queues;
private final FileAccess fileAccess;
private final UpdateTree tree;
private final SyncDirection syncDirection;

public SyncLogic(Queues queues, FileAccess fileAccess, UpdateTree tree) {
public SyncLogic(Queues queues, FileAccess fileAccess, UpdateTree tree, SyncDirection syncDirection) {
this.queues = queues;
this.fileAccess = fileAccess;
this.tree = tree;
this.syncDirection = syncDirection;
}

@Override
Expand Down Expand Up @@ -95,7 +97,7 @@ private void handleUpdate(Update u) throws InterruptedException {
}

private void diff() throws InterruptedException {
DiffResults r = new UpdateTreeDiff(tree).diff();
DiffResults r = new UpdateTreeDiff(tree, syncDirection).diff();
for (Update u : r.saveLocally) {
queues.saveToLocal.put(u);
}
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/mirror/UpdateTreeDiff.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ public String toString() {
}

private final UpdateTree tree;
private final SyncDirection syncDirection;

public UpdateTreeDiff(UpdateTree tree) {
public UpdateTreeDiff(UpdateTree tree, SyncDirection syncDirection) {
this.tree = tree;
this.syncDirection = syncDirection;
}

public DiffResults diff() {
Expand All @@ -65,7 +67,7 @@ private void diff(DiffResults results, Node node) {
Update local = node.getLocal();
Update remote = node.getRemote();

if (node.isLocalNewer()) {
if (node.isLocalNewer() && syncDirection.getAllowOutbound()) {
if (!node.shouldIgnore()) {
debugIfEnabled(node, "isLocalNewer");
if (local.getDelete() && node.isParentDeleted()) {
Expand All @@ -75,7 +77,7 @@ private void diff(DiffResults results, Node node) {
}
}
node.setRemote(local);
} else if (node.isRemoteNewer()) {
} else if (node.isRemoteNewer() && syncDirection.getAllowInbound()) {
// if we were a directory, and this is now a file, do an explicit delete first
if (local != null && !node.isSameType() && !local.getDelete() && !remote.getDelete()) {
Update delete = local.toBuilder().setDelete(true).build();
Expand All @@ -99,6 +101,7 @@ private void diff(DiffResults results, Node node) {
// should rarely/never happen (although it did happen when a bug existed), but
// if the remote side sends over data that exactly matches what we already have,
// we won't save but, which is fine, but make sure we free it from memory
// This will also occur if the other side sends a message that our direction forbids (which shouldn't happen)
node.clearData();
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/main/proto/mirror.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ message InitialSyncRequest {
string remotePath = 1;
int64 currentTime = 7 [deprecated=true];
string version = 8;
bool allowInbound = 10;
bool allowOutbound = 11;
repeated string includes = 3;
repeated string excludes = 4;
repeated string debugPrefixes = 5;
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/mirror/MirrorSessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class MirrorSessionTest {
private final FileWatcherFactory fileWatcherFactory = Mockito.mock(FileWatcherFactory.class);
private final FileWatcher fileWatcher = Mockito.mock(FileWatcher.class);
private final StubTaskFactory taskFactory = new StubTaskFactory();
private final SyncDirection syncDirection = SyncDirection.BOTH;
private MirrorSession session;

@Before
Expand All @@ -33,7 +34,8 @@ public void before() throws Exception {
taskFactory,
new MirrorPaths(root, null, new PathRules("*.jar"), new PathRules(), false, new ArrayList<>()),
fileAccess,
fileWatcherFactory);
fileWatcherFactory,
syncDirection);
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/mirror/SyncLogicTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public class SyncLogicTest {
private final StubObserver<Update> outgoing = new StubObserver<>();
private final StubFileAccess fileAccess = new StubFileAccess();
private final UpdateTree tree = UpdateTree.newRoot();
private final SyncLogic l = new SyncLogic(queues, fileAccess, tree);
private final SyncDirection syncDirection = SyncDirection.BOTH;
private final SyncLogic l = new SyncLogic(queues, fileAccess, tree, syncDirection);

@Test
public void sendLocalChangeToRemote() throws Exception {
Expand Down
Loading