-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDatastore.java
More file actions
164 lines (136 loc) · 4.31 KB
/
Datastore.java
File metadata and controls
164 lines (136 loc) · 4.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import java.net.Socket;
import java.util.Collections;
import java.util.HashSet;
public class Datastore {
private int port;
private boolean index;
private Socket socket;
private HashSet<String> fileNames;
private int rebalanceCount = 0;
private HashSet<RebalanceFile> sendFiles = new HashSet<>();
private HashSet<String> keepFiles = new HashSet<>();
private HashSet<String> removeFiles = new HashSet<>();
public Datastore(int port, boolean index, Socket socket, HashSet<String> fileNames) {
this.port = port;
this.index = index;
this.socket = socket;
this.fileNames = fileNames;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public boolean getIndex() {
return index;
}
public void setIndex(boolean index) {
this.index = index;
}
public Socket getSocket() {
return socket;
}
public void setSocket(Socket socket) {
this.socket = socket;
}
public HashSet<String> getFileNames() {
return fileNames;
}
public void setFileNames(HashSet<String> fileNames) {
this.fileNames = fileNames;
}
public void setFileNames(String[] fileNames) {
this.fileNames = new HashSet<>();
if (!fileNames[0].equals("")) {
Collections.addAll(this.fileNames, fileNames);
}
}
public void addFileName(String fileName) {
this.fileNames.add(fileName);
}
public void removeFileName(String fileName) {
this.fileNames.remove(fileName);
}
public void newRebalance() {
rebalanceCount = 0;
sendFiles = new HashSet<>();
keepFiles = new HashSet<>();
removeFiles = new HashSet<>();
removeFiles.addAll(fileNames);
}
public void newSend(String fileName) {
sendFiles.add(new RebalanceFile(fileName, port));
rebalanceCount++;
}
public HashSet<RebalanceFile> getSendFiles() {
return sendFiles;
}
public HashSet<String> getKeepFiles() {
return keepFiles;
}
public HashSet<String> getRemoveFiles() {
return removeFiles;
}
public void removeRemoveFile(String removeFile) {
removeFiles.remove(removeFile);
}
public boolean containsSendFile(String fileName) {
for (RebalanceFile sendFile : sendFiles) {
if (sendFile.getFileName().equals(fileName)) {
return true;
}
}
return false;
}
public void newKeep(String fileName) {
keepFiles.add(fileName);
rebalanceCount++;
}
public void newReceive() {
rebalanceCount++;
}
public int getRebalanceCount() {
return rebalanceCount;
}
public boolean alreadyStores(String fileName) {
return fileNames.contains(fileName);
}
public String finishRebalance() {
// identifies files to remove
HashSet<String> toRemove = new HashSet<>();
for (String removeFile : removeFiles) {
boolean remove = true;
for (RebalanceFile sendFile : sendFiles) {
if (sendFile.getFileName().equals(removeFile)) {
remove = false;
break;
}
}
if (!remove || keepFiles.contains(removeFile)) {
toRemove.add(removeFile);
}
}
for (String removeFile : toRemove) {
removeFiles.remove(removeFile);
}
// creates rebalance string
StringBuilder portsMsg = new StringBuilder();
int sendCount = sendFiles.size();
for (RebalanceFile sendFile : sendFiles) {
if (sendFile.getDestinationPorts().size() > 0) {
portsMsg.append(" ").append(sendFile.getFileName()).append(" ").append(sendFile.getDestinationPorts().size());
} else {
sendCount--;
}
for (Integer destinationPort : sendFile.getDestinationPorts()) {
portsMsg.append(" ").append(destinationPort);
}
}
portsMsg.append(" ").append(removeFiles.size());
for (String removeFile : removeFiles) {
portsMsg.append(" ").append(removeFile);
}
return "REBALANCE" + " " + sendCount + portsMsg;
}
}