Comparing version 0.1.16 to 0.1.17
@@ -8,2 +8,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
const URL = require('url'); | ||
const TraversalPolicy = require('./traversalPolicy'); | ||
@@ -142,3 +143,3 @@ class Crawler { | ||
request.context = request.context || {}; | ||
request.promises = []; | ||
this._expandPolicy(request); | ||
return request; | ||
@@ -149,2 +150,12 @@ }) | ||
_expandPolicy(request) { | ||
if (typeof request.policy === 'string') { | ||
const policy = TraversalPolicy.getPolicy(request.policy); | ||
if (!policy) { | ||
return this._queueDead(request, `Unknown request policy: ${request.policy}`); | ||
} | ||
request.policy = policy; | ||
} | ||
} | ||
_acquireLock(request) { | ||
@@ -222,7 +233,9 @@ if (!request.url || !this.locker) { | ||
} | ||
const completeWork = Q.all(request.promises).then( | ||
const completeWork = Q.all(request.getTrackedPromises()).then( | ||
() => self._releaseLock(request).then( | ||
() => self._deleteFromQueue(request), | ||
error => self._abandonInQueue(request)), | ||
error => self._completeRequest(request, true)); | ||
error => | ||
self._abandonInQueue(request)), | ||
error => | ||
self._completeRequest(request, true)); | ||
return completeWork.then(() => request); | ||
@@ -261,6 +274,2 @@ } | ||
} | ||
// TODO temporary hack to skip malformed requests already in the queue | ||
if (request.type.endsWith('Event') && !request.payload) { | ||
return request.markSkip('no event payload'); | ||
} | ||
if (request.payload) { | ||
@@ -267,0 +276,0 @@ // The request already has the document, so no need to fetch. Setup the request as if it was actually fetched. |
@@ -16,3 +16,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
this.options._config.on('changed', this._reconfigure.bind(this)); | ||
this.getQueue = async.queue(this._callGitHubTask.bind(this), options.concurrency || 5); | ||
this.fetchQueue = async.queue(this._callGitHubTask.bind(this), options.concurrency || 5); | ||
} | ||
@@ -22,3 +22,3 @@ | ||
if (changes.some(patch => patch.path === '/concurrency')) { | ||
this.getQueue.concurrency = this.options.concurrency; | ||
this.fetchQueue.concurrency = this.options.concurrency; | ||
} | ||
@@ -86,3 +86,3 @@ return Q(); | ||
const deferred = Q.defer(); | ||
this.getQueue.push({ url: request.url, options: options }, (error, response) => { | ||
this.fetchQueue.push({ url: request.url, options: options }, (error, response) => { | ||
if (error) { | ||
@@ -104,2 +104,3 @@ return deferred.reject(error); | ||
try { | ||
this._incrementMetric('fetch'); | ||
this.requestor.get(spec.url, spec.options).then( | ||
@@ -194,4 +195,11 @@ response => callback(null, response), | ||
const traits = this._getTypeDetails(request.type).tokenTraits || []; | ||
const repoType = request.context.repoType; | ||
return this.tokenFactory.getToken(traits.concat(repoType ? [repoType] : [])); | ||
const additionalTraits = []; | ||
if (request.context.repoType) { | ||
additionalTraits.push(request.context.repoType); | ||
} | ||
if (request.attemptCount) { | ||
// if this is a retry, elevate the token to avoid any permissions issues | ||
additionalTraits.push('private', 'admin'); | ||
} | ||
return this.tokenFactory.getToken(traits.concat(additionalTraits)); | ||
} | ||
@@ -204,2 +212,9 @@ | ||
_incrementMetric(operation) { | ||
const metrics = this.logger.metrics; | ||
if (metrics && metrics[operation]) { | ||
metrics[operation].incr(); | ||
} | ||
} | ||
_getTypeDetails(type) { | ||
@@ -206,0 +221,0 @@ const result = { |
@@ -15,3 +15,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
this.store = store; | ||
this.version = 5; | ||
this.version = 6; | ||
} | ||
@@ -26,5 +26,13 @@ | ||
const oldVersion = request.document._metadata.version; | ||
if (!request.policy.shouldProcess(request, this.version)) { | ||
request.markSkip('Excluded', `Traversal policy excluded this resource`); | ||
return request.document; | ||
// We are no going to process but may still need to traverse the doc to get to referenced docs that | ||
// do need proecessing. If so, mark the request for no saving (already have good content) and carry on. | ||
// Otherwise, skip the doc altogether. | ||
if (request.policy.shouldTraverse(request)) { | ||
request.markNoSave(); | ||
} else { | ||
request.markSkip('Excluded', `Traversal policy excluded this resource`); | ||
return request.document; | ||
} | ||
} | ||
@@ -36,3 +44,9 @@ | ||
result._metadata.processedAt = moment.utc().toISOString(); | ||
if (result._metadata.version !== oldVersion) { | ||
request.markSave(); | ||
} | ||
} | ||
if (!request.shouldSave()) { | ||
request.outcome = request.outcome || 'Traversed'; | ||
} | ||
return result; | ||
@@ -81,4 +95,5 @@ } | ||
// TODO if there is no elementType on a collection then assume it is events. Need to fix this up and | ||
// formalize the model of collections where the request carries the payload.a | ||
const newRequest = new Request(item.type, `${request.url}/${item.id}`); | ||
// formalize the model of collections where the request carries the payload. | ||
const baseUrl = request.url.split("?")[0]; | ||
const newRequest = new Request(item.type, `${baseUrl}/${item.id}`); | ||
newRequest.payload = { etag: 1, body: item }; | ||
@@ -99,6 +114,8 @@ request.queue(newRequest); | ||
this._addRoot(request, 'user', 'user', document.url.replace('/orgs/', '/users/'), `urn:user:${document.id}`); | ||
this._addCollection(request, 'repos', "repo", null, `urn:user:${document.id}:repos`); | ||
this._addCollection(request, 'repos', 'repo', null, `urn:user:${document.id}:repos`); | ||
if (document.members_url) { | ||
this._addRelation(request, 'members', "user", document.members_url.replace('{/member}', ''), `${request.getQualifier()}:org_members`); | ||
this._addRelation(request, 'members', 'user', document.members_url.replace('{/member}', ''), `${request.getQualifier()}:org_members`); | ||
} | ||
const url = `${document._metadata.url}/teams`; | ||
this._addRelation(request, 'teams', 'team', url, `${request.getQualifier()}:org_teams`); | ||
@@ -363,3 +380,4 @@ return document; | ||
// Events are immutable (and we can't fetch them later) so set the etag to a constant | ||
const newRequest = new Request(event.type, `${request.url}/${event.id}`); | ||
const baseUrl = request.url.split("?")[0]; | ||
const newRequest = new Request(event.type, `${baseUrl}/${event.id}`); | ||
newRequest.payload = { etag: 1, body: event }; | ||
@@ -560,3 +578,4 @@ return newRequest; | ||
const page = parsed.query.page; | ||
if (page) { | ||
// TODO / check is a temporary measure to work around a queuing bug. Remove once queue is cleared | ||
if (page && !parsed.query.per_page.includes('/')) { | ||
return this.page.bind(this, page); | ||
@@ -563,0 +582,0 @@ } |
@@ -16,3 +16,2 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
this.context = context || {}; | ||
this.promises = []; | ||
} | ||
@@ -24,2 +23,3 @@ | ||
} | ||
object.policy = Request._getExpandedPolicy(object.policy); | ||
if (object.policy && object.policy.__proto__ !== Policy.prototype) { | ||
@@ -31,2 +31,10 @@ object.policy.__proto__ = Policy.prototype; | ||
static _getExpandedPolicy(policyOrSpec) { | ||
return typeof policyOrSpec === 'string' ? Policy.getPolicy(policyOrSpec) : policyOrSpec; | ||
} | ||
getTrackedPromises() { | ||
return this.promises || []; | ||
} | ||
track(promises) { | ||
@@ -36,2 +44,3 @@ if (!promises) { | ||
} | ||
this.promises = this.promises || []; | ||
if (Array.isArray(promises)) { | ||
@@ -184,2 +193,6 @@ Array.prototype.push.apply(this.promises, promises); | ||
markSave() { | ||
this.save = true; | ||
} | ||
markNoSave() { | ||
@@ -190,3 +203,3 @@ this.save = false; | ||
shouldSave() { | ||
return (this.save !== false) && this.document && this.contentOrigin !== 'cacheOfOrigin'; | ||
return this.document && (this.save === true || (this.save !== false && this.contentOrigin !== 'cacheOfOrigin')); | ||
} | ||
@@ -221,3 +234,3 @@ | ||
toUniqueString() { | ||
return `${this.type}@${this.url}:${this.policy.getShortForm()}`; | ||
return `${this.type}@${this.url}:${Request._getExpandedPolicy(this.policy).getShortForm()}`; | ||
} | ||
@@ -224,0 +237,0 @@ |
@@ -22,10 +22,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
* documentAndChildren - generate links etc and queue referenced child resources (i.e., not roots) for further processing | ||
* documentOnly - generate links | ||
* documentOnly - generate links but do not queue any referenced resources | ||
Transitivity -- How related resources should be queued. | ||
* shallow - Queue all non-roots and roots with the current Freshness policy | ||
* deepShallow - Queue NON-roots as always and roots with the current Freshness policy, Roots will have transitivity set to shallow. | ||
* deepDeep - Queue all related resources as always. Roots will have transitivity set to deepShallow | ||
* deepShallow - Queue NON-roots as deep and rootsas shallow. Use the current Freshness policy. | ||
* deepDeep - Queue all related resources with current Freshness policy. Roots will have transitivity set to deepShallow | ||
Basically, once you are doind deep traversal, carry that through for all children, but still allow transivity | ||
Basically, once you are doing deep traversal, carry that through for all children, but still allow transivity | ||
control when traversing to a root. A deepDeep traversal to a root will queue that root as deepShallow. Similarly, | ||
@@ -96,4 +96,4 @@ when traversing with deepShallow, queued roots end up as shallow. This approach gives you the ability to push deep | ||
static update() { | ||
return new TraversalPolicy('originStorage', 'always', 'documentAndRelated', 'deepDeep'); | ||
static refresh() { | ||
return new TraversalPolicy('originStorage', 'match', 'documentAndRelated', 'deepShallow'); | ||
} | ||
@@ -153,15 +153,18 @@ | ||
/** | ||
* Given a request for which the requisite content has been fetched, determine whether or not processing | ||
* should happen. | ||
* Given a request for which the requisite content has been fetched, determine whether or not it needs to be | ||
* processed. | ||
*/ | ||
shouldProcess(request, version) { | ||
// Note: if the freshness is match, we only got here if the content should be processed. | ||
if (this.freshness === 'always' || this.freshness === 'match') { | ||
if (this.freshness === 'always') { | ||
return true; | ||
} | ||
if (this.freshness === 'match') { | ||
// process if the content we got did NOT come from the cache (i.e., either is it new or never seen) | ||
return request.origin !== 'cacheOfOrigin'; | ||
} | ||
if (typeof this.freshness === 'number') { | ||
return this._testTransitivity(request, moment.diff(request.document._metadata.processedAt, 'hours') > this.freshness * 24); | ||
return moment.diff(request.document._metadata.processedAt, 'hours') > this.freshness * 24; | ||
} | ||
if (this.freshness === 'version' || this.freshness === 'matchOrVersion') { | ||
return this._testTransitivity(request, !request.document._metadata.version || (request.document._metadata.version < version)); | ||
return !request.document._metadata.version || (request.document._metadata.version < version); | ||
} | ||
@@ -171,12 +174,7 @@ throw new Error('Invalid freshness in traversal policy'); | ||
_testTransitivity(request, freshnessResult) { | ||
if (freshnessResult) { | ||
return true; | ||
} | ||
// The freshness policy indicates that we should not process this request. If the traversal policy | ||
// indicates deep traversal, mark this request for no saving (already have good content) but allow the | ||
// the request to be processed so referenced entities are walked. | ||
if (this.transitivity !== 'shallow') { | ||
request.markNoSave(); | ||
} | ||
/** | ||
* Given a request that would not otherwise be processed, answer whether or not its document should be | ||
* traversed to discover additional resources to process. | ||
*/ | ||
shouldTraverse(request) { | ||
return this.transitivity !== 'shallow'; | ||
@@ -186,8 +184,10 @@ } | ||
/** | ||
* Given a request for which we have found existing content in our doc store, say whether or not that existing | ||
* content should be fetched. The only case where we do NOT fetch is match. All others require the actual cached | ||
* content to determine whether or not processing is required. | ||
* Return whether or existing content in our store should be fetched. This is called when we have detected | ||
* existing content in our doc store that matches the content at origin (e.g., 304 was returned). | ||
* In the case of "match" freshness, we've already determined that origin and storage match. Only fetch | ||
* if we are traversing deep so there is something use in discovering references. All other cases require | ||
* the actual cached content to determine whether or not processing is required (e.g. to compare doc versions). | ||
*/ | ||
shouldFetchExisting(request) { | ||
return this.freshness !== 'match'; | ||
return this.freshness !== 'match' && this.transitivity !== 'shallow'; | ||
} | ||
@@ -194,0 +194,0 @@ |
{ | ||
"name": "ghcrawler", | ||
"version": "0.1.16", | ||
"version": "0.1.17", | ||
"description": "A robust GitHub API crawler that walks a queue of GitHub entities retrieving and storing their contents.", | ||
@@ -5,0 +5,0 @@ "main": "./index.js", |
@@ -276,3 +276,3 @@ // Copyright (c) Microsoft Corporation. All rights reserved. | ||
crawler.queue(request); | ||
expect(request.promises.length).to.be.equal(0); | ||
expect(request.promises).to.be.undefined; | ||
queue = [].concat.apply([], queue); | ||
@@ -279,0 +279,0 @@ expect(queue.length).to.be.equal(0); |
216700
4799