Comparing version 0.0.1 to 0.0.3
225
index.js
@@ -11,10 +11,21 @@ var dgram = require('dgram'); | ||
var UINT16 = 0xffff; | ||
var ID_MASK = 0xf << 4; | ||
var ST_MASK = 0xf << 4; | ||
var ST_DATA = 0 << 4; | ||
var ST_FIN = 1 << 4; | ||
var ST_STATE = 2 << 4; | ||
var ST_RESET = 3 << 4; | ||
var ST_SYN = 4 << 4; | ||
var PACKET_DATA = 0 << 4; | ||
var PACKET_FIN = 1 << 4; | ||
var PACKET_STATE = 2 << 4; | ||
var PACKET_RESET = 3 << 4; | ||
var PACKET_SYN = 4 << 4; | ||
var CONNECTING = 1; | ||
var CONNECTED = 2; | ||
var HALF_OPEN = 3; | ||
var CLOSED = 4; | ||
var MIN_PACKET_SIZE = 20; | ||
var DEFAULT_WINDOW_SIZE = 1 << 18; | ||
var CLOSE_GRACE = 5000; | ||
var BUFFER_MAX = 512; | ||
var uint32 = function(n) { | ||
@@ -24,2 +35,6 @@ return n >>> 0; | ||
var uint16 = function(n) { | ||
return n & UINT16; | ||
}; | ||
var timestamp = function() { | ||
@@ -37,5 +52,5 @@ var offset = process.hrtime(); | ||
var packet = {}; | ||
packet.id = buffer[0] & ST_MASK; | ||
packet.id = buffer[0] & ID_MASK; | ||
packet.connection = buffer.readUInt16BE(2); | ||
packet.time = buffer.readUInt32BE(4); | ||
packet.timestamp = buffer.readUInt32BE(4); | ||
packet.timediff = buffer.readUInt32BE(8); | ||
@@ -54,3 +69,3 @@ packet.window = buffer.readUInt32BE(12); | ||
buffer.writeUInt16BE(packet.connection, 2); | ||
buffer.writeUInt32BE(timestamp(), 4); | ||
buffer.writeUInt32BE(packet.timestamp, 4); | ||
buffer.writeUInt32BE(packet.timediff, 8); | ||
@@ -72,5 +87,5 @@ buffer.writeUInt32BE(packet.window, 12); | ||
this._outgoing = cyclist(64); | ||
this._incoming = cyclist(64); | ||
this._inflight = 0; | ||
this._outgoing = cyclist(BUFFER_MAX); // TODO: set to 2 and and grow if needed until BUFFER_MAX | ||
this._incoming = cyclist(BUFFER_MAX); | ||
this._inflightPackets = 0; | ||
this._inflightTimeout = 500000; | ||
@@ -82,7 +97,6 @@ this._stack = []; | ||
this.ack = syn.seq; | ||
this.recvId = syn.connection + 1; | ||
this.recvId = uint16(syn.connection + 1); | ||
this.sendId = syn.connection; | ||
this.readyState = CONNECTED; | ||
this._write = this._writeData; | ||
this._recvPacket = this._onconnected; | ||
this._sendAck(); | ||
@@ -94,15 +108,34 @@ } else { | ||
this.sendId = 0; // tmp value for v8 | ||
this.readyState = CONNECTING; | ||
this._write = this._writeBuffer; | ||
this._recvPacket = this._onsynsent; | ||
socket.bind(); // we are iniating this connection so we own the socket | ||
socket.bind(); // we are iniating this connection since we own the socket | ||
socket.on('listening', function() { | ||
self.recvId = socket.address().port; // using the port gives us system wide clash protection | ||
self.sendId = self.recvId + 1; | ||
self._sendPacket(ST_SYN, self.recvId, null); | ||
self.sendId = uint16(self.recvId + 1); | ||
self._sendPacket(PACKET_SYN, self.recvId, null); | ||
}); | ||
socket.on('error', function(err) { | ||
self.emit('error', err); | ||
}); | ||
} | ||
setInterval(this._checkTimeout.bind(this), 500); | ||
var resend = setInterval(this._checkTimeout.bind(this), 500); | ||
var tick = 0; | ||
var closed = function() { | ||
if (++tick !== 2) return; | ||
if (!syn) setTimeout(socket.close.bind(socket), CLOSE_GRACE); | ||
clearInterval(resend); | ||
self.readyState = CLOSED; | ||
self.emit('close'); | ||
}; | ||
this.on('finish', function() { | ||
self._sendFin(function() { | ||
process.nextTick(closed); | ||
}); | ||
}); | ||
this.on('end', function() { | ||
process.nextTick(closed); | ||
}); | ||
}; | ||
@@ -112,29 +145,24 @@ | ||
// stream interface | ||
Connection.prototype._read = noop; | ||
Connection.prototype._writeBuffer = function(data, enc, callback) { | ||
this._stack.push(arguments); | ||
Connection.prototype.destroy = function() { | ||
this.end(); | ||
}; | ||
Connection.prototype._writeData = function(data, enc, callback) { // TODO: check size against MTU | ||
this._sendPacket(ST_DATA, this.sendId, data, callback); | ||
Connection.prototype.address = function() { | ||
return {port:this.port, address:this.host}; | ||
}; | ||
// utp stuff | ||
Connection.prototype._read = noop; | ||
Connection.prototype._recvAck = function(seq) { // when we receive an ack | ||
var prevAcked = this.seq - this._inflight - 1; // last packet that was acked | ||
var acks = seq - prevAcked; | ||
Connection.prototype._write = function(data, enc, callback) { // TODO: check size against MTU | ||
if (this.readyState === CONNECTING) return this._stack.push(this._write.bind(this, data, enc, callback)); | ||
this._sendPacket(PACKET_DATA, this.sendId, data, callback); | ||
}; | ||
for (var i = 0; i < acks; i++) { | ||
this._inflight--; | ||
var packet = this._outgoing.del(prevAcked+i+1); | ||
if (packet && packet.callback) packet.callback(); | ||
} | ||
Connection.prototype._sendFin = function(callback) { | ||
if (this.readyState === CONNECTING) return this._stack.push(this._sendFin.bind(this, callback)); | ||
this._sendPacket(PACKET_FIN, this.sendId, null, callback); | ||
}; | ||
Connection.prototype._sendAck = function() { | ||
this._send(this._packet(ST_STATE, this.sendId, null, null)); | ||
this._send(this._packet(PACKET_STATE, this.sendId, null, null)); | ||
}; | ||
@@ -150,3 +178,3 @@ | ||
var message = packetToBuffer(packet); | ||
this._inflight++; | ||
this._inflightPackets++; | ||
this.socket.send(message, 0, message.length, this.port, this.host); | ||
@@ -156,12 +184,19 @@ }; | ||
Connection.prototype._checkTimeout = function() { | ||
for (var i = 0; i < this._inflight; i++) { | ||
for (var i = 0; i < this._inflightPackets; i++) { | ||
var packet = this._outgoing.get(this.seq - i - 1); | ||
if (!packet) continue; | ||
var now = timestamp(); | ||
if (uint32(now - packet.timesent) < this._inflightTimeout) continue; | ||
packet.timesent = now; | ||
if (uint32(now - packet.sent) < this._inflightTimeout) continue; | ||
packet.sent = now; | ||
this._send(packet); | ||
this.emit('packetresend', packet); | ||
} | ||
}; | ||
console.error('[DEBUG] resend '+packet.seq+' '+packet.id); | ||
Connection.prototype._duplicate = function(packet) { | ||
if (uint16(packet.seq - this.ack) >= BUFFER_MAX) { | ||
this._sendAck(); // this is an duplicate packet - lets just ack it | ||
return true; | ||
} | ||
return false; | ||
}; | ||
@@ -171,12 +206,14 @@ | ||
var now = timestamp(); | ||
var seq = this.seq; | ||
this.seq = uint16(this.seq+1); | ||
return { | ||
id: id, | ||
connection: connection, | ||
timestamp: now, | ||
timediff: 0, | ||
timestamp: now, | ||
timesent: now, | ||
window: 242424, | ||
seq: this.seq++, | ||
window: DEFAULT_WINDOW_SIZE, | ||
seq: seq, | ||
ack: this.ack, | ||
data: data, | ||
sent: now, | ||
callback: callback | ||
@@ -186,29 +223,56 @@ }; | ||
// connection state handlers | ||
Connection.prototype._recvAck = function(seq) { // when we receive an ack | ||
var prevAcked = uint16(this.seq - this._inflightPackets - 1); // last packet that was acked | ||
var acks = uint16(seq - prevAcked); // amount of acks we just recv | ||
if (acks >= BUFFER_MAX) return; // sanity check | ||
Connection.prototype._onsynsent = function(packet) { // when we receive a packet | ||
if (packet.id !== ST_STATE) return this._incoming.put(packet.seq, packet); | ||
this.ack = packet.seq; | ||
this._seqAcked = packet.ack-1; | ||
this._write = this._writeData; | ||
this._recvPacket = this._onconnected; | ||
this._recvAck(packet.ack); | ||
this.emit('connect'); | ||
while (this._stack.length) this._writeData.apply(this, this._stack.shift()); | ||
packet = this._incoming.del(this.ack+1); | ||
if (packet) this._recvPacket(packet); | ||
for (var i = 0; i < acks; i++) { | ||
this._inflightPackets--; | ||
var packet = this._outgoing.del(prevAcked+i+1); | ||
if (packet && packet.callback) packet.callback(); | ||
} | ||
}; | ||
Connection.prototype._onconnected = function(packet) { | ||
if (packet.seq <= this.ack) return this._sendAck(); | ||
Connection.prototype._recvPacket = function(packet) { // when we receive a packet | ||
if (this.readyState === CLOSED) return; | ||
if (this.readyState === CONNECTING && packet.id === PACKET_STATE) { // first ack -> we are connected | ||
this.emit('connect'); | ||
this.ack = packet.seq; | ||
this.readyState = CONNECTED; | ||
this._recvAck(packet.ack); | ||
while (this._stack.length) this._stack.shift()(); | ||
packet = this._incoming.del(this.ack+1); | ||
if (!packet) return; | ||
} | ||
// reorder this packet. TODO: move all this into a ._reorder handler to avoid state check spamming | ||
if (this.readyState !== CONNECTING && this._duplicate(packet)) return; | ||
this._incoming.put(packet.seq, packet); | ||
if (this.readyState === CONNECTING) return; // still waiting for the ack | ||
var shouldAck = false; | ||
while (packet = this._incoming.del(this.ack+1)) { | ||
this.ack++; | ||
this.ack = uint16(this.ack+1); | ||
if (packet.id === ST_DATA) { | ||
shouldAck = true; | ||
if (this.readyState !== CONNECTED) { // not connected -> handle everything as PACKET_STATE packets | ||
this._recvAck(packet.ack); | ||
continue; | ||
} | ||
if (packet.id === PACKET_DATA) { | ||
this.push(packet.data); | ||
} | ||
if (packet.id === PACKET_FIN) { | ||
this.readyState = HALF_OPEN; | ||
this.push(null); | ||
} | ||
if (packet.id === PACKET_RESET) { | ||
this.readyState = CLOSED; | ||
this.push(null); | ||
this.end(); | ||
} | ||
shouldAck = shouldAck || packet.id !== PACKET_STATE; | ||
this._recvAck(packet.ack); | ||
@@ -235,4 +299,4 @@ } | ||
socket = dgram.createSocket('udp4'); | ||
socket.bind(port) | ||
return this.listen(socket); | ||
socket.bind(port); | ||
return this.listen(socket, onlistening); | ||
} | ||
@@ -248,19 +312,25 @@ | ||
}); | ||
socket.on('error', function(err) { | ||
self.emit('error', err); | ||
}); | ||
socket.on('message', function(message, rinfo) { | ||
if (message.length < MIN_PACKET_SIZE) return; | ||
var packet = bufferToPacket(message); | ||
var prefix = rinfo.address+':'; | ||
var conn = connections[prefix+packet.connection]; | ||
var connection = connections[rinfo.address+':'+packet.connection]; | ||
if (conn) { | ||
conn.port = rinfo.port; // do know if port can change when behind routers - investigate | ||
conn._recvPacket(packet); | ||
if (connection) { | ||
connection.port = rinfo.port; // do know if port can change when behind routers - investigate | ||
connection._recvPacket(packet); | ||
return; | ||
} | ||
if (packet.id !== ST_SYN) return; | ||
if (packet.id !== PACKET_SYN) return; | ||
conn = new Connection(rinfo.port, rinfo.address, socket, packet); | ||
connections[prefix+conn.recvId] = conn; | ||
self.emit('connection', conn); | ||
connection = new Connection(rinfo.port, rinfo.address, socket, packet); | ||
connections[rinfo.address+':'+connection.recvId] = connection; | ||
connection.on('close', function() { | ||
delete connections[rinfo.address+':'+connection.recvId]; | ||
}); | ||
self.emit('connection', connection); | ||
}); | ||
@@ -282,2 +352,3 @@ | ||
socket.on('message', function(message) { | ||
if (message.length < MIN_PACKET_SIZE) return; | ||
conn._recvPacket(bufferToPacket(message)); | ||
@@ -284,0 +355,0 @@ }); |
{ | ||
"name": "utp", | ||
"version": "0.0.1", | ||
"version": "0.0.3", | ||
"repository": "git://github.com/mafintosh/utp", | ||
"description": "utp (micro transport protocol) implementation in node", | ||
"dependencies": { | ||
"circular": "~0.1.0" | ||
"cyclist": "~0.1.0" | ||
}, | ||
@@ -18,3 +18,6 @@ "keywords": [ | ||
], | ||
"author": "Mathias Buus Madsen <mathiasbuus@gmail.com>" | ||
"author": "Mathias Buus Madsen <mathiasbuus@gmail.com>", | ||
"scripts": { | ||
"test": "node tests" | ||
} | ||
} |
@@ -17,4 +17,3 @@ # utp | ||
*This module is far from finished!! So beware of dragons* | ||
*Things like closing a socket, backpressuare are not implemented yet!* | ||
*This module is a work in progress! So beware of dragons!* | ||
@@ -21,0 +20,0 @@ ## Usage |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
No tests
QualityPackage does not have any tests. This is a strong signal of a poorly maintained or low quality package.
Found 1 instance in 1 package
14534
12
408
1
45
1
3
+ Addedcyclist@~0.1.0
+ Addedcyclist@0.1.1(transitive)
- Removedcircular@~0.1.0