Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@cumulus/cumulus-ecs-task

Package Overview
Dependencies
Maintainers
7
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cumulus/cumulus-ecs-task - npm Package Compare versions

Comparing version 1.2.5 to 1.3.0

tests/data/cumulus-message-adapter.txt

2

bin/service.js

@@ -104,3 +104,3 @@ #!/usr/bin/env node

else {
log.error('You must provider one of the following options: activity-arn, sqs-url, lambda-input');
log.error('You must provide one of the following options: activity-arn, sqs-url, lambda-input');
process.exit(1);

@@ -107,0 +107,0 @@ }

@@ -9,2 +9,9 @@ # cumulus-ecs-task change log

## [v1.3.0]
### Added
- **CUMULUS-1418**
- Added ability to use lambda layers if they are configured for the target lambda
- Added logic to allow CMA to utilize default `cumulus-message-adapter` location, else expect it in /opt/
## [v1.2.5]

@@ -81,3 +88,4 @@

[Unreleased]: https://github.com/nasa/cumulus-ecs-task/compare/v1.2.5...HEAD
[Unreleased]: https://github.com/nasa/cumulus-ecs-task/compare/v1.3.0...HEAD
[v1.3.0]: https://github.com/nasa/cumulus-ecs-task/compare/v1.2.5...v1.3.0
[v1.2.5]: https://github.com/nasa/cumulus-ecs-task/compare/v1.2.4...v1.2.5

@@ -84,0 +92,0 @@ [v1.1.2]: https://github.com/nasa/cumulus-ecs-task/compare/v1.1.2...v1.2.0

@@ -8,3 +8,6 @@ /* eslint max-len: "off" */

const path = require('path');
const execSync = require('child_process').execSync;
const { promisify } = require('util');
const { exec } = require('child_process');
const execPromise = promisify(exec);
const assert = require('assert');

@@ -20,2 +23,4 @@ const pRetry = require('p-retry');

const region = process.env.AWS_DEFAULT_REGION || 'us-east-1';
const layersDefaultDirectory = '/opt/';
AWS.config.update({ region: region });

@@ -30,3 +35,2 @@

const FUNCTION_NAME_FIELD = 6;
return lambdaId.split(':')[FUNCTION_NAME_FIELD];

@@ -38,6 +42,2 @@ }

// eslint-disable-next-line require-jsdoc
const getLogSenderFromLambdaId = (lambdaId) =>
`cumulus-ecs-task/${getFunctionName(lambdaId)}`;
/**

@@ -52,3 +52,2 @@ * Download a URL to file

const file = fs.createWriteStream(destinationFilename);
return new Promise((resolve, reject) => {

@@ -65,2 +64,6 @@ file.on('error', reject);

// eslint-disable-next-line require-jsdoc
const getLogSenderFromLambdaId = (lambdaId) =>
`cumulus-ecs-task/${getFunctionName(lambdaId)}`;
/**

@@ -89,6 +92,25 @@ * Download a URL and save it to a file. If an ETIMEDOUT error is received,

/**
* Downloads an array of layers from AWS
*
* @param {Array<Object>} layers - list of layer config objects to download
* @param {Array<string>} layersDir - path to download the files to, generally '/opt'
* @returns {Promise<Array>} - returns an array of promises that resolve to a
* filepath strings to downloaded layer .zips
*/
async function downloadLayers(layers, layersDir) {
const layerDownloadPromises = layers.map((layer) => {
log.info(`Adding layer ${JSON.stringify(layer)} to container`);
const filePath = `${layersDir}/${getFunctionName(layer.LayerArn)}.zip`;
return downloadFile(layer.Content.Location, filePath).then(() => filePath);
});
return await Promise.all(layerDownloadPromises);
}
/**
* Download the zip file of a lambda function from AWS
* and it's associated layer .zip files, if any.
*
* @param {string} arn - the arn of the lambda function
* @param {strind} workDir - the dir to download the lambda function to
* @param {string} workDir - the dir to download the lambda function to
* @param {string} layersDir - the dir layers will be downloaded to
* @returns {Promise<Object>} returns an object that includes `filepath`,

@@ -99,4 +121,5 @@ * `moduleFileName`, `moduleFunctionName` arguments.

* The `moduleFunctionName` is the name of the exported function to call in the module.
* The `layerPaths` is an array of filepaths to downloaded layer zip files
**/
async function getLambdaZip(arn, workDir) {
async function getLambdaSource(arn, workDir, layersDir) {
const lambda = new AWS.Lambda({ apiVersion: '2015-03-31' });

@@ -112,10 +135,17 @@

let layerPaths = [];
if (data.Configuration.Layers) {
const layers = data.Configuration.Layers;
const layerConfigPromises = layers.map((layer) => lambda.getLayerVersionByArn({ Arn: layer.Arn }).promise());
const layerConfigs = await Promise.all(layerConfigPromises);
layerPaths = await downloadLayers(layerConfigs, layersDir);
}
const filepath = path.join(workDir, 'fn.zip');
await downloadFile(codeUrl, filepath);
return {
filepath,
moduleFileName,
moduleFunctionName
moduleFunctionName,
layerPaths
};

@@ -125,3 +155,22 @@ }

/**
* Downloads and extracts the code of a lambda function from its zip file
* Given a task dir, detects if the CMA is present in that
* directory. Sets CUMULUS_MESSAGE_ADAPTER_DIR env variable to that
* directory, else sets it to the default layersDir used
* by lambda layers.
*
* @param {string} taskDir - The path to the ECS task source
* @param {string} layerDir - The directory layers are extracted to
* @returns {undefined} - no return value
*/
function setCumulusMessageAdapterPath(taskDir, layerDir) {
const CmaPath = `${taskDir}/cumulus-message-adapter`;
const adapterPath = fs.existsSync(CmaPath) ? CmaPath : layerDir;
log.info(`Setting CMA path to ${adapterPath}`);
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = adapterPath;
}
/**
* Downloads and extracts the code of a lambda function and it's associated layers
* into expected locations on the filesystem
*

@@ -131,9 +180,14 @@ * @param {string} lambdaArn - the arn of the lambda function

* @param {string} taskDir - the dir where the lambda function will be located
* @param {string} layerDir - the dir where layers are to be extracted/used. Generally /opt.
* @returns {Promise<Function>} the `handler` which is the javascript function
* that will run in the ECS service
**/
async function downloadLambdaHandler(lambdaArn, workDir, taskDir) {
const resp = await getLambdaZip(lambdaArn, workDir);
async function installLambdaFunction(lambdaArn, workDir, taskDir, layerDir) {
const resp = await getLambdaSource(lambdaArn, workDir, layerDir);
const unzipPromises = resp.layerPaths.map((layerFilePath) => execPromise(`unzip -o ${layerFilePath} -d ${layerDir}`));
unzipPromises.push(execPromise(`unzip -o ${resp.filepath} -d ${taskDir}`));
await Promise.all(unzipPromises);
execSync(`unzip -o ${resp.filepath} -d ${taskDir}`);
setCumulusMessageAdapterPath(taskDir, layerDir);
const task = require(`${taskDir}/${resp.moduleFileName}`); //eslint-disable-line global-require

@@ -282,3 +336,5 @@ return task[resp.moduleFunctionName];

assert(options.workDirectory && typeof options.workDirectory === 'string', 'options.workDirectory string is required');
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string');
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory;
const lambdaArn = options.lambdaArn;

@@ -291,9 +347,5 @@ const event = options.lambdaInput;

// the cumulus-message-adapter dir is in an unexpected place,
// so tell the adapter where to find it
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = `${taskDir}/cumulus-message-adapter/`;
log.info('Downloading the Lambda function');
try {
const handler = await downloadLambdaHandler(lambdaArn, workDir, taskDir);
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir);
const output = await handleResponse(event, handler);

@@ -329,2 +381,3 @@ log.info('task executed successfully');

assert(options.workDirectory && typeof options.workDirectory === 'string', 'options.workDirectory string is required');
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string');

@@ -337,2 +390,4 @@ const sqs = new AWS.SQS({ apiVersion: '2016-11-23' });

const workDir = options.workDirectory;
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory;
const runForever = isBoolean(options.runForever) ? options.runForever : true;

@@ -342,8 +397,5 @@

// the cumulus-message-adapter dir is in an unexpected place,
// so tell the adapter where to find it
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = `${taskDir}/cumulus-message-adapter/`;
log.info('Downloading the Lambda function');
const handler = await downloadLambdaHandler(lambdaArn, workDir, taskDir);
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir);

@@ -405,2 +457,3 @@ let sigTermReceived = false;

* @param {string} options.workDirectory - the directory to use for downloading the lambda zip file
* @param {string} options.layersDir - the directory to use for extracting lambda layers. Defaults to /opt
* @param {boolean} [options.runForever=true] - whether to poll the activity forever (defaults to true)

@@ -415,2 +468,4 @@ * @returns {Promise<undefined>} undefined

assert(options.workDirectory && typeof options.workDirectory === 'string', 'options.workDirectory string is required');
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string');
if (options.heartbeat) {

@@ -425,2 +480,3 @@ assert(Number.isInteger(options.heartbeat), 'options.heartbeat must be an integer');

const heartbeatInterval = options.heartbeat;
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory;

@@ -431,8 +487,4 @@ const runForever = isBoolean(options.runForever) ? options.runForever : true;

// the cumulus-message-adapter dir is in an unexpected place,
// so tell the adapter where to find it
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = `${taskDir}/cumulus-message-adapter/`;
log.info('Downloading the Lambda function');
const handler = await downloadLambdaHandler(lambdaArn, workDir, taskDir);
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir);

@@ -439,0 +491,0 @@ let sigTermReceived = false;

{
"name": "@cumulus/cumulus-ecs-task",
"version": "1.2.5",
"version": "1.3.0",
"description": "Run lambda functions in ECS",

@@ -23,3 +23,3 @@ "main": "index.js",

"dependencies": {
"aws-sdk": "^2.203.0",
"aws-sdk": "^2.503.0",
"cliclopts": "^1.1.1",

@@ -26,0 +26,0 @@ "lodash.isboolean": "^3.0.3",

@@ -17,6 +17,12 @@ 'use strict';

t.context.lambdaZip = path.join(t.context.tempDir, 'remoteLambda.zip');
t.context.layerZip = path.join(t.context.tempDir, 'fakeLayer.zip');
t.context.lambdaCMAZip = path.join(t.context.tempDir, 'fakeCMALayer.zip');
t.context.taskDirectory = path.join(t.context.tempDir, 'task');
t.context.workDirectory = path.join(t.context.tempDir, '.tmp-work');
t.context.layerDirectory = path.join(t.context.tempDir, 'layers');
fs.mkdirpSync(t.context.taskDirectory);
fs.mkdirpSync(t.context.workDirectory);
fs.mkdirpSync(t.context.layerDirectory);

@@ -34,4 +40,30 @@ // zip fake lambda

// zip fake layer file
await new Promise((resolve, reject) => {
const output = fs.createWriteStream(t.context.layerZip);
const archive = archiver('zip');
output.on('close', resolve);
output.on('error', reject);
archive.pipe(output);
archive.file(path.join(__dirname, 'data/layerDataFile.txt'), { name: 'fakeLayer.txt' });
archive.finalize();
});
// zip CMA injected layer file
await new Promise((resolve, reject) => {
const output = fs.createWriteStream(t.context.lambdaCMAZip);
const archive = archiver('zip');
output.on('close', resolve);
output.on('error', reject);
archive.pipe(output);
archive.file(path.join(__dirname, 'data/fakeLambda.js'), { name: 'fakeLambda.js' });
archive.file(path.join(__dirname, 'data/cumulus-message-adapter.txt'),
{ name: 'cumulus-message-adapter' });
archive.finalize();
});
t.context.lambdaZipUrlPath = '/lambda';
t.context.getLayerUrlPath = '/getLayer';
nock('https://example.com')

@@ -41,2 +73,6 @@ .get(t.context.lambdaZipUrlPath)

nock('https://example.com')
.get(t.context.getLayerUrlPath)
.reply(200, () => fs.createReadStream(t.context.layerZip));
t.context.expectedOutput = [

@@ -46,4 +82,13 @@ 'fakeLambda',

];
t.context.stub = sinon.stub(AWS, 'Lambda')
.returns({
getLayerVersionByArn: () => ({
promise: async () => ({
LayerArn: 'notARealArn',
Content: {
Location: `https://example.com${t.context.getLayerUrlPath}`
}
})
}),
getFunction: () => ({

@@ -55,3 +100,4 @@ promise: async () => ({

Configuration: {
Handler: t.context.expectedOutput.join('.')
Handler: t.context.expectedOutput.join('.'),
Layers: ['notARealArn']
}

@@ -76,8 +122,56 @@ })

taskDirectory: t.context.taskDirectory,
workDirectory: t.context.workDirectory
workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory
});
t.deepEqual(event, output);
});
test.serial('layers are extracted into target directory', async (t) => {
const event = { hi: 'bye' };
await runTask({
lambdaArn: 'arn:aws:lambda:region:account-id:function:fake-function',
lambdaInput: event,
taskDirectory: t.context.taskDirectory,
workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory
});
t.true(fs.existsSync(`${t.context.layerDirectory}/fakeLayer.txt`));
});
test.serial('CMA environment variable is set if CMA is not present', async (t) => {
const event = { hi: 'bye' };
await runTask({
lambdaArn: 'arn:aws:lambda:region:account-id:function:fake-function',
lambdaInput: event,
taskDirectory: t.context.taskDirectory,
workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory
});
t.is(process.env.CUMULUS_MESSAGE_ADAPTER_DIR, t.context.layerDirectory);
});
test.serial('CMA environment variable is set if CMA is present', async (t) => {
const event = { hi: 'bye' };
nock.cleanAll();
nock('https://example.com')
.get(t.context.lambdaZipUrlPath)
.reply(200, () => fs.createReadStream(t.context.lambdaCMAZip));
nock('https://example.com')
.get(t.context.getLayerUrlPath)
.reply(200, () => fs.createReadStream(t.context.layerZip));
await runTask({
lambdaArn: 'arn:aws:lambda:region:account-id:function:fake-function',
lambdaInput: event,
taskDirectory: t.context.taskDirectory,
workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory
});
t.is(process.env.CUMULUS_MESSAGE_ADAPTER_DIR,
`${t.context.taskDirectory}/cumulus-message-adapter`);
});
test.serial('test failed task run', async (t) => {

@@ -89,3 +183,4 @@ const event = { hi: 'bye', error: 'it failed' };

taskDirectory: t.context.taskDirectory,
workDirectory: t.context.workDirectory
workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory
});

@@ -124,2 +219,3 @@ const error = await t.throws(promise);

workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory,
runForever: false

@@ -163,2 +259,3 @@ });

workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory,
runForever: false

@@ -181,2 +278,6 @@ });

nock('https://example.com')
.get(t.context.getLayerUrlPath)
.reply(200, () => fs.createReadStream(t.context.layerZip));
const event = { hi: 'bye' };

@@ -188,3 +289,4 @@

taskDirectory: t.context.taskDirectory,
workDirectory: t.context.workDirectory
workDirectory: t.context.workDirectory,
layersDirectory: t.context.layerDirectory
});

@@ -191,0 +293,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc