Socket
Socket
Sign inDemoInstall

m3u8stream

Package Overview
Dependencies
Maintainers
1
Versions
27
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

m3u8stream - npm Package Compare versions

Comparing version 0.2.2 to 0.3.0

lib/parse-time.js

102

lib/index.js

@@ -6,2 +6,3 @@ const PassThrough = require('stream').PassThrough;

const Queue = require('./queue');
const parseTime = require('./parse-time');

@@ -15,10 +16,14 @@

module.exports = (playlistURL, options) => {
var stream = new PassThrough();
const stream = new PassThrough();
options = options || {};
var chunkReadahead = options.chunkReadahead || 3;
var refreshInterval = options.refreshInterval || 600000; // 10 minutes
var requestOptions = options.requestOptions;
const chunkReadahead = options.chunkReadahead || 3;
const liveBuffer = options.liveBuffer || 20000; // 20 seconds
const requestOptions = options.requestOptions;
let relativeBegin = typeof options.begin === 'string';
let begin = relativeBegin ?
parseTime(options.begin) :
Math.max(options.begin - liveBuffer, 0) || 0;
var currSegment;
var streamQueue = new Queue((segment, callback) => {
let currSegment;
const streamQueue = new Queue((segment, callback) => {
currSegment = segment;

@@ -29,10 +34,7 @@ segment.pipe(stream, { end: false });

var requestQueue = new Queue((segmentURL, callback) => {
var segment = miniget(urlResolve(playlistURL, segmentURL), requestOptions);
const requestQueue = new Queue((segmentURL, callback) => {
let segment = miniget(urlResolve(playlistURL, segmentURL), requestOptions);
segment.on('error', callback);
streamQueue.push(segment, callback);
}, {
concurrency: chunkReadahead,
unique: (segmentURL) => segmentURL,
});
}, { concurrency: chunkReadahead });

@@ -47,6 +49,8 @@ function onError(err) {

// When to look for items again.
var refreshThreshold;
var fetchingPlaylist = false;
var destroyed = false;
var ended = false;
let refreshThreshold;
let fetchingPlaylist = false;
let destroyed = false;
let ended = false;
let lastPlaylistItems = new Set();
function onQueuedEnd(err) {

@@ -64,26 +68,65 @@ currSegment = null;

var tid, currPlaylist;
let currPlaylist;
function refreshPlaylist() {
clearTimeout(tid);
fetchingPlaylist = true;
currPlaylist = miniget(playlistURL, requestOptions);
currPlaylist.on('error', onError);
var parser = currPlaylist.pipe(new m3u8());
parser.on('tag', (tagName) => {
if (tagName === 'EXT-X-ENDLIST') {
ended = true;
currPlaylist.unpipe();
clearTimeout(tid);
const parser = currPlaylist.pipe(new m3u8());
let currTime, nextItemDuration;
parser.on('tag', (tagName, value) => {
switch (tagName) {
case 'EXT-X-PROGRAM-DATE-TIME':
currTime = new Date(value).getTime();
if (relativeBegin && begin >= 0) {
begin += currTime;
}
break;
case 'EXTINF':
nextItemDuration = Math.round(parseFloat(value.split(',')[0], 10) * 1000);
break;
case 'EXT-X-ENDLIST':
ended = true;
break;
}
});
var totalItems = 0;
let currPlaylistItems = new Set();
function addItem(time, item) {
if (lastPlaylistItems.has(item)) { return; }
begin = time;
currPlaylistItems.add(item);
requestQueue.push(item, onQueuedEnd);
}
let tailedItems = [], tailedItemsDuration = 0;
parser.on('item', (item) => {
totalItems++;
requestQueue.push(item, onQueuedEnd);
if (!currTime || begin <= currTime) {
addItem(currTime, item);
} else {
tailedItems.push([nextItemDuration, currTime, item]);
tailedItemsDuration += nextItemDuration;
// Only keep the last `liveBuffer` of items.
while (tailedItems.length > 1 &&
tailedItemsDuration - tailedItems[0][0] > liveBuffer) {
tailedItemsDuration -= tailedItems.shift()[0];
}
}
currTime += nextItemDuration;
});
parser.on('end', () => {
currPlaylist = null;
refreshThreshold = Math.ceil(totalItems * 0.01);
tid = setTimeout(refreshPlaylist, refreshInterval);
// If stream is behind by a bit, make sure to get the latest available
// items with a small buffer.
if (!currPlaylistItems.size && tailedItems.length) {
tailedItems.forEach((item) => {
addItem(item[1], item[2]);
});
}
// Refresh the playlist when remaining segments get low.
refreshThreshold = Math.max(1, Math.ceil(currPlaylistItems.size * 0.01));
fetchingPlaylist = false;
lastPlaylistItems = currPlaylistItems;
});

@@ -97,3 +140,2 @@ }

requestQueue.die();
clearTimeout(tid);
if (currPlaylist) {

@@ -100,0 +142,0 @@ currPlaylist.unpipe();

@@ -1,3 +0,1 @@

'use strict';
const Writable = require('stream').Writable;

@@ -23,3 +21,3 @@

_parseLine(line) {
var tag = line.match(/^#(EXT[A-Z0-9-]+)(?::(.*))?/);
let tag = line.match(/^#(EXT[A-Z0-9-]+)(?::(.*))?/);
if (tag) {

@@ -36,3 +34,3 @@ // This is a tag.

_write(chunk, encoding, callback) {
var lines = chunk.toString('utf8').split('\n');
let lines = chunk.toString('utf8').split('\n');
if (this._lastLine) { lines[0] = this._lastLine + lines[0]; }

@@ -39,0 +37,0 @@ lines.forEach((line, i) => {

@@ -1,3 +0,1 @@

'use strict';
module.exports = class Queue {

@@ -15,4 +13,2 @@ /**

this._concurrency = options.concurrency || 1;
this._unique = options.unique;
this._tasksMap = {};
this.tasks = [];

@@ -29,8 +25,3 @@ this.active = 0;

*/
push(item) {
if (this._unique) {
var key = this._unique(item);
if (this._tasksMap[key] === true) { return; }
this._tasksMap[key] = true;
}
push() {
this.tasks.push(arguments);

@@ -46,10 +37,9 @@ this._next();

if (this.active >= this._concurrency || !this.tasks.length) { return; }
var task = this.tasks.shift();
var item = task[0];
var callback = task[1];
var callbackCalled = false;
let task = this.tasks.shift();
let item = task[0];
let callback = task[1];
let callbackCalled = false;
this.active++;
this._worker(item, (err) => {
if (callbackCalled) { return; }
if (this._unique) { delete this._tasksMap[this._unique(item)]; }
this.active--;

@@ -68,4 +58,3 @@ callbackCalled = true;

this.tasks = [];
this._tasksMap = {};
}
};

@@ -9,3 +9,3 @@ {

],
"version": "0.2.2",
"version": "0.3.0",
"repository": {

@@ -28,5 +28,5 @@ "type": "git",

"istanbul": "^0.4.5",
"lolex": "^2.7.1",
"mocha": "^5.0.0",
"nock": "^9.0.9",
"sinon": "^5.0.0"
"nock": "^9.3.2"
},

@@ -33,0 +33,0 @@ "engines": {

@@ -27,6 +27,7 @@ # node-m3u8stream

* `begin` - Where to begin playing the video. Accepts an absolute unix timestamp or date, and a relative time in the formats `1:23:45.123` and `1m2s`.
* `liveBuffer` - How much buffer in milliseconds to have for live streams. Default is `20000`.
* `chunkReadahead` - How many chunks to preload ahead. Default is `3`.
* `highWaterMark` - How much of the download to buffer into the stream. See [node's docs](https://nodejs.org/api/stream.html#stream_constructor_new_stream_writable_options) for more. Note that the actual amount buffered can be higher since each chunk request maintains its own buffer.
* `requestOptions` - Any options you want to pass to [miniget](https://github.com/fent/node-miniget), such as `headers`.
* `refreshInterval` - How often to refresh the playlist. If end of segment list is approached before the next interval, then it will refresh sooner.

@@ -33,0 +34,0 @@ Stream has an `.end()` method, that if called, stops requesting segments, and refreshing the playlist.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc