Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

webbt

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

webbt - npm Package Compare versions

Comparing version 0.0.2 to 0.0.3

lib/torrent-piece.js

524

examples/test-torrent/main.js

@@ -8,23 +8,16 @@ const fs = require('fs');

// save torrent share info to file
function saveTorrentShareInfoToFile(torrentShareInfo, savePath, cb) {
const data = JSON.stringify(torrentShareInfo, null, 2);
fs.writeFile(savePath, data, cb);
}
const MAX_TIME = 60;
let torrentManager = null;
let torrent = null;
let downloadDir = '/tmp/';
let stopUpdateChart = false;
$(window.document).ready(function () {
const $peerId = $('#peer_id')
, $error = $('#error')
, $torrentShareInfo = $('#torrent-share-info')
, $fileInput = $('#file-input')
, $shareBtn = $('#share-btn')
, $torrentShareInfoInput = $('#torrent-info-input')
, $downloadBtn = $('#download-btn');
, $msg = $('#message')
, $shareDialogue = $('.share-dialogue')
, $downloadDialogue = $('.download-dialogue');
console.log('获取websocket token');
console.log('获取websocket token...');
innerServices.webbtConfigerAPI.get({}, res => {

@@ -35,11 +28,18 @@ torrentManager = new TorrentManager(res.data);

$peerId.text(peerId);
$msg.text('');
$shareDialogue.removeAttr('hidden');
$downloadDialogue.removeAttr('hidden');
});
torrentManager.on('error', err => {
$error.text(err);
$msg.text(err);
});
});
// 按钮事件-分享
// 选择一个本地文件分享
const $shareBtn = $shareDialogue.find('.share-btn');
$shareBtn.on('click', () => {
const files = $fileInput[0].files;
const $fileInput = $shareDialogue.find('.file-input')
, $torrentShareInfo = $shareDialogue.find('.torrent-share-info')
, files = $fileInput[0].files;
if (files.length === 0) {

@@ -49,3 +49,8 @@ alert('请选择一个有效的文件!');

}
let shareFile = files[0];
$downloadDialogue.remove();
$shareBtn.remove();
$fileInput.attr('disabled', true);
$msg.text('计算文件的 torrentShareInfo...');
TorrentShareInfo.generate(shareFile.path, (err, torrentShareInfo) => {

@@ -57,5 +62,2 @@ if (err) {

$torrentShareInfo.html(syntaxHighlight(torrentShareInfo));
$fileInput.attr('disabled', true);
$shareBtn.text('分享中...').attr('disabled', true);
const torrentFilePath = `/tmp/torrent-share-infos/${shareFile.name}.torrent-share-info`;

@@ -65,27 +67,17 @@ const filePath = shareFile.path;

if (err) {
$error.text(err.toString);
$msg.text(err.toString);
throw err;
}
try {
torrent = new Torrent(torrentFilePath, filePath);
} catch (err) {
$error.text(err.toString);
throw err;
}
torrent.once('error', err => {
$error.text(err.toString);
throw err;
});
torrent.once('done', () => {
$error.text('已完成,开始分享...');
});
torrentManager.addTorrent(torrent);
createTorrentDialogue(torrentFilePath, filePath);
});
});
});
// 按钮事件-下载
// 通过torrentShareInfo下载
const downloadDir = '/tmp/';
$downloadDialogue.find('.download-dir').text(downloadDir);
const $downloadBtn = $downloadDialogue.find('.download-btn');
$downloadBtn.on('click', () => {
const data = $torrentShareInfoInput.val();
const $torrentShareInfoInput = $downloadDialogue.find('.torrent-share-info')
, data = $torrentShareInfoInput.val();
let torrentShareInfo = null;

@@ -103,20 +95,466 @@ if (!data) {

filePath = path.join(downloadDir, torrentShareInfo.info.name);
$shareDialogue.remove();
$torrentShareInfoInput.attr('disabled', true);
$downloadBtn.text(`下载中(${filePath})...`).attr('disabled', true);
$downloadBtn.remove();
createTorrentDialogue(torrentShareInfo, filePath);
});
/**
* 保存 torrent share info 到文件中
*/
function saveTorrentShareInfoToFile(torrentShareInfo, savePath, cb) {
const data = JSON.stringify(torrentShareInfo, null, 2);
fs.writeFile(savePath, data, cb);
}
/**
* Torrent
*/
function createTorrentDialogue(torrentShareInfo, filePath) {
const $torrentDialogue = $('.torrent-dialogue')
, $pauseBtn = $torrentDialogue.find('.pause-btn')
, $stopBtn = $torrentDialogue.find('.stop-update-chart-btn')
try {
torrent = new Torrent(torrentShareInfo, filePath);
} catch (err) {
$error.text(err.toString);
$msg.text(err.toString);
throw err;
}
torrent.once('ready', () => {
drawUpAndDownloadChart(torrent);
drawpeersChart(torrent);
drawBitFieldChart(torrent);
$torrentDialogue.removeAttr('hidden');
});
torrent.once('error', err => {
$error.text(err.toString);
$msg.text(err.toString);
throw err;
});
torrent.once('done', () => {
$error.text('下载完成,开始分享...');
$msg.text('下载完成,开始分享...');
});
torrentManager.addTorrent(torrent);
// 按钮 暂停/开始 下载
$pauseBtn.on('click', () => {
if (torrent.isPaused) {
torrent.resume();
$pauseBtn.text('暂停下载');
} else {
torrent.pause();
$pauseBtn.text('继续下载');
}
});
// 按钮 停止/自动更新 chart
$stopBtn.on('click', () => {
if (stopUpdateChart) {
$stopBtn.text('停止刷新');
} else {
$stopBtn.text('自动刷新');
}
stopUpdateChart = !stopUpdateChart;
});
}
});
/**
* 绘制下载上传统计图
*/
function drawUpAndDownloadChart(torrent) {
const upAndDownloadChart = echarts.init(document.getElementById('up-and-down-chart'));
let downSpeeds = []
, uploadSpeeds = []
const option = {
title: {
text: '下载与上传线性图',
subtext: '',
x: 'center'
},
tooltip: {
trigger: 'item',
formatter: function (params, ticket, callback) {
const seriesName = params.seriesName;
return `${seriesName}: ${add_suffix(params.data)}/s`
}
},
legend: {
data: ['下载速度', '上传速度'],
x: 'right'
},
grid: {
left: '3%',
right: '4%',
bottom: '3%',
containLabel: true
},
toolbox: {
feature: {
saveAsImage: {}
}
},
xAxis: {
show: false,
type: 'category',
boundaryGap: false,
data: []
},
yAxis: {
name: 'byte/s',
type: 'value'
},
series: [{
name: '下载速度',
type: 'line',
stack: '总量',
itemStyle: {
normal: {
color: `#00FF00`,
lineStyle: {
color: `#00FF00`
}
}
},
data: []
},
{
name: '上传速度',
type: 'line',
stack: '总量',
itemStyle: {
normal: {
color: `#FFCC00`,
lineStyle: {
color: `#FFCC00`,
type: 'dotted'
}
}
},
data: []
}
]
};
upAndDownloadChart.setOption(option);
let interval = setInterval(() => {
const stats = torrent.getStats();
downSpeeds.push(stats.downloadSpeed);
uploadSpeeds.push(stats.uploadSpeed);
if (downSpeeds.length > MAX_TIME || uploadSpeeds.length > MAX_TIME) {
downSpeeds = downSpeeds.slice(downSpeeds.length - MAX_TIME);
uploadSpeeds = uploadSpeeds.slice(uploadSpeeds.length - MAX_TIME);
}
if (stopUpdateChart) return;
upAndDownloadChart.setOption({
title: {
subtext: `下载: ${add_suffix(stats.downloaded)}(${add_suffix(stats.downloadSpeed)}/s) 上传: ${add_suffix(stats.uploaded)}(${add_suffix(stats.uploadSpeed)}/s)`
},
series: [{
data: downSpeeds
}, {
data: uploadSpeeds
}
]
});
}, 1000);
torrent.once('close', () => {
clearInterval(interval);
});
});
torrent.once('ready', () => {
clearInterval(interval);
});
}
/**
* 绘制peers
*/
function drawpeersChart(torrent) {
let peersChart = echarts.init(document.getElementById('peers-chart'))
, peerStatsDict = {}
, timeStamps = []
const option = {
title: [{
left: 'center',
text: '远端节点'
}],
tooltip: {
trigger: 'item',
formatter: function (params, ticket, callback) {
const data = params.data,
seriesName = params.seriesName,
peerIdShort = seriesName.substr(0, 8) + '...' + seriesName.substr(seriesName.length - 4),
amChoking = data[2],
amInterested = data[3],
peerChoking = data[4],
peerInterested = data[5],
requestsCount = data[6],
peerRequestsCount = data[7],
uploaded = data[8],
downloaded = data[9],
uploadSpeed = data[10],
downloadSpeed = data[11]
const htmlStr = `<div>
<header>${seriesName}</header>
<div style="padding: 5px 10px">
<span>下载: ${add_suffix(downloaded)} 上传: ${add_suffix(uploaded)}</span><br>
<span>下载速度: ${add_suffix(downloadSpeed)}/s 上传速度: ${add_suffix(uploadSpeed)}/s</span><br>
<span>状态: ${amChoking ? 'C' : ' '} ${amInterested ? 'I' : ' '} ${peerChoking ? 'c' : ' '} ${peerInterested ? 'i': ' '}</span><br>
<span>本地请求缓存: ${requestsCount} 远端请求缓存: ${peerRequestsCount}</span><br>
</div>
</div>`;
return htmlStr;
}
},
xAxis: [{
type: 'time',
show: false,
data: [],
}, {
type: 'time',
show: false,
data: [],
gridIndex: 1
}],
yAxis: [{
name: '(从远端下载)byte/s',
}, {
name: '(上传到远端)byte/s',
inverse: true,
gridIndex: 1
}],
grid: [{
left: '3%',
right: '4%',
containLabel: true,
bottom: '50%'
}, {
left: '3%',
right: '4%',
containLabel: true,
top: '50%'
}],
series: []
};
peersChart.setOption(option);
let interval = setInterval(() => {
let curPeers = torrent._wires
, now = Date.now()
, minTimeStamp = null;
timeStamps.push(now);
if (timeStamps.length > MAX_TIME) {
timeStamps = timeStamps.slice(timeStamps.length - MAX_TIME);
}
minTimeStamp = timeStamps[0];
for (let peer of curPeers) {
const peerId = peer.peerId;
if (!(peerId in peerStatsDict)) peerStatsDict[peerId] = [];
peerStatsDict[peerId].push({
timeStamp: now,
amChoking: peer.amChoking,
amInterested: peer.amInterested,
peerChoking: peer.peerChoking,
peerInterested: peer.peerInterested,
requestsCount: peer.requests.length,
peerRequestsCount: peer.peerRequests.length,
uploaded: peer.uploaded,
downloaded: peer.downloaded,
uploadSpeed: peer.uploadSpeed(),
downloadSpeed: peer.downloadSpeed(),
});
}
const series = [];
for (let peerId in peerStatsDict) {
let peerStats = peerStatsDict[peerId]
, itemDown = {}
, itemUp = {}
, color = peerId.slice(34);
itemDown.name = itemUp.name = peerId;
itemDown.type = itemUp.type = 'line';
itemDown.itemStyle = {
normal: {
color: `#${color}`,
lineStyle: {
color: `#${color}`
}
}
};
itemUp.itemStyle = {
normal: {
color: `#${color}`,
lineStyle: {
color: `#${color}`,
type: 'dotted'
}
}
};
// itemDown.showSymbol = itemUp.showSymbol = false;
itemUp.xAxisIndex = 1;
itemUp.yAxisIndex = 1;
itemDown.data = [];
itemUp.data = [];
let index = -1;
for (let i = 0; i < peerStats.length; i ++) {
const d = peerStats[i];
if (d.timeStamp >= minTimeStamp) {
break;
}
index = i;
}
if (index !== -1) {
peerStats = peerStats.slice(index + 1);
peerStatsDict[peerId] = peerStats;
}
for (let i = 0; i < peerStats.length; i ++) {
const d = peerStats[i];
itemDown.data.push([d.timeStamp, d.downloadSpeed, d.amChoking, d.amInterested, d.peerChoking, d.peerInterested, d.requestsCount, d.peerRequestsCount, d.uploaded, d.downloaded, d.uploadSpeed, d.downloadSpeed]);
itemUp.data.push([d.timeStamp, d.uploadSpeed, d.amChoking, d.amInterested, d.peerChoking, d.peerInterested, d.requestsCount, d.peerRequestsCount, d.uploaded, d.downloaded, d.uploadSpeed, d.downloadSpeed])
}
series.push(itemDown);
series.push(itemUp);
}
if (stopUpdateChart) return;
peersChart.setOption({
title: {
subtext: `连接数: ${torrent._wires.length} 队列: ${torrent._queue.length}`
},
xAxis: [{ data: timeStamps }, { data: timeStamps, gridIndex: 1 }],
series: series
});
}, 1000);
torrent.once('close', () => {
clearInterval(interval);
});
torrent.once('ready', () => {
clearInterval(interval);
});
}
/**
* 绘制BitField 分段图
*/
function drawBitFieldChart(torrent) {
const bitfieldChart = echarts.init(document.getElementById('bitfield-chart'))
, data = bitfieldTo3dData(torrent.bitField, torrent.filePieceCount)
const option = {
title: {
text: 'BitField 分段图',
subtext: `共${torrent.filePieceCount}片 (每片${add_suffix(torrent.filePieceSize)}包含${torrent.filePieceSize/16384}块,每块16k)`,
x: 'center'
},
animation: false,
grid: {
//height: '50%',
y: '55',
y2: '40',
x: '20',
x2: '20'
},
xAxis: {
type: 'category',
data: [],
show: false,
},
yAxis: {
show: false,
inverse: true,
type: 'category',
data: [],
show: false,
},
visualMap: {
type: 'piecewise',
splitNumber: 4,
pieces: [{
gte: 0,
lte: 0,
color: 'rgba(211,211,211,0.7)'
},
{
gte: 1,
lte: 1,
color: 'rgba(30,144,255,0.7)',
},
{
gte: 2,
lte: 2,
color: 'rgba( 255,165,0,0.7)',
}
],
dimension: 2,
align: 'left',
show: true,
//calculable: true,
orient: 'horizontal', //'vertical' horizontal
left: 'right', //center
top: 'top',
bottom: '15%'
},
series: [{
name: 'channel',
type: 'heatmap',
data: data,
itemStyle: {
emphasis: {
shadowBlur: 10,
shadowColor: 'rgba(0, 0, 0, 0.5)'
}
}
}]
};
bitfieldChart.setOption(option);
torrent.on('piece-selected', pieceIndex => {
data[pieceIndex][2] = 2;
});
torrent.on('piece-finished', pieceIndex => {
data[pieceIndex][2] = 1;
});
let interval = setInterval(() => {
if (stopUpdateChart) return;
bitfieldChart.setOption({ series: [{ data: data }] });
}, 1000);
function bitfieldTo3dData(bitField, pieceCount) {
const colCount = rowCount = Math.ceil(Math.sqrt(pieceCount)),
data = [];
for (let row = 0; row < rowCount; row++) {
for (let col = 0; col < colCount; col++) {
const index = row * rowCount + col,
value = index < pieceCount ? (bitField.get(index) ? 1 : 0) : -1;
data.push([col, row, value, index]);
}
}
return data;
}
}
function add_suffix(val) {
const prefix = ['B', 'KB', 'MB', 'GB', 'TB'];
for (let i = 0; i < prefix.length; i++) {
if (Math.abs(val) < 1000) {
return val.toFixed(2) + prefix[i]
}
val = parseInt(val / 1000)
}
return val.toFixed(2) + 'PB'
}

