lifion-kinesis
Advanced tools
Comparing version 1.0.6 to 1.0.7
@@ -10,2 +10,27 @@ ## Changelog | ||
### v1.0.7 (2019-05-22) | ||
- [`#138`](https://github.com/lifion/lifion-kinesis/pull/138): Update dependency aws-sdk to ^2.460.0 | ||
- [`#139`](https://github.com/lifion/lifion-kinesis/pull/139): Refactor stream creation on put record(s) | ||
- [`#137`](https://github.com/lifion/lifion-kinesis/pull/137): Update dependency aws-sdk to ^2.458.0 | ||
- [`#134`](https://github.com/lifion/lifion-kinesis/pull/134): Update dependency codecov to ^3.5.0 | ||
- [`#135`](https://github.com/lifion/lifion-kinesis/pull/135): Update dependency lint-staged to ^8.1.7 | ||
- [`#133`](https://github.com/lifion/lifion-kinesis/pull/133): Update dependency aws-sdk to ^2.455.0 | ||
- [`#132`](https://github.com/lifion/lifion-kinesis/pull/132): Update dependency husky to ^2.3.0 | ||
- [`#131`](https://github.com/lifion/lifion-kinesis/pull/131): Update dependency aws-sdk to ^2.454.0 | ||
- [`#130`](https://github.com/lifion/lifion-kinesis/pull/130): Update dependency prettier to ^1.17.1 | ||
- [`#129`](https://github.com/lifion/lifion-kinesis/pull/129): Update dependency jsdoc-to-markdown to v5 | ||
- [`#128`](https://github.com/lifion/lifion-kinesis/pull/128): Update dependency aws-sdk to ^2.453.0 | ||
- [`#125`](https://github.com/lifion/lifion-kinesis/pull/125): Update dependency aws-sdk to ^2.452.0 | ||
- [`#126`](https://github.com/lifion/lifion-kinesis/pull/126): Update dependency codecov to ^3.4.0 | ||
- [`#124`](https://github.com/lifion/lifion-kinesis/pull/124): Update dependency jest to ^24.8.0 | ||
- [`#123`](https://github.com/lifion/lifion-kinesis/pull/123): Update dependency lint-staged to ^8.1.6 | ||
- [`#122`](https://github.com/lifion/lifion-kinesis/pull/122): Update dependency aws-sdk to ^2.451.0 | ||
- [`#121`](https://github.com/lifion/lifion-kinesis/pull/121): Update dependency jest-junit to ^6.4.0 | ||
- [`#120`](https://github.com/lifion/lifion-kinesis/pull/120): Update dependency husky to ^2.2.0 | ||
- [`#118`](https://github.com/lifion/lifion-kinesis/pull/118): Update dependency aws-sdk to ^2.447.0 | ||
- [`#119`](https://github.com/lifion/lifion-kinesis/pull/119): Update dependency short-uuid to ^3.1.1 | ||
- [`2d05ec4`](https://github.com/lifion/lifion-kinesis/commit/2d05ec4cb692d7463327bdd463ebeb0c4a867879): Improve resilience of the fan-out consumer | ||
- [`6ccc463`](https://github.com/lifion/lifion-kinesis/commit/6ccc46302fe345d8fb400f2267d19d344d45de75): Remove extra linting rules | ||
### v1.0.6 (2019-04-29) | ||
@@ -12,0 +37,0 @@ |
@@ -29,2 +29,13 @@ 'use strict'; | ||
function shouldBailRetry(err) { | ||
const { code } = err; | ||
return ( | ||
code === 'MissingParameter' || | ||
code === 'MissingRequiredParameter' || | ||
code === 'MultipleValidationErrors' || | ||
code === 'UnexpectedParameter' || | ||
code === 'ValidationException' | ||
); | ||
} | ||
class PreProcess extends Transform { | ||
@@ -52,9 +63,11 @@ constructor({ requestFlags }) { | ||
class PostProcess extends Writable { | ||
constructor({ markShardAsDepleted, pushToStream, setCheckpoint, shardId }) { | ||
constructor({ abort, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) { | ||
super({ objectMode: true }); | ||
Object.assign(internal(this), { | ||
abort, | ||
markShardAsDepleted, | ||
pushToStream, | ||
setCheckpoint, | ||
shardId | ||
shardId, | ||
timeoutId: null | ||
}); | ||
@@ -64,7 +77,16 @@ } | ||
async _write(chunk, encoding, callback) { | ||
const { markShardAsDepleted, pushToStream, setCheckpoint, shardId } = internal(this); | ||
const { continuationSequenceNumber, records } = chunk; | ||
const { | ||
abort, | ||
markShardAsDepleted, | ||
pushToStream, | ||
setCheckpoint, | ||
shardId, | ||
timeoutId | ||
} = internal(this); | ||
clearTimeout(timeoutId); | ||
internal(this).timeoutId = setTimeout(abort, 10000); | ||
const { continuationSequenceNumber } = chunk; | ||
if (continuationSequenceNumber !== undefined) { | ||
await setCheckpoint(continuationSequenceNumber); | ||
if (records.length > 0) pushToStream(null, { ...chunk, shardId }); | ||
pushToStream(null, { ...chunk, shardId }); | ||
callback(); | ||
@@ -135,3 +157,2 @@ } else { | ||
const { | ||
checkpoint, | ||
client, | ||
@@ -156,3 +177,2 @@ compression, | ||
let pipelineError; | ||
const requestFlags = {}; | ||
@@ -187,4 +207,11 @@ let stream; | ||
await stateStore.storeShardCheckpoint(shardId, sequenceNumber, shardsPath, shardsPathNames); | ||
privateProps.checkpoint = sequenceNumber; | ||
}; | ||
const abort = () => { | ||
if (privateProps.request) { | ||
privateProps.request.abort(); | ||
} | ||
}; | ||
do { | ||
@@ -196,2 +223,4 @@ if (requestFlags.isEventStream === false) { | ||
const { checkpoint } = privateProps; | ||
stream = httpClient.stream('/', { | ||
@@ -219,19 +248,25 @@ body: JSON.stringify({ | ||
new RecordsDecoder({ compression }), | ||
new PostProcess({ markShardAsDepleted, pushToStream, setCheckpoint, shardId }) | ||
new PostProcess({ abort, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) | ||
]); | ||
} catch (err) { | ||
const { code, isRetryable, message } = err; | ||
if (isRetryable) { | ||
logger.warn(`Pipeline closed with retryable error: [${code}] ${message}`); | ||
const { code, message, requestId, statusCode } = err; | ||
console.warn('pipeline erorr', code); | ||
if (!shouldBailRetry(err)) { | ||
logger.warn( | ||
`Trying to recover from AWS.Kinesis error…\n${[ | ||
`\t- Message: ${message}`, | ||
`\t- Request ID: ${requestId}`, | ||
`\t- Code: ${code} (${statusCode})`, | ||
`\t- Stream: ${streamName}` | ||
].join('\n')}` | ||
); | ||
} else { | ||
pushToStream(err); | ||
logger.error(`Pipeline closed with error: [${code}] ${message}`); | ||
privateProps.retryPipeline = false; | ||
} | ||
pipelineError = err; | ||
} | ||
} while (privateProps.retryPipeline && (!pipelineError || pipelineError.isRetryable)); | ||
} while (privateProps.retryPipeline); | ||
if (privateProps.request) { | ||
privateProps.request.abort(); | ||
} | ||
abort(); | ||
} | ||
@@ -238,0 +273,0 @@ |
'use strict'; | ||
const HEARTBEAT_INTERVAL = 30 * 1000; | ||
const HEARTBEAT_INTERVAL = 20 * 1000; | ||
const HEARTBEAT_FAILURE_TIMEOUT = HEARTBEAT_INTERVAL * 2; | ||
@@ -23,4 +23,4 @@ | ||
const heartbeat = async () => { | ||
await stateStore.registerConsumer(); | ||
await stateStore.clearOldConsumers(HEARTBEAT_FAILURE_TIMEOUT); | ||
await stateStore.registerConsumer(); | ||
logger.debug('Heartbeat sent.'); | ||
@@ -27,0 +27,0 @@ privateProps.timeoutId = setTimeout(heartbeat, HEARTBEAT_INTERVAL); |
@@ -39,11 +39,18 @@ /** | ||
async function ensureStreamInitialized(instance) { | ||
async function ensureStreamInitialized(instance, streamNameParam) { | ||
const privateProps = internal(instance); | ||
const { encryption, streamArn, streamCreatedOn, streamName, tags } = privateProps; | ||
let streamProps; | ||
const { encryption, streamArn, streamCreatedOn, tags } = privateProps; | ||
if (streamArn && streamCreatedOn) return; | ||
if (!streamNameParam || streamNameParam === streamName) { | ||
if (streamArn && streamCreatedOn) return; | ||
streamProps = privateProps; | ||
Object.assign(privateProps, await ensureStreamExists(streamProps)); | ||
} else { | ||
streamProps = Object.assign({}, privateProps, { streamName: streamNameParam }); | ||
await ensureStreamExists(streamProps); | ||
} | ||
Object.assign(privateProps, await ensureStreamExists(privateProps)); | ||
if (encryption) await ensureStreamEncription(privateProps); | ||
if (tags) await confirmStreamTags(privateProps); | ||
if (encryption) await ensureStreamEncription(streamProps); | ||
if (tags) await confirmStreamTags(streamProps); | ||
} | ||
@@ -309,7 +316,14 @@ | ||
const { client, recordsEncoder } = privateProps; | ||
await ensureStreamInitialized(this); | ||
return client.putRecord({ | ||
const params = { | ||
...(await recordsEncoder(record)), | ||
StreamName: streamName || privateProps.streamName | ||
}); | ||
}; | ||
try { | ||
const result = await client.putRecord(params); | ||
return result; | ||
} catch (err) { | ||
if (err.code !== 'ResourceNotFoundException') throw err; | ||
await ensureStreamInitialized(this, streamName); | ||
return client.putRecord(params); | ||
} | ||
} | ||
@@ -334,7 +348,19 @@ | ||
const { client, recordsEncoder } = privateProps; | ||
await ensureStreamInitialized(this); | ||
return client.putRecords({ | ||
const params = { | ||
Records: await Promise.all(records.map(recordsEncoder)), | ||
StreamName: streamName || privateProps.streamName | ||
}); | ||
}; | ||
try { | ||
const result = await client.putRecords(params); | ||
return result; | ||
} catch (err) { | ||
if ( | ||
err.code === 'ResourceNotFoundException' || | ||
(err.code === 'UnknownError' && client.isEndpointLocal()) | ||
) { | ||
await ensureStreamInitialized(this, streamName); | ||
return client.putRecords(params); | ||
} | ||
throw err; | ||
} | ||
} | ||
@@ -341,0 +367,0 @@ |
@@ -374,4 +374,10 @@ 'use strict'; | ||
} | ||
isEndpointLocal() { | ||
const { client } = internal(this); | ||
const { host } = client.endpoint; | ||
return host.includes('localhost') || host.includes('localstack'); | ||
} | ||
} | ||
module.exports = KinesisClient; |
@@ -116,5 +116,5 @@ 'use strict'; | ||
module.exports = { | ||
RecordsDecoder, | ||
getRecordsDecoder, | ||
getRecordsEncoder, | ||
RecordsDecoder | ||
getRecordsEncoder | ||
}; |
@@ -171,5 +171,6 @@ 'use strict'; | ||
const usagesToClear = Object.keys(enhancedConsumers).filter(consumerName => | ||
oldConsumers.includes(enhancedConsumers[consumerName].isUsedBy) | ||
); | ||
const usagesToClear = Object.keys(enhancedConsumers).filter(consumerName => { | ||
const { isUsedBy } = enhancedConsumers[consumerName]; | ||
return oldConsumers.includes(isUsedBy) || !consumerIds.includes(isUsedBy); | ||
}); | ||
@@ -271,15 +272,19 @@ if (usagesToClear.length === 0) return; | ||
async updateConsumerIsActive(isActive) { | ||
const { client, consumerGroup, consumerId, streamName } = internal(this); | ||
await client.update({ | ||
ExpressionAttributeNames: { | ||
'#a': 'consumers', | ||
'#b': consumerId, | ||
'#c': 'isActive' | ||
}, | ||
ExpressionAttributeValues: { | ||
':z': isActive | ||
}, | ||
Key: { consumerGroup, streamName }, | ||
UpdateExpression: 'SET #a.#b.#c = :z' | ||
}); | ||
const { client, consumerGroup, consumerId, logger, streamName } = internal(this); | ||
try { | ||
await client.update({ | ||
ExpressionAttributeNames: { | ||
'#a': 'consumers', | ||
'#b': consumerId, | ||
'#c': 'isActive' | ||
}, | ||
ExpressionAttributeValues: { | ||
':z': isActive | ||
}, | ||
Key: { consumerGroup, streamName }, | ||
UpdateExpression: 'SET #a.#b.#c = :z' | ||
}); | ||
} catch (err) { | ||
logger.debug("Can't update the is consumer active flag."); | ||
} | ||
} | ||
@@ -286,0 +291,0 @@ |
{ | ||
"name": "lifion-kinesis", | ||
"version": "1.0.6", | ||
"version": "1.0.7", | ||
"description": "Lifion client for Amazon Kinesis Data streams", | ||
@@ -36,3 +36,3 @@ "keywords": [ | ||
"async-retry": "^1.2.3", | ||
"aws-sdk": "^2.445.0", | ||
"aws-sdk": "^2.460.0", | ||
"aws4": "^1.8.0", | ||
@@ -45,3 +45,3 @@ "fast-deep-equal": "^2.0.1", | ||
"project-name": "^1.0.0", | ||
"short-uuid": "^3.1.0" | ||
"short-uuid": "^3.1.1" | ||
}, | ||
@@ -52,13 +52,13 @@ "devDependencies": { | ||
"check-engines": "^1.5.0", | ||
"codecov": "^3.3.0", | ||
"codecov": "^3.5.0", | ||
"eslint": "^5.16.0", | ||
"eslint-config-lifion": "^1.1.0", | ||
"eslint-config-lifion": "^1.2.2", | ||
"eslint-plugin-sort-destructure-keys": "^1.3.0", | ||
"husky": "^2.1.0", | ||
"jest": "^24.7.1", | ||
"jest-junit": "^6.3.0", | ||
"jsdoc-to-markdown": "^4.0.1", | ||
"lint-staged": "^8.1.5", | ||
"husky": "^2.3.0", | ||
"jest": "^24.8.0", | ||
"jest-junit": "^6.4.0", | ||
"jsdoc-to-markdown": "^5.0.0", | ||
"lint-staged": "^8.1.7", | ||
"npm-watch": "^0.6.0", | ||
"prettier": "^1.17.0", | ||
"prettier": "^1.17.1", | ||
"semver": "^6.0.0" | ||
@@ -71,12 +71,3 @@ }, | ||
"eslintConfig": { | ||
"extends": "lifion", | ||
"plugins": [ | ||
"sort-destructure-keys" | ||
], | ||
"rules": { | ||
"jsdoc/require-returns-description": "off", | ||
"no-await-in-loop": "off", | ||
"sort-destructure-keys/sort-destructure-keys": "error", | ||
"sort-keys": "error" | ||
} | ||
"extends": "lifion" | ||
}, | ||
@@ -83,0 +74,0 @@ "husky": { |
142285
2816
Updatedaws-sdk@^2.460.0
Updatedshort-uuid@^3.1.1