@bonniernews/b0rker
Advanced tools
Comparing version 11.0.0 to 11.1.0
@@ -7,3 +7,5 @@ { | ||
"logLevel": "debug", | ||
"logJson": false | ||
"logJson": false, | ||
"truncateLog": true, | ||
"pretty": true | ||
}, | ||
@@ -10,0 +12,0 @@ "deadLetterTopic": "dead-letter-topic", |
@@ -21,3 +21,3 @@ export default function validateConfig(config) { | ||
for (const i in o) { | ||
if ((typeof o[i]) === "object" && !Array.isArray(o[i])) { | ||
if (typeof o[i] === "object" && !Array.isArray(o[i])) { | ||
const fo = flattenObject(o[i]); | ||
@@ -24,0 +24,0 @@ for (const j in fo) { |
import assert from "assert"; | ||
import { logger } from "lu-logger"; | ||
import { findAttribute, findOrReject } from "./find-attributes.js"; | ||
import httpClient from "./http-client.js"; | ||
import { findAttribute, findOrReject } from "./find-attributes.js"; | ||
@@ -7,0 +7,0 @@ export default function buildContext(correlationId, key) { |
@@ -0,3 +1,3 @@ | ||
import { PubSub } from "@google-cloud/pubsub"; | ||
import config from "exp-config"; | ||
import { PubSub } from "@google-cloud/pubsub"; | ||
import { logger } from "lu-logger"; | ||
@@ -10,4 +10,6 @@ | ||
export function shouldSendToDlx(req, { nextTime = false } = {}) { | ||
return req.attributes.retryCount + (nextTime ? 1 : 0) > maxRetries; | ||
export function shouldSendToDlx({ attributes: { skipRetries, retryCount } = {} }, { nextTime = false } = {}) { | ||
const secondOrLaterAttemptWithoutRetries = skipRetries && retryCount > 0; | ||
const maxRetriesReached = retryCount + (nextTime ? 1 : 0) > maxRetries; | ||
return secondOrLaterAttemptWithoutRetries || maxRetriesReached; | ||
} | ||
@@ -37,5 +39,7 @@ | ||
logger.error("Max retries reached, sending to DLX."); | ||
const retryMessage = err?.extraMessage ? `Max retries reached. Last message: "${err.extraMessage}"` : "Max retries reached"; | ||
const retryMessage = err?.extraMessage | ||
? `Max retries reached. Last message: "${err.extraMessage}"` | ||
: "Max retries reached"; | ||
await sendToDlx(req, retryMessage); | ||
return res.status(200).send({ type: "dlx", message: retryMessage }); | ||
} |
@@ -0,7 +1,7 @@ | ||
import assert from "assert"; | ||
import axios from "axios"; | ||
import config from "exp-config"; | ||
import GoogleAuth from "google-auth-library"; | ||
import { logger } from "lu-logger"; | ||
import util from "util"; | ||
import config from "exp-config"; | ||
import axios from "axios"; | ||
import assert from "assert"; | ||
import { logger } from "lu-logger"; | ||
@@ -8,0 +8,0 @@ import parseUrl from "./utils/parse-url.js"; |
@@ -133,2 +133,2 @@ import { Firestore } from "@google-cloud/firestore"; | ||
export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen }; | ||
export { completedChild, messageAlreadySeen, parentIsComplete, removeParent, storeParent }; |
@@ -102,2 +102,2 @@ // we try and copy the functionality of firestore here as best we can | ||
export { storeParent, completedChild, parentIsComplete, removeParent, messageAlreadySeen, getDB, clearDB }; | ||
export { clearDB, completedChild, getDB, messageAlreadySeen, parentIsComplete, removeParent, storeParent }; |
@@ -35,2 +35,3 @@ import * as uuid from "uuid"; | ||
queue: req.header("x-cloudtasks-queuename"), | ||
skipRetries: Boolean(req.headers["x-no-retry"]), | ||
}; | ||
@@ -37,0 +38,0 @@ return next(); |
@@ -79,4 +79,3 @@ import { CloudTasksClient } from "@google-cloud/tasks"; | ||
const url = `${selfUrl}/v2/${taskUrl.replace(/^\//, "")}`; | ||
const correlationId = | ||
headers.correlationId || headers["correlation-id"] || debugMeta.getDebugMeta().correlationId; | ||
const correlationId = headers.correlationId || headers["correlation-id"] || debugMeta.getDebugMeta().correlationId; | ||
const newHeaders = { | ||
@@ -83,0 +82,0 @@ "Content-Type": "application/json", |
@@ -42,3 +42,6 @@ import { Router as expressRouter } from "express"; | ||
for (const { key, func } of sequenceIterator(sequence)) { | ||
router.post(buildUrl(namespace, name, key), messageHandler(func, nextKeyMapper(`${namespace}.${name}.${key}`), schema)); | ||
router.post( | ||
buildUrl(namespace, name, key), | ||
messageHandler(func, nextKeyMapper(`${namespace}.${name}.${key}`), schema) | ||
); | ||
} | ||
@@ -92,8 +95,12 @@ // Allow to start a sequence/sub-sequence by posting to the sequence name | ||
if (nextKey) { | ||
await publishTask( | ||
keyToUrl(nextKey), | ||
nextBody, | ||
{ correlationId, parentCorrelationId, grandParentCorrelationId, siblingCount, parentSiblingCount, runId }, | ||
queue | ||
); | ||
const headers = { | ||
correlationId, | ||
parentCorrelationId, | ||
grandParentCorrelationId, | ||
siblingCount, | ||
parentSiblingCount, | ||
runId, | ||
}; | ||
await publishTask(keyToUrl(nextKey), nextBody, headers, queue); | ||
} else { | ||
@@ -100,0 +107,0 @@ logger.info("No more steps in this sequence"); |
function parseUrl(baseUrl, path) { | ||
const baseUrlWithProtocol = baseUrl.startsWith("http") ? baseUrl : `https://${baseUrl}`; | ||
const baseUrlWithoutEndSlash = baseUrlWithProtocol.endsWith("/") ? baseUrlWithProtocol.slice(0, -1) : baseUrlWithProtocol; | ||
const baseUrlWithoutEndSlash = baseUrlWithProtocol.endsWith("/") | ||
? baseUrlWithProtocol.slice(0, -1) | ||
: baseUrlWithProtocol; | ||
const url = new URL(`${baseUrlWithoutEndSlash}${path}`); | ||
@@ -5,0 +7,0 @@ return url.href; |
{ | ||
"name": "@bonniernews/b0rker", | ||
"version": "11.0.0", | ||
"version": "11.1.0", | ||
"engines": { | ||
@@ -5,0 +5,0 @@ "node": ">=18" |
import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test"; | ||
import { CloudTasksClient } from "@google-cloud/tasks"; | ||
import config from "exp-config"; | ||
import { createSandbox } from "sinon"; | ||
import request from "supertest"; | ||
import { createSandbox } from "sinon"; | ||
import { CloudTasksClient } from "@google-cloud/tasks"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -54,3 +54,3 @@ const maxRetries = config.maxRetries || 10; | ||
And("there sequence should have been processed", () => { | ||
And("the sequence should have been processed", () => { | ||
response.messages | ||
@@ -125,2 +125,3 @@ .map(({ url }) => url) | ||
retryCount: (maxRetries + 1).toString(), | ||
skipRetries: "false", | ||
}, | ||
@@ -130,2 +131,89 @@ }); | ||
}); | ||
Scenario("A message runs once and then doesn't retry because of no-retry header", () => { | ||
let broker; | ||
Given("broker is initiated with a recipe", () => { | ||
broker = start({ | ||
startServer: false, | ||
recipes: [ | ||
{ | ||
namespace: "sequence", | ||
name: "test", | ||
sequence: [ | ||
route(".perform.http-step", (message, { retryIf }) => { | ||
retryIf(true); | ||
}), | ||
], | ||
}, | ||
], | ||
}); | ||
}); | ||
And("we can publish cloud tasks", () => { | ||
fakeCloudTasks.enablePublish(broker); | ||
}); | ||
And("we can publish pubsub messages", () => { | ||
fakePubSub.enablePublish(broker); | ||
}); | ||
let firstResponse; | ||
When("a specific message is received the first time", async () => { | ||
firstResponse = await request(broker) | ||
.post("/v2/sequence/test/perform.http-step") | ||
.send({}) | ||
.set({ "correlation-id": "some-epic-id", "x-no-retry": "true" }); | ||
await fakeCloudTasks.processMessages(); | ||
}); | ||
Then("the status code should be 400", () => { | ||
firstResponse.statusCode.should.eql(400, firstResponse.text); | ||
firstResponse.body.should.eql({ type: "retry" }); | ||
}); | ||
And("there should be no more processed messages", () => { | ||
fakeCloudTasks.recordedMessages().length.should.eql(0); | ||
}); | ||
But("the message should not have have been sent to the DLX", () => { | ||
fakePubSub.recordedMessages().length.should.eql(0); | ||
}); | ||
let secondReponse; | ||
When("the message is received the second time", async () => { | ||
secondReponse = await request(broker) | ||
.post("/v2/sequence/test/perform.http-step") | ||
.send({}) | ||
.set({ "correlation-id": "some-epic-id", "x-no-retry": "true", "x-cloudtasks-taskretrycount": 1 }); | ||
await fakeCloudTasks.processMessages(); | ||
}); | ||
Then("the status code should be 200 OK", () => { | ||
secondReponse.statusCode.should.eql(200, secondReponse.text); | ||
secondReponse.body.should.eql({ type: "dlx", message: "Max retries reached" }); | ||
}); | ||
But("there should be no more processed messages", () => { | ||
fakeCloudTasks.recordedMessages().length.should.eql(0); | ||
}); | ||
And("the message should have been sent to the DLX", () => { | ||
fakePubSub.recordedMessages().length.should.eql(1); | ||
fakePubSub.recordedMessages()[0].should.deep.eql({ | ||
deliveryAttempt: 1, | ||
message: { error: { message: "Max retries reached" } }, | ||
topic: config.deadLetterTopic, | ||
attributes: { | ||
correlationId: "some-epic-id", | ||
key: "sequence.test.perform.http-step", | ||
origin: "cloudTasks", | ||
runId: fakePubSub.recordedMessages()[0].attributes.runId, | ||
appName: config.appName, | ||
relativeUrl: "sequence/test/perform.http-step", | ||
retryCount: "1", | ||
skipRetries: "true", | ||
}, | ||
}); | ||
}); | ||
}); | ||
}); | ||
@@ -199,2 +287,3 @@ | ||
retryCount: maxRetries.toString(), | ||
skipRetries: "false", | ||
}, | ||
@@ -263,2 +352,3 @@ }); | ||
origin: "cloudTasks", | ||
skipRetries: "false", | ||
}, | ||
@@ -322,2 +412,3 @@ deliveryAttempt: 1, | ||
origin: "cloudTasks", | ||
skipRetries: "false", | ||
}, | ||
@@ -324,0 +415,0 @@ deliveryAttempt: 1, |
@@ -1,6 +0,6 @@ | ||
import nock from "nock"; | ||
import { fakeCloudTasks, fakeGcpAuth } from "@bonniernews/lu-test"; | ||
import config from "exp-config"; | ||
import nock from "nock"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -7,0 +7,0 @@ const triggerMessage = { |
import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import config from "exp-config"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -6,0 +6,0 @@ const { queues } = config.cloudTasks; |
@@ -0,5 +1,5 @@ | ||
import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test"; | ||
import config from "exp-config"; | ||
import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -6,0 +6,0 @@ const triggerMessage = { |
import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import config from "exp-config"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
const server = { | ||
startServer: false, | ||
recipes: [ | ||
{ | ||
namespace: "sequence", | ||
name: "test", | ||
sequence: [ | ||
route(".perform.first", () => ({ type: "first", id: "1" })), | ||
route(".perform.second", () => ({ type: "second", id: "2" })), | ||
route(".perform.third", () => ({ type: "third", id: "3" })), | ||
], | ||
}, | ||
], | ||
}; | ||
const expectedMessages = [ | ||
"/v2/sequence/test/perform.second", | ||
"/v2/sequence/test/perform.third", | ||
"/v2/sequence/test/processed", | ||
]; | ||
Feature("Resending a stuck message", () => { | ||
@@ -14,20 +35,7 @@ afterEachScenario(() => { | ||
Given("broker is initiated with a recipe", () => { | ||
broker = start({ | ||
startServer: false, | ||
recipes: [ | ||
{ | ||
namespace: "sequence", | ||
name: "test", | ||
sequence: [ | ||
route(".perform.first", () => ({ type: "first", id: "1" })), | ||
route(".perform.second", () => ({ type: "second", id: "2" })), | ||
route(".perform.third", () => ({ type: "third", id: "3" })), | ||
], | ||
}, | ||
], | ||
}); | ||
broker = start(server); | ||
}); | ||
let response; | ||
When("a trigger message is received", async () => { | ||
When("a trigger message is received on the resend endpoint", async () => { | ||
response = await fakeCloudTasks.runSequence(broker, "/v2/resend", { | ||
@@ -48,43 +56,20 @@ relativeUrl: "/sequence/test/perform.second", | ||
And("there sequence should have been processed", () => { | ||
response.messages | ||
.map(({ url }) => url) | ||
.should.eql([ | ||
"/v2/sequence/test/perform.second", | ||
"/v2/sequence/test/perform.third", | ||
"/v2/sequence/test/processed", | ||
]); | ||
And("the sequence should have been processed", () => { | ||
response.messages.map(({ url }) => url).should.eql(expectedMessages); | ||
}); | ||
And("the resend number should be included in the task names", () => { | ||
const queue = config.cloudTasks.queues.default; | ||
const [ taskName1, taskName2, taskName3 ] = response.messages.map(({ taskName }) => taskName); | ||
taskName1.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_second__.*__some-epic-id__re1`)); | ||
taskName2.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_third__.*__some-epic-id`)); | ||
taskName3.should.match(new RegExp(`${queue}/tasks/sequence_test_processed__.*__some-epic-id`)); | ||
checkTaskNames(response.messages); | ||
}); | ||
}); | ||
Scenario("Resending a message again", () => { | ||
Scenario("Resending a message for the third time", () => { | ||
const resendNumber = 3; | ||
let broker; | ||
Given("broker is initiated with a recipe", () => { | ||
broker = start({ | ||
startServer: false, | ||
recipes: [ | ||
{ | ||
namespace: "sequence", | ||
name: "test", | ||
sequence: [ | ||
route(".perform.first", () => ({ type: "first", id: "1" })), | ||
route(".perform.second", () => ({ type: "second", id: "2" })), | ||
route(".perform.third", () => ({ type: "third", id: "3" })), | ||
], | ||
}, | ||
], | ||
}); | ||
broker = start(server); | ||
}); | ||
let response; | ||
When("a trigger message is received", async () => { | ||
When("a trigger message is received on the resend endpoint the third time", async () => { | ||
response = await fakeCloudTasks.runSequence(broker, "/v2/resend", { | ||
@@ -96,3 +81,3 @@ relativeUrl: "/sequence/test/perform.second", | ||
}, | ||
headers: { siblingCount: 3, "correlation-id": "some-epic-id", resendNumber: 3 }, | ||
headers: { siblingCount: 3, "correlation-id": "some-epic-id", resendNumber }, | ||
queue: config.cloudTasks.queues.default, | ||
@@ -106,10 +91,4 @@ }); | ||
And("there sequence should have been processed", () => { | ||
response.messages | ||
.map(({ url }) => url) | ||
.should.eql([ | ||
"/v2/sequence/test/perform.second", | ||
"/v2/sequence/test/perform.third", | ||
"/v2/sequence/test/processed", | ||
]); | ||
And("the sequence should have been processed", () => { | ||
response.messages.map(({ url }) => url).should.eql(expectedMessages); | ||
}); | ||
@@ -122,10 +101,59 @@ | ||
And("the resend number should be included in the task names", () => { | ||
const queue = config.cloudTasks.queues.default; | ||
const [ taskName1, taskName2, taskName3 ] = response.messages.map(({ taskName }) => taskName); | ||
checkTaskNames(response.messages, resendNumber); | ||
}); | ||
}); | ||
taskName1.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_second__.*__some-epic-id__re4`)); | ||
taskName2.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_third__.*__some-epic-id`)); | ||
taskName3.should.match(new RegExp(`${queue}/tasks/sequence_test_processed__.*__some-epic-id`)); | ||
Scenario("Resending a message without retries", () => { | ||
const noRetryHeader = (headers) => headers["x-no-retry"]; | ||
let broker; | ||
Given("broker is initiated with a recipe", () => { | ||
broker = start(server); | ||
}); | ||
let response; | ||
When("a trigger message is received with the no retry header set", async () => { | ||
response = await fakeCloudTasks.runSequence(broker, "/v2/resend", { | ||
relativeUrl: "/sequence/test/perform.second", | ||
body: { | ||
attributes: { foo: "bar" }, | ||
data: [ { type: "first", id: "1" } ], | ||
}, | ||
headers: { siblingCount: 3, "correlation-id": "some-epic-id", "x-no-retry": "true" }, | ||
queue: config.cloudTasks.queues.default, | ||
}); | ||
}); | ||
Then("the status code should be 201 created", () => { | ||
response.firstResponse.statusCode.should.eql(201, response.text); | ||
}); | ||
And("the sequence should have been processed", () => { | ||
response.messages.map(({ url }) => url).should.eql(expectedMessages); | ||
}); | ||
And("the resend number should be included in the task names", () => { | ||
checkTaskNames(response.messages); | ||
}); | ||
And("the no retry header should be included in the task header for the first task", () => { | ||
noRetryHeader(response.messages[0].headers).should.eql("true"); | ||
}); | ||
And("the no retry header should not be included in the task header for the remaining tasks", () => { | ||
response.messages | ||
.filter((_, index) => index > 0) | ||
.forEach(({ headers }) => should.not.exist(noRetryHeader(headers))); | ||
}); | ||
}); | ||
}); | ||
function checkTaskNames(messages, resendNumber = 0) { | ||
const queue = config.cloudTasks.queues.default; | ||
const [ taskName1, taskName2, taskName3 ] = messages.map(({ taskName }) => taskName); | ||
taskName1.should.match( | ||
new RegExp(`${queue}/tasks/sequence_test_perform_second__.*__some-epic-id__re${resendNumber + 1}`) | ||
); | ||
taskName2.should.match(new RegExp(`${queue}/tasks/sequence_test_perform_third__.*__some-epic-id`)); | ||
taskName3.should.match(new RegExp(`${queue}/tasks/sequence_test_processed__.*__some-epic-id`)); | ||
} |
import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -5,0 +5,0 @@ const triggerMessage = { |
import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -5,0 +5,0 @@ Feature("Broker sequence with 'run'", () => { |
import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -5,0 +5,0 @@ const triggerMessage = { |
import { fakeCloudTasks, fakeGcpAuth } from "@bonniernews/lu-test"; | ||
import nock from "nock"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
import jobStorage from "../../lib/job-storage/index.js"; | ||
@@ -6,0 +6,0 @@ |
import { fakeCloudTasks, fakeGcpAuth } from "@bonniernews/lu-test"; | ||
import nock from "nock"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
import jobStorage from "../../lib/job-storage/index.js"; | ||
@@ -6,0 +6,0 @@ |
@@ -5,3 +5,3 @@ import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -8,0 +8,0 @@ const triggerMessage = { |
import { fakeCloudTasks } from "@bonniernews/lu-test"; | ||
import { start, route } from "../../index.js"; | ||
import { route, start } from "../../index.js"; | ||
@@ -5,0 +5,0 @@ const triggerMessage = { |
@@ -93,9 +93,9 @@ import { fakeCloudTasks, fakePubSub } from "@bonniernews/lu-test"; | ||
fakePubSub.recordedMessages().length.should.eql(1); | ||
fakePubSub.recordedMessages()[0].message.error.message.should.eql('Validation error: "type" is required, "id" is required'); | ||
fakePubSub | ||
.recordedMessages()[0] | ||
.message.error.message.should.eql('Validation error: "type" is required, "id" is required'); | ||
}); | ||
And("the sequence should not be processed", () => { | ||
response.messages | ||
.map(({ url }) => url) | ||
.should.eql([ "/v2/sequence/test-order/perform.step-1" ]); | ||
response.messages.map(({ url }) => url).should.eql([ "/v2/sequence/test-order/perform.step-1" ]); | ||
}); | ||
@@ -145,11 +145,13 @@ }); | ||
fakePubSub.recordedMessages().length.should.eql(1); | ||
fakePubSub.recordedMessages()[0].message.error.message.should.eql('Validation error: "attributes.foo" must be a string, "attributes.iShouldNotBeHere" is not allowed'); | ||
fakePubSub | ||
.recordedMessages()[0] | ||
.message.error.message.should.eql( | ||
'Validation error: "attributes.foo" must be a string, "attributes.iShouldNotBeHere" is not allowed' | ||
); | ||
}); | ||
And("the sequence should not be processed", () => { | ||
response.messages | ||
.map(({ url }) => url) | ||
.should.eql([ "/v2/sequence/test-order/perform.step-1" ]); | ||
response.messages.map(({ url }) => url).should.eql([ "/v2/sequence/test-order/perform.step-1" ]); | ||
}); | ||
}); | ||
}); |
@@ -0,8 +1,5 @@ | ||
import chai from "chai"; | ||
import events from "events"; | ||
import "mocha-cakes-2"; | ||
import nock from "nock"; | ||
import "mocha-cakes-2"; | ||
import chai from "chai"; | ||
import path from "path"; | ||
import fs from "fs"; | ||
import { fileURLToPath } from "url"; | ||
@@ -24,7 +21,2 @@ // Make sure dates are displayed in the correct timezone | ||
const logFile = path.join(fileURLToPath(import.meta.url), "..", "..", "..", "logs", "test.log"); | ||
if (fs.existsSync(logFile)) { | ||
fs.unlinkSync(logFile); | ||
} | ||
nock.enableNetConnect(/(localhost|127\.0\.0\.1):\d+/); |
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
190167
4860
5