Comparing version 2.1.0 to 2.2.0
# CHANGELOG | ||
- v2.2.0 2020-05-27 | ||
- Updated garbage collection to delete orphaned message chunks by \_id | ||
- Added new option queue.maxQueueTime (default 30 days) to delete unprocessed entries from queue | ||
- v2.1.0 2020-05-27 | ||
@@ -4,0 +9,0 @@ |
@@ -42,3 +42,6 @@ 'use strict'; | ||
// default zone for any other mail not specified by zone | ||
defaultZone: 'default' | ||
defaultZone: 'default', | ||
// remove messages from queue if not delivered or bounced before maxQueueTime | ||
maxQueueTime: 30 * 24 * 3600 * 1000 | ||
}, | ||
@@ -386,4 +389,4 @@ | ||
connectionCache: { | ||
ttl: 5, // how long should a connection kept open. Given in seconds | ||
reuseCount: 100, // how often should a connection be reused | ||
ttl: 5, // how long should a connection kept open. Given in seconds | ||
reuseCount: 100 // how often should a connection be reused | ||
} | ||
@@ -390,0 +393,0 @@ |
@@ -339,6 +339,3 @@ 'use strict'; | ||
let type = (req.params.type || '') | ||
.toString() | ||
.toLowerCase() | ||
.trim(); | ||
let type = (req.params.type || '').toString().toLowerCase().trim(); | ||
@@ -406,8 +403,3 @@ type = ['deferred', 'queued'].includes(type) ? type : 'queued'; | ||
if ( | ||
(req.query.body || '') | ||
.toString() | ||
.trim() | ||
.toLowerCase() === 'yes' | ||
) { | ||
if ((req.query.body || '').toString().trim().toLowerCase() === 'yes') { | ||
// stream only body | ||
@@ -414,0 +406,0 @@ res.writeHead(200, { |
@@ -13,2 +13,3 @@ 'use strict'; | ||
const GridFSBucket = require('mongodb').GridFSBucket; | ||
const ObjectID = require('mongodb').ObjectID; | ||
const internalCounters = require('./counters'); | ||
@@ -382,3 +383,3 @@ const yaml = require('js-yaml'); | ||
zone = zone || (this.options.defaultZone || 'default'); | ||
zone = zone || this.options.defaultZone || 'default'; | ||
options = options || {}; | ||
@@ -732,2 +733,14 @@ | ||
releaseDeliveryAsync(delivery) { | ||
return new Promise((resolve, reject) => { | ||
this.releaseDelivery(delivery, (err, result) => { | ||
if (err) { | ||
reject(err); | ||
} else { | ||
resolve(result); | ||
} | ||
}); | ||
}); | ||
} | ||
/** | ||
@@ -872,7 +885,8 @@ * Method that marks a message as deferred. This message is removed from the active queued | ||
clearGarbage(callback) { | ||
async clearGarbage() { | ||
let collection = this.mongodb.collection(this.options.collection); | ||
let r; | ||
// clear message locks | ||
collection.updateMany( | ||
// Clear message locks | ||
r = await collection.updateMany( | ||
{ | ||
@@ -895,100 +909,82 @@ locked: true, | ||
multi: true | ||
}, | ||
(err, r) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
); | ||
if (r && r.modifiedCount) { | ||
log.verbose('GC', 'Released %s expired locks for queued messages', r.modifiedCount); | ||
} | ||
if (this.options.maxQueueTime && this.options.maxQueueTime > 0) { | ||
// Release messages queued longer than allowed | ||
let releaseObjectId = ObjectID.createFromTime(Math.round((Date.now() - this.options.maxQueueTime) / 1000)); | ||
let cursor = await collection.find( | ||
{ | ||
_id: { | ||
$lte: releaseObjectId | ||
}, | ||
// skip messages that are currenlty being processed | ||
locked: false | ||
}, | ||
{ | ||
projection: { | ||
_id: 1, | ||
id: true, | ||
seq: true, | ||
_lock: true | ||
} | ||
} | ||
); | ||
if (r.result.n) { | ||
log.verbose('GC', 'Released %s expired locks for queued messages', r.result.n); | ||
let delivery; | ||
while ((delivery = await cursor.next())) { | ||
try { | ||
let deleted = await this.releaseDeliveryAsync(delivery); | ||
if (deleted) { | ||
log.info('GC', 'Cleaned up %s from queue', delivery.id); | ||
} | ||
} catch (err) { | ||
log.info('GC', 'Failed to cleaned up %s.%s from queue. %s', delivery.id, delivery.seq, err.message); | ||
} | ||
} | ||
await cursor.close(); | ||
} | ||
collection.findOne( | ||
{}, | ||
{ | ||
sort: { created: 1 }, | ||
projection: { | ||
created: 1 | ||
} | ||
}, | ||
(err, delivery) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
// Find the oldest queued message | ||
let delivery = await collection.findOne( | ||
{}, | ||
{ | ||
sort: { _id: 1 }, | ||
projection: { | ||
_id: 1 | ||
} | ||
} | ||
); | ||
let clearUntil; | ||
if (!delivery) { | ||
// if message queue is empty delete everything older than 10 minutes | ||
clearUntil = Date.now(); | ||
} else { | ||
clearUntil = Number(delivery.created && delivery.created.getTime && delivery.created.getTime()); | ||
} | ||
let clearUntil; | ||
if (delivery) { | ||
clearUntil = delivery._id.getTimestamp().getTime(); | ||
} else { | ||
// empty queue, so delete everything | ||
clearUntil = Date.now(); | ||
} | ||
if (!clearUntil) { | ||
// safe bet in case the first refKey returned something strange | ||
clearUntil = Date.now() - 10 * 24 * 3600 * 1000; // anything older than last 10 days | ||
} else { | ||
// remove messages stored before 10 minutes of the first id | ||
clearUntil -= 10 * 60 * 1000; | ||
} | ||
// 10 minute shift just in case | ||
clearUntil -= 10 * 60 * 1000; | ||
let firstObjectId = false; | ||
let deleted = 0; | ||
let untilObjectId = ObjectID.createFromTime(Math.round(new Date(clearUntil).getTime() / 1000)); | ||
let query = { | ||
_id: { | ||
$lte: untilObjectId | ||
} | ||
}; | ||
let query = { | ||
uploadDate: { | ||
$lte: new Date(clearUntil) | ||
} | ||
}; | ||
r = await this.mongodb.collection(this.options.gfs + '.files').deleteMany(query); | ||
if (r && r.deletedCount) { | ||
log.info('GC', 'Cleared %s expired files from GridStore', r.deletedCount); | ||
} | ||
let cursor = this.mongodb | ||
.collection(this.options.gfs + '.files') | ||
.find(query) | ||
.project({ | ||
_id: true, | ||
filename: true | ||
}); | ||
let deleteNext = () => { | ||
cursor.next((err, doc) => { | ||
if (err) { | ||
return callback(err); | ||
} | ||
if (!doc) { | ||
if (deleted) { | ||
log.info('GC', 'Cleared %s expired files from GridStore', deleted); | ||
} | ||
return cursor.close(() => { | ||
if (!firstObjectId) { | ||
return callback(); | ||
} | ||
// delete orphan chunks (should not exist, just in case) | ||
this.mongodb.collection(this.options.gfs + '.chunks').deleteMany( | ||
{ | ||
_id: { | ||
$lte: firstObjectId | ||
} | ||
}, | ||
callback | ||
); | ||
}); | ||
} | ||
if (!firstObjectId) { | ||
firstObjectId = doc._id; | ||
} | ||
log.verbose('GC', '%s DELEXPIRED', doc.filename.split(' ').pop()); | ||
this.gridstore.delete(doc._id, err => { | ||
if (err) { | ||
return cursor.close(() => callback(err)); | ||
} | ||
deleted++; | ||
deleteNext(); | ||
}); | ||
}); | ||
}; | ||
deleteNext(); | ||
} | ||
); | ||
} | ||
); | ||
r = await this.mongodb.collection(this.options.gfs + '.chunks').deleteMany(query); | ||
if (r && r.deletedCount) { | ||
log.info('GC', 'Cleared %s expired chunks from GridStore', r.deletedCount); | ||
} | ||
} | ||
@@ -1002,12 +998,15 @@ | ||
let startTimer = Date.now(); | ||
this.clearGarbage(err => { | ||
let timeDiff = (Date.now() - startTimer) / 1000; | ||
if (err) { | ||
this.clearGarbage() | ||
.then(() => { | ||
let timeDiff = (Date.now() - startTimer) / 1000; | ||
log.verbose('GC', '[%ss]', timeDiff); | ||
this.garbageTimer = setTimeout(() => this.checkGarbage(), 60 * 1000); | ||
this.garbageTimer.unref(); | ||
}) | ||
.catch(err => { | ||
let timeDiff = (Date.now() - startTimer) / 1000; | ||
log.error('GC', '[%ss] %s', timeDiff, err.message); | ||
} else if (timeDiff > 1.0) { | ||
log.info('GC', 'Garbage collecting duration %ss', timeDiff); | ||
} | ||
this.garbageTimer = setTimeout(() => this.checkGarbage(), 10 * 1000); | ||
this.garbageTimer.unref(); | ||
}); | ||
this.garbageTimer = setTimeout(() => this.checkGarbage(), 5 * 60 * 1000); | ||
this.garbageTimer.unref(); | ||
}); | ||
} | ||
@@ -1084,3 +1083,3 @@ | ||
if (!this.options.disableGC) { | ||
this.garbageTimer = setTimeout(() => this.checkGarbage(), 10 * 1000); | ||
this.garbageTimer = setTimeout(() => this.checkGarbage(), 60 * 1000); | ||
this.garbageTimer.unref(); | ||
@@ -1087,0 +1086,0 @@ } |
{ | ||
"name": "zone-mta", | ||
"private": false, | ||
"version": "2.1.0", | ||
"version": "2.2.0", | ||
"description": "Tiny outbound MTA", | ||
@@ -16,3 +16,2 @@ "main": "app.js", | ||
"dependencies": { | ||
"13": "0.0.0", | ||
"crc-32": "1.2.0", | ||
@@ -19,0 +18,0 @@ "dnscache": "1.0.2", |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23
494354
9366