@@ -38,2 +38,3 @@ 'use strict'

static generate(filePath, cb) {
logger.debug('generate torrent share info...');
const ShareInfo = {

@@ -62,2 +63,3 @@ info: {},

ShareInfo.infoHash = calculateInfoHash(ShareInfo.info);
logger.debug('finish.');
cb(null, ShareInfo);

@@ -64,0 +66,0 @@ });

@@ -14,2 +14,3 @@ 'use strict'

const TorrentShareInfo = require('./torrent-share-info').TorrentShareInfo;
const Piece = require('./torrent-piece').Piece;

@@ -19,2 +20,3 @@

const RECHOKE_INTERVAL = 10000; // re choke 时间间隔 10s
const CHOKE_TIMEOUT = 5000; // choke

@@ -26,4 +28,8 @@ const MAX_CONNECTS = 55; // Max number of connections per torrent

const RECHOKE_OPTIMISTIC_DURATION = 2; // 客户端每隔一定的时间重新选择优化非阻塞peer 如30s
const SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH; // 下载速度阈值(大于该下载速度时 表示远端节点上传状态非常好)
const PIPELINE_MIN_DURATION = 0.5
const PIPELINE_MAX_DURATION = 1
/**

@@ -34,2 +40,6 @@ * Torrent

* 'error': 出现错误时触发,附加参数 'error'
* 'upload': 向远端节点上传数据时触发, 附加参数 'length'
* 'download': 从远端节点下载数据时触发, 附加参数 'length'
* 'piece-selected': 某piece 加入下载队列, 附加参数 'pieceIndex'
* 'piece-finished': 某piece 下载完成, 附加参数 'pieceIndex'
* 'done': 下载完成后触发

@@ -52,4 +62,4 @@ * 'pause': 暂停后触发

this.fileSize = 0; // 文件大小
this.filePieceSize = 0; // 每块的大
this.filePieceCount = 0; // 文件可以分成的块数小
this.filePieceSize = 0; // 每片的大
this.filePieceCount = 0; // 文件可以分成的片数

@@ -61,2 +71,4 @@ this.infoHash = '';

this.bitField = null;
this.pieces = [];
this._torrentManager = null;

@@ -73,2 +85,3 @@

this.isClosed = false;
this.inCompleting = false;

@@ -81,4 +94,4 @@ // stats

this._wires = []; // 已建立链接
this._queue = []; // 因为超过最大允许的链接数,而不能链接的远端节点
this._wires = []; // 已建立链接
this._queue = []; // 因为超出最大链接数 暂时没有建立链接的节点队列

@@ -204,3 +217,3 @@ this._closeOnErrHappen();

const buffer = Buffer.allocUnsafe(bitFieldsSize);
// 读取 bitfield
// 读取 bitfield 初始化pieces
fs.read(this._fileFd, buffer, 0, bitFieldsSize, this.fileSize, err => {

@@ -212,3 +225,11 @@ if (err) {

}
// bitField
this.bitField = new BitField(buffer);
// pieces
for (let i = 0; i < this.filePieceCount - 1; i++) {
this.pieces.push(new Piece(this.filePieceSize));
}
this.pieces.push(new Piece(this.fileSize % this.filePieceSize));
this.emit('loadFinished');

@@ -220,9 +241,10 @@ });

_checkBitField() {
if (this.isDone) return;
_checkDone() {
if (this.isDone) return true;
for (let i = 0; i < this.filePieceCount; i++) {
if (!this.bitField.get(i)) {
return;
return false;
}
}
return true;
}

@@ -238,5 +260,13 @@

fs.read(this._fileFd, buffer, 0, length, position, (err, bytesRead, buffer) => {
cb(err, buffer);
});
if (this.inCompleting) {
this.once('done', () => {
fs.read(this._fileFd, buffer, 0, length, position, (err, bytesRead, buffer) => {
cb(err, buffer);
});
});
} else {
fs.read(this._fileFd, buffer, 0, length, position, (err, bytesRead, buffer) => {
cb(err, buffer);
});
}
}

@@ -248,3 +278,3 @@

}
if (this.isDone) {
if (this.inCompleting || this.isDone) {
return cb(null, buffer);

@@ -257,5 +287,54 @@ }

cb(err, buffer);
})
});
}
_saveBitFields() {
if (!this.isReady) return;
if (this.inCompleting || this.isDone) return;
fs.write(this._fileFd, this.bitField.buffer, 0, this.bitField.buffer.length, this.fileSize, err => {
if (err) {
logger.warn(`wirte ${this._tempFilePath} bit fields fail: ${err.toString()}`);
}
});
}
_completFile() {
if (!this.isReady) return;
if (this.isDone) return;
if (this.inCompleting) return;
this.inCompleting = true;
fs.ftruncate(this._fileFd, this.fileSize, err => {
if (err) {
this.emit('error', err);
}
fs.close(this._fileFd, err => {
if (err) {
this.emit('error', err);
}
fs.rename(this._tempFilePath, this._filePath, err => {
if (err) {
this.emit('error', err);
}
fs.open(this._filePath, 'r', (err, fd) => {
if (err) {
this.emit('error', new Error(`open ${this._filePath} fail: ${err.toString()}.`));
}
this._fileFd = fd;
this.pieces = null;
this.inCompleting = false;
this.isDone = true;
this.emit('done');
});
})
})
});
}
/**

@@ -277,3 +356,3 @@ * 设置 TorrentManager

if (self._checkBitField()) {
if (self._checkDone()) {
self.isDone = true;

@@ -303,6 +382,24 @@ self.emit('done');

_addWire(wire) {
const peerId = wire.peerId;
this._appendExt(wire);
this._wires.push(wire);
/**
* 添加远端节点
* @param {*} wire
* @memberof Torrent
*/
addWire(wire) {
if (!this.isReady) {
throw new Error('not ready!');
}
if (wire.isClosed) {
logger.debug('add wire fail: wire is closed aready, ignore it.');
return;
}
logger.debug('add wire:', wire);
const peerId = wire.peerId,
oldWire = this._getWire(peerId);
if (oldWire) {
// 每个torrent 同一个远端只保存一个链接
logger.debug(`${peerId} has aready connected, close it.`);
wire.close();
}
const index = this._queue.indexOf(peerId);

@@ -312,2 +409,7 @@ if (index !== -1) {

}
this._addWire(wire);
}
_addWire(wire) {
this._wires.push(wire);
this._strategy(wire);

@@ -318,4 +420,7 @@ }

const peerId = wire.peerId;
let handshakeTimer = null;
let handshakeTimer = null
, chokeTimer = null;
this._appendExt(wire);
/**

@@ -326,2 +431,3 @@ * 启动握手超时监控, 超时后自动关闭

handshakeTimer = setTimeout(() => {
if (!this.isReady) return;
logger.debug('handshake timeout');

@@ -341,3 +447,30 @@ wire.close();

/**
* choke 超时
*/
const chokeTimeOut = () => {
if (!this.isReady) return;
if (wire.amInterested && this._queue.length > MAX_CONNECTS / 2) {
wire.close();
} else {
chokeTimer = setTimeout(() => {
chokeTimeOut();
}, CHOKE_TIMEOUT);
}
}
/**
* 检查远端是否是已经完成下载
*/
const checkBitField = () => {
for (let i = 0; i < this.filePieceCount; i ++) {
if (!wire.peerBitField.get(i)) return false;
}
wire.isSeeder = true;
wire.choke();
return true;
}
// ---------------------- Wire Handels ----------------------
// ready
const onWireReady = () => {

@@ -351,5 +484,7 @@ if (!wire.hasSendHandShake) {

wire.bitfield(this.bitField);
wire.uninterested();
}
};
// handshke
const onHandshake = (infoHash, peerId) => {

@@ -362,7 +497,28 @@ stopHandShakeTimer();

wire.bitfield(this.bitField);
wire.uninterested();
};
// choke
const onChoke = () => {
clearTimeout(chokeTimer);
chokeTimer = setTimeout(() => {
chokeTimeOut();
}, CHOKE_TIMEOUT);
}
const onUnchoke = () => {
if (chokeTimer) {
clearTimeout(chokeTimer);
chokeTimer = null;
}
this._update();
}
const onInterested = () => {
wire.unchoke();
}
const onBitfield = () => {
const peerBitField = wire.peerBitField;
checkBitField();
this._update();
};

@@ -375,9 +531,30 @@

}
if (this.bitField.get(index)) return;
if (!this.bitField.get(index)) return;
this._readFileBlock(index, offset, length, cb);
};
const onUpload = length => {
if (!this.isReady) return;
this._uploadSpeed(length);
this._uploaded += length;
this.emit('upload', length);
}
const onDownload = length => {
if (!this.isReady) return;
this._downloadSpeed(length);
this._downloaded += length;
this.emit('download', length);
}
const onTimeout = () => {
logger.debug('timeout');
wire.close();
};
const onError = err => {
logger.warn(err);
onClose();
wire.close();
};

@@ -387,4 +564,13 @@

stopHandShakeTimer();
console.log('close');
logger.debug('close');
this._removeWire(peerId);
if (!this.isReady || this.isDone) return;
if (this._wires.length < MAX_CONNECTS && !this.isPaused && this._queue.length) {
const peerId = this._queue.shift()
, peerConn = this._torrentManager.createPeerConn(peerId)
, wire = new Wire(peerConn);
this._addWire(wire);
}
};

@@ -397,5 +583,13 @@

wire.once('handshake', onHandshake);
wire.on('bitfield', onBitfield);
wire.on('have', onBitfield);
wire.on('choke', onChoke);
wire.on('unchoke', onUnchoke);
wire.on('interested', onInterested);
if (!this.isDone) {
wire.on('bitfield', onBitfield);
wire.on('have', onBitfield);
}
wire.on('request', onRequest);
wire.on('upload', onUpload);
wire.on('download', onDownload);
wire.once('timeout', onTimeout);
wire.once('error', onError);

@@ -426,2 +620,206 @@ wire.once('close', onClose);

_getEagerPieceIndexs(peerBitField, maxPieces=500) {
let indexs = []
, count = 0;
for (let i = 0; i < this.filePieceCount; i ++) {
const piece = this.pieces[i];
if (!piece) continue;
if (!this.bitField.get(i) && peerBitField.get(i) && !piece.peepNextReseve() !== -1) {
indexs.push(i);
count ++;
if (count >= maxPieces) {
break;
}
}
}
if (count === 0) return null;
return indexs;
}
_update() {
if (!this.isReady || this.isDone || this.inCompleting || this.isPaused) return;
logger.debug('update...');
const ite = util.randomIterate(this._wires);
let wire = null;
while (wire = ite()) {
this._updateWire(wire);
}
}
_updateWire(wire) {
if (wire.peerChoking) return;
logger.debug(`update wire ${wire.peerId}...`);
// 尝试通过从未下载过资源的节点下载内容
if (!wire.downloaded && !wire.requests.length) {
const pieceIndexs = this._getEagerPieceIndexs(wire.peerBitField);
if (!pieceIndexs) {
if (wire.amInterested) wire.uninterested();
return;
}
const pieceIndex = pieceIndexs[util.randomInt(pieceIndexs.length)]
, piece = this.pieces[pieceIndex];
while(piece.peepNextReseve() !== -1) {
if (!this._request(wire, pieceIndex)) return;
}
}
// 根据以往从节点下载资源的速度 为每个节点设定请求队列大小
const minOutstandingRequests = this.getBlockPipelineLength(wire, PIPELINE_MIN_DURATION)
, maxOutstandingRequests = this.getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
if (wire.requests.length >= minOutstandingRequests) return;
const pieceIndexs = this._getEagerPieceIndexs(wire.peerBitField);
if (!pieceIndexs) {
if (wire.amInterested && !wire.requests.length) wire.uninterested();
return;
}
/**
* 尽可能从下载快的节点下载更多的资源
*/
const speedRanker = () => {
const speed = wire.downloadSpeed() || 1;
if (speed > SPEED_THRESHOLD) return () => { return true; };
return pieceIndex => {
const piece = this.pieces[pieceIndex];
for (let i = 0; i < this._wires.length; i ++) {
if (wire === this._wires[i]) continue;
const otherWire = this._wires[i]
, otherSpeed = otherWire.downloadSpeed()
, otherMinOutstandingRequests = this.getBlockPipelineLength(otherWire, PIPELINE_MIN_DURATION);
if (otherSpeed < SPEED_THRESHOLD) continue;
if (!otherWire.peerBitField.get(pieceIndex)) continue;
if (otherWire.requests.length > otherMinOutstandingRequests) continue;
while (piece.peepNextReseve() !== -1) {
if (!this._request(otherWire, pieceIndex)) break;
}
if (piece.peepNextReseve() !== -1) continue;
return false;
}
return true;
}
}
const ranker = speedRanker()
, ite = util.randomIterate(pieceIndexs);
let pieceIndex = null;
while((pieceIndex = ite()) !== null) {
if (ranker(pieceIndex)) {
const piece = this.pieces[pieceIndex];
while (piece.peepNextReseve() !== -1) {
if (!this._request(wire, pieceIndex)) break;
}
if (wire.requests.length >= maxOutstandingRequests) return;
}
}
}
getBlockPipelineLength(wire, duration) {
return 2 + Math.ceil(duration * wire.downloadSpeed() / Piece.BLOCK_LENGTH)
}
_request(wire, pieceIndex) {
if (this.bitField.get(pieceIndex)) return false;
logger.debug(`request ${pieceIndex} from ${wire.peerId}.`);
const maxOutstandingRequests = this.getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
if (wire.requests.length > maxOutstandingRequests) return false;
const piece = this.pieces[pieceIndex]
, reservation = piece.reserve();
if (reservation === -1) return false;
if (reservation === 0) {
this.emit('piece-selected', pieceIndex);
}
const onUpdateTick = () => {
process.nextTick(() => { logger.debug('i call it'); this._update(); });
};
const writePieceToFile = (piece, cb) => {
const buffer = piece.flush();
this.pieces[pieceIndex] = null;
// ==== check buffer hash ====
// this.pieces[pieceIndex] = new Piece(piece.length);
if (!this.isReady) return cb(new Error('torrent is closed!'));
if (this.bitField.get(pieceIndex)) return cb(new Error(`piece: ${pieceIndex} has aready write, ignore.`));
this._writeFileBlock(pieceIndex, 0, buffer.length, buffer, (err, buffer) => {
if (err) {
this.emit('error', err);
return cb(err);
}
this.bitField.set(pieceIndex);
cb(null);
});
};
const chunkOffset = piece.chunkOffset(reservation)
, chunkLength = piece.chunkLength(reservation);
if (!wire.amInterested) wire.interested();
wire.request(pieceIndex, chunkOffset, chunkLength, (err, chunkBuffer) => {
if (!this.isReady) return;
const piece = this.pieces[pieceIndex];
if (piece === null) return onUpdateTick();
if (err) {
logger.debug(`error getting piece ${pieceIndex} (offset: ${chunkOffset} length: ${chunkLength}) from ${wire.peerId}: ${err.toString()}.`);
piece.cancel(reservation);
onUpdateTick();
return;
}
logger.debug(`got piece ${pieceIndex} (offset: ${chunkOffset} length: ${chunkLength}) from ${wire.peerId}.`);
piece.set(reservation, chunkBuffer);
if (!piece.finished()) return onUpdateTick();
writePieceToFile(piece, err => {
if (err) {
logger.debug(`error: write file: ${this.filePath} block fail: ${err.toString()}.`);
return;
}
// 完成下载后
if (this._checkDone()) {
this.once('done', () => {
for (let i = 0; i < this._wires.length; i++) {
const wire = this._wires[i];
if (wire.isSeeder) {
wire.close();
} else {
wire.have(pieceIndex);
}
}
});
this._completFile();
} else {
this.pieces[pieceIndex] = null;
for (let i = 0; i < this._wires.length; i++) {
const wire = this._wires[i];
wire.have(pieceIndex);
}
this._saveBitFields();
onUpdateTick();
}
this.emit('piece-finished', pieceIndex)
});
});
return true;
}
/**

@@ -441,2 +839,4 @@ * 为了防止有的peer只下载不上传, BitTorrent协议建议, 客户端只给那些向它提供最快下载速度的4个peer上传数据。

logger.debug('rechoke...');
if (this._rechokeOptimisticTimes > 0) this._rechokeOptimisticTimes -= 1;

@@ -446,3 +846,3 @@ else this._rechokeOptimisticWire = null;

const peers = [];
for (let i; i < this._wires.length; i ++) {
for (let i = 0; i < this._wires.length; i ++) {
const wire = this._wires[i];

@@ -481,5 +881,8 @@ if (wire.isSeeder || wire === this._rechokeOptimisticWire) continue;

peers.forEach(function (peer) {
peers.forEach(peer => {
logger.debug(`${peer.wire.peerId} amChoke: "${peer.wire.amChoking}" -> "${peer.isChoked}"`);
if (peer.wire.amChoking !== peer.isChoked) {
if (peer.isChoked) peer.wire.choke();
if (peer.isChoked) {
peer.wire.choke();
}
else peer.wire.unchoke();

@@ -522,3 +925,5 @@ }

this._discover.on('join', peerId => {
if (this.isReady && !this.isPaused && this._wires.length < MAX_CONNECTS) {
if (!this.isReady || this.isDone) return;
if (!this.isPaused && this._wires.length < MAX_CONNECTS) {
if (this._getWire(peerId)) {

@@ -551,27 +956,2 @@ // 每个torrent 同一个远端只保存一个链接

/**
* 添加远端节点
* @param {*} wire
* @memberof Torrent
*/
addWire(wire) {
if (!this.isReady) {
throw new Error('not ready!');
}
if (wire.isClosed) {
logger.debug('add wire fail: wire is closed aready, ignore it.');
return;
}
logger.debug('add wire:', wire);
const oldWire = this._getWire(wire.peerId);
if (oldWire) {
// 每个torrent 同一个远端只保存一个链接
wire.close();
}
if (this._wires.length >= MAX_CONNECTS) {
wire.choke();
}
this._addWire(wire);
}
/**
* 获取状态

@@ -578,0 +958,0 @@ */

@@ -43,2 +43,4 @@ 'use strict'

_getOnlinePeers() {
if (!this.isReady) return;
this._channel.presence().then(context => {

@@ -171,2 +173,5 @@ this._peerStats = {};

this._announce();
setTimeout(() => {
this._getOnlinePeers();
}, 5000);
}, ANNOUNCE_INTERVAL);

@@ -173,0 +178,0 @@ }

@@ -44,7 +44,27 @@ 'use strict'

randomInt(high) {
// 随机选取 0-high 之间的一个整数
randomInt: function (high) {
return Math.random() * high | 0;
}
},
randomIterate: function (list) {
let offset = 0;
return function () {
if (offset == list.length) return null;
const len = list.length - offset
, i = util.randomInt(len)
, el = list[offset + i]
, tmp = list[offset];
list[offset] = el;
list[offset + i] = tmp;
offset ++;
return el;
}
},
}
module.exports = util;
module.exports = util;

13

lib/wire.js

@@ -12,6 +12,8 @@ 'use strict'

const BITFIELD_GROW = 65536; // 最大文件尺寸 BITFIELD_GROW * PIECE_SIZE(256k) = 16G
const BITFIELD_GROW = 262144; // 最大文件尺寸 BITFIELD_GROW * PIECE_SIZE(256k) = 64G
const KEEP_ALIVE_TIMER = 55000;
const BITTORRENT_PROTOCOL = 'BitTorrent protocol';
const REQUIRE_TIMEOUT = 20000; // 请求超时时间
class Request {

@@ -41,3 +43,3 @@ constructor(piece, offset, length, callback) {

* 'request': 收到远端节点数据块请求后触发, 附加参数('index', 'offset', 'length', 'respond')
* 'upload': 向远端节点上传数据时触发, 附加参数 'uploadLength'
* 'upload': 向远端节点上传数据时触发, 附加参数 'length'
* 'piece': 从远端节点下载数据时触发, 附加参数('index', 'offset', 'buffer')

@@ -62,3 +64,3 @@ * 'download': 从远端节点下载数据时触发, 附加参数 'length'

this._timer = null;
this._timeoutTime = 0;
this._timeoutTime = REQUIRE_TIMEOUT;

@@ -104,3 +106,4 @@ this.amChoking = true; // are we choking the peer?

this.peerConn.on('data', data => {
window.myData = data;
if (this.isClosed) return;
// window.myData = data;

@@ -339,3 +342,3 @@ let dataLength;

_onChoke() {
this.peerChoking = ture;
this.peerChoking = true;
logger.debug('got choke');

@@ -342,0 +345,0 @@ this.emit('choke');

{
"name": "webbt",
"version": "0.0.2",
"version": "0.0.3",
"description": "web bt client for electron",

@@ -5,0 +5,0 @@ "author": "Simeon",

@@ -10,2 +10,5 @@ # webbt

## 最大可传单文件大小
64G (BITFIELD_GROW)
## 依赖的第三方服务:

@@ -12,0 +15,0 @@ * STUN(必须 nat)

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc