diff --git a/fixtures/flight/src/index.js b/fixtures/flight/src/index.js index 3fd921a1bb5b4..7e4e8c801ef95 100644 --- a/fixtures/flight/src/index.js +++ b/fixtures/flight/src/index.js @@ -16,18 +16,49 @@ function findSourceMapURL(fileName) { let updateRoot; async function callServer(id, args) { - const response = fetch('/', { - method: 'POST', - headers: { - Accept: 'text/x-component', - 'rsc-action': id, - }, - body: await encodeReply(args), - }); - const {returnValue, root} = await createFromFetch(response, { - callServer, - findSourceMapURL, - }); + let response; + if ( + process.env.NODE_ENV === 'development' && + typeof WebSocketStream === 'function' + ) { + const requestId = crypto.randomUUID(); + const wss = new WebSocketStream( + 'ws://localhost:3001/debug-channel?' + requestId + ); + const debugChannel = await wss.opened; + response = createFromFetch( + fetch('/', { + method: 'POST', + headers: { + Accept: 'text/x-component', + 'rsc-action': id, + 'rsc-request-id': requestId, + }, + body: await encodeReply(args), + }), + { + callServer, + debugChannel, + findSourceMapURL, + } + ); + } else { + response = createFromFetch( + fetch('/', { + method: 'POST', + headers: { + Accept: 'text/x-component', + 'rsc-action': id, + }, + body: await encodeReply(args), + }), + { + callServer, + findSourceMapURL, + } + ); + } + const {returnValue, root} = await response; // Refresh the tree with the new RSC payload. startTransition(() => { updateRoot(root); diff --git a/packages/react-client/src/ReactFlightClient.js b/packages/react-client/src/ReactFlightClient.js index d538e077abb1c..1faf6d07a832c 100644 --- a/packages/react-client/src/ReactFlightClient.js +++ b/packages/react-client/src/ReactFlightClient.js @@ -332,7 +332,7 @@ export type FindSourceMapURLCallback = ( export type DebugChannelCallback = (message: string) => void; -export type Response = { +type Response = { _bundlerConfig: ServerConsumerModuleMap, _serverReferenceConfig: null | ServerManifest, _moduleLoading: ModuleLoading, @@ -351,6 +351,8 @@ export type Response = { _closedReason: mixed, _tempRefs: void | TemporaryReferenceSet, // the set temporary references can be resolved from _timeOrigin: number, // Profiling-only + _pendingChunks: number, // DEV-only + _weakResponse: WeakResponse, // DEV-only _debugRootOwner?: null | ReactComponentInfo, // DEV-only _debugRootStack?: null | Error, // DEV-only _debugRootTask?: null | ConsoleTask, // DEV-only @@ -360,6 +362,54 @@ export type Response = { _rootEnvironmentName: string, // DEV-only, the requested environment name. }; +// This indirection exists only to clean up DebugChannel when all Lazy References are GC:ed. +// Therefore we only use the indirection in DEV. +type WeakResponse = { + weak: WeakRef, + response: null | Response, // This is null when there are no pending chunks. +}; + +export type {WeakResponse as Response}; + +function hasGCedResponse(weakResponse: WeakResponse): boolean { + return __DEV__ && weakResponse.weak.deref() === undefined; +} + +function unwrapWeakResponse(weakResponse: WeakResponse): Response { + if (__DEV__) { + const response = weakResponse.weak.deref(); + if (response === undefined) { + // eslint-disable-next-line react-internal/prod-error-codes + throw new Error( + 'We did not expect to receive new data after GC:ing the response.', + ); + } + return response; + } else { + return (weakResponse: any); // In prod we just use the real Response directly. + } +} + +function getWeakResponse(response: Response): WeakResponse { + if (__DEV__) { + return response._weakResponse; + } else { + return (response: any); // In prod we just use the real Response directly. + } +} + +function cleanupDebugChannel(debugChannel: DebugChannelCallback): void { + // When a Response gets GC:ed because nobody is referring to any of the objects that lazily + // loads from the Response anymore, then we can close the debug channel. + debugChannel(''); +} + +// If FinalizationRegistry doesn't exist, we cannot use the debugChannel. +const debugChannelRegistry = + __DEV__ && typeof FinalizationRegistry === 'function' + ? new FinalizationRegistry(cleanupDebugChannel) + : null; + function readChunk(chunk: SomeChunk): T { // If we have resolved content, we try to initialize it first which // might put us back into one of the other states. @@ -385,16 +435,32 @@ function readChunk(chunk: SomeChunk): T { } } -export function getRoot(response: Response): Thenable { +export function getRoot(weakResponse: WeakResponse): Thenable { + const response = unwrapWeakResponse(weakResponse); const chunk = getChunk(response, 0); return (chunk: any); } function createPendingChunk(response: Response): PendingChunk { + if (__DEV__) { + // Retain a strong reference to the Response while we wait for the result. + response._pendingChunks++; + response._weakResponse.response = response; + } // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors return new ReactPromise(PENDING, null, null); } +function releasePendingChunk(response: Response, chunk: SomeChunk): void { + if (__DEV__ && chunk.status === PENDING) { + if (--response._pendingChunks === 0) { + // We're no longer waiting for any more chunks. We can release the strong reference + // to the response. We'll regain it if we ask for any more data later on. + response._weakResponse.response = null; + } + } +} + function createBlockedChunk(response: Response): BlockedChunk { // $FlowFixMe[invalid-constructor] Flow doesn't support functions as constructors return new ReactPromise(BLOCKED, null, null); @@ -525,7 +591,11 @@ function wakeChunkIfInitialized( } } -function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { +function triggerErrorOnChunk( + response: Response, + chunk: SomeChunk, + error: mixed, +): void { if (chunk.status !== PENDING && chunk.status !== BLOCKED) { // If we get more data to an already resolved ID, we assume that it's // a stream chunk since any other row shouldn't have more than one entry. @@ -535,6 +605,7 @@ function triggerErrorOnChunk(chunk: SomeChunk, error: mixed): void { controller.error(error); return; } + releasePendingChunk(response, chunk); const listeners = chunk.reason; const erroredChunk: ErroredChunk = (chunk: any); erroredChunk.status = ERRORED; @@ -635,6 +706,7 @@ function resolveModelChunk( controller.enqueueModel(value); return; } + releasePendingChunk(response, chunk); const resolveListeners = chunk.value; const rejectListeners = chunk.reason; const resolvedChunk: ResolvedModelChunk = (chunk: any); @@ -652,6 +724,7 @@ function resolveModelChunk( } function resolveModuleChunk( + response: Response, chunk: SomeChunk, value: ClientReference, ): void { @@ -659,6 +732,7 @@ function resolveModuleChunk( // We already resolved. We didn't expect to see this. return; } + releasePendingChunk(response, chunk); const resolveListeners = chunk.value; const rejectListeners = chunk.reason; const resolvedChunk: ResolvedModuleChunk = (chunk: any); @@ -766,7 +840,15 @@ function initializeModuleChunk(chunk: ResolvedModuleChunk): void { // Report that any missing chunks in the model is now going to throw this // error upon read. Also notify any pending promises. -export function reportGlobalError(response: Response, error: Error): void { +export function reportGlobalError( + weakResponse: WeakResponse, + error: Error, +): void { + if (hasGCedResponse(weakResponse)) { + // Ignore close signal if we are not awaiting any more pending chunks. + return; + } + const response = unwrapWeakResponse(weakResponse); response._closed = true; response._closedReason = error; response._chunks.forEach(chunk => { @@ -774,7 +856,7 @@ export function reportGlobalError(response: Response, error: Error): void { // trigger an error but if it wasn't then we need to // because we won't be getting any new data to resolve it. if (chunk.status === PENDING) { - triggerErrorOnChunk(chunk, error); + triggerErrorOnChunk(response, chunk, error); } }); if (__DEV__) { @@ -1218,7 +1300,7 @@ function rejectReference( reference: InitializationReference, error: mixed, ): void { - const {handler} = reference; + const {handler, response} = reference; if (handler.errored) { // We've already errored. We could instead build up an AggregateError @@ -1263,7 +1345,7 @@ function rejectReference( } } - triggerErrorOnChunk(chunk, error); + triggerErrorOnChunk(response, chunk, error); } function waitForReference( @@ -1482,7 +1564,7 @@ function loadServerReference, T>( } } - triggerErrorOnChunk(chunk, error); + triggerErrorOnChunk(response, chunk, error); } promise.then(fulfill, reject); @@ -2025,6 +2107,11 @@ function ResponseInstance( this._timeOrigin = 0; } if (__DEV__) { + this._pendingChunks = 0; + this._weakResponse = { + weak: new WeakRef(this), + response: this, + }; // TODO: The Flight Client can be used in a Client Environment too and we should really support // getting the owner there as well, but currently the owner of ReactComponentInfo is typed as only // supporting other ReactComponentInfo as owners (and not Fiber or Fizz's ComponentStackNode). @@ -2059,6 +2146,15 @@ function ResponseInstance( this._debugChannel = debugChannel; this._replayConsole = replayConsole; this._rootEnvironmentName = rootEnv; + if (debugChannel) { + if (debugChannelRegistry === null) { + // We can't safely clean things up later, so we immediately close the debug channel. + debugChannel(''); + this._debugChannel = undefined; + } else { + debugChannelRegistry.register(this, debugChannel); + } + } } if (enableProfilerTimer && enableComponentPerformanceTrack) { // Since we don't know when recording of profiles will start and stop, we have to @@ -2084,20 +2180,22 @@ export function createResponse( replayConsole: boolean, // DEV-only environmentName: void | string, // DEV-only debugChannel: void | DebugChannelCallback, // DEV-only -): Response { - // $FlowFixMe[invalid-constructor]: the shapes are exact here but Flow doesn't like constructors - return new ResponseInstance( - bundlerConfig, - serverReferenceConfig, - moduleLoading, - callServer, - encodeFormAction, - nonce, - temporaryReferences, - findSourceMapURL, - replayConsole, - environmentName, - debugChannel, +): WeakResponse { + return getWeakResponse( + // $FlowFixMe[invalid-constructor]: the shapes are exact here but Flow doesn't like constructors + new ResponseInstance( + bundlerConfig, + serverReferenceConfig, + moduleLoading, + callServer, + encodeFormAction, + nonce, + temporaryReferences, + findSourceMapURL, + replayConsole, + environmentName, + debugChannel, + ), ); } @@ -2111,6 +2209,7 @@ function resolveDebugHalt(response: Response, id: number): void { if (chunk.status !== PENDING && chunk.status !== BLOCKED) { return; } + releasePendingChunk(response, chunk); const haltedChunk: HaltedChunk = (chunk: any); haltedChunk.status = HALTED; haltedChunk.value = null; @@ -2142,6 +2241,9 @@ function resolveText(response: Response, id: number, text: string): void { controller.enqueueValue(text); return; } + if (chunk) { + releasePendingChunk(response, chunk); + } chunks.set(id, createInitializedTextChunk(response, text)); } @@ -2160,6 +2262,9 @@ function resolveBuffer( controller.enqueueValue(buffer); return; } + if (chunk) { + releasePendingChunk(response, chunk); + } chunks.set(id, createInitializedBufferChunk(response, buffer)); } @@ -2197,14 +2302,15 @@ function resolveModule( blockedChunk = createBlockedChunk(response); chunks.set(id, blockedChunk); } else { + releasePendingChunk(response, chunk); // This can't actually happen because we don't have any forward // references to modules. blockedChunk = (chunk: any); blockedChunk.status = BLOCKED; } promise.then( - () => resolveModuleChunk(blockedChunk, clientReference), - error => triggerErrorOnChunk(blockedChunk, error), + () => resolveModuleChunk(response, blockedChunk, clientReference), + error => triggerErrorOnChunk(response, blockedChunk, error), ); } else { if (!chunk) { @@ -2212,7 +2318,7 @@ function resolveModule( } else { // This can't actually happen because we don't have any forward // references to modules. - resolveModuleChunk(chunk, clientReference); + resolveModuleChunk(response, chunk, clientReference); } } } @@ -2233,6 +2339,7 @@ function resolveStream>( // We already resolved. We didn't expect to see this. return; } + releasePendingChunk(response, chunk); const resolveListeners = chunk.value; const resolvedChunk: InitializedStreamChunk = (chunk: any); resolvedChunk.status = INITIALIZED; @@ -2432,7 +2539,7 @@ function startAsyncIterable( createPendingChunk>(response); } while (nextWriteIndex < buffer.length) { - triggerErrorOnChunk(buffer[nextWriteIndex++], error); + triggerErrorOnChunk(response, buffer[nextWriteIndex++], error); } }, }; @@ -2569,7 +2676,7 @@ function resolvePostponeProd(response: Response, id: number): void { if (!chunk) { chunks.set(id, createErrorChunk(response, postponeInstance)); } else { - triggerErrorOnChunk(chunk, postponeInstance); + triggerErrorOnChunk(response, chunk, postponeInstance); } } @@ -2608,7 +2715,7 @@ function resolvePostponeDev( if (!chunk) { chunks.set(id, createErrorChunk(response, postponeInstance)); } else { - triggerErrorOnChunk(chunk, postponeInstance); + triggerErrorOnChunk(response, chunk, postponeInstance); } } @@ -3690,7 +3797,7 @@ function processFullStringRow( if (!chunk) { chunks.set(id, createErrorChunk(response, errorWithDigest)); } else { - triggerErrorOnChunk(chunk, errorWithDigest); + triggerErrorOnChunk(response, chunk, errorWithDigest); } return; } @@ -3819,9 +3926,14 @@ function processFullStringRow( } export function processBinaryChunk( - response: Response, + weakResponse: WeakResponse, chunk: Uint8Array, ): void { + if (hasGCedResponse(weakResponse)) { + // Ignore more chunks if we've already GC:ed all listeners. + return; + } + const response = unwrapWeakResponse(weakResponse); let i = 0; let rowState = response._rowState; let rowID = response._rowID; @@ -3938,7 +4050,15 @@ export function processBinaryChunk( response._rowLength = rowLength; } -export function processStringChunk(response: Response, chunk: string): void { +export function processStringChunk( + weakResponse: WeakResponse, + chunk: string, +): void { + if (hasGCedResponse(weakResponse)) { + // Ignore more chunks if we've already GC:ed all listeners. + return; + } + const response = unwrapWeakResponse(weakResponse); // This is a fork of processBinaryChunk that takes a string as input. // This can't be just any binary chunk coverted to a string. It needs to be // in the same offsets given from the Flight Server. E.g. if it's shifted by @@ -4100,12 +4220,12 @@ function createFromJSONCallback(response: Response) { }; } -export function close(response: Response): void { +export function close(weakResponse: WeakResponse): void { // In case there are any remaining unresolved chunks, they won't // be resolved now. So we need to issue an error to those. // Ideally we should be able to early bail out if we kept a // ref count of pending chunks. - reportGlobalError(response, new Error('Connection closed.')); + reportGlobalError(weakResponse, new Error('Connection closed.')); } function getCurrentOwnerInDEV(): null | ReactComponentInfo { diff --git a/packages/react-client/src/__tests__/ReactFlight-test.js b/packages/react-client/src/__tests__/ReactFlight-test.js index d79977beec0d5..abd690ab725f9 100644 --- a/packages/react-client/src/__tests__/ReactFlight-test.js +++ b/packages/react-client/src/__tests__/ReactFlight-test.js @@ -92,21 +92,27 @@ function getDebugInfo(obj) { return debugInfo; } -const heldValues = []; -let finalizationCallback; +const finalizationRegistries = []; function FinalizationRegistryMock(callback) { - finalizationCallback = callback; + this._heldValues = []; + this._callback = callback; + finalizationRegistries.push(this); } FinalizationRegistryMock.prototype.register = function (target, heldValue) { - heldValues.push(heldValue); + this._heldValues.push(heldValue); }; global.FinalizationRegistry = FinalizationRegistryMock; function gc() { - for (let i = 0; i < heldValues.length; i++) { - finalizationCallback(heldValues[i]); + for (let i = 0; i < finalizationRegistries.length; i++) { + const registry = finalizationRegistries[i]; + const callback = registry._callback; + const heldValues = registry._heldValues; + for (let j = 0; j < heldValues.length; j++) { + callback(heldValues[j]); + } + heldValues.length = 0; } - heldValues.length = 0; } let act;