react-streaming
Advanced tools
Comparing version 0.3.34 to 0.3.35
@@ -55,3 +55,3 @@ "use strict"; | ||
element = react_1.default.createElement(useSuspenseData_1.SuspenseData, null, element); | ||
let injectToStream = (chunk) => { | ||
let injectToStream = async (chunk) => { | ||
buffer.push(chunk); | ||
@@ -58,0 +58,0 @@ }; |
@@ -8,4 +8,4 @@ export { createBuffer }; | ||
}; | ||
declare type Chunk = string; | ||
declare type InjectToStream = (chunk: Chunk, options?: InjectToStreamOptions) => void; | ||
declare type Chunk = string | Promise<string>; | ||
declare type InjectToStream = (chunk: Chunk, options?: InjectToStreamOptions) => Promise<void>; | ||
declare type StreamOperations = { | ||
@@ -19,6 +19,5 @@ operations: null | { | ||
injectToStream: InjectToStream; | ||
onReactWriteBefore: (chunk: unknown) => void; | ||
onReactWriteAfter: () => void; | ||
onBeforeEnd: () => void; | ||
onReactWrite: (chunk: unknown) => Promise<void>; | ||
onBeforeEnd: () => Promise<void>; | ||
hasStreamEnded: () => boolean; | ||
}; |
@@ -14,4 +14,4 @@ "use strict"; | ||
let writePermission = false; | ||
return { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded }; | ||
function injectToStream(chunk, options) { | ||
return { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded }; | ||
async function injectToStream(chunk, options) { | ||
if (debug.isEnabled) { | ||
@@ -24,5 +24,5 @@ debug('injectToStream()', getChunkAsString(chunk)); | ||
buffer.push({ chunk, flush: options === null || options === void 0 ? void 0 : options.flush }); | ||
flushBuffer(); | ||
await flushBuffer(); | ||
} | ||
function flushBuffer() { | ||
async function flushBuffer() { | ||
if (!writePermission) { | ||
@@ -39,10 +39,11 @@ return; | ||
let flushStream = false; | ||
buffer.forEach((bufferEntry) => { | ||
for (let { chunk, flush } of buffer) { | ||
(0, utils_1.assert)(streamOperations.operations); | ||
const { writeChunk } = streamOperations.operations; | ||
writeChunk(bufferEntry.chunk); | ||
if (bufferEntry.flush) { | ||
if ((0, utils_1.isPromise)(chunk)) | ||
chunk = await chunk; | ||
writeChunk(chunk); | ||
if (flush) | ||
flushStream = true; | ||
} | ||
}); | ||
} | ||
buffer.length = 0; | ||
@@ -55,9 +56,3 @@ (0, utils_1.assert)(streamOperations.operations); | ||
} | ||
function onReactWriteAfter() { | ||
const writeWasBlocked = !writePermission; | ||
writePermission = true; | ||
if (writeWasBlocked) | ||
flushBuffer(); | ||
} | ||
function onReactWriteBefore(chunk) { | ||
async function onReactWrite(chunk) { | ||
state === 'UNSTARTED' && debug('>>> START'); | ||
@@ -68,7 +63,15 @@ if (debug.isEnabled) { | ||
state = 'STREAMING'; | ||
flushBuffer(); | ||
const bufferReactEntry = { chunk: chunk, flush: true }; | ||
if (!writePermission) { | ||
buffer.unshift(bufferReactEntry); | ||
} | ||
else { | ||
buffer.push(bufferReactEntry); | ||
} | ||
writePermission = true; | ||
await flushBuffer(); | ||
} | ||
function onBeforeEnd() { | ||
async function onBeforeEnd() { | ||
writePermission = true; // in case React didn't write anything | ||
flushBuffer(); | ||
await flushBuffer(); | ||
(0, utils_1.assert)(buffer.length === 0); | ||
@@ -75,0 +78,0 @@ state = 'ENDED'; |
@@ -13,3 +13,3 @@ "use strict"; | ||
}; | ||
const { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded } = (0, createBuffer_1.createBuffer)(streamOperations); | ||
const { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded } = (0, createBuffer_1.createBuffer)(streamOperations); | ||
return { pipeForUser, streamEnd, injectToStream, hasStreamEnded }; | ||
@@ -24,18 +24,18 @@ function createPipeForUser() { | ||
const writableForReact = new stream_1.Writable({ | ||
write(chunk, encoding, callback) { | ||
async write(chunk, encoding, callback) { | ||
debug('write'); | ||
onReactWriteBefore(chunk); | ||
if (!writableFromUser.destroyed) { | ||
writableFromUser.write(chunk, encoding, callback); | ||
onReactWriteAfter(); | ||
await onReactWrite(chunk); | ||
} | ||
else { | ||
// Destroying twice is fine: https://github.com/brillout/react-streaming/pull/21#issuecomment-1554517163 | ||
// - E.g. when the server closes the connection. | ||
// - Destroying twice is fine: https://github.com/brillout/react-streaming/pull/21#issuecomment-1554517163 | ||
writableForReact.destroy(); | ||
} | ||
callback(); | ||
}, | ||
final(callback) { | ||
async final(callback) { | ||
debug('final'); | ||
stopTimeout === null || stopTimeout === void 0 ? void 0 : stopTimeout(); | ||
onBeforeEnd(); | ||
await onBeforeEnd(); | ||
writableFromUser.end(); | ||
@@ -42,0 +42,0 @@ onEnded(); |
@@ -23,3 +23,3 @@ "use strict"; | ||
}); | ||
const { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded } = (0, createBuffer_1.createBuffer)(streamOperations); | ||
const { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded } = (0, createBuffer_1.createBuffer)(streamOperations); | ||
return { readableForUser, streamEnd, injectToStream, hasStreamEnded }; | ||
@@ -47,5 +47,3 @@ async function onReady(onEnded) { | ||
} | ||
onReactWriteBefore(value); | ||
streamOperations.operations.writeChunk(value); | ||
onReactWriteAfter(); | ||
await onReactWrite(value); | ||
} | ||
@@ -56,4 +54,4 @@ stopTimeout === null || stopTimeout === void 0 ? void 0 : stopTimeout(); | ||
// We should probably remove this workaround once we have a proper solution. | ||
setTimeout(() => { | ||
onBeforeEnd(); | ||
setTimeout(async () => { | ||
await onBeforeEnd(); | ||
controllerOfUserStream.close(); | ||
@@ -60,0 +58,0 @@ onEnded(); |
@@ -5,3 +5,3 @@ "use strict"; | ||
const getGlobalObject_1 = require("./getGlobalObject"); | ||
const PROJECT_VERSION = '0.3.34'; | ||
const PROJECT_VERSION = '0.3.35'; | ||
const projectInfo = { | ||
@@ -8,0 +8,0 @@ projectName: 'react-streaming', |
@@ -29,3 +29,3 @@ export { renderToStream }; | ||
element = React.createElement(SuspenseData, null, element); | ||
let injectToStream = (chunk) => { | ||
let injectToStream = async (chunk) => { | ||
buffer.push(chunk); | ||
@@ -32,0 +32,0 @@ }; |
@@ -8,4 +8,4 @@ export { createBuffer }; | ||
}; | ||
declare type Chunk = string; | ||
declare type InjectToStream = (chunk: Chunk, options?: InjectToStreamOptions) => void; | ||
declare type Chunk = string | Promise<string>; | ||
declare type InjectToStream = (chunk: Chunk, options?: InjectToStreamOptions) => Promise<void>; | ||
declare type StreamOperations = { | ||
@@ -19,6 +19,5 @@ operations: null | { | ||
injectToStream: InjectToStream; | ||
onReactWriteBefore: (chunk: unknown) => void; | ||
onReactWriteAfter: () => void; | ||
onBeforeEnd: () => void; | ||
onReactWrite: (chunk: unknown) => Promise<void>; | ||
onBeforeEnd: () => Promise<void>; | ||
hasStreamEnded: () => boolean; | ||
}; |
export { createBuffer }; | ||
import { assert, assertUsage, createDebugger } from '../utils'; | ||
import { assert, assertUsage, createDebugger, isPromise } from '../utils'; | ||
const debug = createDebugger('react-streaming:buffer'); | ||
@@ -12,4 +12,4 @@ function createBuffer(streamOperations) { | ||
let writePermission = false; | ||
return { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded }; | ||
function injectToStream(chunk, options) { | ||
return { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded }; | ||
async function injectToStream(chunk, options) { | ||
if (debug.isEnabled) { | ||
@@ -22,5 +22,5 @@ debug('injectToStream()', getChunkAsString(chunk)); | ||
buffer.push({ chunk, flush: options === null || options === void 0 ? void 0 : options.flush }); | ||
flushBuffer(); | ||
await flushBuffer(); | ||
} | ||
function flushBuffer() { | ||
async function flushBuffer() { | ||
if (!writePermission) { | ||
@@ -37,10 +37,11 @@ return; | ||
let flushStream = false; | ||
buffer.forEach((bufferEntry) => { | ||
for (let { chunk, flush } of buffer) { | ||
assert(streamOperations.operations); | ||
const { writeChunk } = streamOperations.operations; | ||
writeChunk(bufferEntry.chunk); | ||
if (bufferEntry.flush) { | ||
if (isPromise(chunk)) | ||
chunk = await chunk; | ||
writeChunk(chunk); | ||
if (flush) | ||
flushStream = true; | ||
} | ||
}); | ||
} | ||
buffer.length = 0; | ||
@@ -53,9 +54,3 @@ assert(streamOperations.operations); | ||
} | ||
function onReactWriteAfter() { | ||
const writeWasBlocked = !writePermission; | ||
writePermission = true; | ||
if (writeWasBlocked) | ||
flushBuffer(); | ||
} | ||
function onReactWriteBefore(chunk) { | ||
async function onReactWrite(chunk) { | ||
state === 'UNSTARTED' && debug('>>> START'); | ||
@@ -66,7 +61,15 @@ if (debug.isEnabled) { | ||
state = 'STREAMING'; | ||
flushBuffer(); | ||
const bufferReactEntry = { chunk: chunk, flush: true }; | ||
if (!writePermission) { | ||
buffer.unshift(bufferReactEntry); | ||
} | ||
else { | ||
buffer.push(bufferReactEntry); | ||
} | ||
writePermission = true; | ||
await flushBuffer(); | ||
} | ||
function onBeforeEnd() { | ||
async function onBeforeEnd() { | ||
writePermission = true; // in case React didn't write anything | ||
flushBuffer(); | ||
await flushBuffer(); | ||
assert(buffer.length === 0); | ||
@@ -73,0 +76,0 @@ state = 'ENDED'; |
@@ -11,3 +11,3 @@ export { createPipeWrapper }; | ||
}; | ||
const { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded } = createBuffer(streamOperations); | ||
const { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded } = createBuffer(streamOperations); | ||
return { pipeForUser, streamEnd, injectToStream, hasStreamEnded }; | ||
@@ -22,18 +22,18 @@ function createPipeForUser() { | ||
const writableForReact = new Writable({ | ||
write(chunk, encoding, callback) { | ||
async write(chunk, encoding, callback) { | ||
debug('write'); | ||
onReactWriteBefore(chunk); | ||
if (!writableFromUser.destroyed) { | ||
writableFromUser.write(chunk, encoding, callback); | ||
onReactWriteAfter(); | ||
await onReactWrite(chunk); | ||
} | ||
else { | ||
// Destroying twice is fine: https://github.com/brillout/react-streaming/pull/21#issuecomment-1554517163 | ||
// - E.g. when the server closes the connection. | ||
// - Destroying twice is fine: https://github.com/brillout/react-streaming/pull/21#issuecomment-1554517163 | ||
writableForReact.destroy(); | ||
} | ||
callback(); | ||
}, | ||
final(callback) { | ||
async final(callback) { | ||
debug('final'); | ||
stopTimeout === null || stopTimeout === void 0 ? void 0 : stopTimeout(); | ||
onBeforeEnd(); | ||
await onBeforeEnd(); | ||
writableFromUser.end(); | ||
@@ -40,0 +40,0 @@ onEnded(); |
@@ -21,3 +21,3 @@ export { createReadableWrapper }; | ||
}); | ||
const { injectToStream, onReactWriteBefore, onReactWriteAfter, onBeforeEnd, hasStreamEnded } = createBuffer(streamOperations); | ||
const { injectToStream, onReactWrite, onBeforeEnd, hasStreamEnded } = createBuffer(streamOperations); | ||
return { readableForUser, streamEnd, injectToStream, hasStreamEnded }; | ||
@@ -45,5 +45,3 @@ async function onReady(onEnded) { | ||
} | ||
onReactWriteBefore(value); | ||
streamOperations.operations.writeChunk(value); | ||
onReactWriteAfter(); | ||
await onReactWrite(value); | ||
} | ||
@@ -54,4 +52,4 @@ stopTimeout === null || stopTimeout === void 0 ? void 0 : stopTimeout(); | ||
// We should probably remove this workaround once we have a proper solution. | ||
setTimeout(() => { | ||
onBeforeEnd(); | ||
setTimeout(async () => { | ||
await onBeforeEnd(); | ||
controllerOfUserStream.close(); | ||
@@ -58,0 +56,0 @@ onEnded(); |
export { projectInfo }; | ||
import { getGlobalObject } from './getGlobalObject'; | ||
const PROJECT_VERSION = '0.3.34'; | ||
const PROJECT_VERSION = '0.3.35'; | ||
const projectInfo = { | ||
@@ -5,0 +5,0 @@ projectName: 'react-streaming', |
{ | ||
"name": "react-streaming", | ||
"description": "React 18 Streaming. Full-fledged & Easy.", | ||
"version": "0.3.34", | ||
"version": "0.3.35", | ||
"peerDependencies": { | ||
@@ -6,0 +6,0 @@ "react": ">=18", |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
145562