Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

zone-mta

Package Overview
Dependencies
Maintainers
1
Versions
334
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

zone-mta - npm Package Compare versions

Comparing version 2.1.0 to 2.2.0

5

CHANGELOG.md
# CHANGELOG
- v2.2.0 2020-05-27
- Updated garbage collection to delete orphaned message chunks by \_id
- Added new option queue.maxQueueTime (default 30 days) to delete unprocessed entries from queue
- v2.1.0 2020-05-27

@@ -4,0 +9,0 @@

9

config/default.js

@@ -42,3 +42,6 @@ 'use strict';

// default zone for any other mail not specified by zone
defaultZone: 'default'
defaultZone: 'default',
// remove messages from queue if not delivered or bounced before maxQueueTime
maxQueueTime: 30 * 24 * 3600 * 1000
},

@@ -386,4 +389,4 @@

connectionCache: {
ttl: 5, // how long should a connection kept open. Given in seconds
reuseCount: 100, // how often should a connection be reused
ttl: 5, // how long should a connection kept open. Given in seconds
reuseCount: 100 // how often should a connection be reused
}

@@ -390,0 +393,0 @@

@@ -339,6 +339,3 @@ 'use strict';

let type = (req.params.type || '')
.toString()
.toLowerCase()
.trim();
let type = (req.params.type || '').toString().toLowerCase().trim();

@@ -406,8 +403,3 @@ type = ['deferred', 'queued'].includes(type) ? type : 'queued';

