-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.js
More file actions
113 lines (92 loc) · 2.68 KB
/
server.js
File metadata and controls
113 lines (92 loc) · 2.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
const dbName = "autobot";
const collSignalSourceName = "signal_source";
const MongoClient = require('mongodb').MongoClient;
const MongoOplog = require('mongo-oplog');
const assert = require('assert');
const dgram = require('dgram');
const dbUrlBegin = "mongodb://localhost:27017/";
const dbUrl = dbUrlBegin + dbName;
let signalSources = {};
let oplogSignalSources = 0;
function main() {
MongoClient.connect(dbUrl, {reconnectTries: 999, reconnectInterval: 5000}, (err, db) => {
if (err) {
console.log(err.message);
setTimeout(main, 5000);
return;
}
setTimeout(() => {
console.log("Connected successfully to mongodb server");
let signalSource = db.db(dbName).collection(collSignalSourceName);
signalSource.find().each((err, item) => {
if (item == null) {
beginSignalsListening();
} else {
let _id = item._id;
delete item._id;
signalSources[_id] = item;
}
});
db.on('reconnect', oplogStart);
db.on('timeout', oplogStop);
db.on('close', oplogStop);
}, 5000);
});
}
function oplogStart() {
let firstStart = (oplogSignalSources == 0);
if (firstStart) {
oplogSignalSources = MongoOplog(dbUrlBegin + "local", {ns: dbName + "." + collSignalSourceName});
}
oplogSignalSources.stop(() => {
oplogSignalSources.tail(() => {
//console.log("Oplog started");
});
});
if (firstStart) {
oplogSignalSources.on('update', (obj) => {
let doc = obj.o;
let _id = doc._id;
let ss = signalSources[_id];
if (doc.port != ss.port) {
const oldAddress = ss.socket.address();
ss.port = doc.port;
ss.socket.close(() => {
console.log(`Unbind from ${oldAddress.address}:${oldAddress.port}`);
listenFor(_id);
});
}
ss.ip = doc.ip;
ss.pwdhash = doc.pwdhash;
});
}
}
function oplogStop() {
if (!oplogSignalSources) {
return;
}
oplogSignalSources.stop(() => {
//console.log("Oplog stopped");
});
}
function listenFor(_id) {
const ss = signalSources[_id];
const socket = dgram.createSocket('udp4');
ss.socket = socket;
socket.bind(ss.port, () => {
const address = socket.address();
console.log(`Server listening ${address.address}:${address.port}`);
socket.on('message', (msg, rinfo) => {
console.log(`Server got to ${address.port}: ${msg} from ${rinfo.address}:${rinfo.port}`);
});
});
}
function beginSignalsListening() {
assert.notEqual(Object.keys(signalSources).length, 0, "Signal source list is empty");
oplogStart();
for (let _id in signalSources) {
listenFor(_id);
}
}
console.log("Starting server...");
main();