mongodb-snapshot
Advanced tools
Comparing version 1.4.0 to 1.4.1
@@ -140,2 +140,3 @@ "use strict"; | ||
*/ | ||
// when the data stream is bigger than the metadata size of the collection | ||
const trimmed$ = getTrimmedChunk$({ | ||
@@ -146,5 +147,10 @@ chunk$, | ||
const content$ = trimmed$.pipe((0, operators_1.map)(({ chunk }) => chunk)); | ||
const remain$ = trimmed$.pipe((0, operators_1.takeLast)(1), (0, operators_1.switchMap)(({ totalBytes }) => getEmptyChunk$({ | ||
// handling when the data stream is smaller than the metadata size of the collection | ||
const remain$ = (0, rxjs_1.concat)(trimmed$.pipe((0, operators_1.takeLast)(1), (0, operators_1.map)(({ accumulatedTotalBytes: totalBytes }) => metadata.size - totalBytes)), | ||
// this is necessary to handle a case when the data stream is empty | ||
(0, rxjs_1.of)(metadata.size)).pipe( | ||
// always emits one value, either the remaining bytes by the actual size of the chunk stream or metadata size | ||
(0, operators_1.take)(1), (0, operators_1.switchMap)((remainBytes) => getEmptyChunk$({ | ||
chunkSize: fillEmptyChunkSize, | ||
totalBytes: metadata.size - totalBytes | ||
totalBytes: remainBytes | ||
}))); | ||
@@ -191,3 +197,3 @@ const write$ = (0, rxjs_1.concat)(content$, remain$); | ||
(0, operators_1.scan)((acc, chunk) => { | ||
const remainingBytes = opts.totalBytes - acc.totalBytes; | ||
const remainingBytes = opts.totalBytes - acc.accumulatedTotalBytes; | ||
if (remainingBytes > 0) { | ||
@@ -197,3 +203,3 @@ if (chunk.length < remainingBytes) { | ||
chunk, | ||
totalBytes: acc.totalBytes + chunk.length | ||
accumulatedTotalBytes: acc.accumulatedTotalBytes + chunk.length | ||
}; | ||
@@ -204,3 +210,3 @@ } | ||
chunk: chunk.slice(0, remainingBytes), | ||
totalBytes: opts.totalBytes | ||
accumulatedTotalBytes: opts.totalBytes | ||
}; | ||
@@ -211,8 +217,8 @@ } | ||
return { | ||
totalBytes: opts.totalBytes | ||
accumulatedTotalBytes: opts.totalBytes | ||
}; | ||
} | ||
}, { | ||
totalBytes: 0, | ||
}), (0, operators_1.filter)(({ totalBytes, chunk }) => { | ||
accumulatedTotalBytes: 0, | ||
}), (0, operators_1.filter)(({ accumulatedTotalBytes: totalBytes, chunk }) => { | ||
return totalBytes > 0 && chunk !== undefined; | ||
@@ -219,0 +225,0 @@ })); |
{ | ||
"name": "mongodb-snapshot", | ||
"version": "1.4.0", | ||
"version": "1.4.1", | ||
"description": "Manages MongoDB snapshots, provides functionality to deliver MongoDB content between data sources for backup and restore", | ||
@@ -5,0 +5,0 @@ "main": "./dist/index.js", |
import { TargetConnector, SourceConnector, CollectionData } from "../Connector"; | ||
import { Readable, Writable } from "stream"; | ||
import { GzipOpts, CollectionMetadata } from "../../contracts"; | ||
import { Observable, Observer, concat, ReplaySubject, defer, fromEvent, throwError } from "rxjs"; | ||
import { Observable, Observer, concat, ReplaySubject, defer, fromEvent, throwError, of } from "rxjs"; | ||
import { groupBy, concatMap, toArray, map, take, catchError, scan, filter, share, takeLast, switchMap } from 'rxjs/operators'; | ||
@@ -237,2 +237,3 @@ import * as tar from "tar-stream"; | ||
*/ | ||
// when the data stream is bigger than the metadata size of the collection | ||
const trimmed$ = getTrimmedChunk$({ | ||
@@ -246,13 +247,19 @@ chunk$, | ||
const content$ = trimmed$.pipe(map(({ chunk }) => chunk)) as Observable<Buffer>; | ||
const remain$ = trimmed$.pipe( | ||
takeLast(1), | ||
switchMap(({ | ||
totalBytes | ||
}) => | ||
// handling when the data stream is smaller than the metadata size of the collection | ||
const remain$ = concat( | ||
trimmed$.pipe(takeLast(1), map(({accumulatedTotalBytes: totalBytes}) => metadata.size - totalBytes)), | ||
// this is necessary to handle a case when the data stream is empty | ||
of(metadata.size), | ||
).pipe( | ||
// always emits one value, either the remaining bytes by the actual size of the chunk stream or metadata size | ||
take(1), | ||
switchMap((remainBytes) => | ||
getEmptyChunk$({ | ||
chunkSize: fillEmptyChunkSize, | ||
totalBytes: metadata.size - totalBytes | ||
totalBytes: remainBytes | ||
}) | ||
) | ||
) | ||
); | ||
const write$ = concat(content$, remain$); | ||
@@ -306,5 +313,5 @@ | ||
// cutting extra data from the stream | ||
scan<Buffer, { totalBytes: number, chunk?: Buffer }>( | ||
scan<Buffer, { accumulatedTotalBytes: number, chunk?: Buffer }>( | ||
(acc, chunk) => { | ||
const remainingBytes = opts.totalBytes - acc.totalBytes; | ||
const remainingBytes = opts.totalBytes - acc.accumulatedTotalBytes; | ||
@@ -315,3 +322,3 @@ if (remainingBytes > 0) { | ||
chunk, | ||
totalBytes: acc.totalBytes + chunk.length | ||
accumulatedTotalBytes: acc.accumulatedTotalBytes + chunk.length | ||
} | ||
@@ -321,3 +328,3 @@ } else { | ||
chunk: chunk.slice(0, remainingBytes), | ||
totalBytes: opts.totalBytes | ||
accumulatedTotalBytes: opts.totalBytes | ||
} | ||
@@ -327,3 +334,3 @@ } | ||
return { | ||
totalBytes: opts.totalBytes | ||
accumulatedTotalBytes: opts.totalBytes | ||
} | ||
@@ -334,7 +341,7 @@ } | ||
{ | ||
totalBytes: 0, | ||
accumulatedTotalBytes: 0, | ||
} | ||
), | ||
filter(({ | ||
totalBytes, | ||
accumulatedTotalBytes: totalBytes, | ||
chunk | ||
@@ -341,0 +348,0 @@ }) => { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
146484
2502