access-watch
Advanced tools
Comparing version 0.8.3 to 0.10.1
{ | ||
"name": "access-watch", | ||
"version": "0.8.3", | ||
"version": "0.10.1", | ||
"description": "Open Source Web Traffic Manager", | ||
@@ -31,12 +31,13 @@ "repository": { | ||
"access-watch-sdk": "^0.1.4", | ||
"access-watch-ui": "^1.4.1", | ||
"ajv": "^5.5.2", | ||
"axios": "^0.17.0", | ||
"access-watch-ui": "^1.7.1", | ||
"ajv": "^6.1.1", | ||
"axios": "^0.18.0", | ||
"date-format-lite": "^17.7.0", | ||
"ejs": "^2.5.7", | ||
"elasticsearch": "^13.3.1", | ||
"elasticsearch": "^14.1.0", | ||
"express": "^4.16.2", | ||
"express-ws": "^3.0.0", | ||
"fs-extra": "^5.0.0", | ||
"immutable": "^4.0.0-rc.9", | ||
"lodash.merge": "^4.6.0", | ||
"lodash.merge": "^4.6.1", | ||
"lodash.omit": "^4.5.0", | ||
@@ -47,3 +48,3 @@ "lru-cache": "^4.1.1", | ||
"proxy-addr": "^2.0.2", | ||
"rc": "^1.2.4", | ||
"rc": "^1.2.5", | ||
"syslog-parse": "^1.3.1", | ||
@@ -53,10 +54,10 @@ "tail": "^1.2.3", | ||
"uuid": "^3.2.1", | ||
"ws": "^3.3.3" | ||
"ws": "^4.0.0" | ||
}, | ||
"devDependencies": { | ||
"eslint": "^4.16.0", | ||
"eslint": "^4.18.0", | ||
"eslint-config-prettier": "^2.9.0", | ||
"eslint-plugin-node": "^5.2.1", | ||
"eslint-plugin-prettier": "^2.5.0", | ||
"mocha": "^4.1.0", | ||
"eslint-plugin-node": "^6.0.0", | ||
"eslint-plugin-prettier": "^2.6.0", | ||
"mocha": "^5.0.1", | ||
"prettier": "^1.10.2" | ||
@@ -63,0 +64,0 @@ }, |
 | ||
[](https://travis-ci.org/access-watch/access-watch) | ||
[](https://greenkeeper.io/) | ||
[](http://slack.access.watch/) | ||
@@ -5,0 +6,0 @@ |
const express = require('express'); | ||
const pipeline = require('../lib/pipeline'); | ||
const monitoring = require('../lib/monitoring'); | ||
@@ -10,5 +10,5 @@ const app = express(); | ||
app.get('/monitoring', (req, res) => { | ||
res.send(pipeline.monitoring()); | ||
res.send(monitoring.getAllComputed()); | ||
}); | ||
module.exports = app; |
@@ -34,2 +34,3 @@ const path = require('path'); | ||
}, | ||
session: config.session, | ||
}, | ||
@@ -36,0 +37,0 @@ config.ui |
const express = require('express'); | ||
const expressWs = require('express-ws'); | ||
const uuid = require('uuid/v4'); | ||
const monitoring = require('../lib/monitoring'); | ||
@@ -8,4 +9,25 @@ const app = express(); | ||
app.streamToWebsocket = (endpoint, stream) => { | ||
app.streamToWebsocket = ( | ||
endpoint, | ||
stream, | ||
{ name = `WebSocket: ${endpoint}`, monitoringEnabled = false } = {} | ||
) => { | ||
const clients = {}; | ||
let monitor; | ||
if (monitoringEnabled) { | ||
monitor = monitoring.registerOutput({ name }); | ||
} | ||
const updateMonitoringStatus = () => { | ||
if (monitor) { | ||
const clientsSize = Object.keys(clients).length; | ||
if (clientsSize) { | ||
monitor.status = `${clientsSize} client${ | ||
clientsSize > 1 ? 's' : '' | ||
} listening on #API_WEBSOCKET_URL#${endpoint}`; | ||
} else { | ||
monitor.status = `Waiting for clients on #API_WEBSOCKET_URL#${endpoint}`; | ||
} | ||
} | ||
}; | ||
updateMonitoringStatus(); | ||
@@ -15,4 +37,6 @@ app.ws(endpoint, client => { | ||
clients[clientId] = client; | ||
updateMonitoringStatus(); | ||
client.on('close', () => { | ||
delete clients[clientId]; | ||
updateMonitoringStatus(); | ||
}); | ||
@@ -24,2 +48,5 @@ }); | ||
if (client.readyState === 1 /* === WebSocket.OPEN */) { | ||
if (monitor) { | ||
monitor.hit(); | ||
} | ||
client.send(JSON.stringify(log)); | ||
@@ -26,0 +53,0 @@ } |
@@ -6,2 +6,3 @@ const rc = require('rc'); | ||
port: 3000, | ||
app: {}, | ||
pipeline: { | ||
@@ -14,2 +15,3 @@ allowedLateness: 60, | ||
directory: path.resolve(__dirname, '../data'), | ||
saveInterval: 60 * 60 * 1000, | ||
}, | ||
@@ -34,2 +36,3 @@ metrics: { | ||
}, | ||
timerange: false, | ||
}, | ||
@@ -55,5 +58,8 @@ hub: { | ||
}, | ||
robots: { | ||
timerange: false, | ||
}, | ||
}, | ||
elasticsearch: { | ||
retention: 5, | ||
retention: 7, | ||
logsIndexName: 'access-watch-logs', | ||
@@ -60,0 +66,0 @@ configuration: {}, |
@@ -52,3 +52,3 @@ const elasticsearch = require('elasticsearch'); | ||
} catch (err) { | ||
pipeline.error(err); | ||
pipeline.reject(err); | ||
} | ||
@@ -55,0 +55,0 @@ }); |
@@ -25,7 +25,7 @@ const Tail = require('tail').Tail; | ||
} catch (err) { | ||
pipeline.error(err); | ||
pipeline.reject(err); | ||
} | ||
}); | ||
tail.on('error', err => { | ||
pipeline.error(err); | ||
pipeline.log(err, 'error'); | ||
}); | ||
@@ -32,0 +32,0 @@ tail.watch(); |
@@ -18,3 +18,3 @@ const { fromJS } = require('immutable'); | ||
} catch (err) { | ||
pipeline.error(err); | ||
pipeline.reject(err); | ||
} | ||
@@ -21,0 +21,0 @@ }); |
@@ -60,2 +60,3 @@ const dgram = require('dgram'); | ||
}) { | ||
let udpServer, tcpServer; | ||
return { | ||
@@ -71,12 +72,22 @@ name: name, | ||
} catch (err) { | ||
pipeline.error(err); | ||
pipeline.reject(err); | ||
} | ||
}; | ||
if (!protocol || protocol === 'udp') { | ||
createUdpServer({ pipeline, name, port, handler }); | ||
udpServer = createUdpServer({ pipeline, name, port, handler }); | ||
} | ||
if (!protocol || protocol === 'tcp') { | ||
createTcpServer({ pipeline, name, port, handler }); | ||
tcpServer = createTcpServer({ pipeline, name, port, handler }); | ||
} | ||
}, | ||
stop: () => { | ||
const promises = []; | ||
if (udpServer) { | ||
promises.push(new Promise(resolve => udpServer.close(resolve))); | ||
} | ||
if (tcpServer) { | ||
promises.push(new Promise(resolve => tcpServer.close(resolve))); | ||
} | ||
return Promise.all(promises); | ||
}, | ||
}; | ||
@@ -83,0 +94,0 @@ } |
@@ -27,3 +27,3 @@ const syslogParse = require('syslog-parse'); | ||
} catch (err) { | ||
pipeline.error(err); | ||
pipeline.reject(err); | ||
} | ||
@@ -30,0 +30,0 @@ }; |
@@ -20,3 +20,3 @@ const WebSocket = require('ws'); | ||
} catch (err) { | ||
pipeline.error(err); | ||
pipeline.reject(err); | ||
} | ||
@@ -26,4 +26,4 @@ }); | ||
const setupClientWebSocket = ({ pipeline, address, listenSocket }) => { | ||
let socket = new WebSocket(address); | ||
const setupClientWebSocket = ({ pipeline, address, options, listenSocket }) => { | ||
let socket = new WebSocket(address, [], options); | ||
pipeline.status(null, 'Waiting for connection to ' + address); | ||
@@ -40,2 +40,3 @@ socket.on('open', () => { | ||
listenSocket(socket); | ||
return socket; | ||
}; | ||
@@ -52,2 +53,3 @@ | ||
path, | ||
options, | ||
type = 'client', | ||
@@ -57,2 +59,3 @@ parse, | ||
}) { | ||
let client; | ||
return { | ||
@@ -63,3 +66,8 @@ name: `${name} ${type}`, | ||
if (type === 'client') { | ||
setupClientWebSocket({ pipeline, address, listenSocket }); | ||
client = setupClientWebSocket({ | ||
pipeline, | ||
address, | ||
options, | ||
listenSocket, | ||
}); | ||
} else if (type === 'server') { | ||
@@ -69,5 +77,8 @@ setupServerWebSocket({ pipeline, path, listenSocket }); | ||
const errMsg = 'WebSocket type is either client or server'; | ||
pipeline.error(new Error(errMsg), errMsg); | ||
pipeline.log(new Error(errMsg), 'error'); | ||
} | ||
}, | ||
stop: () => { | ||
if (client) client.close(); | ||
}, | ||
}; | ||
@@ -74,0 +85,0 @@ } |
@@ -5,3 +5,3 @@ /** | ||
const path = require('path'); | ||
const fs = require('fs'); | ||
const fs = require('fs-extra'); | ||
@@ -22,2 +22,3 @@ const config = require('../constants'); | ||
clearInterval(this.gc); | ||
return Promise.resolve(); | ||
} | ||
@@ -52,6 +53,22 @@ } | ||
/** | ||
* Write the Javascript value as a JSON file at `path`. | ||
* Write `data` in JSON in of the file at `path` | ||
*/ | ||
function writeJSONFile(path, value) { | ||
fs.writeFileSync(path, JSON.stringify(value, null, 2)); | ||
function writeJSONFile(path, data) { | ||
return fs | ||
.writeJson(path + '.new', data) | ||
.then(() => | ||
fs.pathExists(path).then(exists => { | ||
if (exists) fs.rename(path, path + '.previous'); | ||
}) | ||
) | ||
.then(() => | ||
fs.pathExists(path + '.new').then(exists => { | ||
if (exists) fs.rename(path + '.new', path); | ||
}) | ||
) | ||
.then(() => | ||
fs.pathExists(path + '.previous').then(exists => { | ||
if (exists) fs.unlink(path + '.previous'); | ||
}) | ||
); | ||
} | ||
@@ -64,10 +81,30 @@ | ||
constructor(name, Klass, gcInterval) { | ||
const start = process.hrtime(); | ||
const filePath = path.resolve(config.data.directory, name + '.json'); | ||
super(Klass.deserialize(readJSONFile(filePath)), gcInterval); | ||
const data = readJSONFile(filePath); | ||
const db = Klass.deserialize(data); | ||
super(db, gcInterval); | ||
const end = process.hrtime(start); | ||
const elapsed = end[0] + Math.round(end[1] / 1000000) / 1000; | ||
console.log(`Loaded ${filePath} in ${elapsed}s`); | ||
this.filePath = filePath; | ||
this.saveInterval = setInterval( | ||
() => this.save().catch(console.error), | ||
config.data.saveInterval | ||
); | ||
} | ||
save() { | ||
const start = process.hrtime(); | ||
const data = this.db.serialize(); | ||
return writeJSONFile(this.filePath, data).then(() => { | ||
const end = process.hrtime(start); | ||
const elapsed = end[0] + Math.round(end[1] / 1000000) / 1000; | ||
console.log(`Saved ${this.filePath} in ${elapsed}s`); | ||
}); | ||
} | ||
close() { | ||
super.close(); | ||
writeJSONFile(this.filePath, this.db.serialize()); | ||
clearInterval(this.saveInterval); | ||
return Promise.all([super.close(), this.save()]); | ||
} | ||
@@ -109,6 +146,8 @@ } | ||
function close() { | ||
const promises = []; | ||
for (var name in connections) { | ||
connections[name].close(); | ||
promises.push(connections[name].close()); | ||
delete connections[name]; | ||
} | ||
return Promise.all(promises); | ||
} | ||
@@ -115,0 +154,0 @@ |
@@ -15,3 +15,3 @@ /** | ||
const { now, complement, iso } = require('./util'); | ||
const { Speed } = require('./speed'); | ||
const monitoring = require('./monitoring'); | ||
@@ -24,25 +24,7 @@ const config = require('../constants'); | ||
// Basic "error stream", we might want to do something more sophisticated here | ||
const onError = error => { | ||
console.log(error); | ||
if (error.has('reason')) { | ||
console.trace(error.get('reason')); | ||
function log(log, severity = 'warn') { | ||
console[severity](log); | ||
if (severity === 'error') { | ||
console.trace(log); | ||
} | ||
}; | ||
/** | ||
* Build an error event and send it to the error stream. | ||
* | ||
* Optionally takes the `event` that caused the error. | ||
*/ | ||
function error(reason, event) { | ||
let errorEvent = Map({ | ||
name: 'error', | ||
time: now(), | ||
reason: reason, | ||
}); | ||
if (event) { | ||
errorEvent = errorEvent.set('event', event); | ||
} | ||
onError(errorEvent); | ||
} | ||
@@ -69,3 +51,3 @@ | ||
} catch (reason) { | ||
error(reason, event); | ||
log(reason, 'warn'); | ||
} | ||
@@ -93,3 +75,3 @@ } | ||
.then(value => forward(stream, event.set('data', value))) | ||
.catch(reason => error(reason, event)); | ||
.catch(reason => log(reason, 'warn')); | ||
} else { | ||
@@ -304,2 +286,3 @@ forward(stream, event.set('data', res)); | ||
this.stream = null; | ||
this.log = log; | ||
} | ||
@@ -309,13 +292,9 @@ | ||
this.inputs.push(input); | ||
this.monitors.push({ | ||
accepted: { | ||
per_minute: new Speed(60, 15), | ||
per_hour: new Speed(3600, 24), | ||
}, | ||
rejected: { | ||
per_minute: new Speed(60, 15), | ||
per_hour: new Speed(3600, 24), | ||
}, | ||
status: 'Not started', | ||
}); | ||
this.monitors.push( | ||
monitoring.register({ | ||
speeds: ['accepted', 'rejected'], | ||
name: input.name, | ||
type: 'input', | ||
}) | ||
); | ||
} | ||
@@ -327,2 +306,3 @@ | ||
this.inputs.map(function(input, idx) { | ||
const monitor = pipeline.monitors[idx]; | ||
input.start({ | ||
@@ -338,19 +318,15 @@ success: function(log) { | ||
}); | ||
pipeline.monitors[idx].accepted.per_minute.hit(event.get('time')); | ||
pipeline.monitors[idx].accepted.per_hour.hit(event.get('time')); | ||
monitor.hit('accepted', event.get('time')); | ||
pipeline.handleEvent(event); | ||
} else { | ||
pipeline.monitors[idx].rejected.per_minute.hit(now()); | ||
pipeline.monitors[idx].rejected.per_hour.hit(now()); | ||
pipeline.handleError( | ||
new Error( | ||
`Invalid message: ${validator.errorsText(validate.errors)}` | ||
) | ||
monitor.hit('rejected'); | ||
pipeline.log( | ||
`Invalid message: ${validator.errorsText(validate.errors)}`, | ||
'warn' | ||
); | ||
} | ||
}, | ||
error: function(error) { | ||
pipeline.monitors[idx].rejected.per_minute.hit(now()); | ||
pipeline.monitors[idx].rejected.per_hour.hit(now()); | ||
pipeline.handleError(error); | ||
reject: function(reason) { | ||
monitor.hit('rejected'); | ||
pipeline.log(reason, 'warn'); | ||
}, | ||
@@ -362,4 +338,5 @@ status: function(err, msg) { | ||
console.log(input.name + ': ' + msg); | ||
pipeline.monitors[idx].status = msg; | ||
monitor.status = msg; | ||
}, | ||
log: pipeline.log, | ||
}); | ||
@@ -369,22 +346,13 @@ }); | ||
stop() { | ||
return Promise.all( | ||
this.inputs.filter(input => input.stop).map(input => input.stop()) | ||
); | ||
} | ||
handleEvent(event) { | ||
forward(this.stream, event); | ||
} | ||
handleError(err) { | ||
error(err); | ||
} | ||
monitoring() { | ||
return this.monitors.map((monitor, idx) => { | ||
return Map(monitor) | ||
.set('name', this.inputs[idx].name) | ||
.updateIn(['accepted', 'per_minute'], s => s.compute()) | ||
.updateIn(['accepted', 'per_hour'], s => s.compute()) | ||
.updateIn(['rejected', 'per_minute'], s => s.compute()) | ||
.updateIn(['rejected', 'per_hour'], s => s.compute()); | ||
}); | ||
} | ||
} | ||
module.exports = new Pipeline(); |
@@ -9,2 +9,3 @@ const config = require('../constants'); | ||
config.logs.memory.retention = 0; | ||
config.session.timerange = true; | ||
@@ -17,1 +18,31 @@ stream.map(elasticsearch.index); | ||
}); | ||
const transformSession = sessionName => session => ({ | ||
count: session.count, | ||
reputation: session.reputation, | ||
id: session.id, | ||
[sessionName]: session, | ||
}); | ||
const searchFns = { | ||
robot: elasticsearch.searchRobots, | ||
address: elasticsearch.searchAddresses, | ||
}; | ||
app.get('/sessions/:type', (req, res) => { | ||
const { params, query } = req; | ||
const { start, end, filter } = query; | ||
const { type } = params; | ||
if (start && end) { | ||
const esQuery = { start, end }; | ||
if (filter) { | ||
const [key, value] = filter.split(':'); | ||
esQuery[key] = value; | ||
} | ||
if (searchFns[type]) { | ||
searchFns[type](esQuery).then(sessions => | ||
res.send(sessions.map(transformSession(type))) | ||
); | ||
} | ||
} | ||
}); |
const app = require('../apps/websocket'); | ||
const rawStream = require('../lib/pipeline'); | ||
const pipeline = require('../lib/pipeline'); | ||
const memoryIndex = require('../plugins/memory-logs'); | ||
const { logIsAugmented } = require('../lib/util'); | ||
app.streamToWebsocket('/logs/raw', rawStream); | ||
// Expose raw logs | ||
app.streamToWebsocket('/logs/raw', pipeline, { | ||
name: 'WebSocket: raw logs', | ||
monitoringEnabled: true, | ||
}); | ||
const { stream: augmentedStream } = require('../pipeline/augmented'); | ||
app.streamToWebsocket('/logs/augmented', augmentedStream); | ||
// Expose augmented logs | ||
app.streamToWebsocket('/logs/augmented', augmentedStream, { | ||
name: 'WebSocket: augmented logs', | ||
monitoringEnabled: true, | ||
}); | ||
// Expose augmented logs for dashboard | ||
app.streamToWebsocket('/logs', augmentedStream.filter(logIsAugmented)); | ||
// Keep logs in memory and expose as API endpoint | ||
augmentedStream.map(memoryIndex.index); | ||
@@ -15,0 +29,0 @@ |
@@ -9,2 +9,4 @@ const { Map } = require('immutable'); | ||
const config = require('../constants'); | ||
/** | ||
@@ -110,11 +112,15 @@ * Assign a session to the log event, create it if necessary. | ||
app.get('/sessions/:type', (req, res) => { | ||
res.send( | ||
session.list({ | ||
type: req.params.type, | ||
sort: req.query.sort || 'count', | ||
filter: (req.query.filter && parseFilter(req.query, 'filter')) || null, | ||
limit: (req.query.limit && parseInt(req.query.limit)) || 100, | ||
}) | ||
); | ||
app.get('/sessions/:type', (req, res, next) => { | ||
if (config.session.timerange && req.query.start && req.query.end) { | ||
next(); | ||
} else { | ||
res.send( | ||
session.list({ | ||
type: req.params.type, | ||
sort: req.query.sort || 'count', | ||
filter: (req.query.filter && parseFilter(req.query, 'filter')) || null, | ||
limit: (req.query.limit && parseInt(req.query.limit)) || 100, | ||
}) | ||
); | ||
} | ||
}); | ||
@@ -121,0 +127,0 @@ |
require('date-format-lite'); | ||
const omit = require('lodash.omit'); | ||
const elasticsearch = require('elasticsearch'); | ||
const { database } = require('access-watch-sdk'); | ||
const logsIndexConfig = require('./logs-index-config.json'); | ||
const config = require('../../constants'); | ||
const monitoring = require('../../lib/monitoring'); | ||
const { iso } = require('../../lib/util'); | ||
const monitor = monitoring.registerOutput({ name: 'Elasticsearch' }); | ||
const { logsIndexName, retention } = config.elasticsearch; | ||
const accessWatchSdkDatabase = database(); | ||
@@ -20,12 +26,22 @@ const generateIndexName = date => | ||
const reportOnError = promise => | ||
promise.catch(e => { | ||
// Remove the eventual Error: that might come from the error from ES | ||
const errString = e.message.replace('Error :', ''); | ||
monitor.status = `Error: ${errString}`; | ||
console.error(`Elasticsearch error: ${errString}`); | ||
}); | ||
const createIndexIfNotExists = client => index => { | ||
if (!indexesDb[index]) { | ||
indexesDb[index] = client.indices.exists({ index }).then(exists => { | ||
if (!exists) { | ||
return client.indices.create({ | ||
index, | ||
body: logsIndexConfig, | ||
}); | ||
} | ||
}); | ||
indexesDb[index] = reportOnError( | ||
client.indices.exists({ index }).then(exists => { | ||
if (!exists) { | ||
return client.indices.create({ | ||
index, | ||
body: logsIndexConfig, | ||
}); | ||
} | ||
}) | ||
); | ||
} | ||
@@ -41,8 +57,15 @@ return indexesDb[index]; | ||
createIndexIfNotExists(client)(index).then(() => { | ||
client.index({ | ||
index, | ||
type: 'log', | ||
routing: log.getIn(['address', 'value']), | ||
body: log.toJS(), | ||
}); | ||
reportOnError( | ||
client | ||
.index({ | ||
index, | ||
type: 'log', | ||
routing: log.getIn(['address', 'value']), | ||
body: log.toJS(), | ||
}) | ||
.then(() => { | ||
monitor.status = 'Connected and indexing'; | ||
monitor.hit(); | ||
}) | ||
); | ||
}); | ||
@@ -57,26 +80,30 @@ } else { | ||
const indexesGc = client => () => { | ||
client.indices.get({ index: '*' }).then(indices => { | ||
Object.keys(indices) | ||
.filter(i => i.indexOf(logsIndexName) !== -1) | ||
.forEach(index => { | ||
const indexDate = new Date(getIndexDate(index)); | ||
const gcDate = getGcDate(); | ||
if (indexDate.getTime() < gcDate.getTime()) { | ||
client.indices.delete({ index }); | ||
} | ||
}); | ||
}); | ||
reportOnError( | ||
client.indices.get({ index: '*' }).then(indices => { | ||
Object.keys(indices) | ||
.filter(i => i.indexOf(logsIndexName) !== -1) | ||
.forEach(index => { | ||
const indexDate = new Date(getIndexDate(index)); | ||
const gcDate = getGcDate(); | ||
if (indexDate.getTime() < gcDate.getTime()) { | ||
reportOnError(client.indices.delete({ index })); | ||
} | ||
}); | ||
}) | ||
); | ||
}; | ||
const reservedSearchTerms = ['start', 'end', 'limit']; | ||
const reservedSearchTerms = ['start', 'end', 'limit', 'aggs']; | ||
const searchLogs = client => (query = {}) => { | ||
const { start, end, limit: size } = query; | ||
const search = client => (query = {}) => { | ||
const { start, end, limit: size = 50, aggs } = query; | ||
const queryMatch = omit(query, reservedSearchTerms); | ||
let bool = { | ||
filter: { | ||
exists: { | ||
field: 'identity.id', | ||
filter: [ | ||
{ | ||
exists: { | ||
field: 'identity.id', | ||
}, | ||
}, | ||
}, | ||
], | ||
}; | ||
@@ -109,12 +136,17 @@ const body = { | ||
if (start || end) { | ||
bool.filter.range = { | ||
'request.time': Object.assign( | ||
start ? { gte: start } : {}, | ||
end ? { lte: end } : {} | ||
), | ||
}; | ||
bool.filter.push({ | ||
range: { | ||
'request.time': Object.assign( | ||
start ? { gte: iso(start) } : {}, | ||
end ? { lte: iso(end) } : {} | ||
), | ||
}, | ||
}); | ||
} | ||
if (aggs) { | ||
body.aggs = aggs; | ||
} | ||
body.query = { bool }; | ||
return client | ||
.search({ | ||
return reportOnError( | ||
client.search({ | ||
index: `${logsIndexName}-*`, | ||
@@ -125,10 +157,67 @@ type: 'log', | ||
}) | ||
.then(({ hits }) => { | ||
if (hits) { | ||
return hits.hits.map(({ _source }) => _source); | ||
} | ||
return []; | ||
}); | ||
); | ||
}; | ||
const searchLogs = client => (query = {}) => | ||
search(client)(query).then(({ hits }) => { | ||
if (hits) { | ||
return hits.hits.map(({ _source }) => _source); | ||
} | ||
return []; | ||
}); | ||
const searchSessions = ({ | ||
fetchFn, | ||
sessionId, | ||
queryConstants = {}, | ||
}) => client => (query = {}) => | ||
search(client)( | ||
Object.assign( | ||
{ | ||
aggs: { | ||
sessions: { | ||
terms: { | ||
field: sessionId, | ||
size: query.limit || 50, | ||
}, | ||
}, | ||
}, | ||
limit: 0, | ||
}, | ||
queryConstants, | ||
query | ||
) | ||
) | ||
.then(({ aggregations: { sessions: { buckets } } }) => | ||
buckets.map(({ key, doc_count }) => ({ | ||
id: key, | ||
count: doc_count, | ||
})) | ||
) | ||
.then(sessions => | ||
Promise.all(sessions.map(({ id }) => fetchFn(id))).then(sessionsData => | ||
sessionsData.map((sessionData, i) => | ||
Object.assign( | ||
{ | ||
count: sessions[i].count, | ||
}, | ||
sessionData | ||
) | ||
) | ||
) | ||
); | ||
const searchRobots = searchSessions({ | ||
queryConstants: { | ||
'identity.type': 'robot', | ||
}, | ||
fetchFn: id => accessWatchSdkDatabase.getRobot({ uuid: id }), | ||
sessionId: 'robot.id', | ||
}); | ||
const searchAddresses = searchSessions({ | ||
fetchFn: address => accessWatchSdkDatabase.getAddress(address), | ||
sessionId: 'address.value', | ||
}); | ||
const logsEndpoint = client => { | ||
@@ -145,2 +234,3 @@ const search = searchLogs(client); | ||
const gc = indexesGc(esClient); | ||
monitor.status = 'Started'; | ||
setImmediate(gc); | ||
@@ -151,2 +241,4 @@ setInterval(gc, 24 * 3600 * 1000); | ||
searchLogs: searchLogs(esClient), | ||
searchRobots: searchRobots(esClient), | ||
searchAddresses: searchAddresses(esClient), | ||
logsEndpoint: logsEndpoint(esClient), | ||
@@ -153,0 +245,0 @@ }; |
44
start.js
@@ -0,1 +1,2 @@ | ||
const http = require('http'); | ||
const path = require('path'); | ||
@@ -20,3 +21,4 @@ const express = require('express'); | ||
const app = express(); | ||
expressWs(app); | ||
const httpServer = http.createServer(app); | ||
expressWs(app, httpServer); | ||
@@ -29,9 +31,10 @@ app.use( | ||
app.set('port', process.env.PORT || accessWatch.constants.port); | ||
Object.keys(accessWatch.constants.app).forEach(key => { | ||
app.set(key, accessWatch.constants.app[key]); | ||
}); | ||
app.listen(app.get('port'), () => { | ||
console.log( | ||
'HTTP and Websocket Server listening on port %d', | ||
app.get('port') | ||
); | ||
const port = process.env.PORT || accessWatch.constants.port; | ||
httpServer.listen(port, () => { | ||
console.log(`HTTP and Websocket Server listening on port ${port}`); | ||
}); | ||
@@ -45,6 +48,27 @@ | ||
let shutdownInProgress; | ||
function shutdown() { | ||
if (!shutdownInProgress) { | ||
shutdownInProgress = true; | ||
Promise.all([ | ||
httpServer.close(), | ||
accessWatch.pipeline.stop(), | ||
accessWatch.database.close(), | ||
]) | ||
.then(() => { | ||
process.exit(); | ||
}) | ||
.catch(console.error); | ||
} | ||
} | ||
process.on('SIGTERM', () => { | ||
console.log('SIGTERM'); | ||
shutdown(); | ||
}); | ||
process.on('SIGINT', () => { | ||
accessWatch.database.close(); | ||
// eslint-disable-next-line no-process-exit | ||
process.exit(); | ||
console.log('SIGINT'); | ||
shutdown(); | ||
}); |
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
168669
73
4302
85
23
4
+ Addedfs-extra@^5.0.0
+ Addedagentkeepalive@3.5.3(transitive)
+ Addedajv@6.12.6(transitive)
+ Addedelasticsearch@14.2.2(transitive)
+ Addedfast-deep-equal@3.1.3(transitive)
+ Addedfs-extra@5.0.0(transitive)
+ Addedgraceful-fs@4.2.11(transitive)
+ Addedhumanize-ms@1.2.1(transitive)
+ Addedjson-schema-traverse@0.4.1(transitive)
+ Addedjsonfile@4.0.0(transitive)
+ Addedpunycode@2.3.1(transitive)
+ Addeduniversalify@0.1.2(transitive)
+ Addeduri-js@4.4.1(transitive)
+ Addedws@4.1.0(transitive)
- Removedagentkeepalive@2.2.0(transitive)
- Removedajv@5.5.2(transitive)
- Removedaxios@0.17.1(transitive)
- Removedco@4.6.0(transitive)
- Removedelasticsearch@13.3.1(transitive)
- Removedfast-deep-equal@1.1.0(transitive)
- Removedjson-schema-traverse@0.3.1(transitive)
- Removedws@3.3.3(transitive)
Updatedaccess-watch-ui@^1.7.1
Updatedajv@^6.1.1
Updatedaxios@^0.18.0
Updatedelasticsearch@^14.1.0
Updatedlodash.merge@^4.6.1
Updatedrc@^1.2.5
Updatedws@^4.0.0