-
Notifications
You must be signed in to change notification settings - Fork 1
/
websocket.js
88 lines (84 loc) · 2.63 KB
/
websocket.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import websocketStream from "websocket-stream/stream";
import ram from "random-access-memory";
import hyperdrive from "@jimpick/hyperdrive-hyperdb-backend";
import hyperdiscovery from "hyperdiscovery";
import pump from "pump";
import dumpWriters from "./src/lib/dumpWriters";
let maxArchives = 100;
let archives = {};
export function cleanup() {
setInterval(function() {
const sortedArchives = Object.values(archives).sort(
(a, b) => a.lastAccess - b.lastAccess
);
sortedArchives.forEach((entry, index) => {
const { archive, lastAccess, clients } = entry;
const key = archive.key && archive.key.toString("hex");
const peers = archive.db.source.peers.length;
console.log(
` ${index} ${lastAccess} ${key} (${clients} clients, ${peers} peers)`
);
});
if (sortedArchives.length > maxArchives) {
for (let i = 0; i < sortedArchives.length - maxArchives; i++) {
const archive = sortedArchives[i].archive;
const key = archive.key && archive.key.toString("hex");
console.log(`Releasing ${i} ${key}`);
sortedArchives[i].cancel();
}
}
}, 60 * 1000);
}
export function cb(ws, req) {
let archiveKey = req.params.key;
console.log("Websocket initiated for", archiveKey);
let archive;
if (archives[archiveKey]) {
archive = archives[archiveKey].archive;
archives[archiveKey].lastAccess = Date.now();
} else {
archive = hyperdrive(ram, archiveKey);
archives[archiveKey] = {
archive,
lastAccess: Date.now(),
cancel,
clients: 0
};
archive.on("ready", () => {
console.log("archive ready");
// Join swarm
const sw = hyperdiscovery(archive);
archives[archiveKey].swarm = sw;
sw.on("connection", (peer, info) => {
console.log("Swarm connection", info);
});
const watcher = archive.db.watch(() => {
console.log("Archive updated:", archive.key.toString("hex"));
dumpWriters(archive);
});
watcher.on("error", err => {
console.error("Watcher error", err);
});
});
}
archive.ready(() => {
archives[archiveKey].clients += 1;
const stream = websocketStream(ws);
pump(
stream,
archive.replicate({ encrypt: false, live: true }),
stream,
err => {
console.log("pipe finished for " + archiveKey, err && err.message);
archives[archiveKey].clients -= 1;
}
);
});
function cancel() {
console.log(`Cancelling ${archiveKey}`);
const sw = archives[archiveKey].swarm;
if (sw) sw.close();
archive.db.source.peers.forEach(peer => peer.end());
delete archives[archiveKey];
}
}