fabric-shim
Advanced tools
Comparing version 1.0.0-snapshot.10 to 1.0.0-snapshot.11
@@ -43,3 +43,3 @@ /* | ||
var start = function(chaincode) { | ||
let start = function(chaincode) { | ||
if (typeof chaincode !== 'object' || chaincode === null) | ||
@@ -77,3 +77,3 @@ throw new Error('Missing required argument: chaincode'); | ||
var success = function(payload) { | ||
let success = function(payload) { | ||
let ret = new _responseProto.Response(); | ||
@@ -86,3 +86,3 @@ ret.status = Stub.RESPONSE_CODE.OK; | ||
var error = function(msg) { | ||
let error = function(msg) { | ||
let ret = new _responseProto.Response(); | ||
@@ -89,0 +89,0 @@ ret.status = Stub.RESPONSE_CODE.ERROR; |
@@ -20,2 +20,3 @@ /* | ||
const Stub = require('./stub.js'); | ||
const shim = require('./chaincode.js'); | ||
@@ -54,2 +55,150 @@ const _serviceProto = grpc.load({ | ||
/* | ||
* Simple class to represent a message to be queued with the associated | ||
* promise methods to be driven around this message | ||
*/ | ||
class QMsg { | ||
constructor(msg, method, resolve, reject) { | ||
this.msg = msg; | ||
this.method = method; | ||
this.resolve = resolve; | ||
this.reject = reject; | ||
} | ||
getMsg() { | ||
return this.msg; | ||
} | ||
getMsgTxid() { | ||
return this.msg.txid; | ||
} | ||
getMethod() { | ||
return this.method; | ||
} | ||
success(response) { | ||
this.resolve(response); | ||
} | ||
fail(err) { | ||
this.reject(err); | ||
} | ||
} | ||
/* | ||
* This class handles queuing messages to be sent to the peer based on transaction id | ||
* The peer can access requests coming from different transactions concurrently but | ||
* cannot handle concurrent requests for the same transaction. Given the nature of asynchronouse | ||
* programming this could present a problem so this implementation provides a way to allow | ||
* code to perform concurrent request by serialising the calls to the peer. | ||
*/ | ||
class MsgQueueHandler { | ||
constructor(handler) { | ||
this.handler = handler; | ||
this.stream = handler._stream; | ||
this.txQueues = {}; | ||
} | ||
/* | ||
* Queue a message to be sent to the peer. If it is the first | ||
* message on the queue then send the message to the peer | ||
* | ||
* @param {QMsg} qMsg the message to queue | ||
*/ | ||
queueMsg(qMsg) { | ||
let txId = qMsg.getMsgTxid(); | ||
let msgQueue = this.txQueues[txId]; | ||
if (!msgQueue) { | ||
msgQueue = this.txQueues[txId] = []; | ||
} | ||
msgQueue.push(qMsg); | ||
if (msgQueue.length === 1) { | ||
this._sendMsg(txId); | ||
} | ||
} | ||
/** | ||
* Handle a response to a message. this looks at the top of | ||
* the queue for the specific txn id to get the message this | ||
* response is associated with so it can drive the promise waiting | ||
* on this message response. it then removes that message from the | ||
* queue and sends the next message on the queue if there is one. | ||
* | ||
* @param {any} response the received response | ||
*/ | ||
handleMsgResponse(response) { | ||
let txId = response.txid; | ||
let qMsg = this._getCurrentMsg(txId); | ||
if (qMsg) { | ||
try { | ||
let parsedResponse = parseResponse(this.handler, response, qMsg.getMethod()); | ||
qMsg.success(parsedResponse); | ||
} | ||
catch(err) { | ||
qMsg.fail(err); | ||
} | ||
this._removeCurrentAndSendNextMsg(txId); | ||
} | ||
} | ||
/** | ||
* Get the current message. | ||
* this returns the message at the top of the queue for the particular transaction. | ||
* | ||
* @param {string} txId | ||
* @returns {QMsg} the message at the top of the queue | ||
*/ | ||
_getCurrentMsg(txId) { | ||
let msgQueue = this.txQueues[txId]; | ||
if (msgQueue) { | ||
return msgQueue[0]; | ||
} | ||
let errMsg = util.format('Failed to find a message for transaction %s', txId); | ||
logger.error(errMsg); | ||
//Throwing an error here will terminate the container and potentially lose logs | ||
//This may be an error, but I don't know if this should abend the container or | ||
//should just keep going. | ||
//throw new Error(errMsg); | ||
} | ||
/** | ||
* Remove the current message and send the next message in the queue if there is one. | ||
* delete the queue if there are no more messages. | ||
* | ||
* @param {any} txId the transaction id | ||
*/ | ||
_removeCurrentAndSendNextMsg(txId) { | ||
let msgQueue = this.txQueues[txId]; | ||
if (msgQueue && msgQueue.length > 0) { | ||
msgQueue.shift(); | ||
if (msgQueue.length === 0) { | ||
delete this.txQueues[txId]; | ||
} else { | ||
this._sendMsg(txId); | ||
} | ||
} | ||
} | ||
/** | ||
* send the current message to the peer. | ||
* | ||
* @param {any} txId the transaction id | ||
*/ | ||
_sendMsg(txId) { | ||
let qMsg = this._getCurrentMsg(txId); | ||
if (qMsg) { | ||
try { | ||
this.stream.write(qMsg.getMsg()); | ||
} | ||
catch(err) { | ||
qMsg.fail(err); | ||
} | ||
} | ||
} | ||
} | ||
/** | ||
@@ -60,3 +209,3 @@ * The ChaincodeSupportClient class represents a the base class for all remote nodes, Peer, Orderer , and MemberServicespeer. | ||
*/ | ||
let ChaincodeSupportClient = class { | ||
class ChaincodeSupportClient { | ||
@@ -113,3 +262,2 @@ /** | ||
this._client = new _serviceProto.ChaincodeSupport(this._endpoint.addr, this._endpoint.creds, this._options); | ||
this._peerListeners = {}; | ||
} | ||
@@ -126,2 +274,3 @@ | ||
this._stream = this._client.register(); | ||
this.msgQueueHandler = new MsgQueueHandler(this); | ||
@@ -144,15 +293,5 @@ let stream = this._stream; | ||
if (type === MSG_TYPE.RESPONSE) { | ||
let cb = self._peerListeners[msg.txid]; | ||
if (cb) { | ||
cb(msg); | ||
} else { | ||
let errMsg = util.format('Failed to find a listener for the peer response with transaction Id %s', msg.txid); | ||
logger.error(errMsg); | ||
throw new Error(errMsg); | ||
} | ||
} else if (type === MSG_TYPE.ERROR) { | ||
// TODO: peer has sent error response to a request from the shim | ||
// use the txId of the message to call the corresponding callback | ||
if (type === MSG_TYPE.RESPONSE || type === MSG_TYPE.ERROR) { | ||
logger.debug('[%s]Received %s, handling good or error response', shortTxid(msg.txid), msg.type); | ||
self.msgQueueHandler.handleMsgResponse(msg); | ||
} else if (type === MSG_TYPE.INIT) { | ||
@@ -166,2 +305,3 @@ logger.debug('[%s]Received %s, initializing chaincode', shortTxid(msg.txid), msg.type); | ||
logger.error('Received unknown message from the peer. Exiting.'); | ||
//TODO: Should we really do this ? | ||
process.exit(1); | ||
@@ -349,25 +489,19 @@ } | ||
registerPeerListener(txId, cb) { | ||
this._peerListeners[txId] = cb; | ||
} | ||
removePeerListener(txId) { | ||
if (this._peerListeners[txId]) { | ||
delete this._peerListeners[txId]; | ||
} | ||
} | ||
/** | ||
* send a message to the peer which returns a promise of the | ||
* response. | ||
* | ||
* @param {string} msg the message to send to the peer | ||
* @param {string} method the name of the method being called | ||
* @returns {promise} returns a promise which is resolved with the response | ||
* or is rejected otherwise | ||
*/ | ||
_askPeerAndListen(msg, method) { | ||
let self = this; | ||
return new Promise((resolve, reject) => { | ||
self.registerPeerListener(msg.txid, (res) => { | ||
self.removePeerListener(msg.txid); | ||
peerResponded(self, res, method, resolve, reject); | ||
}); | ||
self._stream.write(msg); | ||
let qMsg = new QMsg(msg, method, resolve, reject); | ||
this.msgQueueHandler.queueMsg(qMsg); | ||
}); | ||
} | ||
/** | ||
@@ -399,3 +533,3 @@ * return a printable representation of this object | ||
try { | ||
stub = new Stub(client, msg.txid, input, msg.proposal); | ||
stub = createStub(client, msg.txid, input, msg.proposal); | ||
} catch(err) { | ||
@@ -421,6 +555,10 @@ logger.error(util.format('Failed to construct a chaincode stub instance from the INIT message: %s', err)); | ||
} | ||
// check that a response object has been returned otherwise assume an error. | ||
if (!resp || !resp.status) { | ||
let errMsg = util.format('[%s]Calling chaincode %s() has not called success or error.', | ||
shortTxid(msg.txid), method); | ||
logger.error(errMsg); | ||
resp = shim.error(errMsg); | ||
} | ||
//TODO: We should validate that a promise is returned, also that the resp has fields | ||
//in it such as status, eg don't return shim.success() or shim.error() will cause | ||
//unhandledPromiseRecection. | ||
logger.debug(util.format( | ||
@@ -461,2 +599,15 @@ '[%s]Calling chaincode %s(), response status: %s', | ||
/** | ||
* function to create a new Stub, this is done to facilitate unit testing | ||
* | ||
* @param {Handler} client an instance of the Handler class | ||
* @param {string} txid transaction id | ||
* @param {any} input decoded message from peer | ||
* @param {any} proposal the proposal | ||
* @returns a new Stub instance | ||
*/ | ||
function createStub(client, txid, input, proposal) { | ||
return new Stub(client, txid, input, proposal); | ||
} | ||
function newErrorMsg(msg, state) { | ||
@@ -480,3 +631,3 @@ let errStr = util.format('[%s]Chaincode handler FSM cannot handle message (%s) with payload size (%d) while in state: %s', | ||
function peerResponded(handler, res, method, resolve, reject) { | ||
function parseResponse(handler, res, method) { | ||
if (res.type === MSG_TYPE.RESPONSE) { | ||
@@ -490,16 +641,16 @@ logger.debug(util.format('[%s]Received %s() successful response', shortTxid(res.txid), method)); | ||
case 'GetQueryResult': | ||
return resolve(new StateQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload))); | ||
return new StateQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload)); | ||
case 'GetHistoryForKey': | ||
return resolve (new HistoryQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload))); | ||
return new HistoryQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload)); | ||
case 'QueryStateNext': | ||
case 'QueryStateClose': | ||
return resolve(_serviceProto.QueryResponse.decode(res.payload)); | ||
return _serviceProto.QueryResponse.decode(res.payload); | ||
case 'InvokeChaincode': | ||
return resolve(_serviceProto.ChaincodeMessage.decode(res.payload)); | ||
return _serviceProto.ChaincodeMessage.decode(res.payload); | ||
} | ||
return resolve(res.payload); | ||
return res.payload; | ||
} else if (res.type === MSG_TYPE.ERROR) { | ||
logger.debug(util.format('[%s]Received %s() error response', shortTxid(res.txid), method)); | ||
return reject(new Error(res.payload.toString())); | ||
throw new Error(res.payload.toString()); | ||
} else { | ||
@@ -510,3 +661,3 @@ let errMsg = util.format( | ||
logger.debug(errMsg); | ||
return reject(new Error(errMsg)); | ||
throw new Error(errMsg); | ||
} | ||
@@ -520,5 +671,5 @@ } | ||
// | ||
let Endpoint = class { | ||
class Endpoint { | ||
constructor(url /*string*/, opts ) { | ||
var fs = require('fs'), | ||
let fs = require('fs'), | ||
path = require('path'); | ||
@@ -553,2 +704,2 @@ | ||
} | ||
}; | ||
} |
@@ -65,5 +65,2 @@ 'use strict'; | ||
} | ||
if (queryResult.done && this.listenerCount('end') > 0) { | ||
this.emit('end', this); | ||
} | ||
return queryResult; | ||
@@ -70,0 +67,0 @@ } |
{ | ||
"name": "fabric-shim", | ||
"version": "1.0.0-snapshot.10", | ||
"version": "1.0.0-snapshot.11", | ||
"description": "A node.js implementation of Hyperledger Fabric chaincode shim, to allow endorsing peers and user-provided chaincodes to communicate with each other", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
77626
1087