Socket
Socket
Sign inDemoInstall

utp

Package Overview
Dependencies
1
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 0.0.1 to 0.0.3

.npmignore

225

index.js

@@ -11,10 +11,21 @@ var dgram = require('dgram');

var UINT16 = 0xffff;
var ID_MASK = 0xf << 4;
var ST_MASK = 0xf << 4;
var ST_DATA = 0 << 4;
var ST_FIN = 1 << 4;
var ST_STATE = 2 << 4;
var ST_RESET = 3 << 4;
var ST_SYN = 4 << 4;
var PACKET_DATA = 0 << 4;
var PACKET_FIN = 1 << 4;
var PACKET_STATE = 2 << 4;
var PACKET_RESET = 3 << 4;
var PACKET_SYN = 4 << 4;
var CONNECTING = 1;
var CONNECTED = 2;
var HALF_OPEN = 3;
var CLOSED = 4;
var MIN_PACKET_SIZE = 20;
var DEFAULT_WINDOW_SIZE = 1 << 18;
var CLOSE_GRACE = 5000;
var BUFFER_MAX = 512;
var uint32 = function(n) {

@@ -24,2 +35,6 @@ return n >>> 0;

var uint16 = function(n) {
return n & UINT16;
};
var timestamp = function() {

@@ -37,5 +52,5 @@ var offset = process.hrtime();

var packet = {};
packet.id = buffer[0] & ST_MASK;
packet.id = buffer[0] & ID_MASK;
packet.connection = buffer.readUInt16BE(2);
packet.time = buffer.readUInt32BE(4);
packet.timestamp = buffer.readUInt32BE(4);
packet.timediff = buffer.readUInt32BE(8);

@@ -54,3 +69,3 @@ packet.window = buffer.readUInt32BE(12);

buffer.writeUInt16BE(packet.connection, 2);
buffer.writeUInt32BE(timestamp(), 4);
buffer.writeUInt32BE(packet.timestamp, 4);
buffer.writeUInt32BE(packet.timediff, 8);

@@ -72,5 +87,5 @@ buffer.writeUInt32BE(packet.window, 12);

this._outgoing = cyclist(64);
this._incoming = cyclist(64);
this._inflight = 0;
this._outgoing = cyclist(BUFFER_MAX); // TODO: set to 2 and and grow if needed until BUFFER_MAX
this._incoming = cyclist(BUFFER_MAX);
this._inflightPackets = 0;
this._inflightTimeout = 500000;

@@ -82,7 +97,6 @@ this._stack = [];

this.ack = syn.seq;
this.recvId = syn.connection + 1;
this.recvId = uint16(syn.connection + 1);
this.sendId = syn.connection;
this.readyState = CONNECTED;
this._write = this._writeData;
this._recvPacket = this._onconnected;
this._sendAck();

@@ -94,15 +108,34 @@ } else {

this.sendId = 0; // tmp value for v8
this.readyState = CONNECTING;
this._write = this._writeBuffer;
this._recvPacket = this._onsynsent;
socket.bind(); // we are iniating this connection so we own the socket
socket.bind(); // we are iniating this connection since we own the socket
socket.on('listening', function() {
self.recvId = socket.address().port; // using the port gives us system wide clash protection
self.sendId = self.recvId + 1;
self._sendPacket(ST_SYN, self.recvId, null);
self.sendId = uint16(self.recvId + 1);
self._sendPacket(PACKET_SYN, self.recvId, null);
});
socket.on('error', function(err) {
self.emit('error', err);
});
}
setInterval(this._checkTimeout.bind(this), 500);
var resend = setInterval(this._checkTimeout.bind(this), 500);
var tick = 0;
var closed = function() {
if (++tick !== 2) return;
if (!syn) setTimeout(socket.close.bind(socket), CLOSE_GRACE);
clearInterval(resend);
self.readyState = CLOSED;
self.emit('close');
};
this.on('finish', function() {
self._sendFin(function() {
process.nextTick(closed);
});
});
this.on('end', function() {
process.nextTick(closed);
});
};

@@ -112,29 +145,24 @@

// stream interface
Connection.prototype._read = noop;
Connection.prototype._writeBuffer = function(data, enc, callback) {
this._stack.push(arguments);
Connection.prototype.destroy = function() {
this.end();
};
Connection.prototype._writeData = function(data, enc, callback) { // TODO: check size against MTU
this._sendPacket(ST_DATA, this.sendId, data, callback);
Connection.prototype.address = function() {
return {port:this.port, address:this.host};
};
// utp stuff
Connection.prototype._read = noop;
Connection.prototype._recvAck = function(seq) { // when we receive an ack
var prevAcked = this.seq - this._inflight - 1; // last packet that was acked
var acks = seq - prevAcked;
Connection.prototype._write = function(data, enc, callback) { // TODO: check size against MTU
if (this.readyState === CONNECTING) return this._stack.push(this._write.bind(this, data, enc, callback));
this._sendPacket(PACKET_DATA, this.sendId, data, callback);
};
for (var i = 0; i < acks; i++) {
this._inflight--;
var packet = this._outgoing.del(prevAcked+i+1);
if (packet && packet.callback) packet.callback();
}
Connection.prototype._sendFin = function(callback) {
if (this.readyState === CONNECTING) return this._stack.push(this._sendFin.bind(this, callback));
this._sendPacket(PACKET_FIN, this.sendId, null, callback);
};
Connection.prototype._sendAck = function() {
this._send(this._packet(ST_STATE, this.sendId, null, null));
this._send(this._packet(PACKET_STATE, this.sendId, null, null));
};

@@ -150,3 +178,3 @@

var message = packetToBuffer(packet);
this._inflight++;
this._inflightPackets++;
this.socket.send(message, 0, message.length, this.port, this.host);

@@ -156,12 +184,19 @@ };

Connection.prototype._checkTimeout = function() {
for (var i = 0; i < this._inflight; i++) {
for (var i = 0; i < this._inflightPackets; i++) {
var packet = this._outgoing.get(this.seq - i - 1);
if (!packet) continue;
var now = timestamp();
if (uint32(now - packet.timesent) < this._inflightTimeout) continue;
packet.timesent = now;
if (uint32(now - packet.sent) < this._inflightTimeout) continue;
packet.sent = now;
this._send(packet);
this.emit('packetresend', packet);
}
};
console.error('[DEBUG] resend '+packet.seq+' '+packet.id);
Connection.prototype._duplicate = function(packet) {
if (uint16(packet.seq - this.ack) >= BUFFER_MAX) {
this._sendAck(); // this is an duplicate packet - lets just ack it
return true;
}
return false;
};

@@ -171,12 +206,14 @@

var now = timestamp();
var seq = this.seq;
this.seq = uint16(this.seq+1);
return {
id: id,
connection: connection,
timestamp: now,
timediff: 0,
timestamp: now,
timesent: now,
window: 242424,
seq: this.seq++,
window: DEFAULT_WINDOW_SIZE,
seq: seq,
ack: this.ack,
data: data,
sent: now,
callback: callback

@@ -186,29 +223,56 @@ };

// connection state handlers
Connection.prototype._recvAck = function(seq) { // when we receive an ack
var prevAcked = uint16(this.seq - this._inflightPackets - 1); // last packet that was acked
var acks = uint16(seq - prevAcked); // amount of acks we just recv
if (acks >= BUFFER_MAX) return; // sanity check
Connection.prototype._onsynsent = function(packet) { // when we receive a packet
if (packet.id !== ST_STATE) return this._incoming.put(packet.seq, packet);
this.ack = packet.seq;
this._seqAcked = packet.ack-1;
this._write = this._writeData;
this._recvPacket = this._onconnected;
this._recvAck(packet.ack);
this.emit('connect');
while (this._stack.length) this._writeData.apply(this, this._stack.shift());
packet = this._incoming.del(this.ack+1);
if (packet) this._recvPacket(packet);
for (var i = 0; i < acks; i++) {
this._inflightPackets--;
var packet = this._outgoing.del(prevAcked+i+1);
if (packet && packet.callback) packet.callback();
}
};
Connection.prototype._onconnected = function(packet) {
if (packet.seq <= this.ack) return this._sendAck();
Connection.prototype._recvPacket = function(packet) { // when we receive a packet
if (this.readyState === CLOSED) return;
if (this.readyState === CONNECTING && packet.id === PACKET_STATE) { // first ack -> we are connected
this.emit('connect');
this.ack = packet.seq;
this.readyState = CONNECTED;
this._recvAck(packet.ack);
while (this._stack.length) this._stack.shift()();
packet = this._incoming.del(this.ack+1);
if (!packet) return;
}
// reorder this packet. TODO: move all this into a ._reorder handler to avoid state check spamming
if (this.readyState !== CONNECTING && this._duplicate(packet)) return;
this._incoming.put(packet.seq, packet);
if (this.readyState === CONNECTING) return; // still waiting for the ack
var shouldAck = false;
while (packet = this._incoming.del(this.ack+1)) {
this.ack++;
this.ack = uint16(this.ack+1);
if (packet.id === ST_DATA) {
shouldAck = true;
if (this.readyState !== CONNECTED) { // not connected -> handle everything as PACKET_STATE packets
this._recvAck(packet.ack);
continue;
}
if (packet.id === PACKET_DATA) {
this.push(packet.data);
}
if (packet.id === PACKET_FIN) {
this.readyState = HALF_OPEN;
this.push(null);
}
if (packet.id === PACKET_RESET) {
this.readyState = CLOSED;
this.push(null);
this.end();
}
shouldAck = shouldAck || packet.id !== PACKET_STATE;
this._recvAck(packet.ack);

@@ -235,4 +299,4 @@ }

socket = dgram.createSocket('udp4');
socket.bind(port)
return this.listen(socket);
socket.bind(port);
return this.listen(socket, onlistening);
}

@@ -248,19 +312,25 @@

});
socket.on('error', function(err) {
self.emit('error', err);
});
socket.on('message', function(message, rinfo) {
if (message.length < MIN_PACKET_SIZE) return;
var packet = bufferToPacket(message);
var prefix = rinfo.address+':';
var conn = connections[prefix+packet.connection];
var connection = connections[rinfo.address+':'+packet.connection];
if (conn) {
conn.port = rinfo.port; // do know if port can change when behind routers - investigate
conn._recvPacket(packet);
if (connection) {
connection.port = rinfo.port; // do know if port can change when behind routers - investigate
connection._recvPacket(packet);
return;
}
if (packet.id !== ST_SYN) return;
if (packet.id !== PACKET_SYN) return;
conn = new Connection(rinfo.port, rinfo.address, socket, packet);
connections[prefix+conn.recvId] = conn;
self.emit('connection', conn);
connection = new Connection(rinfo.port, rinfo.address, socket, packet);
connections[rinfo.address+':'+connection.recvId] = connection;
connection.on('close', function() {
delete connections[rinfo.address+':'+connection.recvId];
});
self.emit('connection', connection);
});

@@ -282,2 +352,3 @@

socket.on('message', function(message) {
if (message.length < MIN_PACKET_SIZE) return;
conn._recvPacket(bufferToPacket(message));

@@ -284,0 +355,0 @@ });

{
"name": "utp",
"version": "0.0.1",
"version": "0.0.3",
"repository": "git://github.com/mafintosh/utp",
"description": "utp (micro transport protocol) implementation in node",
"dependencies": {
"circular": "~0.1.0"
"cyclist": "~0.1.0"
},

@@ -18,3 +18,6 @@ "keywords": [

],
"author": "Mathias Buus Madsen <mathiasbuus@gmail.com>"
"author": "Mathias Buus Madsen <mathiasbuus@gmail.com>",
"scripts": {
"test": "node tests"
}
}

@@ -17,4 +17,3 @@ # utp

*This module is far from finished!! So beware of dragons*
*Things like closing a socket, backpressuare are not implemented yet!*
*This module is a work in progress! So beware of dragons!*

@@ -21,0 +20,0 @@ ## Usage

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc