Socket
Socket
Sign inDemoInstall

http2

Package Overview
Dependencies
0
Maintainers
1
Versions
44
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.0.6 to 0.1.0

example/localhost.crt

25

example/client.js
var parse_url = require('url').parse;
var net = require('net');
var http2 = require('../lib/index');
var Endpoint = http2.endpoint.Endpoint;
var path = require('path');
var http2 = require('..');

@@ -12,13 +11,15 @@ var settings = {

var url = parse_url(process.argv.pop());
var server = { host: url.hostname, port: url.port };
var socket = net.connect(server, function() {
var client_endpoint = new Endpoint('CLIENT', settings);
client_endpoint.pipe(socket).pipe(client_endpoint);
var request = http2.request({
method: 'get',
host: url.hostname,
port: url.port,
url: url.path,
rejectUnauthorized: false
});
request.end();
var stream = client_endpoint._connection.createStream();
stream.open({ ':path': url.path });
stream.end();
stream.pipe(process.stderr);
stream.on('end', process.exit);
request.on('response', function(response) {
response.pipe(process.stderr);
response.on('end', process.exit);
});
var fs = require('fs');
var net = require('net');
var path = require('path');
var http2 = require('../lib/index');
var Endpoint = http2.endpoint.Endpoint;
var http2 = require('..');
var settings = {
SETTINGS_MAX_CONCURRENT_STREAMS: 1,
SETTINGS_INITIAL_WINDOW_SIZE: 100000
};
var server = http2.createServer({
key: fs.readFileSync(path.join(__dirname, './localhost.key')),
cert: fs.readFileSync(path.join(__dirname, '/localhost.crt'))
}, function(request, response) {
var filename = path.join(__dirname, request.url);
console.error('Incoming request:', request.url, '(' + filename + ')');
var server = net.createServer(function(socket) {
console.error('Incoming connection.');
if (fs.existsSync(filename) && fs.statSync(filename).isFile()) {
console.error('Reading file from disk.');
var filestream = fs.createReadStream(filename);
response.writeHead('200');
filestream.pipe(response);
var server_endpoint = new Endpoint('SERVER', settings);
server_endpoint.pipe(socket).pipe(server_endpoint);
server_endpoint._connection.on('incoming_stream', function(stream) {
console.error('Incoming stream.');
stream.on('headers', function(headers) {
var filename = path.join(__dirname, headers[':path']);
console.error('Incoming request:', headers[':path'], '(' + filename + ')');
if (fs.existsSync(filename)) {
console.error('Reading file from disk.');
stream.open({
':status': '200'
});
var filestream = fs.createReadStream(filename);
filestream.pipe(stream);
} else {
console.error('File not found.');
stream.open({
':status': '404'
});
stream.end();
}
});
});
} else {
console.error('File not found.');
response.writeHead('404');
response.end();
}
});
server.listen(8080);
console.error('Listening on localhost:8080, serving up files from', __dirname);
var port = 8080;
if ('HTTP2_PORT' in process.env) {
port = parseInt(process.env.HTTP2_PORT);
}
server.listen(port);
console.error('Listening on localhost:' + port + ', serving up files from', __dirname);
Version history
===============
### 0.1.0 (2013-08-06) ###
* First release with public API (similar to the standard node HTTPS module)
* Support for NPN negotiation (no ALPN or Upgrade yet)
* Stream number limitation is in place
* Push streams works but not exposed yet in the public API
* [Blog post](http://gabor.molnar.es/blog/2013/08/05/gsoc-week-number-6-and-number-7/)
* [Tarball](https://github.com/molnarg/node-http2/archive/node-http2-0.1.0.tar.gz)
### 0.0.6 (2013-07-19) ###

@@ -5,0 +14,0 @@

@@ -165,9 +165,9 @@ // HTTP/2 compression is implemented by two [Transform Stream][1] subclasses that operate in

var full_match = this._table.filter(equal);
if (full_match.length !== 0) {
var full_index = this._table.indexOf(full_match[0]);
if (!this._isShadowed(full_index)) {
var fullMatch = this._table.filter(equal);
if (fullMatch.length !== 0) {
var fullIndex = this._table.indexOf(fullMatch[0]);
if (!this._isShadowed(fullIndex)) {
return {
name: full_index,
value: full_index,
name: fullIndex,
value: fullIndex,
index: -1

@@ -179,12 +179,12 @@ };

var name = pair[0].toLowerCase();
var name_match = this._table.filter(function(entry) {
var nameMatch = this._table.filter(function(entry) {
return entry[0].toLowerCase() === name;
});
if (name_match.length !== 0) {
var name_index = this._table.indexOf(name_match[0]);
if (!this._isShadowed(name_index)) {
if (nameMatch.length !== 0) {
var nameIndex = this._table.indexOf(nameMatch[0]);
if (!this._isShadowed(nameIndex)) {
return {
name: name_index,
name: nameIndex,
value: pair[1],
index: name_index
index: nameIndex
};

@@ -252,5 +252,5 @@ }

var initial_table = (type === 'REQUEST') ? CompressionContext.initialRequestTable
: CompressionContext.initialResponseTable;
this._context = new CompressionContext(initial_table);
var initialTable = (type === 'REQUEST') ? CompressionContext.initialRequestTable
: CompressionContext.initialResponseTable;
this._context = new CompressionContext(initialTable);

@@ -314,5 +314,5 @@ this._initializeStream();

var initial_table = (type === 'REQUEST') ? CompressionContext.initialRequestTable
: CompressionContext.initialResponseTable;
this._context = new CompressionContext(initial_table);
var initialTable = (type === 'REQUEST') ? CompressionContext.initialRequestTable
: CompressionContext.initialResponseTable;
this._context = new CompressionContext(initialTable);

@@ -469,5 +469,5 @@ this._initializeStream();

Compressor.string = function writeString(str) {
var encoded_string = new Buffer(str, 'utf8');
var encoded_length = Compressor.integer(encoded_string.length, 0);
return encoded_length.concat(encoded_string);
var encodedString = new Buffer(str, 'utf8');
var encodedLength = Compressor.integer(encodedString.length, 0);
return encodedLength.concat(encodedString);
};

@@ -524,4 +524,4 @@

literal : { prefix: 5, pattern: 0x60 },
literal_incremental : { prefix: 5, pattern: 0x40 },
literal_substitution: { prefix: 6, pattern: 0x00 }
literalIncremental : { prefix: 5, pattern: 0x40 },
literalSubstitution : { prefix: 6, pattern: 0x00 }
};

@@ -537,5 +537,5 @@

} else if (header.index === Infinity) {
representation = representations.literal_incremental;
representation = representations.literalIncremental;
} else {
representation = representations.literal_substitution;
representation = representations.literalSubstitution;
}

@@ -554,3 +554,3 @@

if (representation === representations.literal_substitution) {
if (representation === representations.literalSubstitution) {
buffers.push(Compressor.integer(header.index, 0));

@@ -570,13 +570,13 @@ }

var first_byte = buffer[buffer.cursor];
if (first_byte & 0x80) {
var firstByte = buffer[buffer.cursor];
if (firstByte & 0x80) {
representation = representations.indexed;
} else if (first_byte & 0x40) {
if (first_byte & 0x20) {
} else if (firstByte & 0x40) {
if (firstByte & 0x20) {
representation = representations.literal;
} else {
representation = representations.literal_incremental;
representation = representations.literalIncremental;
}
} else {
representation = representations.literal_substitution;
representation = representations.literalSubstitution;
}

@@ -594,5 +594,5 @@

if (representation === representations.literal_substitution) {
if (representation === representations.literalSubstitution) {
header.index = Decompressor.integer(buffer, 0);
} else if (representation === representations.literal_incremental) {
} else if (representation === representations.literalIncremental) {
header.index = Infinity;

@@ -647,7 +647,7 @@ } else {

// * for each `chunk`, it pushes out a `chunk_frame` that is identical to the original, except
// * for each chunk, it pushes out a chunk frame that is identical to the original, except
// the `data` property which holds the given chunk, the END_HEADERS/END_PUSH_STREAM flag that
// marks the last frame and the END_STREAM flag which is always false before the end
for (var i = 0; i < chunks.length; i++) {
var flags = utils.shallow_copy(frame.flags);
var flags = utils.shallowCopy(frame.flags);
if (i === chunks.length - 1) {

@@ -681,3 +681,3 @@ flags['END_' + frame.type] = true;

//
// If there's a frame in progress, `this._in_progress` is `true`. The frames are collected in
// If there's a frame in progress, `this._inProgress` is `true`. The frames are collected in
// `this._frames`, and the type of the frame and the stream identifier is stored in `this._type`

@@ -687,3 +687,3 @@ // and `this._stream` respectively.

Transform.call(this, { objectMode: true });
this._in_progress = false;
this._inProgress = false;
this._type = undefined;

@@ -696,6 +696,6 @@ this._stream = undefined;

Decompressor.prototype._transform = function _transform(frame, encoding, done) {
// * and the collection process is already `_in_progress`, the frame is simply stored, except if
// * and the collection process is already `_inProgress`, the frame is simply stored, except if
// it's an illegal frame
if (this._in_progress) {
if (frame.type !== this._type || frame.stream !== this._stream) {
if (this._inProgress) {
if ((frame.type !== this._type) || (frame.stream !== this._stream)) {
this.emit('error', 'A series of header frames must not be interleaved with other frames!');

@@ -706,6 +706,6 @@ }

// * and the collection process is not `_in_progress`, but the new frame's type is HEADERS or
// * and the collection process is not `_inProgress`, but the new frame's type is HEADERS or
// PUSH_PROMISE, a new collection process begins
else if (frame.type === 'HEADERS' || frame.type === 'PUSH_PROMISE') {
this._in_progress = true;
else if ((frame.type === 'HEADERS') || (frame.type === 'PUSH_PROMISE')) {
this._inProgress = true;
this._type = frame.type;

@@ -724,3 +724,3 @@ this._stream = frame.stream;

// decompressed headers.
if (this._in_progress && (frame.flags.END_HEADERS || frame.flags.END_PUSH_PROMISE)) {
if (this._inProgress && (frame.flags.END_HEADERS || frame.flags.END_PUSH_PROMISE)) {
var buffer = utils.concat(this._frames.map(function(frame) {

@@ -737,3 +737,3 @@ return frame.data;

});
this._in_progress = false;
this._inProgress = false;
}

@@ -740,0 +740,0 @@

@@ -0,6 +1,41 @@

var assert = require('assert');
var utils = require('./utils');
var logging = require('./logging');
var Stream = require('./stream').Stream;
// Connection
// ----------
// The Connection class manages HTTP/2 connections. Each instance corresponds to one transport
// stream (TCP stream). It operates by sending and receiving frames and is implemented as an
// [object mode][1] [Duplex stream][2].
//
// [1]: http://nodejs.org/api/stream.html#stream_new_stream_readable_options
// [2]: http://nodejs.org/api/stream.html#stream_class_stream_duplex
var Duplex = require('stream').Duplex;
exports.Connection = Connection;
// The main aspects of managing the connection are:
function Connection(firstStreamId, settings, log) {
// * handling IO, particularly multiplexing/demultiplexing incoming and outgoing frames
Duplex.call(this, { objectMode: true });
// * logging: every method uses the common logger object
this._log = (log || logging.root).child({ component: 'connection' });
// * stream management
this._initializeStreamManagement(firstStreamId);
// * settings management
this._initializeSettingsManagement(settings);
// * lifecycle management
this._initializeLifecycleManagement();
// * flow control
this._initializeFlowControl();
}
Connection.prototype = Object.create(Duplex.prototype, { constructor: { value: Connection } });
// Overview

@@ -37,57 +72,138 @@ // --------

// Connection
// ----------
// Stream management
// -----------------
exports.Connection = Connection;
var Stream = require('./stream').Stream;
// The main aspects of managing the connection are:
function Connection(firstStreamId, settings, log) {
// * handling IO, particularly multiplexing/demultiplexing incoming and outgoing frames
Duplex.call(this, { objectMode: true });
// Initialization:
Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
// * streams are stored in two data structures:
// * `_streamsIds` is the primary registry of streams. It's a sparse array that serves as an
// id -> stream map.
//
// * `_streamPriorities` is an ordered set of streams that are allowed to send data. The order
// is determined by stream priorities. (currently, it's order of creation)
this._streamsIds = [];
// * logging: every method uses the common logger object
this._log = (log || logging.root).child({ component: 'connection' });
this._streamPriorities = [];
// * stream management
this._initializeStreamManagement(firstStreamId);
// * The next outbound stream ID is stored in `this._nextStreamId`
this._nextStreamId = firstStreamId;
// * settings management
this._initializeSettingsManagement(settings);
// * Creating the `_control` stream that corresponds to stream ID 0 (connection level frames).
this._control = new Duplex({ objectMode: true });
this._control._write = this._writeControlFrame.bind(this);
this._control._read = utils.noop;
this._control.on('readable', this.emit.bind(this, 'stream_readable'));
this._streamsIds[0] = this._streamPriorities[0] = { upstream: this._control, priority: -1 };
// * lifecycle management
this._initializeLifecycleManagement();
// * By default, the number of concurrent outbound streams is not limited. The `_streamLimit` can
// be set by the SETTINGS_MAX_CONCURRENT_STREAMS setting.
this._streamCount = 0;
this._streamLimit = Infinity;
this._control.on('SETTINGS_MAX_CONCURRENT_STREAMS', this._updateStreamLimit.bind(this));
};
// * flow control
this._initializeFlowControl();
}
Connection.prototype = Object.create(Duplex.prototype, { constructor: { value: Connection } });
Connection.prototype.getIdOf = function getIdOf(stream) {
return this._streamsIds.indexOf(stream);
};
// Stream management
// -----------------
// `_writeControlFrame` is called when there's an incoming frame in the `_control` stream. It
// broadcasts the message by creating an event on it.
Connection.prototype._writeControlFrame = function _writeControlFrame(frame, encoding, done) {
this._control.emit(frame.type, frame);
done();
};
Connection.prototype._initializeStreamManagement = function _initializeStreamManagement(firstStreamId) {
this._control = new Duplex({ objectMode: true });
this._control._write = function(frame, encoding, done) {
this.emit(frame.type, frame);
done();
};
this._control._read = utils.noop;
this._control.on('readable', this.emit.bind(this, 'stream_readable'));
// Changing the stream count limit
Connection.prototype._updateStreamLimit = function _updateStreamLimit(newStreamLimit) {
this._streamLimit = newStreamLimit;
this.emit('stream_slot_change');
};
this.streams = [{ upstream: this._control }];
this._next_stream_id = firstStreamId;
// Freeing a slot in the stream pool
Connection.prototype._decreaseStreamCount = function _decreaseStreamCount() {
this._streamCount -= 1;
this.emit('stream_slot_change');
};
// Creating a new *inbound or outbound* stream with the given `id` consists of two steps:
//
// 1. `var newstream = this._newStream(id);`
// * creates the new stream and registers it in `this._streamsIds`
// 2. `this._activateStream(newstream);`
// * adds it to `_streamPriorities` (in the appropriate position)
// * transforms 'readable' events on the stream to 'stream_readable' events on the connection
Connection.prototype._newStream = function _newStream(id) {
this._log.trace({ id: id }, 'Adding new stream.');
var stream = new Stream(this._log.child({ stream: id }));
this._log.trace({ id: id }, 'Adding new stream.');
this.streams[id] = stream;
this._streamsIds[id] = stream;
return stream;
};
Connection.prototype._activateStream = function _activateStream(stream) {
this._log.trace({ id: this.getIdOf(stream) }, 'Activating stream.');
this._streamPriorities.push(stream);
stream.upstream.on('readable', this.emit.bind(this, 'stream_readable'));
};
// Creating an *inbound* stream with the given ID. It is called when there's an incoming frame to
// a previously nonexistent stream.
//
// * creating and activating the stream
// * emitting 'stream' event with the new stream
Connection.prototype._incomingStream = function _incomingStream(id) {
this._log.debug({ id: id }, 'New incoming stream.');
var stream = this._newStream(id);
this._activateStream(stream);
this.emit('stream', stream, id);
return stream;
};
// Creating an *outbound* stream with the next available ID
Connection.prototype.createStream = function createStream() {
var id = this._next_stream_id;
this._next_stream_id += 2;
return this._newStream(id);
// * Allocating a new ID with the appropriate parity.
var id = this._nextStreamId;
this._nextStreamId += 2;
this._log.trace({ id: id }, 'Creating new outbound stream.');
// * Creating a new Stream.
var stream = this._newStream(id);
// * Activating the created stream is only possible when there's enough space in the stream pool.
// `tryToActivate` tries to activate the stream until it finally succeeds.
var self = this;
function tryToActivate() {
if (self._streamCount >= self._streamLimit) {
self.once('stream_slot_change', tryToActivate);
} else {
self._activateStream(stream);
}
}
// * Starting activation process when
// * it becomes 'active' (tries to send a frame)
// * and if it is a promised stream, the PUSH_PROMISE is sent
var promisePending = false;
stream.once('promise_initiated', function() {
promisePending = true;
stream.once('promise_sent', function() {
promisePending = false;
});
});
stream.once('active', function() {
if (promisePending) {
stream.once('promise_sent', tryToActivate);
} else {
tryToActivate();
}
});
// * When the stream becomes inactive, decreasing the `_streamCount`
stream.once('inactive', this._decreaseStreamCount.bind(this));
return stream;
};

@@ -98,17 +214,26 @@

Connection.prototype._read = function _read() { // TODO: prioritization
// The `_read` method is a [virtual method of the Duplex class][1] that has to be implemented by
// child classes. It reads frames from streams and pushes them to the output buffer.
// [1]: http://nodejs.org/api/stream.html#stream_readable_read_size
Connection.prototype._read = function _read() {
this._log.trace('Starting forwarding frames from streams.');
var more_needed = true, stream, frame;
for (var id = 0; id < this.streams.length && more_needed; id++) {
stream = this.streams[id];
if (stream) {
while (frame = stream.upstream.read()) {
frame.stream = id;
more_needed = this._send(frame);
// * Looping through the active streams in priority order, forwarding until:
var moreNeeded = true, stream, id, frame;
for (var i = 0; i < this._streamPriorities.length && moreNeeded; i++) {
stream = this._streamPriorities[i];
id = this.getIdOf(stream);
while (frame = stream.upstream.read()) {
frame.stream = id;
if (frame.type === 'PUSH_PROMISE') {
frame.promised_stream.emit('promise_sent');
frame.promised_stream = this.getIdOf(frame.promised_stream);
}
moreNeeded = this._send(frame);
}
}
if (more_needed === true) {
// * there are no more frames in the buffers of the streams, but more would be needed
// * coming back once a stream becomes readable again
if (i === this._streamPriorities.length) {
this._log.trace('More chunk is needed, but we could not provide more.');

@@ -118,8 +243,12 @@ this.once('stream_readable', this._read.bind(this));

else if (more_needed === null) {
this._log.trace('We could not send more because of insufficient flow control window.'); // TODO: push back frame
// * it's not possible to send more because of flow control
// * coming back once flow control window is updated
else if (moreNeeded === null) {
this._log.trace('We could not send more because of insufficient flow control window.');
this.once('window_update', this._read.bind(this));
}
else {
// * no more chunk needed
// * coming back only when `_read` is called again by Duplex
else if (moreNeeded === false) {
this._log.trace('No more chunk needed, stopping forwarding.');

@@ -129,13 +258,23 @@ }

// The `_write` method is another [virtual method of the Duplex class][1] that has to be implemented
// by child classes. It forwards the given frame to the appropriate stream:
// [1]: http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback
Connection.prototype._write = function write(frame, encoding, done) {
var stream = this.streams[frame.stream];
// * gets the appropriate stream from the stream registry
var stream = this._streamsIds[frame.stream];
// * or creates one if it's not in `this.streams`
if (!stream) {
stream = this._newStream(frame.stream);
this.emit('incoming_stream', stream);
this._log.debug({ id: frame.stream }, 'New incoming stream.');
stream = this._incomingStream(frame.stream);
}
// * in case of PUSH_PROMISE, replaces the promised stream id with a new incoming stream
if (frame.type === 'PUSH_PROMISE') {
frame.promised_stream = this._incomingStream(frame.promised_stream);
}
// * tells the world that there's an incoming frame
this.emit('receiving', frame);
// * and writes it to the `stream`'s `upstream`
stream.upstream.write(frame);

@@ -149,28 +288,49 @@

// Settings management initialization:
Connection.prototype._initializeSettingsManagement = function _initializeSettingsManagement(settings) {
this._settings = settings;
// * Sending the initial settings.
this._log.info('Sending the first SETTINGS frame as part of the connection header.');
this._control.push({
type: 'SETTINGS',
settings: this._settings
});
assert('SETTINGS_MAX_CONCURRENT_STREAMS' in settings);
assert('SETTINGS_INITIAL_WINDOW_SIZE' in settings);
this.set(settings);
// * Checking that the first frame the other endpoint sends is SETTINGS
this.once('receiving', function(frame) {
if (frame.stream === 0 && frame.type === 'SETTINGS') {
if ((frame.stream === 0) && (frame.type === 'SETTINGS')) {
this._log.info('Receiving the first SETTINGS frame as part of the connection header.');
} else {
this.reset();
this._log.fatal({ frame: frame }, 'Invalid connection header: first frame is not SETTINGS.');
this.emit('error');
}
});
// * Forwarding SETTINGS frames to the `_receiveSettings` method
this._control.on('SETTINGS', this._receiveSettings.bind(this));
};
// Handling of incoming SETTINGS frames.
Connection.prototype._receiveSettings = function _receiveSettings(frame) {
for (var name in frame.settings) {
this._control.emit(name, frame.settings[name]);
}
};
// Changing one or more settings value and sending out a SETTINGS frame
Connection.prototype.set = function set(settings) {
this._control.push({
type: 'SETTINGS',
settings: settings
});
};
// Lifecycle management
// --------------------
// The main responsibilities of lifecycle management code:
//
// * keeping the connection alive by
// * sending PINGs when the connection is idle
// * answering PINGs
// * ending the connection
Connection.prototype._initializeLifecycleManagement = function _initializeLifecycleManagement() {

@@ -180,4 +340,9 @@ this._pings = {};

this._control.on('GOAWAY', this._receiveGoaway.bind(this));
this._lastIncomingStream = 0;
this.on('stream', function(stream, id) {
this._lastIncomingStream = id;
}.bind(this));
};
// Generating a string of length 16 with random hexadecimal digits
Connection.prototype._generatePingId = function _generatePingId() {

@@ -189,6 +354,7 @@ do {

}
} while(!(id in this._pings));
} while(id in this._pings);
return id;
};
// Sending a ping and calling `callback` when the answer arrives
Connection.prototype.ping = function ping(callback) {

@@ -199,3 +365,3 @@ var id = this._generatePingId();

this._log.debug({ data: data }, 'Sending PING.')
this._log.debug({ data: data }, 'Sending PING.');
this._control.push({

@@ -210,2 +376,3 @@ type: 'PING',

// Answering pings
Connection.prototype._receivePing = function _receivePing(frame) {

@@ -219,7 +386,7 @@ if (frame.flags.PONG) {

} else {
this._log.warning({ data: frame.data }, 'Unsolicited PING answer.');
this._log.warn({ data: frame.data }, 'Unsolicited PING answer.');
}
} else {
this._log.debug({ data: frame.data }, 'Answering PING.')
this._log.debug({ data: frame.data }, 'Answering PING.');
this._control.push({

@@ -235,6 +402,14 @@ type: 'PING',

Connection.prototype.reset = function reset() {
// Terminating the connection
Connection.prototype.close = function close(error) {
this.push({
type: 'GOAWAY',
last_stream: this._lastIncomingStream,
error: error || 'NO_ERROR'
});
this.push(null);
};
Connection.prototype._receiveGoaway = function _receiveGoaway(frame) {
this.push(null);
};

@@ -246,3 +421,3 @@

Connection.prototype._initializeFlowControl = function _initializeFlowControl() {
// Turning off flow control for incoming frames (not yet supported):
// Turning off flow control for incoming frames (not yet supported).
this._control.push({

@@ -299,3 +474,3 @@ type: 'WINDOW_UPDATE',

Connection.prototype._send = function _send(frame) {
if (frame && frame.type === 'DATA') {
if (frame && (frame.type === 'DATA')) {
if (frame.data.length > this._window) {

@@ -309,1 +484,19 @@ return null;

};
// TODO list
// ---------
//
// * Stream management
// * check if the stream initiated by the peer has a stream id with appropriate parity
// * check for invalid frame types on the control stream
// * _activateStream:
// * respect priority when inserting
// * Multiplexing
// * prioritization
// * if we are on the flow control limit, it's still possible to send non-DATA frames
// * Settings management
// * storing and broadcasting the incoming settings
// * Lifecycle management
// * implementing connection tear down procedure
// * Flow control
// * setting the initial window size of streams (based on SETTINGS_INITIAL_WINDOW_SIZE)

@@ -36,2 +36,5 @@ var logging = require('./logging');

// * Initialization of management code.
this._initializeManagement();
// * Initializing error handling.

@@ -140,8 +143,8 @@ this._initializeErrorHandling();

Endpoint.prototype._read = function _read(size) {
var more_needed = true, chunk;
while (more_needed && (chunk = this._serializer.read(size))) {
more_needed = this.push(chunk);
var moreNeeded = true, chunk;
while (moreNeeded && (chunk = this._serializer.read(size))) {
moreNeeded = this.push(chunk);
}
if (more_needed) {
if (moreNeeded) {
this._serializer.once('readable', this._read.bind(this));

@@ -155,2 +158,13 @@ }

// Management
// --------------
Endpoint.prototype._initializeManagement = function _initializeManagement() {
this._connection.on('stream', this.emit.bind(this, 'stream'));
};
Endpoint.prototype.createStream = function createStream() {
return this._connection.createStream();
};
// Error handling

@@ -167,6 +181,9 @@ // --------------

Endpoint.prototype._error = function _error(component, message) {
this._log.fatal({ component: component, message: message }, 'Fatal error, closing connection');
this.push(null);
this.emit('error', component, message);
Endpoint.prototype._error = function _error(component, error) {
this._log.fatal({ component: component, message: error }, 'Fatal error, closing connection');
this.close(error);
};
Endpoint.prototype.close = function close(error) {
this._connection.close(error);
};

@@ -5,2 +5,3 @@ // The framer consists of two [Transform Stream][1] subclasses that operate in [object mode][2]:

// [2]: http://nodejs.org/api/stream.html#stream_new_stream_readable_options
var assert = process.env.HTTP2_ASSERT ? require('assert') : function noop() {};
var logging = require('./logging');

@@ -37,5 +38,3 @@

if (!(frame.type in Serializer)) {
throw new Error('Unknown frame type: ' + frame.type);
}
assert(frame.type in Serializer, 'Unknown frame type: ' + frame.type);

@@ -73,3 +72,3 @@ var buffers = [];

// The Deserializer is stateful, and it's two main alternating states are: *waiting for header* and
// *waiting for payload*. The state is stored in the boolean property `_waiting_for_header`.
// *waiting for payload*. The state is stored in the boolean property `_waitingForHeader`.
//

@@ -81,4 +80,4 @@ // When entering a new state, a `_buffer` is created that will hold the accumulated data (header or

this._buffer = new Buffer(size);
this._waiting_for_header = !this._waiting_for_header;
if (this._waiting_for_header) {
this._waitingForHeader = !this._waitingForHeader;
if (this._waitingForHeader) {
this._frame = {};

@@ -98,6 +97,6 @@ }

// chunk, then only a part of it is copied.
var to_copy = Math.min(chunk.length - cursor, this._buffer.length - this._cursor);
chunk.copy(this._buffer, this._cursor, cursor, cursor + to_copy);
this._cursor += to_copy;
cursor += to_copy;
var toCopy = Math.min(chunk.length - cursor, this._buffer.length - this._cursor);
chunk.copy(this._buffer, this._cursor, cursor, cursor + toCopy);
this._cursor += toCopy;
cursor += toCopy;

@@ -109,3 +108,3 @@ // When `_buffer` is full, it's content gets parsed either as header or payload depending on

// deserializer waits for the specified length payload.
if (this._cursor === this._buffer.length && this._waiting_for_header) {
if ((this._cursor === this._buffer.length) && this._waitingForHeader) {
Deserializer.commonHeader(this._buffer, this._frame);

@@ -120,3 +119,3 @@ this._next(this._frame.length);

// will also run.
if (this._cursor === this._buffer.length && !this._waiting_for_header) {
if ((this._cursor === this._buffer.length) && !this._waitingForHeader) {
if (this._frame.type) {

@@ -189,12 +188,12 @@ try {

var frame_types = [];
var frameTypes = [];
var frame_flags = {};
var frameFlags = {};
var generic_attributes = ['length', 'type', 'flags', 'stream'];
var genericAttributes = ['length', 'type', 'flags', 'stream'];
var type_specific_attributes = {};
var typeSpecificAttributes = {};
Serializer.commonHeader = function writeCommonHeader(frame, buffers) {
var header_buffer = new Buffer(COMMON_HEADER_SIZE);
var headerBuffer = new Buffer(COMMON_HEADER_SIZE);

@@ -205,28 +204,22 @@ var size = 0;

}
if (size > MAX_PAYLOAD_SIZE) {
throw new Error('Too large frame: ' + size + ' bytes');
}
header_buffer.writeUInt16BE(size, 0);
assert(size < MAX_PAYLOAD_SIZE, 'Too large frame: ' + size + ' bytes');
headerBuffer.writeUInt16BE(size, 0);
var type_id = frame_types.indexOf(frame.type); // If we are here then the type is valid for sure
header_buffer.writeUInt8(type_id, 2);
var typeId = frameTypes.indexOf(frame.type); // If we are here then the type is valid for sure
headerBuffer.writeUInt8(typeId, 2);
var flag_byte = 0;
var flagByte = 0;
for (var flag in frame.flags) {
var position = frame_flags[frame.type].indexOf(flag);
if (position === -1) {
throw new Error('Unknown flag for frame type ' + frame.type + ': ' + flag);
}
var position = frameFlags[frame.type].indexOf(flag);
assert(position !== -1, 'Unknown flag for frame type ' + frame.type + ': ' + flag);
if (frame.flags[flag]) {
flag_byte |= (1 << position);
flagByte |= (1 << position);
}
}
header_buffer.writeUInt8(flag_byte, 3);
headerBuffer.writeUInt8(flagByte, 3);
if (frame.stream > 0x7fffffff) {
throw new Error('Too large stream ID: ' + frame.stream);
}
header_buffer.writeUInt32BE(frame.stream || 0, 4);
assert(frame.stream < 0x7fffffff, 'Too large stream ID: ' + frame.stream);
headerBuffer.writeUInt32BE(frame.stream || 0, 4);
buffers.unshift(header_buffer);
buffers.unshift(headerBuffer);
};

@@ -237,9 +230,9 @@

frame.type = frame_types[buffer.readUInt8(2)];
frame.type = frameTypes[buffer.readUInt8(2)];
frame.flags = {};
var flag_byte = buffer.readUInt8(3);
var defined_flags = frame_flags[frame.type];
for (var i = 0; i < defined_flags.length; i++) {
frame.flags[defined_flags[i]] = Boolean(flag_byte & (1 << i));
var flagByte = buffer.readUInt8(3);
var definedFlags = frameFlags[frame.type];
for (var i = 0; i < definedFlags.length; i++) {
frame.flags[definedFlags[i]] = Boolean(flagByte & (1 << i));
}

@@ -255,5 +248,5 @@

//
// * `frame_types`: a register of frame type codes (used by `commonHeader()`)
// * `frame_flags`: a register of valid flags for frame types (used by `commonHeader()`)
// * `type_specific_attributes`: a register of frame specific frame object attributes (used by
// * `frameTypes`: a register of frame type codes (used by `commonHeader()`)
// * `frameFlags`: a register of valid flags for frame types (used by `commonHeader()`)
// * `typeSpecificAttributes`: a register of frame specific frame object attributes (used by
// logging code and also serves as documentation for frame objects)

@@ -275,7 +268,7 @@

frame_types[0x0] = 'DATA';
frameTypes[0x0] = 'DATA';
frame_flags.DATA = ['END_STREAM', 'RESERVED'];
frameFlags.DATA = ['END_STREAM', 'RESERVED'];
type_specific_attributes.DATA = ['data'];
typeSpecificAttributes.DATA = ['data'];

@@ -309,7 +302,7 @@ Serializer.DATA = function writeData(frame, buffers) {

frame_types[0x1] = 'HEADERS';
frameTypes[0x1] = 'HEADERS';
frame_flags.HEADERS = ['END_STREAM', 'RESERVED', 'END_HEADERS', 'PRIORITY'];
frameFlags.HEADERS = ['END_STREAM', 'RESERVED', 'END_HEADERS', 'PRIORITY'];
type_specific_attributes.HEADERS = ['priority', 'headers', 'data'];
typeSpecificAttributes.HEADERS = ['priority', 'headers', 'data'];

@@ -351,7 +344,7 @@ // 0 1 2 3

frame_types[0x2] = 'PRIORITY';
frameTypes[0x2] = 'PRIORITY';
frame_flags.PRIORITY = [];
frameFlags.PRIORITY = [];
type_specific_attributes.PRIORITY = ['priority'];
typeSpecificAttributes.PRIORITY = ['priority'];

@@ -383,7 +376,7 @@ // 0 1 2 3

frame_types[0x3] = 'RST_STREAM';
frameTypes[0x3] = 'RST_STREAM';
frame_flags.RST_STREAM = [];
frameFlags.RST_STREAM = [];
type_specific_attributes.RST_STREAM = ['error'];
typeSpecificAttributes.RST_STREAM = ['error'];

@@ -401,3 +394,3 @@ // 0 1 2 3

var buffer = new Buffer(4);
buffer.writeUInt32BE(error_codes.indexOf(frame.error), 0);
buffer.writeUInt32BE(errorCodes.indexOf(frame.error), 0);
buffers.push(buffer);

@@ -407,3 +400,3 @@ };

Deserializer.RST_STREAM = function readRstStream(buffer, frame) {
frame.error = error_codes[buffer.readUInt32BE(0)];
frame.error = errorCodes[buffer.readUInt32BE(0)];
};

@@ -419,7 +412,7 @@

frame_types[0x4] = 'SETTINGS';
frameTypes[0x4] = 'SETTINGS';
frame_flags.SETTINGS = [];
frameFlags.SETTINGS = [];
type_specific_attributes.SETTINGS = ['settings'];
typeSpecificAttributes.SETTINGS = ['settings'];

@@ -445,6 +438,6 @@ // The payload of a SETTINGS frame consists of zero or more settings. Each setting consists of an

Serializer.SETTINGS = function writeSettings(frame, buffers) {
var settings = [], settings_left = Object.keys(frame.settings);
defined_settings.forEach(function(setting, id) {
var settings = [], settingsLeft = Object.keys(frame.settings);
definedSettings.forEach(function(setting, id) {
if (setting.name in frame.settings) {
settings_left.splice(settings_left.indexOf(setting.name), 1);
settingsLeft.splice(settingsLeft.indexOf(setting.name), 1);
var value = frame.settings[setting.name];

@@ -454,5 +447,3 @@ settings.push({ id: id, value: setting.flag ? Boolean(value) : value });

});
if (settings_left.length !== 0) {
throw new Error('Unknown settings: ' + settings_left.join(', '));
}
assert(settingsLeft.length === 0, 'Unknown settings: ' + settingsLeft.join(', '));

@@ -476,5 +467,5 @@ var buffer = new Buffer(settings.length * 8);

var id = buffer.readUInt32BE(i*8) & 0xffffff;
var setting = defined_settings[id];
var setting = definedSettings[id];
var value = buffer.readUInt32BE(i*8 + 4);
if (!setting || setting.name in frame.settings) {
if (!setting || (setting.name in frame.settings)) {
continue;

@@ -484,16 +475,14 @@ }

}
return frame;
};
// The following settings are defined:
var defined_settings = [];
var definedSettings = [];
// * SETTINGS_MAX_CONCURRENT_STREAMS (4):
// indicates the maximum number of concurrent streams that the sender will allow.
defined_settings[4] = { name: 'SETTINGS_MAX_CONCURRENT_STREAMS', flag: false };
definedSettings[4] = { name: 'SETTINGS_MAX_CONCURRENT_STREAMS', flag: false };
// * SETTINGS_INITIAL_WINDOW_SIZE (7):
// indicates the sender's initial stream window size (in bytes) for new streams.
defined_settings[7] = { name: 'SETTINGS_INITIAL_WINDOW_SIZE', flag: false };
definedSettings[7] = { name: 'SETTINGS_INITIAL_WINDOW_SIZE', flag: false };

@@ -504,3 +493,3 @@ // * SETTINGS_FLOW_CONTROL_OPTIONS (10):

// bits are reserved.
defined_settings[10] = { name: 'SETTINGS_FLOW_CONTROL_OPTIONS', flag: true };
definedSettings[10] = { name: 'SETTINGS_FLOW_CONTROL_OPTIONS', flag: true };

@@ -519,7 +508,7 @@ // [PUSH_PROMISE](http://http2.github.io/http2-spec/#PUSH_PROMISE)

frame_types[0x5] = 'PUSH_PROMISE';
frameTypes[0x5] = 'PUSH_PROMISE';
frame_flags.PUSH_PROMISE = ['END_PUSH_PROMISE'];
frameFlags.PUSH_PROMISE = ['END_PUSH_PROMISE'];
type_specific_attributes.PUSH_PROMISE = ['promised_stream', 'headers', 'data'];
typeSpecificAttributes.PUSH_PROMISE = ['promised_stream', 'headers', 'data'];

@@ -561,7 +550,7 @@ // 0 1 2 3

frame_types[0x6] = 'PING';
frameTypes[0x6] = 'PING';
frame_flags.PING = ['PONG'];
frameFlags.PING = ['PONG'];
type_specific_attributes.PING = ['data'];
typeSpecificAttributes.PING = ['data'];

@@ -571,5 +560,3 @@ // In addition to the frame header, PING frames MUST contain 8 additional octets of opaque data.

Serializer.PING = function writePing(frame, buffers) {
if (!frame.data || frame.data.length !== 8) {
throw new Error('PING frames must carry an 8 byte payload.');
}
assert(('data' in frame) && (frame.data.length === 8), 'PING frames must carry an 8 byte payload.');
buffers.push(frame.data);

@@ -592,7 +579,7 @@ };

frame_types[0x7] = 'GOAWAY';
frameTypes[0x7] = 'GOAWAY';
frame_flags.GOAWAY = [];
frameFlags.GOAWAY = [];
type_specific_attributes.GOAWAY = ['last_stream', 'error'];
typeSpecificAttributes.GOAWAY = ['last_stream', 'error'];

@@ -617,3 +604,3 @@ // 0 1 2 3

buffer.writeUInt32BE(frame.last_stream & 0x7fffffff, 0);
buffer.writeUInt32BE(error_codes.indexOf(frame.error), 4);
buffer.writeUInt32BE(errorCodes.indexOf(frame.error), 4);
buffers.push(buffer);

@@ -624,3 +611,3 @@ };

frame.last_stream = buffer.readUInt32BE(0) & 0x7fffffff;
frame.error = error_codes[buffer.readUInt32BE(4)];
frame.error = errorCodes[buffer.readUInt32BE(4)];
};

@@ -639,7 +626,7 @@

frame_types[0x9] = 'WINDOW_UPDATE';
frameTypes[0x9] = 'WINDOW_UPDATE';
frame_flags.WINDOW_UPDATE = ['END_FLOW_CONTROL'];
frameFlags.WINDOW_UPDATE = ['END_FLOW_CONTROL'];
type_specific_attributes.WINDOW_UPDATE = ['window_size'];
typeSpecificAttributes.WINDOW_UPDATE = ['window_size'];

@@ -664,3 +651,3 @@ // The payload of a WINDOW_UPDATE frame is a 32-bit value indicating the additional number of bytes

var error_codes = [
var errorCodes = [
'NO_ERROR',

@@ -687,16 +674,16 @@ 'PROTOCOL_ERROR',

logging.serializers.frame = function(frame) {
var log_entry = {};
generic_attributes.concat(type_specific_attributes[frame.type]).forEach(function(name) {
log_entry[name] = frame[name];
var logEntry = {};
genericAttributes.concat(typeSpecificAttributes[frame.type]).forEach(function(name) {
logEntry[name] = frame[name];
});
if (frame.data instanceof Buffer) {
log_entry.data = frame.data.toString('hex');
logEntry.data = frame.data.toString('hex');
}
log_entry.flags = Object.keys(frame.flags || {}).filter(function(name) {
logEntry.flags = Object.keys(frame.flags || {}).filter(function(name) {
return frame.flags[name] === true;
});
return log_entry;
return logEntry;
};

@@ -703,0 +690,0 @@

@@ -1,4 +0,13 @@

// This is the main API that can be used to create HTTP/2 server that does not run on top of TLS.
var tls = require('tls');
var net = require('net');
var EventEmitter = require('events').EventEmitter;
var PassThrough = require('stream').PassThrough;
var Endpoint = require('./endpoint').Endpoint;
// This is the main API that can be used to create HTTP/2 servers.
var http2 = exports;
// The implemented draft is [http2-04](http://tools.ietf.org/html/draft-ietf-httpbis-http2-04).
var implementedVersion = 'HTTP-draft-04/2.0';
// The main governing power behind the http2 API design is that it should look very similar to the

@@ -9,2 +18,3 @@ // existing node.js [HTTP](http://nodejs.org/api/http.html)/[HTTPS](http://nodejs.org/api/https.html)

var http = require('http');
var https = require('https');

@@ -15,3 +25,6 @@ http2.STATUS_CODES = http.STATUS_CODES;

// using the options configuration object in client and server APIs.
var default_settings = {};
var default_settings = {
SETTINGS_MAX_CONCURRENT_STREAMS: 100,
SETTINGS_INITIAL_WINDOW_SIZE: 100000
};

@@ -23,25 +36,134 @@ // Server

// override the default settings.
http2.createServer = function createServer(options, requestListener) {
http2.createServer = createServer;
http2.Server = Server;
function createServer(options, requestListener) {
if (typeof options === 'function') {
requestListener = options;
options = undefined;
}
var server = new Server(options);
if (requestListener) {
server.on('request', requestListener);
}
return server;
}
function Server(options) {
options = options || {};
this._settings = options.settings;
// HTTP2 over TLS (using NPN instean of ALPN)
if ((options.key && options.cert) || options.pfx) {
options.NPNProtocols = [implementedVersion, 'http/1.1', 'http/1.0'];
this._server = https.createServer(options);
this._originalSocketListeners = this._server.listeners('secureConnection');
this._server.removeAllListeners('secureConnection');
this._server.on('secureConnection', this._onSecureConnection.bind(this));
this._server.on('request', this.emit.bind(this, 'request'));
}
// HTTP2 over plain TCP
else if (options.plain) {
this._server = net.createServer(this._start.bind(this));
}
// HTTP/2 with HTTP/1.1 upgrade
else {
throw new Error('HTTP1.1 -> HTTP2 upgrade is not yet supported. Please provide TLS keys.');
}
}
Server.prototype = Object.create(EventEmitter.prototype, { constructor: { value: Server } });
Server.prototype._onSecureConnection = function _onSecureConnection(socket) {
// Upgrading only if the NPN negotiation was successful
if (socket.npnProtocol === implementedVersion) {
this._start(socket);
}
// Fallback to https
else {
for (var i = 0; i < this._originalSocketListeners.length; i++) {
this._originalSocketListeners[i].call(this._server, socket);
}
}
};
// Starting HTTP/2
Server.prototype._start = function _start(socket) {
var endpoint = new Endpoint('SERVER', this._settings || default_settings);
endpoint.pipe(socket).pipe(endpoint);
endpoint.on('stream', this._onStream.bind(this));
};
Server.prototype._onStream = function _onStream(stream) {
var request = new IncomingMessage(stream);
var response = new ServerResponse(stream);
request.once('ready', this.emit.bind(this, 'request', request, response));
};
Server.prototype.listen = function listen(port) {
this._server.listen(port);
};
Server.prototype.close = function close() {
this._server.close();
};
// Client
// ------
// Implementation hints:
//
// Settings encoding:
// ```
// var buffers = [], Serializer = require('./framer').Serializer;
// Serializer.SETTINGS({ settings: { k1: v1, k2: v2 } }, buffers);
// var result = buffers[0];
// ```
//
// Once handshake is complete and switching to HTTP2 mode
//
// * create a `require('./connection').Connection` object, with
// * `'CLIENT'` as `role`
// * the used TCP stream as `socket`
// * the used settings as `settings`
// * the initial `req` and `res` object as `initialRequest` and `initialResponse`
http2.request = function request(options, callback) {
var request = new ClientRequest();
if (callback) {
request.on('response', callback);
}
var tls_options = {
host: options.hostname || options.host,
port: options.port || 80,
NPNProtocols: [implementedVersion, 'http/1.1', 'http/1.0']
};
var options_to_forward = [
'pfx',
'key',
'passphrase',
'cert',
'ca',
'ciphers',
'rejectUnauthorized',
'secureProtocol'
];
for (var i = 0; i < options_to_forward.length; i++) {
var key = options_to_forward[i];
if (key in options) {
tls_options[key] = options[key];
}
}
var socket = tls.connect(tls_options, function() {
// HTTP2 is supported!
if (socket.npnProtocol === implementedVersion) {
var endpoint = new Endpoint('CLIENT', options._settings || default_settings);
endpoint.pipe(socket).pipe(endpoint);
request._start(endpoint.createStream(), options);
}
// Fallback
else {
socket.end();
request._fallback(https.request(options));
}
});
return request;
};

@@ -52,2 +174,94 @@

// Common IncomingMessage class
// ----------------------------
function IncomingMessage(stream) {
PassThrough.call(this);
this._stream = stream;
this.httpVersion = '2.0';
this.httpVersionMajor = 2;
this.httpVersionMinor = 0;
stream.pipe(this);
stream.once('headers', this._onHeaders.bind(this));
}
IncomingMessage.prototype = Object.create(PassThrough.prototype, { constructor: { value: IncomingMessage } });
IncomingMessage.prototype._onHeaders = function _onHeaders(headers) {
this.statusCode = headers[':status'];
this.method = headers[':method'];
this.url = headers[':path'];
this.headers = headers;
headers.host = headers[':host'];
delete headers[':scheme'];
delete headers[':method'];
delete headers[':host'];
delete headers[':path'];
this.emit('ready');
};
// ServerResponse
// --------------
function ServerResponse(stream) {
PassThrough.call(this);
this._stream = stream;
this.pipe(stream);
}
ServerResponse.prototype = Object.create(PassThrough.prototype, { constructor: { value: ServerResponse } });
ServerResponse.prototype.writeHead = function writeHead(statusCode, reasonPhrase, headers) {
if (!headers) {
headers = reasonPhrase;
}
headers = headers || {};
headers[':status'] = statusCode;
this._stream.headers(headers);
};
// ClientRequest
// -------------
function ClientRequest() {
PassThrough.call(this);
this._stream = undefined;
this._request = undefined;
}
ClientRequest.prototype = Object.create(PassThrough.prototype, { constructor: { value: ClientRequest } });
ClientRequest.prototype._start = function _start(stream, options) {
var headers = {};
for (var key in options.headers) {
headers[key] = options.headers[key];
}
delete headers.host;
headers[':scheme'] = 'https';
headers[':method'] = options.method;
headers[':host'] = options.hostname || options.host;
headers[':path'] = options.url;
this._stream = stream;
stream.headers(headers);
this.pipe(stream);
var response = new IncomingMessage(stream);
response.once('ready', this.emit.bind(this, 'response', response));
};
ClientRequest.prototype._fallback = function _fallback(request) {
this._request = request;
this.pipe(request);
};
// Agent

@@ -54,0 +268,0 @@ // -----

// [node-http2](https://github.com/molnarg/node-http2) consists of the following components:
var http2 = exports;
// * [http.js](http.html): public node-http2 API
var http2 = require('./http');
module.exports = http2;
// * [utils.js](utils.html): common utility functions, like concatenating buffers

@@ -27,7 +30,1 @@ http2.utils = require('./utils');

http2.endpoint = require('./endpoint');
// * [http.js](http.html): public node-http2 API for unencrypted connections
http2.http = require('./http');
// * [https.js](https.html): public node-http2 API for SSL/TLS encrypted connections
http2.https = require('./https');

@@ -0,1 +1,2 @@

var assert = require('assert');
var utils = require('./utils');

@@ -45,3 +46,3 @@ var logging = require('./logging');

if (frame.type === 'PUSH_PROMISE') {
this.emit('promise', frame.headers);
this.emit('promise', frame.headers, frame.promised_stream);
} else if (frame.type === 'HEADERS') {

@@ -60,5 +61,7 @@ this.priority = frame.priority;

// state transitions here.
Stream.prototype.promise = function promise(headers) {
Stream.prototype.promise = function promise(stream, headers) {
stream.emit('promise_initiated');
this._send({
type: 'PUSH_PROMISE',
promised_stream: stream,
headers: headers

@@ -68,3 +71,3 @@ });

Stream.prototype.open = function open(headers, priority) {
Stream.prototype.headers = function headers(headers, priority) {
this._send({

@@ -93,3 +96,3 @@ type: 'HEADERS',

Stream.prototype._initializeUpstream = function _initializeUpstream() {
this._flush_timer = undefined;
this._flushTimer = undefined;
this.on('finish', this._finishing.bind(this));

@@ -115,4 +118,4 @@

this.upstream._queue.push(frame);
if (!this._flush_timer) {
this._flush_timer = setImmediate(this._flush.bind(this));
if (!this._flushTimer) {
this._flushTimer = setImmediate(this._flush.bind(this));
}

@@ -128,3 +131,3 @@ };

}
this._flush_timer = undefined;
this._flushTimer = undefined;
};

@@ -137,3 +140,3 @@

// all the time, but putting the flag on an existing frame is a nice optimization.
var empty_buffer = new Buffer(0);
var emptyBuffer = new Buffer(0);
Stream.prototype._finishing = function _finishing() {

@@ -145,7 +148,7 @@ var length = this.upstream._queue.length;

flags: { END_STREAM: true },
data: empty_buffer
data: emptyBuffer
});
} else {
var last_frame = this.upstream._queue[length - 1];
last_frame.flags.END_STREAM = true;
var lastFrame = this.upstream._queue[length - 1];
lastFrame.flags.END_STREAM = true;
}

@@ -191,8 +194,19 @@ };

// Only `_setState` should change `this.state` directly. It also logs the state change and notifies
// interested parties using the 'state' event.
// interested parties using the 'state', 'active' and 'inactive' event.
var ACTIVE_STATES = ['HALF_CLOSED_LOCAL', 'HALF_CLOSED_REMOTE', 'OPEN'];
Stream.prototype._setState = function transition(state) {
if (this.state !== state) {
this._log.debug({ from: this.state, to: state }, 'State transition');
var wasActive = (ACTIVE_STATES.indexOf(this.state) !== -1);
var isActive = (ACTIVE_STATES.indexOf(state) !== -1);
this.state = state;
this.emit('state', state);
this.state = state;
if (!wasActive && isActive) {
this.emit('active');
} else if (wasActive && !isActive) {
this.emit('inactive');
}
}

@@ -212,6 +226,2 @@ };

// * Sending or receiving a HEADERS frame causes the stream to become "open".
// * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state
// for the reserved stream transitions to "reserved (local)".
// * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer.
// The state of the stream becomes "reserved (remote)".
//

@@ -225,4 +235,2 @@ // When the HEADERS frame contains the END_STREAM flags, then two state transitions happen.

}
} else if (frame.type === 'PUSH_PROMISE') {
this._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE');
} else { // TODO: Not well defined. https://github.com/http2/http2-spec/issues/165

@@ -242,5 +250,5 @@ error = 'PROTOCOL_ERROR';

case 'RESERVED_LOCAL':
if (sending && frame.type === 'HEADERS') {
if (sending && (frame.type === 'HEADERS')) {
this._setState('HALF_CLOSED_REMOTE');
} else if (sending && frame.type === 'RST_STREAM') {
} else if (sending && (frame.type === 'RST_STREAM')) {
this._setState('CLOSED');

@@ -261,3 +269,3 @@ } else { // TODO: Not well defined. https://github.com/http2/http2-spec/issues/165

this._setState('CLOSED');
} else if (receiving && frame.type === 'HEADERS') {
} else if (receiving && (frame.type === 'HEADERS')) {
this._setState('HALF_CLOSED_LOCAL');

@@ -291,3 +299,3 @@ } else {

case 'HALF_CLOSED_LOCAL':
if (frame.type === 'RST_STREAM' || (receiving && frame.flags.END_STREAM)) {
if ((frame.type === 'RST_STREAM') || (receiving && frame.flags.END_STREAM)) {
this._setState('CLOSED');

@@ -308,3 +316,3 @@ } else if (sending) {

case 'HALF_CLOSED_REMOTE':
if (frame.type === 'RST_STREAM' || (sending && frame.flags.END_STREAM)) {
if ((frame.type === 'RST_STREAM') || (sending && frame.flags.END_STREAM)) {
this._setState('CLOSED');

@@ -331,5 +339,5 @@ } else if (receiving) {

case 'CLOSED':
if (receiving && frame.type === 'PUSH_PROMISE') {
if (receiving && (frame.type === 'PUSH_PROMISE')) {
this._setState('RESERVED_REMOTE');
} else if (!(sending && frame.type === 'RST_STREAM')) {
} else if (!(sending && (frame.type === 'RST_STREAM'))) {
error = 'PROTOCOL_ERROR';

@@ -340,2 +348,13 @@ } // TODO: act based on the reason for termination.

// Sending/receiving a PUSH_PROMISE
//
// * Sending a PUSH_PROMISE frame marks the associated stream for later use. The stream state
// for the reserved stream transitions to "reserved (local)".
// * Receiving a PUSH_PROMISE frame marks the associated stream as reserved by the remote peer.
// The state of the stream becomes "reserved (remote)".
if (!error && (frame.type === 'PUSH_PROMISE')) {
assert(frame.promised_stream.state === 'IDLE');
frame.promised_stream._setState(sending ? 'RESERVED_LOCAL' : 'RESERVED_REMOTE');
}
// Common error handling.

@@ -423,3 +442,3 @@ if (error) {

// `window` is not enough to send a chunk
while (chunks.length > 0 && chunks[0].length <= this._window) {
while ((chunks.length > 0) && (chunks[0].length <= this._window)) {
var chunk = chunks.shift();

@@ -426,0 +445,0 @@ sent += chunk.length;

@@ -21,5 +21,5 @@ // Concatenate an array of buffers into a new buffer

do {
var chunk_size = Math.min(size, buffer.length - cursor);
chunks.push(buffer.slice(cursor, cursor + chunk_size));
cursor += chunk_size;
var chunkSize = Math.min(size, buffer.length - cursor);
chunks.push(buffer.slice(cursor, cursor + chunkSize));
cursor += chunkSize;
} while(cursor < buffer.length);

@@ -30,3 +30,3 @@ return chunks;

// Shallow copy inspired by underscore's [clone](http://underscorejs.org/#clone)
exports.shallow_copy = function shallow_copy(object) {
exports.shallowCopy = function shallowCopy(object) {
var clone = {};

@@ -33,0 +33,0 @@ for (var key in object) {

{
"name": "http2",
"version": "0.0.6",
"version": "0.1.0",
"description": "An HTTP/2 server implementation",

@@ -5,0 +5,0 @@ "main": "lib/index.js",

@@ -11,4 +11,5 @@ node-http2

I post weekly status updates [on my blog][2]. Short version: an example server and client can be
run. A more node-like API, more documentation and tests are coming soon.
I post weekly status updates [on my blog][2]. Short version: the first version of the public API is
in place. NPN negotiation works (no ALPN or Upgrade mechanism yet). Main missing items will be
tracked in the issue tracker.

@@ -29,3 +30,6 @@ [2]: http://gabor.molnar.es/blog/categories/google-summer-of-code/

API documentation is coming later, when the public API becomes usable.
The API is very similar to the [standard node.js HTTPS API](http://nodejs.org/api/https.html). The
goal is the perfect API compatibility, with additional HTTP2 related extensions (like server push).
Currently, basic operations work, server push is not yet exposed to the public API. See the examples
for more info.

@@ -35,7 +39,39 @@ Examples

Using as a server:
```javascript
var http2 = require('http2');
var options = {
key: fs.readFileSync('./example/localhost.key'),
cert: fs.readFileSync('./example/localhost.crt')
};
http2.http.createServer(options, function(request, response) {
response.end('Hello world!');
}).listen(8080);
```
Using as a client:
```javascript
var http2 = require('http2');
var request = http2.request({
method: 'get',
host: 'gabor.molnar.es',
port: 8080,
url: '/',
rejectUnauthorized: false
});
request.end();
request.on('response', function(response) {
response.pipe(process.stdout);
});
```
An example server (serving up static files from its own directory) and client are available in the
example directory.
example directory. Running the server:
Running the server:
```bash

@@ -46,4 +82,4 @@ $ node ./example/server.js

Downloading the server's source code from the server (the downloaded content gets pumped out to the
standard error output):
An example client is also available. Downloading the server's source code from the server (the
downloaded content gets pumped out to the standard error output):

@@ -81,2 +117,3 @@ ```bash

before releasing a new version. To regenerate them manually, run `npm run-script prepublish`.
There's a hosted version which is located [here](http://molnarg.github.io/node-http2/doc/).

@@ -88,2 +125,4 @@ ### Running the tests ###

### Test coverage ###
To generate a code coverage report, run `npm test --coverage`. Code coverage summary as of version

@@ -98,2 +137,6 @@ 0.0.6:

There's a hosted version of the detailed (line-by-line) coverage report [here][8].
[8]: http://molnarg.github.io/node-http2/coverage/lcov-report/lib/
### Logging ###

@@ -108,5 +151,11 @@

```
HTTP2_LOG=debug node ./example/client.js 'http://localhost:8080/server.js' 2>/tmp/server.js | bunyan -o short
HTTP2_LOG=debug node ./example/client.js 'http://localhost:8080/server.js' 2>/tmp/server.js | bunyan
```
Contributors
------------
* Nick Hurley
* Mike Belshe
License

@@ -113,0 +162,0 @@ -------

@@ -6,2 +6,7 @@ var expect = require('chai').expect;

var settings = {
SETTINGS_MAX_CONCURRENT_STREAMS: 100,
SETTINGS_INITIAL_WINDOW_SIZE: 100000
};
describe('connection.js', function() {

@@ -11,4 +16,4 @@ describe('scenario', function() {

it('should work as expected', function(done) {
var c = new Connection(1, {});
var s = new Connection(2, {});
var c = new Connection(1, settings);
var s = new Connection(2, settings);

@@ -25,4 +30,4 @@ c.pipe(s).pipe(c);

it('should work as expected', function(done) {
var c = new Connection(1, {}, log_root.child({ role: 'client' }));
var s = new Connection(2, {}, log_root.child({ role: 'server' }));
var c = new Connection(1, settings, log_root.child({ role: 'client' }));
var s = new Connection(2, settings, log_root.child({ role: 'server' }));

@@ -43,6 +48,6 @@ c.pipe(s).pipe(c);

// Setting up server
s.on('incoming_stream', function(server_stream) {
s.on('stream', function(server_stream) {
server_stream.on('headers', function(headers) {
expect(headers).to.deep.equal(request_headers);
server_stream.open(response_headers);
server_stream.headers(response_headers);
server_stream.end(response_data);

@@ -54,3 +59,3 @@ });

var client_stream = c.createStream();
client_stream.open(request_headers);
client_stream.headers(request_headers);
client_stream.end(request_data);

@@ -76,3 +81,23 @@

});
describe('ping test', function() {
it('client ping', function(done) {
var c = new Connection(1, settings, log_root.child({ role: 'client' }));
var s = new Connection(2, settings, log_root.child({ role: 'server' }));
c.pipe(s).pipe(c);
c.ping(function(id) {
done();
});
});
it('server ping', function(done) {
var c = new Connection(1, settings, log_root.child({ role: 'client' }));
var s = new Connection(2, settings, log_root.child({ role: 'server' }));
c.pipe(s).pipe(c);
s.ping(function(id) {
done();
});
});
});
});
});

@@ -7,2 +7,7 @@ var expect = require('chai').expect;

var settings = {
SETTINGS_MAX_CONCURRENT_STREAMS: 100,
SETTINGS_INITIAL_WINDOW_SIZE: 100000
};
describe('endpoint.js', function() {

@@ -12,4 +17,4 @@ describe('scenario', function() {

it('should work as expected', function(done) {
var c = new Endpoint('CLIENT', {}, log_root.child({ role: 'client' }));
var s = new Endpoint('SERVER', {}, log_root.child({ role: 'server' }));
var c = new Endpoint('CLIENT', settings, log_root.child({ role: 'client' }));
var s = new Endpoint('SERVER', settings, log_root.child({ role: 'server' }));

@@ -16,0 +21,0 @@ log_root.debug('Test initialization over, starting piping.');

@@ -5,5 +5,20 @@ var expect = require('chai').expect;

function callNTimes(limit, done) {
var i = 0;
return function() {
i += 1;
if (i === limit) {
done();
}
}
}
// Execute a list of commands and assertions
function execute_sequence(sequence, done) {
var stream = new Stream();
var recorded_events = ['state', 'error', 'window_update', 'headers', 'promise']
function execute_sequence(stream, sequence, done) {
if (!done) {
done = sequence;
sequence = stream;
stream = new Stream();
}

@@ -15,3 +30,3 @@ var outgoing_frames = [];

stream.emit = function(name, data) {
if (name === 'state' || name === 'error' || name === 'window_update') {
if (recorded_events.indexOf(name) !== -1) {
events.push({ name: name, data: data });

@@ -66,3 +81,3 @@ }

execute(check);
setImmediate(execute.bind(null, check));
}

@@ -74,3 +89,4 @@

{ type: 'PRIORITY', flags: {}, priority: 1 },
{ type: 'WINDOW_UPDATE', flags: {}, settings: {} }
{ type: 'WINDOW_UPDATE', flags: {}, settings: {} },
{ type: 'PUSH_PROMISE', flags: {}, headers: {} }
],

@@ -135,7 +151,10 @@ RESERVED_LOCAL: [

execute_sequence([
{ method : { name: 'open', arguments: [{ ':path': '/' }] } },
{ method : { name: 'headers', arguments: [{ ':path': '/' }] } },
{ outgoing: { type: 'HEADERS', flags: { }, headers: { ':path': '/' }, priority: undefined } },
{ event : { name: 'state', data: 'OPEN' } },
{ wait : 5 },
{ method : { name: 'end', arguments: [] } },
{ outgoing: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' }, priority: undefined } },
{ event : { name: 'state', data: 'OPEN' } },
{ event : { name: 'state', data: 'HALF_CLOSED_LOCAL' } },
{ outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(0) } },

@@ -145,2 +164,3 @@ { wait : 10 },

{ incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: new Buffer(5) } },
{ event : { name: 'headers', data: { ':status': 200 } } },
{ event : { name: 'state', data: 'CLOSED' } }

@@ -155,2 +175,3 @@ ], done);

{ incoming: { type: 'HEADERS', flags: { }, headers: { ':path': '/' } } },
{ event : { name: 'headers', data: { ':path': '/' } } },
{ event : { name: 'state', data: 'OPEN' } },

@@ -164,3 +185,3 @@

{ wait : 5 },
{ method : { name: 'open', arguments: [{ ':status': 200 }] } },
{ method : { name: 'headers', arguments: [{ ':status': 200 }] } },
{ outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': 200 }, priority: undefined } },

@@ -178,11 +199,40 @@

var payload = new Buffer(5);
execute_sequence([
{ method : { name: 'promise', arguments: [{ ':path': '/' }] } },
{ outgoing: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/' } } },
var original_stream = new Stream();
var promised_stream = new Stream();
done = callNTimes(2, done);
execute_sequence(original_stream, [
// receiving request
{ incoming: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' } } },
{ event : { name: 'headers', data: { ':path': '/' } } },
{ event : { name: 'state', data: 'OPEN' } },
{ event : { name: 'state', data: 'HALF_CLOSED_REMOTE' } },
// sending response headers
{ wait : 5 },
{ method : { name: 'headers', arguments: [{ ':status': '200' }] } },
{ outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' }, priority: undefined } },
// sending push promise
{ method : { name: 'promise', arguments: [promised_stream, { ':path': '/' }] } },
{ outgoing: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/' }, promised_stream: promised_stream } },
// sending response data
{ method : { name: 'end', arguments: [payload] } },
{ outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state', data: 'CLOSED' } }
], done);
execute_sequence(promised_stream, [
// initial state of the promised stream
{ event : { name: 'state', data: 'RESERVED_LOCAL' } },
{ method : { name: 'open', arguments: [{ ':status': '200' }] } },
// push headers
{ wait : 5 },
{ method : { name: 'headers', arguments: [{ ':status': '200' }] } },
{ outgoing: { type: 'HEADERS', flags: { }, headers: { ':status': '200' }, priority: undefined } },
{ event : { name: 'state', data: 'HALF_CLOSED_REMOTE' } },
// push data
{ method : { name: 'end', arguments: [payload] } },

@@ -196,12 +246,42 @@ { outgoing: { type: 'DATA', flags: { END_STREAM: true }, data: payload } },

it('should trigger the appropriate state transitions and outgoing frames', function(done) {
execute_sequence([
{ incoming: { type: 'PUSH_PROMISE', flags: { END_STREAM: false }, headers: { ':path': '/' } } },
var payload = new Buffer(5);
var original_stream = new Stream();
var promised_stream = new Stream();
done = callNTimes(2, done);
execute_sequence(original_stream, [
// sending request headers
{ method : { name: 'headers', arguments: [{ ':path': '/' }] } },
{ method : { name: 'end', arguments: [] } },
{ outgoing: { type: 'HEADERS', flags: { END_STREAM: true }, headers: { ':path': '/' }, priority: undefined } },
{ event : { name: 'state', data: 'OPEN' } },
{ event : { name: 'state', data: 'HALF_CLOSED_LOCAL' } },
// receiving response headers
{ wait : 10 },
{ incoming: { type: 'HEADERS', flags: { }, headers: { ':status': 200 } } },
{ event : { name: 'headers', data: { ':status': 200 } } },
// receiving push promise
{ incoming: { type: 'PUSH_PROMISE', flags: { }, headers: { ':path': '/2.html' }, promised_stream: promised_stream } },
{ event : { name: 'promise', data: { ':path': '/2.html' } } },
// receiving response data
{ incoming: { type: 'DATA' , flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state', data: 'CLOSED' } }
], done);
execute_sequence(promised_stream, [
// initial state of the promised stream
{ event : { name: 'state', data: 'RESERVED_REMOTE' } },
// push headers
{ wait : 10 },
{ incoming: { type: 'HEADERS', flags: { END_STREAM: false }, headers: { ':status': 200 } } },
{ event : { name: 'headers', data: { ':status': 200 } } },
{ event : { name: 'state', data: 'HALF_CLOSED_LOCAL' } },
{ wait : 10 },
{ incoming: { type: 'DATA', flags: { END_STREAM: true }, data: new Buffer(5) } },
// push data
{ incoming: { type: 'DATA', flags: { END_STREAM: true }, data: payload } },
{ event : { name: 'state', data: 'CLOSED' } }

@@ -208,0 +288,0 @@ ], done);

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Packages

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc