Socket
Socket
Sign inDemoInstall

fabric-shim

Package Overview
Dependencies
Maintainers
2
Versions
304
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

fabric-shim - npm Package Compare versions

Comparing version 1.0.0-snapshot.10 to 1.0.0-snapshot.11

6

lib/chaincode.js

@@ -43,3 +43,3 @@ /*

var start = function(chaincode) {
let start = function(chaincode) {
if (typeof chaincode !== 'object' || chaincode === null)

@@ -77,3 +77,3 @@ throw new Error('Missing required argument: chaincode');

var success = function(payload) {
let success = function(payload) {
let ret = new _responseProto.Response();

@@ -86,3 +86,3 @@ ret.status = Stub.RESPONSE_CODE.OK;

var error = function(msg) {
let error = function(msg) {
let ret = new _responseProto.Response();

@@ -89,0 +89,0 @@ ret.status = Stub.RESPONSE_CODE.ERROR;

@@ -20,2 +20,3 @@ /*

const Stub = require('./stub.js');
const shim = require('./chaincode.js');

@@ -54,2 +55,150 @@ const _serviceProto = grpc.load({

/*
* Simple class to represent a message to be queued with the associated
* promise methods to be driven around this message
*/
class QMsg {
constructor(msg, method, resolve, reject) {
this.msg = msg;
this.method = method;
this.resolve = resolve;
this.reject = reject;
}
getMsg() {
return this.msg;
}
getMsgTxid() {
return this.msg.txid;
}
getMethod() {
return this.method;
}
success(response) {
this.resolve(response);
}
fail(err) {
this.reject(err);
}
}
/*
* This class handles queuing messages to be sent to the peer based on transaction id
* The peer can access requests coming from different transactions concurrently but
* cannot handle concurrent requests for the same transaction. Given the nature of asynchronouse
* programming this could present a problem so this implementation provides a way to allow
* code to perform concurrent request by serialising the calls to the peer.
*/
class MsgQueueHandler {
constructor(handler) {
this.handler = handler;
this.stream = handler._stream;
this.txQueues = {};
}
/*
* Queue a message to be sent to the peer. If it is the first
* message on the queue then send the message to the peer
*
* @param {QMsg} qMsg the message to queue
*/
queueMsg(qMsg) {
let txId = qMsg.getMsgTxid();
let msgQueue = this.txQueues[txId];
if (!msgQueue) {
msgQueue = this.txQueues[txId] = [];
}
msgQueue.push(qMsg);
if (msgQueue.length === 1) {
this._sendMsg(txId);
}
}
/**
* Handle a response to a message. this looks at the top of
* the queue for the specific txn id to get the message this
* response is associated with so it can drive the promise waiting
* on this message response. it then removes that message from the
* queue and sends the next message on the queue if there is one.
*
* @param {any} response the received response
*/
handleMsgResponse(response) {
let txId = response.txid;
let qMsg = this._getCurrentMsg(txId);
if (qMsg) {
try {
let parsedResponse = parseResponse(this.handler, response, qMsg.getMethod());
qMsg.success(parsedResponse);
}
catch(err) {
qMsg.fail(err);
}
this._removeCurrentAndSendNextMsg(txId);
}
}
/**
* Get the current message.
* this returns the message at the top of the queue for the particular transaction.
*
* @param {string} txId
* @returns {QMsg} the message at the top of the queue
*/
_getCurrentMsg(txId) {
let msgQueue = this.txQueues[txId];
if (msgQueue) {
return msgQueue[0];
}
let errMsg = util.format('Failed to find a message for transaction %s', txId);
logger.error(errMsg);
//Throwing an error here will terminate the container and potentially lose logs
//This may be an error, but I don't know if this should abend the container or
//should just keep going.
//throw new Error(errMsg);
}
/**
* Remove the current message and send the next message in the queue if there is one.
* delete the queue if there are no more messages.
*
* @param {any} txId the transaction id
*/
_removeCurrentAndSendNextMsg(txId) {
let msgQueue = this.txQueues[txId];
if (msgQueue && msgQueue.length > 0) {
msgQueue.shift();
if (msgQueue.length === 0) {
delete this.txQueues[txId];
} else {
this._sendMsg(txId);
}
}
}
/**
* send the current message to the peer.
*
* @param {any} txId the transaction id
*/
_sendMsg(txId) {
let qMsg = this._getCurrentMsg(txId);
if (qMsg) {
try {
this.stream.write(qMsg.getMsg());
}
catch(err) {
qMsg.fail(err);
}
}
}
}
/**

@@ -60,3 +209,3 @@ * The ChaincodeSupportClient class represents a the base class for all remote nodes, Peer, Orderer , and MemberServicespeer.

*/
let ChaincodeSupportClient = class {
class ChaincodeSupportClient {

@@ -113,3 +262,2 @@ /**

this._client = new _serviceProto.ChaincodeSupport(this._endpoint.addr, this._endpoint.creds, this._options);
this._peerListeners = {};
}

@@ -126,2 +274,3 @@

this._stream = this._client.register();
this.msgQueueHandler = new MsgQueueHandler(this);

@@ -144,15 +293,5 @@ let stream = this._stream;

if (type === MSG_TYPE.RESPONSE) {
let cb = self._peerListeners[msg.txid];
if (cb) {
cb(msg);
} else {
let errMsg = util.format('Failed to find a listener for the peer response with transaction Id %s', msg.txid);
logger.error(errMsg);
throw new Error(errMsg);
}
} else if (type === MSG_TYPE.ERROR) {
// TODO: peer has sent error response to a request from the shim
// use the txId of the message to call the corresponding callback
if (type === MSG_TYPE.RESPONSE || type === MSG_TYPE.ERROR) {
logger.debug('[%s]Received %s, handling good or error response', shortTxid(msg.txid), msg.type);
self.msgQueueHandler.handleMsgResponse(msg);
} else if (type === MSG_TYPE.INIT) {

@@ -166,2 +305,3 @@ logger.debug('[%s]Received %s, initializing chaincode', shortTxid(msg.txid), msg.type);

logger.error('Received unknown message from the peer. Exiting.');
//TODO: Should we really do this ?
process.exit(1);

@@ -349,25 +489,19 @@ }

registerPeerListener(txId, cb) {
this._peerListeners[txId] = cb;
}
removePeerListener(txId) {
if (this._peerListeners[txId]) {
delete this._peerListeners[txId];
}
}
/**
* send a message to the peer which returns a promise of the
* response.
*
* @param {string} msg the message to send to the peer
* @param {string} method the name of the method being called
* @returns {promise} returns a promise which is resolved with the response
* or is rejected otherwise
*/
_askPeerAndListen(msg, method) {
let self = this;
return new Promise((resolve, reject) => {
self.registerPeerListener(msg.txid, (res) => {
self.removePeerListener(msg.txid);
peerResponded(self, res, method, resolve, reject);
});
self._stream.write(msg);
let qMsg = new QMsg(msg, method, resolve, reject);
this.msgQueueHandler.queueMsg(qMsg);
});
}
/**

@@ -399,3 +533,3 @@ * return a printable representation of this object

try {
stub = new Stub(client, msg.txid, input, msg.proposal);
stub = createStub(client, msg.txid, input, msg.proposal);
} catch(err) {

@@ -421,6 +555,10 @@ logger.error(util.format('Failed to construct a chaincode stub instance from the INIT message: %s', err));

}
// check that a response object has been returned otherwise assume an error.
if (!resp || !resp.status) {
let errMsg = util.format('[%s]Calling chaincode %s() has not called success or error.',
shortTxid(msg.txid), method);
logger.error(errMsg);
resp = shim.error(errMsg);
}
//TODO: We should validate that a promise is returned, also that the resp has fields
//in it such as status, eg don't return shim.success() or shim.error() will cause
//unhandledPromiseRecection.
logger.debug(util.format(

@@ -461,2 +599,15 @@ '[%s]Calling chaincode %s(), response status: %s',

/**
* function to create a new Stub, this is done to facilitate unit testing
*
* @param {Handler} client an instance of the Handler class
* @param {string} txid transaction id
* @param {any} input decoded message from peer
* @param {any} proposal the proposal
* @returns a new Stub instance
*/
function createStub(client, txid, input, proposal) {
return new Stub(client, txid, input, proposal);
}
function newErrorMsg(msg, state) {

@@ -480,3 +631,3 @@ let errStr = util.format('[%s]Chaincode handler FSM cannot handle message (%s) with payload size (%d) while in state: %s',

function peerResponded(handler, res, method, resolve, reject) {
function parseResponse(handler, res, method) {
if (res.type === MSG_TYPE.RESPONSE) {

@@ -490,16 +641,16 @@ logger.debug(util.format('[%s]Received %s() successful response', shortTxid(res.txid), method));

case 'GetQueryResult':
return resolve(new StateQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload)));
return new StateQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload));
case 'GetHistoryForKey':
return resolve (new HistoryQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload)));
return new HistoryQueryIterator(handler, res.txid, _serviceProto.QueryResponse.decode(res.payload));
case 'QueryStateNext':
case 'QueryStateClose':
return resolve(_serviceProto.QueryResponse.decode(res.payload));
return _serviceProto.QueryResponse.decode(res.payload);
case 'InvokeChaincode':
return resolve(_serviceProto.ChaincodeMessage.decode(res.payload));
return _serviceProto.ChaincodeMessage.decode(res.payload);
}
return resolve(res.payload);
return res.payload;
} else if (res.type === MSG_TYPE.ERROR) {
logger.debug(util.format('[%s]Received %s() error response', shortTxid(res.txid), method));
return reject(new Error(res.payload.toString()));
throw new Error(res.payload.toString());
} else {

@@ -510,3 +661,3 @@ let errMsg = util.format(

logger.debug(errMsg);
return reject(new Error(errMsg));
throw new Error(errMsg);
}

@@ -520,5 +671,5 @@ }

//
let Endpoint = class {
class Endpoint {
constructor(url /*string*/, opts ) {
var fs = require('fs'),
let fs = require('fs'),
path = require('path');

@@ -553,2 +704,2 @@

}
};
}

@@ -65,5 +65,2 @@ 'use strict';

}
if (queryResult.done && this.listenerCount('end') > 0) {
this.emit('end', this);
}
return queryResult;

@@ -70,0 +67,0 @@ }

{
"name": "fabric-shim",
"version": "1.0.0-snapshot.10",
"version": "1.0.0-snapshot.11",
"description": "A node.js implementation of Hyperledger Fabric chaincode shim, to allow endorsing peers and user-provided chaincodes to communicate with each other",

@@ -5,0 +5,0 @@ "main": "index.js",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc