Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

lifion-kinesis

Package Overview
Dependencies
Maintainers
3
Versions
30
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

lifion-kinesis - npm Package Compare versions

Comparing version 1.0.6 to 1.0.7

25

CHANGELOG.md

@@ -10,2 +10,27 @@ ## Changelog

### v1.0.7 (2019-05-22)
- [`#138`](https://github.com/lifion/lifion-kinesis/pull/138): Update dependency aws-sdk to ^2.460.0
- [`#139`](https://github.com/lifion/lifion-kinesis/pull/139): Refactor stream creation on put record(s)
- [`#137`](https://github.com/lifion/lifion-kinesis/pull/137): Update dependency aws-sdk to ^2.458.0
- [`#134`](https://github.com/lifion/lifion-kinesis/pull/134): Update dependency codecov to ^3.5.0
- [`#135`](https://github.com/lifion/lifion-kinesis/pull/135): Update dependency lint-staged to ^8.1.7
- [`#133`](https://github.com/lifion/lifion-kinesis/pull/133): Update dependency aws-sdk to ^2.455.0
- [`#132`](https://github.com/lifion/lifion-kinesis/pull/132): Update dependency husky to ^2.3.0
- [`#131`](https://github.com/lifion/lifion-kinesis/pull/131): Update dependency aws-sdk to ^2.454.0
- [`#130`](https://github.com/lifion/lifion-kinesis/pull/130): Update dependency prettier to ^1.17.1
- [`#129`](https://github.com/lifion/lifion-kinesis/pull/129): Update dependency jsdoc-to-markdown to v5
- [`#128`](https://github.com/lifion/lifion-kinesis/pull/128): Update dependency aws-sdk to ^2.453.0
- [`#125`](https://github.com/lifion/lifion-kinesis/pull/125): Update dependency aws-sdk to ^2.452.0
- [`#126`](https://github.com/lifion/lifion-kinesis/pull/126): Update dependency codecov to ^3.4.0
- [`#124`](https://github.com/lifion/lifion-kinesis/pull/124): Update dependency jest to ^24.8.0
- [`#123`](https://github.com/lifion/lifion-kinesis/pull/123): Update dependency lint-staged to ^8.1.6
- [`#122`](https://github.com/lifion/lifion-kinesis/pull/122): Update dependency aws-sdk to ^2.451.0
- [`#121`](https://github.com/lifion/lifion-kinesis/pull/121): Update dependency jest-junit to ^6.4.0
- [`#120`](https://github.com/lifion/lifion-kinesis/pull/120): Update dependency husky to ^2.2.0
- [`#118`](https://github.com/lifion/lifion-kinesis/pull/118): Update dependency aws-sdk to ^2.447.0
- [`#119`](https://github.com/lifion/lifion-kinesis/pull/119): Update dependency short-uuid to ^3.1.1
- [`2d05ec4`](https://github.com/lifion/lifion-kinesis/commit/2d05ec4cb692d7463327bdd463ebeb0c4a867879): Improve resilience of the fan-out consumer
- [`6ccc463`](https://github.com/lifion/lifion-kinesis/commit/6ccc46302fe345d8fb400f2267d19d344d45de75): Remove extra linting rules
### v1.0.6 (2019-04-29)

@@ -12,0 +37,0 @@

67

lib/fan-out-consumer.js

@@ -29,2 +29,13 @@ 'use strict';

function shouldBailRetry(err) {
const { code } = err;
return (
code === 'MissingParameter' ||
code === 'MissingRequiredParameter' ||
code === 'MultipleValidationErrors' ||
code === 'UnexpectedParameter' ||
code === 'ValidationException'
);
}
class PreProcess extends Transform {

@@ -52,9 +63,11 @@ constructor({ requestFlags }) {

class PostProcess extends Writable {
constructor({ markShardAsDepleted, pushToStream, setCheckpoint, shardId }) {
constructor({ abort, markShardAsDepleted, pushToStream, setCheckpoint, shardId }) {
super({ objectMode: true });
Object.assign(internal(this), {
abort,
markShardAsDepleted,
pushToStream,
setCheckpoint,
shardId
shardId,
timeoutId: null
});

@@ -64,7 +77,16 @@ }

async _write(chunk, encoding, callback) {
const { markShardAsDepleted, pushToStream, setCheckpoint, shardId } = internal(this);
const { continuationSequenceNumber, records } = chunk;
const {
abort,
markShardAsDepleted,
pushToStream,
setCheckpoint,
shardId,
timeoutId
} = internal(this);
clearTimeout(timeoutId);
internal(this).timeoutId = setTimeout(abort, 10000);
const { continuationSequenceNumber } = chunk;
if (continuationSequenceNumber !== undefined) {
await setCheckpoint(continuationSequenceNumber);
if (records.length > 0) pushToStream(null, { ...chunk, shardId });
pushToStream(null, { ...chunk, shardId });
callback();

@@ -135,3 +157,2 @@ } else {

const {
checkpoint,
client,

@@ -156,3 +177,2 @@ compression,

let pipelineError;
const requestFlags = {};

@@ -187,4 +207,11 @@ let stream;

await stateStore.storeShardCheckpoint(shardId, sequenceNumber, shardsPath, shardsPathNames);
privateProps.checkpoint = sequenceNumber;
};
const abort = () => {
if (privateProps.request) {
privateProps.request.abort();
}
};
do {

@@ -196,2 +223,4 @@ if (requestFlags.isEventStream === false) {

const { checkpoint } = privateProps;
stream = httpClient.stream('/', {

@@ -219,19 +248,25 @@ body: JSON.stringify({

new RecordsDecoder({ compression }),
new PostProcess({ markShardAsDepleted, pushToStream, setCheckpoint, shardId })
new PostProcess({ abort, markShardAsDepleted, pushToStream, setCheckpoint, shardId })
]);
} catch (err) {
const { code, isRetryable, message } = err;
if (isRetryable) {
logger.warn(`Pipeline closed with retryable error: [${code}] ${message}`);
const { code, message, requestId, statusCode } = err;
console.warn('pipeline erorr', code);
if (!shouldBailRetry(err)) {
logger.warn(
`Trying to recover from AWS.Kinesis error…\n${[
`\t- Message: ${message}`,
`\t- Request ID: ${requestId}`,
`\t- Code: ${code} (${statusCode})`,
`\t- Stream: ${streamName}`
].join('\n')}`
);
} else {
pushToStream(err);
logger.error(`Pipeline closed with error: [${code}] ${message}`);
privateProps.retryPipeline = false;
}
pipelineError = err;
}
} while (privateProps.retryPipeline && (!pipelineError || pipelineError.isRetryable));
} while (privateProps.retryPipeline);
if (privateProps.request) {
privateProps.request.abort();
}
abort();
}

@@ -238,0 +273,0 @@

'use strict';
const HEARTBEAT_INTERVAL = 30 * 1000;
const HEARTBEAT_INTERVAL = 20 * 1000;
const HEARTBEAT_FAILURE_TIMEOUT = HEARTBEAT_INTERVAL * 2;

@@ -23,4 +23,4 @@

const heartbeat = async () => {
await stateStore.registerConsumer();
await stateStore.clearOldConsumers(HEARTBEAT_FAILURE_TIMEOUT);
await stateStore.registerConsumer();
logger.debug('Heartbeat sent.');

@@ -27,0 +27,0 @@ privateProps.timeoutId = setTimeout(heartbeat, HEARTBEAT_INTERVAL);

@@ -39,11 +39,18 @@ /**

async function ensureStreamInitialized(instance) {
async function ensureStreamInitialized(instance, streamNameParam) {
const privateProps = internal(instance);
const { encryption, streamArn, streamCreatedOn, streamName, tags } = privateProps;
let streamProps;
const { encryption, streamArn, streamCreatedOn, tags } = privateProps;
if (streamArn && streamCreatedOn) return;
if (!streamNameParam || streamNameParam === streamName) {
if (streamArn && streamCreatedOn) return;
streamProps = privateProps;
Object.assign(privateProps, await ensureStreamExists(streamProps));
} else {
streamProps = Object.assign({}, privateProps, { streamName: streamNameParam });
await ensureStreamExists(streamProps);
}
Object.assign(privateProps, await ensureStreamExists(privateProps));
if (encryption) await ensureStreamEncription(privateProps);
if (tags) await confirmStreamTags(privateProps);
if (encryption) await ensureStreamEncription(streamProps);
if (tags) await confirmStreamTags(streamProps);
}

@@ -309,7 +316,14 @@

const { client, recordsEncoder } = privateProps;
await ensureStreamInitialized(this);
return client.putRecord({
const params = {
...(await recordsEncoder(record)),
StreamName: streamName || privateProps.streamName
});
};
try {
const result = await client.putRecord(params);
return result;
} catch (err) {
if (err.code !== 'ResourceNotFoundException') throw err;
await ensureStreamInitialized(this, streamName);
return client.putRecord(params);
}
}

@@ -334,7 +348,19 @@

const { client, recordsEncoder } = privateProps;
await ensureStreamInitialized(this);
return client.putRecords({
const params = {
Records: await Promise.all(records.map(recordsEncoder)),
StreamName: streamName || privateProps.streamName
});
};
try {
const result = await client.putRecords(params);
return result;
} catch (err) {
if (
err.code === 'ResourceNotFoundException' ||
(err.code === 'UnknownError' && client.isEndpointLocal())
) {
await ensureStreamInitialized(this, streamName);
return client.putRecords(params);
}
throw err;
}
}

@@ -341,0 +367,0 @@

@@ -374,4 +374,10 @@ 'use strict';

}
isEndpointLocal() {
const { client } = internal(this);
const { host } = client.endpoint;
return host.includes('localhost') || host.includes('localstack');
}
}
module.exports = KinesisClient;

@@ -116,5 +116,5 @@ 'use strict';

module.exports = {
RecordsDecoder,
getRecordsDecoder,
getRecordsEncoder,
RecordsDecoder
getRecordsEncoder
};

@@ -171,5 +171,6 @@ 'use strict';

const usagesToClear = Object.keys(enhancedConsumers).filter(consumerName =>
oldConsumers.includes(enhancedConsumers[consumerName].isUsedBy)
);
const usagesToClear = Object.keys(enhancedConsumers).filter(consumerName => {
const { isUsedBy } = enhancedConsumers[consumerName];
return oldConsumers.includes(isUsedBy) || !consumerIds.includes(isUsedBy);
});

@@ -271,15 +272,19 @@ if (usagesToClear.length === 0) return;

async updateConsumerIsActive(isActive) {
const { client, consumerGroup, consumerId, streamName } = internal(this);
await client.update({
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId,
'#c': 'isActive'
},
ExpressionAttributeValues: {
':z': isActive
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :z'
});
const { client, consumerGroup, consumerId, logger, streamName } = internal(this);
try {
await client.update({
ExpressionAttributeNames: {
'#a': 'consumers',
'#b': consumerId,
'#c': 'isActive'
},
ExpressionAttributeValues: {
':z': isActive
},
Key: { consumerGroup, streamName },
UpdateExpression: 'SET #a.#b.#c = :z'
});
} catch (err) {
logger.debug("Can't update the is consumer active flag.");
}
}

@@ -286,0 +291,0 @@

{
"name": "lifion-kinesis",
"version": "1.0.6",
"version": "1.0.7",
"description": "Lifion client for Amazon Kinesis Data streams",

@@ -36,3 +36,3 @@ "keywords": [

"async-retry": "^1.2.3",
"aws-sdk": "^2.445.0",
"aws-sdk": "^2.460.0",
"aws4": "^1.8.0",

@@ -45,3 +45,3 @@ "fast-deep-equal": "^2.0.1",

"project-name": "^1.0.0",
"short-uuid": "^3.1.0"
"short-uuid": "^3.1.1"
},

@@ -52,13 +52,13 @@ "devDependencies": {

"check-engines": "^1.5.0",
"codecov": "^3.3.0",
"codecov": "^3.5.0",
"eslint": "^5.16.0",
"eslint-config-lifion": "^1.1.0",
"eslint-config-lifion": "^1.2.2",
"eslint-plugin-sort-destructure-keys": "^1.3.0",
"husky": "^2.1.0",
"jest": "^24.7.1",
"jest-junit": "^6.3.0",
"jsdoc-to-markdown": "^4.0.1",
"lint-staged": "^8.1.5",
"husky": "^2.3.0",
"jest": "^24.8.0",
"jest-junit": "^6.4.0",
"jsdoc-to-markdown": "^5.0.0",
"lint-staged": "^8.1.7",
"npm-watch": "^0.6.0",
"prettier": "^1.17.0",
"prettier": "^1.17.1",
"semver": "^6.0.0"

@@ -71,12 +71,3 @@ },

"eslintConfig": {
"extends": "lifion",
"plugins": [
"sort-destructure-keys"
],
"rules": {
"jsdoc/require-returns-description": "off",
"no-await-in-loop": "off",
"sort-destructure-keys/sort-destructure-keys": "error",
"sort-keys": "error"
}
"extends": "lifion"
},

@@ -83,0 +74,0 @@ "husky": {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc