@cumulus/cumulus-ecs-task
Advanced tools
Comparing version 1.2.5 to 1.3.0
@@ -104,3 +104,3 @@ #!/usr/bin/env node | ||
else { | ||
log.error('You must provider one of the following options: activity-arn, sqs-url, lambda-input'); | ||
log.error('You must provide one of the following options: activity-arn, sqs-url, lambda-input'); | ||
process.exit(1); | ||
@@ -107,0 +107,0 @@ } |
@@ -9,2 +9,9 @@ # cumulus-ecs-task change log | ||
## [v1.3.0] | ||
### Added | ||
- **CUMULUS-1418** | ||
- Added ability to use lambda layers if they are configured for the target lambda | ||
- Added logic to allow CMA to utilize default `cumulus-message-adapter` location, else expect it in /opt/ | ||
## [v1.2.5] | ||
@@ -81,3 +88,4 @@ | ||
[Unreleased]: https://github.com/nasa/cumulus-ecs-task/compare/v1.2.5...HEAD | ||
[Unreleased]: https://github.com/nasa/cumulus-ecs-task/compare/v1.3.0...HEAD | ||
[v1.3.0]: https://github.com/nasa/cumulus-ecs-task/compare/v1.2.5...v1.3.0 | ||
[v1.2.5]: https://github.com/nasa/cumulus-ecs-task/compare/v1.2.4...v1.2.5 | ||
@@ -84,0 +92,0 @@ [v1.1.2]: https://github.com/nasa/cumulus-ecs-task/compare/v1.1.2...v1.2.0 |
112
index.js
@@ -8,3 +8,6 @@ /* eslint max-len: "off" */ | ||
const path = require('path'); | ||
const execSync = require('child_process').execSync; | ||
const { promisify } = require('util'); | ||
const { exec } = require('child_process'); | ||
const execPromise = promisify(exec); | ||
const assert = require('assert'); | ||
@@ -20,2 +23,4 @@ const pRetry = require('p-retry'); | ||
const region = process.env.AWS_DEFAULT_REGION || 'us-east-1'; | ||
const layersDefaultDirectory = '/opt/'; | ||
AWS.config.update({ region: region }); | ||
@@ -30,3 +35,2 @@ | ||
const FUNCTION_NAME_FIELD = 6; | ||
return lambdaId.split(':')[FUNCTION_NAME_FIELD]; | ||
@@ -38,6 +42,2 @@ } | ||
// eslint-disable-next-line require-jsdoc | ||
const getLogSenderFromLambdaId = (lambdaId) => | ||
`cumulus-ecs-task/${getFunctionName(lambdaId)}`; | ||
/** | ||
@@ -52,3 +52,2 @@ * Download a URL to file | ||
const file = fs.createWriteStream(destinationFilename); | ||
return new Promise((resolve, reject) => { | ||
@@ -65,2 +64,6 @@ file.on('error', reject); | ||
// eslint-disable-next-line require-jsdoc | ||
const getLogSenderFromLambdaId = (lambdaId) => | ||
`cumulus-ecs-task/${getFunctionName(lambdaId)}`; | ||
/** | ||
@@ -89,6 +92,25 @@ * Download a URL and save it to a file. If an ETIMEDOUT error is received, | ||
/** | ||
* Downloads an array of layers from AWS | ||
* | ||
* @param {Array<Object>} layers - list of layer config objects to download | ||
* @param {Array<string>} layersDir - path to download the files to, generally '/opt' | ||
* @returns {Promise<Array>} - returns an array of promises that resolve to a | ||
* filepath strings to downloaded layer .zips | ||
*/ | ||
async function downloadLayers(layers, layersDir) { | ||
const layerDownloadPromises = layers.map((layer) => { | ||
log.info(`Adding layer ${JSON.stringify(layer)} to container`); | ||
const filePath = `${layersDir}/${getFunctionName(layer.LayerArn)}.zip`; | ||
return downloadFile(layer.Content.Location, filePath).then(() => filePath); | ||
}); | ||
return await Promise.all(layerDownloadPromises); | ||
} | ||
/** | ||
* Download the zip file of a lambda function from AWS | ||
* and it's associated layer .zip files, if any. | ||
* | ||
* @param {string} arn - the arn of the lambda function | ||
* @param {strind} workDir - the dir to download the lambda function to | ||
* @param {string} workDir - the dir to download the lambda function to | ||
* @param {string} layersDir - the dir layers will be downloaded to | ||
* @returns {Promise<Object>} returns an object that includes `filepath`, | ||
@@ -99,4 +121,5 @@ * `moduleFileName`, `moduleFunctionName` arguments. | ||
* The `moduleFunctionName` is the name of the exported function to call in the module. | ||
* The `layerPaths` is an array of filepaths to downloaded layer zip files | ||
**/ | ||
async function getLambdaZip(arn, workDir) { | ||
async function getLambdaSource(arn, workDir, layersDir) { | ||
const lambda = new AWS.Lambda({ apiVersion: '2015-03-31' }); | ||
@@ -112,10 +135,17 @@ | ||
let layerPaths = []; | ||
if (data.Configuration.Layers) { | ||
const layers = data.Configuration.Layers; | ||
const layerConfigPromises = layers.map((layer) => lambda.getLayerVersionByArn({ Arn: layer.Arn }).promise()); | ||
const layerConfigs = await Promise.all(layerConfigPromises); | ||
layerPaths = await downloadLayers(layerConfigs, layersDir); | ||
} | ||
const filepath = path.join(workDir, 'fn.zip'); | ||
await downloadFile(codeUrl, filepath); | ||
return { | ||
filepath, | ||
moduleFileName, | ||
moduleFunctionName | ||
moduleFunctionName, | ||
layerPaths | ||
}; | ||
@@ -125,3 +155,22 @@ } | ||
/** | ||
* Downloads and extracts the code of a lambda function from its zip file | ||
* Given a task dir, detects if the CMA is present in that | ||
* directory. Sets CUMULUS_MESSAGE_ADAPTER_DIR env variable to that | ||
* directory, else sets it to the default layersDir used | ||
* by lambda layers. | ||
* | ||
* @param {string} taskDir - The path to the ECS task source | ||
* @param {string} layerDir - The directory layers are extracted to | ||
* @returns {undefined} - no return value | ||
*/ | ||
function setCumulusMessageAdapterPath(taskDir, layerDir) { | ||
const CmaPath = `${taskDir}/cumulus-message-adapter`; | ||
const adapterPath = fs.existsSync(CmaPath) ? CmaPath : layerDir; | ||
log.info(`Setting CMA path to ${adapterPath}`); | ||
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = adapterPath; | ||
} | ||
/** | ||
* Downloads and extracts the code of a lambda function and it's associated layers | ||
* into expected locations on the filesystem | ||
* | ||
@@ -131,9 +180,14 @@ * @param {string} lambdaArn - the arn of the lambda function | ||
* @param {string} taskDir - the dir where the lambda function will be located | ||
* @param {string} layerDir - the dir where layers are to be extracted/used. Generally /opt. | ||
* @returns {Promise<Function>} the `handler` which is the javascript function | ||
* that will run in the ECS service | ||
**/ | ||
async function downloadLambdaHandler(lambdaArn, workDir, taskDir) { | ||
const resp = await getLambdaZip(lambdaArn, workDir); | ||
async function installLambdaFunction(lambdaArn, workDir, taskDir, layerDir) { | ||
const resp = await getLambdaSource(lambdaArn, workDir, layerDir); | ||
const unzipPromises = resp.layerPaths.map((layerFilePath) => execPromise(`unzip -o ${layerFilePath} -d ${layerDir}`)); | ||
unzipPromises.push(execPromise(`unzip -o ${resp.filepath} -d ${taskDir}`)); | ||
await Promise.all(unzipPromises); | ||
execSync(`unzip -o ${resp.filepath} -d ${taskDir}`); | ||
setCumulusMessageAdapterPath(taskDir, layerDir); | ||
const task = require(`${taskDir}/${resp.moduleFileName}`); //eslint-disable-line global-require | ||
@@ -282,3 +336,5 @@ return task[resp.moduleFunctionName]; | ||
assert(options.workDirectory && typeof options.workDirectory === 'string', 'options.workDirectory string is required'); | ||
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string'); | ||
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory; | ||
const lambdaArn = options.lambdaArn; | ||
@@ -291,9 +347,5 @@ const event = options.lambdaInput; | ||
// the cumulus-message-adapter dir is in an unexpected place, | ||
// so tell the adapter where to find it | ||
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = `${taskDir}/cumulus-message-adapter/`; | ||
log.info('Downloading the Lambda function'); | ||
try { | ||
const handler = await downloadLambdaHandler(lambdaArn, workDir, taskDir); | ||
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir); | ||
const output = await handleResponse(event, handler); | ||
@@ -329,2 +381,3 @@ log.info('task executed successfully'); | ||
assert(options.workDirectory && typeof options.workDirectory === 'string', 'options.workDirectory string is required'); | ||
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string'); | ||
@@ -337,2 +390,4 @@ const sqs = new AWS.SQS({ apiVersion: '2016-11-23' }); | ||
const workDir = options.workDirectory; | ||
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory; | ||
const runForever = isBoolean(options.runForever) ? options.runForever : true; | ||
@@ -342,8 +397,5 @@ | ||
// the cumulus-message-adapter dir is in an unexpected place, | ||
// so tell the adapter where to find it | ||
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = `${taskDir}/cumulus-message-adapter/`; | ||
log.info('Downloading the Lambda function'); | ||
const handler = await downloadLambdaHandler(lambdaArn, workDir, taskDir); | ||
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir); | ||
@@ -405,2 +457,3 @@ let sigTermReceived = false; | ||
* @param {string} options.workDirectory - the directory to use for downloading the lambda zip file | ||
* @param {string} options.layersDir - the directory to use for extracting lambda layers. Defaults to /opt | ||
* @param {boolean} [options.runForever=true] - whether to poll the activity forever (defaults to true) | ||
@@ -415,2 +468,4 @@ * @returns {Promise<undefined>} undefined | ||
assert(options.workDirectory && typeof options.workDirectory === 'string', 'options.workDirectory string is required'); | ||
assert(!options.layersDirectory || typeof options.layersDirectory === 'string', 'options.layersDir should be a string'); | ||
if (options.heartbeat) { | ||
@@ -425,2 +480,3 @@ assert(Number.isInteger(options.heartbeat), 'options.heartbeat must be an integer'); | ||
const heartbeatInterval = options.heartbeat; | ||
const layersDir = options.layersDirectory ? options.layersDirectory : layersDefaultDirectory; | ||
@@ -431,8 +487,4 @@ const runForever = isBoolean(options.runForever) ? options.runForever : true; | ||
// the cumulus-message-adapter dir is in an unexpected place, | ||
// so tell the adapter where to find it | ||
process.env.CUMULUS_MESSAGE_ADAPTER_DIR = `${taskDir}/cumulus-message-adapter/`; | ||
log.info('Downloading the Lambda function'); | ||
const handler = await downloadLambdaHandler(lambdaArn, workDir, taskDir); | ||
const handler = await installLambdaFunction(lambdaArn, workDir, taskDir, layersDir); | ||
@@ -439,0 +491,0 @@ let sigTermReceived = false; |
{ | ||
"name": "@cumulus/cumulus-ecs-task", | ||
"version": "1.2.5", | ||
"version": "1.3.0", | ||
"description": "Run lambda functions in ECS", | ||
@@ -23,3 +23,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"aws-sdk": "^2.203.0", | ||
"aws-sdk": "^2.503.0", | ||
"cliclopts": "^1.1.1", | ||
@@ -26,0 +26,0 @@ "lodash.isboolean": "^3.0.3", |
@@ -17,6 +17,12 @@ 'use strict'; | ||
t.context.lambdaZip = path.join(t.context.tempDir, 'remoteLambda.zip'); | ||
t.context.layerZip = path.join(t.context.tempDir, 'fakeLayer.zip'); | ||
t.context.lambdaCMAZip = path.join(t.context.tempDir, 'fakeCMALayer.zip'); | ||
t.context.taskDirectory = path.join(t.context.tempDir, 'task'); | ||
t.context.workDirectory = path.join(t.context.tempDir, '.tmp-work'); | ||
t.context.layerDirectory = path.join(t.context.tempDir, 'layers'); | ||
fs.mkdirpSync(t.context.taskDirectory); | ||
fs.mkdirpSync(t.context.workDirectory); | ||
fs.mkdirpSync(t.context.layerDirectory); | ||
@@ -34,4 +40,30 @@ // zip fake lambda | ||
// zip fake layer file | ||
await new Promise((resolve, reject) => { | ||
const output = fs.createWriteStream(t.context.layerZip); | ||
const archive = archiver('zip'); | ||
output.on('close', resolve); | ||
output.on('error', reject); | ||
archive.pipe(output); | ||
archive.file(path.join(__dirname, 'data/layerDataFile.txt'), { name: 'fakeLayer.txt' }); | ||
archive.finalize(); | ||
}); | ||
// zip CMA injected layer file | ||
await new Promise((resolve, reject) => { | ||
const output = fs.createWriteStream(t.context.lambdaCMAZip); | ||
const archive = archiver('zip'); | ||
output.on('close', resolve); | ||
output.on('error', reject); | ||
archive.pipe(output); | ||
archive.file(path.join(__dirname, 'data/fakeLambda.js'), { name: 'fakeLambda.js' }); | ||
archive.file(path.join(__dirname, 'data/cumulus-message-adapter.txt'), | ||
{ name: 'cumulus-message-adapter' }); | ||
archive.finalize(); | ||
}); | ||
t.context.lambdaZipUrlPath = '/lambda'; | ||
t.context.getLayerUrlPath = '/getLayer'; | ||
nock('https://example.com') | ||
@@ -41,2 +73,6 @@ .get(t.context.lambdaZipUrlPath) | ||
nock('https://example.com') | ||
.get(t.context.getLayerUrlPath) | ||
.reply(200, () => fs.createReadStream(t.context.layerZip)); | ||
t.context.expectedOutput = [ | ||
@@ -46,4 +82,13 @@ 'fakeLambda', | ||
]; | ||
t.context.stub = sinon.stub(AWS, 'Lambda') | ||
.returns({ | ||
getLayerVersionByArn: () => ({ | ||
promise: async () => ({ | ||
LayerArn: 'notARealArn', | ||
Content: { | ||
Location: `https://example.com${t.context.getLayerUrlPath}` | ||
} | ||
}) | ||
}), | ||
getFunction: () => ({ | ||
@@ -55,3 +100,4 @@ promise: async () => ({ | ||
Configuration: { | ||
Handler: t.context.expectedOutput.join('.') | ||
Handler: t.context.expectedOutput.join('.'), | ||
Layers: ['notARealArn'] | ||
} | ||
@@ -76,8 +122,56 @@ }) | ||
taskDirectory: t.context.taskDirectory, | ||
workDirectory: t.context.workDirectory | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory | ||
}); | ||
t.deepEqual(event, output); | ||
}); | ||
test.serial('layers are extracted into target directory', async (t) => { | ||
const event = { hi: 'bye' }; | ||
await runTask({ | ||
lambdaArn: 'arn:aws:lambda:region:account-id:function:fake-function', | ||
lambdaInput: event, | ||
taskDirectory: t.context.taskDirectory, | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory | ||
}); | ||
t.true(fs.existsSync(`${t.context.layerDirectory}/fakeLayer.txt`)); | ||
}); | ||
test.serial('CMA environment variable is set if CMA is not present', async (t) => { | ||
const event = { hi: 'bye' }; | ||
await runTask({ | ||
lambdaArn: 'arn:aws:lambda:region:account-id:function:fake-function', | ||
lambdaInput: event, | ||
taskDirectory: t.context.taskDirectory, | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory | ||
}); | ||
t.is(process.env.CUMULUS_MESSAGE_ADAPTER_DIR, t.context.layerDirectory); | ||
}); | ||
test.serial('CMA environment variable is set if CMA is present', async (t) => { | ||
const event = { hi: 'bye' }; | ||
nock.cleanAll(); | ||
nock('https://example.com') | ||
.get(t.context.lambdaZipUrlPath) | ||
.reply(200, () => fs.createReadStream(t.context.lambdaCMAZip)); | ||
nock('https://example.com') | ||
.get(t.context.getLayerUrlPath) | ||
.reply(200, () => fs.createReadStream(t.context.layerZip)); | ||
await runTask({ | ||
lambdaArn: 'arn:aws:lambda:region:account-id:function:fake-function', | ||
lambdaInput: event, | ||
taskDirectory: t.context.taskDirectory, | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory | ||
}); | ||
t.is(process.env.CUMULUS_MESSAGE_ADAPTER_DIR, | ||
`${t.context.taskDirectory}/cumulus-message-adapter`); | ||
}); | ||
test.serial('test failed task run', async (t) => { | ||
@@ -89,3 +183,4 @@ const event = { hi: 'bye', error: 'it failed' }; | ||
taskDirectory: t.context.taskDirectory, | ||
workDirectory: t.context.workDirectory | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory | ||
}); | ||
@@ -124,2 +219,3 @@ const error = await t.throws(promise); | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory, | ||
runForever: false | ||
@@ -163,2 +259,3 @@ }); | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory, | ||
runForever: false | ||
@@ -181,2 +278,6 @@ }); | ||
nock('https://example.com') | ||
.get(t.context.getLayerUrlPath) | ||
.reply(200, () => fs.createReadStream(t.context.layerZip)); | ||
const event = { hi: 'bye' }; | ||
@@ -188,3 +289,4 @@ | ||
taskDirectory: t.context.taskDirectory, | ||
workDirectory: t.context.workDirectory | ||
workDirectory: t.context.workDirectory, | ||
layersDirectory: t.context.layerDirectory | ||
}); | ||
@@ -191,0 +293,0 @@ |
61377
20
977
Updatedaws-sdk@^2.503.0