@backstroke/worker
Advanced tools
Comparing version 1.1.0 to 2.0.0
{ | ||
"name": "@backstroke/worker", | ||
"version": "1.1.0", | ||
"version": "2.0.0", | ||
"dependencies": { | ||
@@ -14,2 +14,4 @@ "bluebird": "^3.5.0", | ||
"start:once": "node src/index.js --once", | ||
"start-dev": "nodemon src/index.js", | ||
"start-dev:once": "nodemon src/index.js --once", | ||
"test": "NODE_ENV=test jest --watch", | ||
@@ -20,4 +22,5 @@ "test-ci": "CI=true NODE_ENV=test jest" | ||
"jest": "^20.0.4", | ||
"nodemon": "^1.12.1", | ||
"sinon": "^3.2.1" | ||
} | ||
} |
@@ -27,7 +27,14 @@ # Backstroke Worker | ||
### Environment variables | ||
- `GITHUB_TOKEN`: The Github token for the user that creates pull requests. When deployed, this | ||
is a token for [backstroke-bot](https://github.com/backstroke-bot). | ||
- `REDIS_URL`: A url to a redis instance with a rsmq queue inside. Takes the form of | ||
- `GITHUB_TOKEN` (required): The Github token for the user that creates pull requests. When | ||
deployed, this is a token for [backstroke-bot](https://github.com/backstroke-bot). | ||
- `REDIS_URL` (required): A url to a redis instance with a rsmq queue inside. Takes the form of | ||
`redis://user:password@host:port`. | ||
- `THROTTLE`: Provide an optional delay between handling each webhook operation. This is | ||
potentially handy to keep a worker from exhausing the rate limit on a token. | ||
## Arguments | ||
- `--pr mock`: Tell the worker not to actually make pull requests, but only log out when it is about | ||
to make a pull request. This is handy for repeated testing or for testing against repositories | ||
that you don't own. This option is off by default. | ||
## Running tests | ||
@@ -34,0 +41,0 @@ ``` |
@@ -29,3 +29,3 @@ const GitHubApi = require('github'); | ||
function getForksForRepo(user, args) { | ||
const github = new GitHubApi({}); | ||
const github = new GitHubApi({timeout: 5000}); | ||
github.authenticate({type: "oauth", token: user.accessToken}); | ||
@@ -36,5 +36,5 @@ | ||
if (err) { | ||
reject(err); | ||
reject(new Error(`Couldn't get forks for repository ${args.owner}/${args.repo}: ${err.message ? err.message : err}`)); | ||
} else { | ||
resolve(res); | ||
resolve(res.data); | ||
} | ||
@@ -45,2 +45,18 @@ }); | ||
// Return the smallest number of api calls required to exhaust the rate limit. | ||
function checkRateLimit() { | ||
const github = new GitHubApi({timeout: 5000}); | ||
github.authenticate({type: "oauth", token: process.env.GITHUB_TOKEN}); | ||
return new Promise((resolve, reject) => { | ||
github.misc.getRateLimit({}, (err, res) => { | ||
if (err) { | ||
reject(new Error(`Couldn't fetch token rate limit: ${err.message ? err.message : err}`)); | ||
} else { | ||
resolve(res.data.resources.core.remaining); | ||
} | ||
}); | ||
}); | ||
} | ||
// Given a repository `user/repo` and a provider that the repo is located on (ex: `github`), | ||
@@ -50,9 +66,13 @@ // determine if the repo opted out. | ||
return new Promise((resolve, reject) => { | ||
github.search.issues({ | ||
q: `repo:${owner}/${repo} is:pr label:optout`, | ||
github.issues.getForRepo({ | ||
owner, repo, | ||
labels: 'optout', | ||
per_page: 1, | ||
}, (err, issues) => { | ||
if (err) { | ||
reject(err); | ||
if (err && err.errors && err.errors.find(i => i.code === 'invalid')) { | ||
reject(new Error(`Repository ${owner}/${repo} doesn't exist.`)); | ||
} else if (err) { | ||
reject(new Error(`Couldn't search issues on repository ${owner}/${repo}: ${err.message ? err.message : err}`)); | ||
} else { | ||
resolve(issues.total_count > 0); | ||
resolve(issues.data.length > 0); | ||
} | ||
@@ -74,3 +94,3 @@ }); | ||
-------- | ||
Created by [Backstroke](http://backstroke.us) (I'm a bot!) | ||
Created by [Backstroke](http://backstroke.co) (I'm a bot!) | ||
`.replace('\n', ''); | ||
@@ -80,3 +100,3 @@ | ||
async function createPullRequest(user, link, fork, debug, didRepoOptOut, githubPullRequestsCreate) { | ||
const github = new GitHubApi({}); | ||
const github = new GitHubApi({timeout: 5000}); | ||
if (!process.env.GITHUB_TOKEN) { | ||
@@ -93,2 +113,3 @@ if (process.env.NODE_ENV !== 'test') { | ||
const didOptOut = await didRepoOptOut(github, fork.owner, fork.repo); | ||
// Do we have permission to make a pull request on the child? | ||
@@ -106,3 +127,3 @@ if (didOptOut) { | ||
head: `${link.upstreamOwner}:${link.upstreamBranch}`, | ||
base: fork.branch, | ||
base: link.forkType === 'fork-all' ? link.upstreamBranch : link.forkBranch, | ||
body: generatePullRequestBody(link.upstreamOwner, link.upstreamRepo, link.upstreamBranch), | ||
@@ -117,3 +138,3 @@ maintainer_can_modify: false, | ||
// Still reject anything else | ||
reject(err); | ||
reject(new Error(`Couldn't create pull request on repository ${link.forkOwner}/${link.forkRepo}: ${err.message ? err.message : err}`)); | ||
} else { | ||
@@ -131,2 +152,4 @@ resolve(`Successfully created pull request on ${link.forkOwner}/${link.forkRepo}`); | ||
createPullRequest, | ||
didRepoOptOut, | ||
checkRateLimit, | ||
}; |
@@ -11,5 +11,5 @@ const args = require('minimist')(process.argv.slice(2)); | ||
const mockCreatePullRequest = async (...args) => console.log(' *', require('chalk').green('MOCK CREATE PR'), args); | ||
const createPullRequest = require('./helpers').createPullRequest; | ||
const didRepoOptOut = require('./helpers').didRepoOptOut; | ||
const checkRateLimit = require('./helpers').checkRateLimit; | ||
@@ -19,4 +19,5 @@ const githubPullRequestsCreate = github => github.pullRequests.create | ||
const ONE_HOUR_IN_SECONDS = 60 * 60; | ||
const debug = require('debug')('backstroke:webhook-status-store'); | ||
const WebhookStatusStore = { | ||
set(webhookId, status, expiresIn=ONE_HOUR_IN_SECONDS) { | ||
set(webhookId, status, expiresIn=24*ONE_HOUR_IN_SECONDS) { | ||
return new Promise((resolve, reject) => { | ||
@@ -29,2 +30,21 @@ redis.set(`webhook:status:${webhookId}`, JSON.stringify(status), 'EX', expiresIn, (err, id) => { | ||
resolve(id); | ||
// Notate how the operation went | ||
if (status.status === 'OK') { | ||
// Finally, increment the success / error metrics | ||
debug(`Incrementing webhook:stats:successes key...`); | ||
redis.incr(`webhook:stats:successes`, err => { | ||
if (err) { | ||
debug(`Error incrementing webhook webhook:stats:successes key: ${err}`); | ||
} | ||
}); | ||
} else if (status.status === 'ERROR') { | ||
// Finally, increment the error metric | ||
debug(`Incrementing webhook:stats:errors key... => ${JSON.stringify(status)}`); | ||
redis.incr(`webhook:stats:errors`, err => { | ||
if (err) { | ||
debug(`Error incrementing webhook webhook:stats:errors key: ${err}`); | ||
} | ||
}); | ||
} | ||
} | ||
@@ -102,2 +122,14 @@ }); | ||
if (require.main === module) { | ||
let rawPullRequestCreate = githubPullRequestsCreate; | ||
if (process.env.PR === 'mock' || args.pr === 'mock') { | ||
console.log('* Using pull request mock...'); | ||
rawPullRequestCreate = () => (args, cb) => { | ||
console.log(' *', require('chalk').green('MOCK CREATE PR'), args); | ||
cb(null); | ||
} | ||
} | ||
// Provide a mechanism to throttle the time between handling webhooks | ||
const throttleBatch = process.env.THROTTLE ? parseInt(process.env.THROTTLE, 10) : 0; | ||
// Called once the process finishes. | ||
@@ -109,3 +141,3 @@ function final() { | ||
// Kick off the batch! | ||
function go() { | ||
function go(done) { | ||
processBatch( | ||
@@ -116,12 +148,13 @@ WebhookQueue, | ||
getForksForRepo, | ||
{default: createPullRequest, mock: mockCreatePullRequest}[args.pr || 'default'], | ||
createPullRequest, | ||
didRepoOptOut, | ||
githubPullRequestsCreate | ||
rawPullRequestCreate, | ||
throttleBatch, | ||
checkRateLimit | ||
).then(() => { | ||
console.log('* Success!'); | ||
final(); | ||
done(); | ||
}).catch(err => { | ||
console.error('Error:'); | ||
console.error(err.stack); | ||
final(); | ||
console.error(err.stack ? err.stack : err); | ||
done(); | ||
}); | ||
@@ -131,6 +164,12 @@ } | ||
if (args.once) { | ||
go(); | ||
go(final); | ||
} else { | ||
setInterval(go, process.env.WORKER_POLL_INTERVAL || 5000); | ||
// Run the webhook worker, then once it's complete, wait 5 seconds and run it again, ad | ||
// infinitum | ||
const interval = process.env.WORKER_POLL_INTERVAL || 5000; | ||
function iteration() { | ||
go(() => setTimeout(iteration, interval)); | ||
} | ||
setTimeout(iteration, interval); | ||
} | ||
} |
const paginateRequest = require('./helpers').paginateRequest; | ||
const hostname = require('os').hostname; | ||
const crypto = require('crypto'); | ||
// An async fnction that returns a promise that resolves once there is at least one call left in the | ||
// token rate limit. | ||
async function didNotGoOverRateLimit(debug, checkRateLimit) { | ||
// Verify that we have api calls available to process items | ||
if (checkRateLimit) { | ||
while (true) { | ||
const rateLimit = await checkRateLimit(); | ||
if (rateLimit === 0) { | ||
debug('Waiting for token rate limit to reset...'); | ||
await (new Promise(resolve => setTimeout(resolve, 1000))); | ||
} else { | ||
debug('Token rate limit not exhausted - rate limit at', rateLimit); | ||
break; | ||
} | ||
} | ||
} | ||
} | ||
const OK = 'OK', | ||
@@ -14,4 +34,11 @@ RUNNING = 'RUNNING', | ||
didRepoOptOut, | ||
githubPullRequestsCreate | ||
githubPullRequestsCreate, | ||
throttleBatch=0, | ||
checkRateLimit=false | ||
) { | ||
// Provide a mechanism to throttle queue operations so that rate limits won't expire. | ||
if (throttleBatch > 0) { | ||
await (new Promise(resolve => setTimeout(resolve, throttleBatch))); | ||
} | ||
// if disabled, or upstream/fork is null, return so | ||
@@ -27,2 +54,6 @@ if (!link.enabled) { | ||
debug('Webhook is on the fork. Making a pull request to the single fork repository.'); | ||
// Ensure we didn't go over the token rate limit prior to making the pull request. | ||
await didNotGoOverRateLimit(debug, checkRateLimit); | ||
const response = await createPullRequest( | ||
@@ -55,4 +86,7 @@ user, | ||
debug('Found %d forks of the upstream.', forks.length); | ||
debug(`Found ${forks.length} forks of the upstream.`); | ||
const all = forks.map(async fork => { | ||
// Ensure we didn't go over the token rate limit prior to making the pull request. | ||
await didNotGoOverRateLimit(debug, checkRateLimit); | ||
return createPullRequest( | ||
@@ -92,5 +126,10 @@ user, | ||
didRepoOptOut, | ||
githubPullRequestsCreate | ||
githubPullRequestsCreate, | ||
throttleBatch=0, | ||
checkRateLimit=false | ||
) { | ||
while (true) { | ||
// Ensure we didn't go over the token rate limit prior to handling another link. | ||
await didNotGoOverRateLimit(debug, checkRateLimit); | ||
// Fetch a new webhook event. | ||
@@ -130,3 +169,5 @@ const webhook = await WebhookQueue.pop(); | ||
didRepoOptOut, | ||
githubPullRequestsCreate | ||
githubPullRequestsCreate, | ||
throttleBatch, | ||
checkRateLimit | ||
); | ||
@@ -141,2 +182,4 @@ debug('Result:', output); | ||
output, | ||
link, | ||
handledBy: crypto.createHash('sha256').update(hostname()).digest('base64'), | ||
}); | ||
@@ -150,2 +193,3 @@ } catch (error) { | ||
output: {error: error.message, stack: error.stack}, | ||
link: Object.assign({}, link, {user: undefined}), | ||
}); | ||
@@ -152,0 +196,0 @@ } |
@@ -413,3 +413,3 @@ const sinon = require('sinon'); | ||
assert.equal(response.status, 'ERROR'); | ||
assert.equal(response.output.error, `Unknown Error!`); | ||
assert.equal(response.output.error, `Couldn't create pull request on repository rgaus/biome: Unknown Error!`); | ||
}); | ||
@@ -560,2 +560,72 @@ it('should make a PR to a single fork of an upstream, but the link is disabled', async () => { | ||
}); | ||
it(`should make a PR to a single fork of an upstream, but fork is a repository that doesn't exist`, async () => { | ||
const createPullRequest = sinon.stub().yields([null]); | ||
const getForksForRepo = sinon.stub().resolves([{ | ||
owner: {login: 'foo'}, | ||
name: 'bar', | ||
}]); | ||
const didRepoOptOut = sinon.stub().rejects(new Error(`Repository foo/bar doesn't exist!`)); | ||
const enqueuedAs = await MockWebhookQueue.push({ | ||
type: 'MANUAL', | ||
user: { | ||
id: 1, | ||
username: '1egoman', | ||
email: null, | ||
githubId: '1704236', | ||
accessToken: 'ACCESS TOKEN', | ||
publicScope: false, | ||
createdAt: '2017-08-09T12:00:36.000Z', | ||
lastLoggedInAt: '2017-08-16T12:50:40.203Z', | ||
updatedAt: '2017-08-16T12:50:40.204Z', | ||
}, | ||
link: { | ||
id: 8, | ||
name: 'My Link', | ||
enabled: true, | ||
webhookId: '37948270678a440a97db01ebe71ddda2', | ||
lastSyncedAt: '2017-08-17T11:37:22.999Z', | ||
upstreamType: 'repo', | ||
upstreamOwner: '1egoman', | ||
upstreamRepo: 'backstroke', | ||
upstreamIsFork: null, | ||
upstreamBranches: '["inject","master"]', | ||
upstreamBranch: 'master', | ||
forkType: 'repo', | ||
forkOwner: 'rgaus', | ||
forkRepo: 'backstroke', | ||
forkBranches: '["master"]', | ||
forkBranch: 'master', | ||
createdAt: '2017-08-11T10:17:09.614Z', | ||
updatedAt: '2017-08-17T11:37:23.001Z', | ||
ownerId: 1, | ||
owner: { | ||
id: 1, | ||
username: '1egoman', | ||
email: null, | ||
githubId: '1704236', | ||
accessToken: 'ACCESS TOKEN', | ||
publicScope: false, | ||
createdAt: '2017-08-09T12:00:36.000Z', | ||
lastLoggedInAt: '2017-08-16T12:50:40.203Z', | ||
updatedAt: '2017-08-16T12:50:40.204Z', | ||
} | ||
}, | ||
}); | ||
// Run the worker that eats off the queue. | ||
await processBatch( | ||
MockWebhookQueue, | ||
MockWebhookStatusStore, | ||
() => null, // console.log.bind(console, '* '), | ||
getForksForRepo, | ||
require('./helpers').createPullRequest, | ||
didRepoOptOut | ||
); | ||
// Make sure that it worked | ||
const response = MockWebhookStatusStore.keys[enqueuedAs].status; | ||
assert.equal(response.status, 'ERROR'); | ||
assert.equal(response.output.error, `Repository foo/bar doesn't exist!`); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 2 instances in 1 package
141351
11
1050
43
3
12