if (
(req.query.body || '')
.toString()
.trim()
.toLowerCase() === 'yes'
) {
if ((req.query.body || '').toString().trim().toLowerCase() === 'yes') {
// stream only body

@@ -414,0 +406,0 @@ res.writeHead(200, {

@@ -13,2 +13,3 @@ 'use strict';

const GridFSBucket = require('mongodb').GridFSBucket;
const ObjectID = require('mongodb').ObjectID;
const internalCounters = require('./counters');

@@ -382,3 +383,3 @@ const yaml = require('js-yaml');

zone = zone || (this.options.defaultZone || 'default');
zone = zone || this.options.defaultZone || 'default';
options = options || {};

@@ -732,2 +733,14 @@

releaseDeliveryAsync(delivery) {
return new Promise((resolve, reject) => {
this.releaseDelivery(delivery, (err, result) => {
if (err) {
reject(err);
} else {
resolve(result);
}
});
});
}
/**

@@ -872,7 +885,8 @@ * Method that marks a message as deferred. This message is removed from the active queued

clearGarbage(callback) {
async clearGarbage() {
let collection = this.mongodb.collection(this.options.collection);
let r;
// clear message locks
collection.updateMany(
// Clear message locks
r = await collection.updateMany(
{

@@ -895,100 +909,82 @@ locked: true,

multi: true
},
(err, r) => {
if (err) {
return callback(err);
}
);
if (r && r.modifiedCount) {
log.verbose('GC', 'Released %s expired locks for queued messages', r.modifiedCount);
}
if (this.options.maxQueueTime && this.options.maxQueueTime > 0) {
// Release messages queued longer than allowed
let releaseObjectId = ObjectID.createFromTime(Math.round((Date.now() - this.options.maxQueueTime) / 1000));
let cursor = await collection.find(
{
_id: {
$lte: releaseObjectId
},
// skip messages that are currenlty being processed
locked: false
},
{
projection: {
_id: 1,
id: true,
seq: true,
_lock: true
}
}
);
if (r.result.n) {
log.verbose('GC', 'Released %s expired locks for queued messages', r.result.n);
let delivery;
while ((delivery = await cursor.next())) {
try {
let deleted = await this.releaseDeliveryAsync(delivery);
if (deleted) {
log.info('GC', 'Cleaned up %s from queue', delivery.id);
}
} catch (err) {
log.info('GC', 'Failed to cleaned up %s.%s from queue. %s', delivery.id, delivery.seq, err.message);
}
}
await cursor.close();
}
collection.findOne(
{},
{
sort: { created: 1 },
projection: {
created: 1
}
},
(err, delivery) => {
if (err) {
return callback(err);
}
// Find the oldest queued message
let delivery = await collection.findOne(
{},
{
sort: { _id: 1 },
projection: {
_id: 1
}
}
);
let clearUntil;
if (!delivery) {
// if message queue is empty delete everything older than 10 minutes
clearUntil = Date.now();
} else {
clearUntil = Number(delivery.created && delivery.created.getTime && delivery.created.getTime());
}
let clearUntil;
if (delivery) {
clearUntil = delivery._id.getTimestamp().getTime();
} else {
// empty queue, so delete everything
clearUntil = Date.now();
}
if (!clearUntil) {
// safe bet in case the first refKey returned something strange
clearUntil = Date.now() - 10 * 24 * 3600 * 1000; // anything older than last 10 days
} else {
// remove messages stored before 10 minutes of the first id
clearUntil -= 10 * 60 * 1000;
}
// 10 minute shift just in case
clearUntil -= 10 * 60 * 1000;
let firstObjectId = false;
let deleted = 0;
let untilObjectId = ObjectID.createFromTime(Math.round(new Date(clearUntil).getTime() / 1000));
let query = {
_id: {
$lte: untilObjectId
}
};
let query = {
uploadDate: {
$lte: new Date(clearUntil)
}
};
r = await this.mongodb.collection(this.options.gfs + '.files').deleteMany(query);
if (r && r.deletedCount) {
log.info('GC', 'Cleared %s expired files from GridStore', r.deletedCount);
}
let cursor = this.mongodb
.collection(this.options.gfs + '.files')
.find(query)
.project({
_id: true,
filename: true
});
let deleteNext = () => {
cursor.next((err, doc) => {
if (err) {
return callback(err);
}
if (!doc) {
if (deleted) {
log.info('GC', 'Cleared %s expired files from GridStore', deleted);
}
return cursor.close(() => {
if (!firstObjectId) {
return callback();
}
// delete orphan chunks (should not exist, just in case)
this.mongodb.collection(this.options.gfs + '.chunks').deleteMany(
{
_id: {
$lte: firstObjectId
}
},
callback
);
});
}
if (!firstObjectId) {
firstObjectId = doc._id;
}
log.verbose('GC', '%s DELEXPIRED', doc.filename.split(' ').pop());
this.gridstore.delete(doc._id, err => {
if (err) {
return cursor.close(() => callback(err));
}
deleted++;
deleteNext();
});
});
};
deleteNext();
}
);
}
);
r = await this.mongodb.collection(this.options.gfs + '.chunks').deleteMany(query);
if (r && r.deletedCount) {
log.info('GC', 'Cleared %s expired chunks from GridStore', r.deletedCount);
}
}

@@ -1002,12 +998,15 @@

let startTimer = Date.now();
this.clearGarbage(err => {
let timeDiff = (Date.now() - startTimer) / 1000;
if (err) {
this.clearGarbage()
.then(() => {
let timeDiff = (Date.now() - startTimer) / 1000;
log.verbose('GC', '[%ss]', timeDiff);
this.garbageTimer = setTimeout(() => this.checkGarbage(), 60 * 1000);
this.garbageTimer.unref();
})
.catch(err => {
let timeDiff = (Date.now() - startTimer) / 1000;
log.error('GC', '[%ss] %s', timeDiff, err.message);
} else if (timeDiff > 1.0) {
log.info('GC', 'Garbage collecting duration %ss', timeDiff);
}
this.garbageTimer = setTimeout(() => this.checkGarbage(), 10 * 1000);
this.garbageTimer.unref();
});
this.garbageTimer = setTimeout(() => this.checkGarbage(), 5 * 60 * 1000);
this.garbageTimer.unref();
});
}

@@ -1084,3 +1083,3 @@

if (!this.options.disableGC) {
this.garbageTimer = setTimeout(() => this.checkGarbage(), 10 * 1000);
this.garbageTimer = setTimeout(() => this.checkGarbage(), 60 * 1000);
this.garbageTimer.unref();

@@ -1087,0 +1086,0 @@ }

{
"name": "zone-mta",
"private": false,
"version": "2.1.0",
"version": "2.2.0",
"description": "Tiny outbound MTA",

@@ -16,3 +16,2 @@ "main": "app.js",

"dependencies": {
"13": "0.0.0",
"crc-32": "1.2.0",

@@ -19,0 +18,0 @@ "dnscache": "1.0.2",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc