Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@bonniernews/b0rker

Package Overview
Dependencies
Maintainers
0
Versions
79
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@bonniernews/b0rker - npm Package Compare versions

Comparing version 11.0.0 to 11.1.0

4

config/test.json

@@ -7,3 +7,5 @@ {

"logLevel": "debug",
"logJson": false
"logJson": false,
"truncateLog": true,
"pretty": true
},

@@ -10,0 +12,0 @@ "deadLetterTopic": "dead-letter-topic",

@@ -21,3 +21,3 @@ export default function validateConfig(config) {

for (const i in o) {
if ((typeof o[i]) === "object" && !Array.isArray(o[i])) {
if (typeof o[i] === "object" && !Array.isArray(o[i])) {
const fo = flattenObject(o[i]);

@@ -24,0 +24,0 @@ for (const j in fo) {

import assert from "assert";
import { logger } from "lu-logger";
import { findAttribute, findOrReject } from "./find-attributes.js";
import httpClient from "./http-client.js";
import { findAttribute, findOrReject } from "./find-attributes.js";

@@ -7,0 +7,0 @@ export default function buildContext(correlationId, key) {

@@ -0,3 +1,3 @@

import { PubSub } from "@google-cloud/pubsub";
import config from "exp-config";
import { PubSub } from "@google-cloud/pubsub";
import { logger } from "lu-logger";

@@ -10,4 +10,6 @@

export function shouldSendToDlx(req, { nextTime = false } = {}) {
return req.attributes.retryCount + (nextTime ? 1 : 0) > maxRetries;
export function shouldSendToDlx({ attributes: { skipRetries, retryCount } = {} }, { nextTime = false } = {}) {
const secondOrLaterAttemptWithoutRetries = skipRetries && retryCount > 0;
const maxRetriesReached = retryCount + (nextTime ? 1 : 0) > maxRetries;
return secondOrLaterAttemptWithoutRetries || maxRetriesReached;
}

@@ -37,5 +39,7 @@

logger.error("Max retries reached, sending to DLX.");
const retryMessage = err?.extraMessage ? `Max retries reached. Last message: "${err.extraMessage}"` : "Max retries reached";
const retryMessage = err?.extraMessage
? `Max retries reached. Last message: "${err.extraMessage}"`
: "Max retries reached";
await sendToDlx(req, retryMessage);
return res.status(200).send({ type: "dlx", message: retryMessage });
}

@@ -0,7 +1,7 @@

import assert from "assert";
import axios from "axios";
import config from "exp-config";
import GoogleAuth from "google-auth-library";
import { logger } from "lu-logger";
import util from "util";
import config from "exp-config";
import axios from "axios";
import assert from "assert";
import { logger } from "lu-logger";

@@ -8,0 +8,0 @@ import parseUrl from "./utils/parse-url.js";

@@ -133,2 +133,2 @@ import { Firestore } from "@google-cloud/firestore";

export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen };
export { completedChild, messageAlreadySeen, parentIsComplete, removeParent, storeParent };

@@ -102,2 +102,2 @@ // we try and copy the functionality of firestore here as best we can

export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen, getDB, clearDB };
export { clearDB, completedChild, getDB, messageAlreadySeen, parentIsComplete, removeParent, storeParent };

@@ -35,2 +35,3 @@ import * as uuid from "uuid";

queue: req.header("x-cloudtasks-queuename"),
skipRetries: Boolean(req.headers["x-no-retry"]),
};

@@ -37,0 +38,0 @@ return next();

@@ -79,4 +79,3 @@ import { CloudTasksClient } from "@google-cloud/tasks";

const url = `${selfUrl}/v2/${taskUrl.replace(/^\//, "")}`;
const correlationId =
headers.correlationId || headers["correlation-id"] || debugMeta.getDebugMeta().correlationId;
const correlationId = headers.correlationId || headers["correlation-id"] || debugMeta.getDebugMeta().correlationId;
const newHeaders = {

@@ -83,0 +82,0 @@ "Content-Type": "application/json",

@@ -42,3 +42,6 @@ import { Router as expressRouter } from "express";

for (const { key, func } of sequenceIterator(sequence)) {
router.post(buildUrl(namespace, name, key), messageHandler(func, nextKeyMapper(`${namespace}.${name}.${key}`), schema));
router.post(
buildUrl(namespace, name, key),
messageHandler(func, nextKeyMapper(`${namespace}.${name}.${key}`), schema)
);
}

@@ -92,8 +95,12 @@ // Allow to start a sequence/sub-sequence by posting to the sequence name

if (nextKey) {
await publishTask(
keyToUrl(nextKey),
nextBody,
{ correlationId, parentCorrelationId, grandParentCorrelationId, siblingCount, parentSiblingCount, runId },
queue
);
const headers = {
correlationId,
parentCorrelationId,
grandParentCorrelationId,
siblingCount,
parentSiblingCount,
runId,
};
await publishTask(keyToUrl(nextKey), nextBody, headers, queue);
} else {

@@ -100,0 +107,0 @@ logger.info("No more steps in this sequence");

function parseUrl(baseUrl, path) {
const baseUrlWithProtocol = baseUrl.startsWith("http") ? baseUrl : `https://${baseUrl}`;
const baseUrlWithoutEndSlash = baseUrlWithProtocol.endsWith("/") ? baseUrlWithProtocol.slice(0, -1) : baseUrlWithProtocol;
const baseUrlWithoutEndSlash = baseUrlWithProtocol.endsWith("/")
? baseUrlWithProtocol.slice(0, -1)
: baseUrlWithProtocol;
const url = new URL(`${baseUrlWithoutEndSlash}${path}`);

@@ -5,0 +7,0 @@ return url.href;

{
"name": "@bonniernews/b0rker",
"version": "11.0.0",
"version": "11.1.0",
"engines": {

@@ -5,0 +5,0 @@ "node": ">=18"

import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test";
import { CloudTasksClient } from "@google-cloud/tasks";
import config from "exp-config";
import { createSandbox } from "sinon";
import request from "supertest";
import { createSandbox } from "sinon";
import { CloudTasksClient } from "@google-cloud/tasks";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -54,3 +54,3 @@ const maxRetries = config.maxRetries || 10;

And("there sequence should have been processed", () => {
And("the sequence should have been processed", () => {
response.messages

@@ -125,2 +125,3 @@ .map(({ url }) => url)

retryCount: (maxRetries + 1).toString(),
skipRetries: "false",
},

@@ -130,2 +131,89 @@ });

});
Scenario("A message runs once and then doesn't retry because of no-retry header", () => {
let broker;
Given("broker is initiated with a recipe", () => {
broker = start({
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test",
sequence: [
route(".perform.http-step", (message, { retryIf }) => {
retryIf(true);
}),
],
},
],
});
});
And("we can publish cloud tasks", () => {
fakeCloudTasks.enablePublish(broker);
});
And("we can publish pubsub messages", () => {
fakePubSub.enablePublish(broker);
});
let firstResponse;
When("a specific message is received the first time", async () => {
firstResponse = await request(broker)
.post("/v2/sequence/test/perform.http-step")
.send({})
.set({ "correlation-id": "some-epic-id", "x-no-retry": "true" });
await fakeCloudTasks.processMessages();
});
Then("the status code should be 400", () => {
firstResponse.statusCode.should.eql(400, firstResponse.text);
firstResponse.body.should.eql({ type: "retry" });
});
And("there should be no more processed messages", () => {
fakeCloudTasks.recordedMessages().length.should.eql(0);
});
But("the message should not have have been sent to the DLX", () => {
fakePubSub.recordedMessages().length.should.eql(0);
});
let secondReponse;
When("the message is received the second time", async () => {
secondReponse = await request(broker)
.post("/v2/sequence/test/perform.http-step")
.send({})
.set({ "correlation-id": "some-epic-id", "x-no-retry": "true", "x-cloudtasks-taskretrycount": 1 });
await fakeCloudTasks.processMessages();
});
Then("the status code should be 200 OK", () => {
secondReponse.statusCode.should.eql(200, secondReponse.text);
secondReponse.body.should.eql({ type: "dlx", message: "Max retries reached" });
});
But("there should be no more processed messages", () => {
fakeCloudTasks.recordedMessages().length.should.eql(0);
});
And("the message should have been sent to the DLX", () => {
fakePubSub.recordedMessages().length.should.eql(1);
fakePubSub.recordedMessages()[0].should.deep.eql({
deliveryAttempt: 1,
message: { error: { message: "Max retries reached" } },
topic: config.deadLetterTopic,
attributes: {
correlationId: "some-epic-id",
key: "sequence.test.perform.http-step",
origin: "cloudTasks",
runId: fakePubSub.recordedMessages()[0].attributes.runId,
appName: config.appName,
relativeUrl: "sequence/test/perform.http-step",
retryCount: "1",
skipRetries: "true",
},
});
});
});
});

@@ -199,2 +287,3 @@

retryCount: maxRetries.toString(),
skipRetries: "false",
},

@@ -263,2 +352,3 @@ });

origin: "cloudTasks",
skipRetries: "false",
},

@@ -322,2 +412,3 @@ deliveryAttempt: 1,

origin: "cloudTasks",
skipRetries: "false",
},

@@ -324,0 +415,0 @@ deliveryAttempt: 1,

@@ -1,6 +0,6 @@

import nock from "nock";
import { fakeCloudTasks, fakeGcpAuth } from "@bonniernews/lu-test";
import config from "exp-config";
import nock from "nock";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -7,0 +7,0 @@ const triggerMessage = {

import { fakeCloudTasks } from "@bonniernews/lu-test";
import config from "exp-config";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -6,0 +6,0 @@ const { queues } = config.cloudTasks;

@@ -0,5 +1,5 @@

import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test";
import config from "exp-config";
import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -6,0 +6,0 @@ const triggerMessage = {

import { fakeCloudTasks } from "@bonniernews/lu-test";
import config from "exp-config";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";
const server = {
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test",
sequence: [
route(".perform.first", () => ({ type: "first", id: "1" })),
route(".perform.second", () => ({ type: "second", id: "2" })),
route(".perform.third", () => ({ type: "third", id: "3" })),
],
},
],
};
const expectedMessages = [
"/v2/sequence/test/perform.second",
"/v2/sequence/test/perform.third",
"/v2/sequence/test/processed",
];
Feature("Resending a stuck message", () => {

@@ -14,20 +35,7 @@ afterEachScenario(() => {

Given("broker is initiated with a recipe", () => {
broker = start({
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test",
sequence: [
route(".perform.first", () => ({ type: "first", id: "1" })),
route(".perform.second", () => ({ type: "second", id: "2" })),
route(".perform.third", () => ({ type: "third", id: "3" })),
],
},
],
});
broker = start(server);
});
let response;
When("a trigger message is received", async () => {
When("a trigger message is received on the resend endpoint", async () => {
response = await fakeCloudTasks.runSequence(broker, "/v2/resend", {

@@ -48,43 +56,20 @@ relativeUrl: "/sequence/test/perform.second",

And("there sequence should have been processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([
"/v2/sequence/test/perform.second",
"/v2/sequence/test/perform.third",
"/v2/sequence/test/processed",
]);
And("the sequence should have been processed", () => {
response.messages.map(({ url }) => url).should.eql(expectedMessages);
});
And("the resend number should be included in the task names", () => {
const queue = config.cloudTasks.queues.default;
const [ taskName1, taskName2, taskName3 ] = response.messages.map(({ taskName }) => taskName);
taskName1.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_second__.*__some-epic-id__re1`));
taskName2.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_third__.*__some-epic-id`));
taskName3.should.match(new RegExp(`${queue}/tasks/sequence_test_processed__.*__some-epic-id`));
checkTaskNames(response.messages);
});
});
Scenario("Resending a message again", () => {
Scenario("Resending a message for the third time", () => {
const resendNumber = 3;
let broker;
Given("broker is initiated with a recipe", () => {
broker = start({
startServer: false,
recipes: [
{
namespace: "sequence",
name: "test",
sequence: [
route(".perform.first", () => ({ type: "first", id: "1" })),
route(".perform.second", () => ({ type: "second", id: "2" })),
route(".perform.third", () => ({ type: "third", id: "3" })),
],
},
],
});
broker = start(server);
});
let response;
When("a trigger message is received", async () => {
When("a trigger message is received on the resend endpoint the third time", async () => {
response = await fakeCloudTasks.runSequence(broker, "/v2/resend", {

@@ -96,3 +81,3 @@ relativeUrl: "/sequence/test/perform.second",

},
headers: { siblingCount: 3, "correlation-id": "some-epic-id", resendNumber: 3 },
headers: { siblingCount: 3, "correlation-id": "some-epic-id", resendNumber },
queue: config.cloudTasks.queues.default,

@@ -106,10 +91,4 @@ });

And("there sequence should have been processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([
"/v2/sequence/test/perform.second",
"/v2/sequence/test/perform.third",
"/v2/sequence/test/processed",
]);
And("the sequence should have been processed", () => {
response.messages.map(({ url }) => url).should.eql(expectedMessages);
});

@@ -122,10 +101,59 @@

And("the resend number should be included in the task names", () => {
const queue = config.cloudTasks.queues.default;
const [ taskName1, taskName2, taskName3 ] = response.messages.map(({ taskName }) => taskName);
checkTaskNames(response.messages, resendNumber);
});
});
taskName1.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_second__.*__some-epic-id__re4`));
taskName2.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_third__.*__some-epic-id`));
taskName3.should.match(new RegExp(`${queue}/tasks/sequence_test_processed__.*__some-epic-id`));
Scenario("Resending a message without retries", () => {
const noRetryHeader = (headers) => headers["x-no-retry"];
let broker;
Given("broker is initiated with a recipe", () => {
broker = start(server);
});
let response;
When("a trigger message is received with the no retry header set", async () => {
response = await fakeCloudTasks.runSequence(broker, "/v2/resend", {
relativeUrl: "/sequence/test/perform.second",
body: {
attributes: { foo: "bar" },
data: [ { type: "first", id: "1" } ],
},
headers: { siblingCount: 3, "correlation-id": "some-epic-id", "x-no-retry": "true" },
queue: config.cloudTasks.queues.default,
});
});
Then("the status code should be 201 created", () => {
response.firstResponse.statusCode.should.eql(201, response.text);
});
And("the sequence should have been processed", () => {
response.messages.map(({ url }) => url).should.eql(expectedMessages);
});
And("the resend number should be included in the task names", () => {
checkTaskNames(response.messages);
});
And("the no retry header should be included in the task header for the first task", () => {
noRetryHeader(response.messages[0].headers).should.eql("true");
});
And("the no retry header should not be included in the task header for the remaining tasks", () => {
response.messages
.filter((_, index) => index > 0)
.forEach(({ headers }) => should.not.exist(noRetryHeader(headers)));
});
});
});
function checkTaskNames(messages, resendNumber = 0) {
const queue = config.cloudTasks.queues.default;
const [ taskName1, taskName2, taskName3 ] = messages.map(({ taskName }) => taskName);
taskName1.should.match(
new RegExp(`${queue}/tasks/sequence_test_perform_second__.*__some-epic-id__re${resendNumber + 1}`)
);
taskName2.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_third__.*__some-epic-id`));
taskName3.should.match(new RegExp(`${queue}/tasks/sequence_test_processed__.*__some-epic-id`));
}
import { fakeCloudTasks } from "@bonniernews/lu-test";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -5,0 +5,0 @@ const triggerMessage = {

import { fakeCloudTasks } from "@bonniernews/lu-test";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -5,0 +5,0 @@ Feature("Broker sequence with 'run'", () => {

import { fakeCloudTasks } from "@bonniernews/lu-test";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -5,0 +5,0 @@ const triggerMessage = {

import { fakeCloudTasks, fakeGcpAuth } from "@bonniernews/lu-test";
import nock from "nock";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";
import jobStorage from "../../lib/job-storage/index.js";

@@ -6,0 +6,0 @@

import { fakeCloudTasks, fakeGcpAuth } from "@bonniernews/lu-test";
import nock from "nock";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";
import jobStorage from "../../lib/job-storage/index.js";

@@ -6,0 +6,0 @@

@@ -5,3 +5,3 @@ import { fakeCloudTasks } from "@bonniernews/lu-test";

import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -8,0 +8,0 @@ const triggerMessage = {

import { fakeCloudTasks } from "@bonniernews/lu-test";
import { start, route } from "../../index.js";
import { route, start } from "../../index.js";

@@ -5,0 +5,0 @@ const triggerMessage = {

@@ -93,9 +93,9 @@ import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test";

fakePubSub.recordedMessages().length.should.eql(1);
fakePubSub.recordedMessages()[0].message.error.message.should.eql('Validation error: "type" is required, "id" is required');
fakePubSub
.recordedMessages()[0]
.message.error.message.should.eql('Validation error: "type" is required, "id" is required');
});
And("the sequence should not be processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([ "/v2/sequence/test-order/perform.step-1" ]);
response.messages.map(({ url }) => url).should.eql([ "/v2/sequence/test-order/perform.step-1" ]);
});

@@ -145,11 +145,13 @@ });

fakePubSub.recordedMessages().length.should.eql(1);
fakePubSub.recordedMessages()[0].message.error.message.should.eql('Validation error: "attributes.foo" must be a string, "attributes.iShouldNotBeHere" is not allowed');
fakePubSub
.recordedMessages()[0]
.message.error.message.should.eql(
'Validation error: "attributes.foo" must be a string, "attributes.iShouldNotBeHere" is not allowed'
);
});
And("the sequence should not be processed", () => {
response.messages
.map(({ url }) => url)
.should.eql([ "/v2/sequence/test-order/perform.step-1" ]);
response.messages.map(({ url }) => url).should.eql([ "/v2/sequence/test-order/perform.step-1" ]);
});
});
});

@@ -0,8 +1,5 @@

import chai from "chai";
import events from "events";
import "mocha-cakes-2";
import nock from "nock";
import "mocha-cakes-2";
import chai from "chai";
import path from "path";
import fs from "fs";
import { fileURLToPath } from "url";

@@ -24,7 +21,2 @@ // Make sure dates are displayed in the correct timezone

const logFile = path.join(fileURLToPath(import.meta.url), "..", "..", "..", "logs", "test.log");
if (fs.existsSync(logFile)) {
fs.unlinkSync(logFile);
}
nock.enableNetConnect(/(localhost|127\.0\.0\.1):\d+/);
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc