From 6bdbe04c29e7f3b72c4dbab9123692fdfc2b3564 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Sat, 2 Sep 2023 17:13:41 +0200 Subject: [PATCH 01/27] feat: support for streaming --- packages/open-next/src/adapters/http/index.ts | 3 + .../src/adapters/{ => http}/request.ts | 0 .../src/adapters/{ => http}/response.ts | 17 +- .../src/adapters/http/responseStreaming.ts | 148 ++++++++++++++++++ packages/open-next/src/adapters/http/util.ts | 17 ++ packages/open-next/src/adapters/next-types.ts | 4 +- .../src/adapters/plugins/lambdaHandler.ts | 143 +++++++++++++++++ .../plugins/routing/default.replacement.ts | 29 ++-- .../src/adapters/plugins/routing/default.ts | 15 +- .../src/adapters/plugins/routing/types.ts | 19 --- .../src/adapters/plugins/routing/util.ts | 4 +- .../plugins/serverHandler.replacement.ts | 4 +- .../src/adapters/plugins/serverHandler.ts | 4 +- .../adapters/plugins/streaming.replacement.ts | 99 ++++++++++++ .../src/adapters/routing/middleware.ts | 4 +- .../open-next/src/adapters/routing/util.ts | 2 +- .../open-next/src/adapters/server-adapter.ts | 141 +---------------- .../src/adapters/types/aws-lambda.ts | 27 ++++ .../open-next/src/adapters/types/plugin.ts | 29 ++++ packages/open-next/src/build.ts | 62 +++++--- packages/open-next/src/index.ts | 1 + 21 files changed, 547 insertions(+), 225 deletions(-) create mode 100644 packages/open-next/src/adapters/http/index.ts rename packages/open-next/src/adapters/{ => http}/request.ts (100%) rename packages/open-next/src/adapters/{ => http}/response.ts (83%) create mode 100644 packages/open-next/src/adapters/http/responseStreaming.ts create mode 100644 packages/open-next/src/adapters/http/util.ts create mode 100644 packages/open-next/src/adapters/plugins/lambdaHandler.ts create mode 100644 packages/open-next/src/adapters/plugins/streaming.replacement.ts create mode 100644 packages/open-next/src/adapters/types/aws-lambda.ts create mode 100644 packages/open-next/src/adapters/types/plugin.ts diff --git a/packages/open-next/src/adapters/http/index.ts b/packages/open-next/src/adapters/http/index.ts new file mode 100644 index 000000000..82e9e9411 --- /dev/null +++ b/packages/open-next/src/adapters/http/index.ts @@ -0,0 +1,3 @@ +export * from "./request.js"; +export * from "./response.js"; +export * from "./responseStreaming.js"; diff --git a/packages/open-next/src/adapters/request.ts b/packages/open-next/src/adapters/http/request.ts similarity index 100% rename from packages/open-next/src/adapters/request.ts rename to packages/open-next/src/adapters/http/request.ts diff --git a/packages/open-next/src/adapters/response.ts b/packages/open-next/src/adapters/http/response.ts similarity index 83% rename from packages/open-next/src/adapters/response.ts rename to packages/open-next/src/adapters/http/response.ts index 51e6a07c8..e717b3e4d 100644 --- a/packages/open-next/src/adapters/response.ts +++ b/packages/open-next/src/adapters/http/response.ts @@ -5,26 +5,11 @@ // @ts-nocheck import http from "node:http"; -const headerEnd = "\r\n\r\n"; +import { getString, headerEnd } from "./util.js"; const BODY = Symbol(); const HEADERS = Symbol(); -function getString(data) { - // Note: use `ArrayBuffer.isView()` to check for Uint8Array. Using - // `instanceof Uint8Array` returns false in some cases. For example, - // when the buffer is created in middleware and passed to NextServer. - if (Buffer.isBuffer(data)) { - return data.toString("utf8"); - } else if (ArrayBuffer.isView(data)) { - return Buffer.from(data).toString("utf8"); - } else if (typeof data === "string") { - return data; - } else { - throw new Error(`response.getString() of unexpected type: ${typeof data}`); - } -} - function addData(stream, data) { if ( Buffer.isBuffer(data) || diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts new file mode 100644 index 000000000..ef6ced9eb --- /dev/null +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -0,0 +1,148 @@ +import http from "node:http"; + +import { debug, error } from "../logger.js"; +import type { ResponseStream } from "../types/aws-lambda.js"; + +const HEADERS = Symbol(); + +export class StreamingServerResponse extends http.ServerResponse { + [HEADERS]: Record = {}; + responseStream: ResponseStream; + fixHeaders: (headers: Record) => void; + private _wroteHeader = false; + private _hasWritten = false; + + get headers() { + return this[HEADERS]; + } + + setHeader(name: string, value: string | number | readonly string[]): this { + // @ts-ignore + this[HEADERS][name.toLowerCase()] = value; + return this; + } + + writeHead( + statusCode: number, + statusMessage?: string | undefined, + headers?: http.OutgoingHttpHeaders | http.OutgoingHttpHeader[] | undefined, + ): this; + writeHead( + statusCode: number, + headers?: http.OutgoingHttpHeaders | http.OutgoingHttpHeader[] | undefined, + ): this; + writeHead( + statusCode: unknown, + statusMessage?: unknown, + headers?: unknown, + ): this { + if (this._wroteHeader) { + return this; + } + try { + this.fixHeaders(this[HEADERS]); + this.responseStream = awslambda.HttpResponseStream.from( + this.responseStream, + { + statusCode: statusCode as number, + headers: this[HEADERS], + }, + ); + + this._wroteHeader = true; + debug("writeHead", this[HEADERS]); + } catch (e) { + this.responseStream.end(); + error(e); + } + + return this; + } + + end(cb?: (() => void) | undefined): this; + end(chunk: any, cb?: (() => void) | undefined): this; + end( + chunk: any, + encoding: BufferEncoding, + cb?: (() => void) | undefined, + ): this; + end(chunk?: unknown, encoding?: unknown, cb?: unknown): this { + if (!this._wroteHeader) { + // When next directly returns with end, the writeHead is not called, + // so we need to call it here + this.writeHead(this.statusCode ?? 200); + } + if (chunk && typeof chunk !== "function") { + this.internalWrite(chunk); + } + + setImmediate(() => { + if (!this._hasWritten) { + // We need to send data here, otherwise the stream will not end at all + this.internalWrite(new Uint8Array(8)); + } + this.responseStream.end(); + }); + debug("stream end", chunk); + return this; + } + + private internalWrite(chunk: any) { + setImmediate(() => { + this.responseStream.write(chunk); + this._hasWritten = true; + }); + } + + constructor( + { method, headers }: { method?: string; headers?: Record }, + responseStream: ResponseStream, + fixHeaders: (headers: Record) => void, + ) { + //@ts-ignore + super({ method }); + + this[HEADERS] = headers || {}; + + this.fixHeaders = fixHeaders; + this.responseStream = responseStream; + + this.useChunkedEncodingByDefault = false; + this.chunkedEncoding = false; + + this.assignSocket({ + _writableState: {}, + writable: true, + // @ts-ignore + on: Function.prototype, + // @ts-ignore + removeListener: Function.prototype, + // @ts-ignore + destroy: Function.prototype, + // @ts-ignore + cork: Function.prototype, + // @ts-ignore + uncork: Function.prototype, + // @ts-ignore + write: (data, encoding, cb) => { + if (typeof encoding === "function") { + cb = encoding; + encoding = undefined; + } + + this.internalWrite(data); + + if (typeof cb === "function") { + cb(); + } + return true; + }, + }); + + this.responseStream.on("error", (err) => { + this.emit("error", err); + this.responseStream.end(); + error("error", err); + }); + } +} diff --git a/packages/open-next/src/adapters/http/util.ts b/packages/open-next/src/adapters/http/util.ts new file mode 100644 index 000000000..605c2f453 --- /dev/null +++ b/packages/open-next/src/adapters/http/util.ts @@ -0,0 +1,17 @@ +export function getString(data: any) { + // Note: use `ArrayBuffer.isView()` to check for Uint8Array. Using + // `instanceof Uint8Array` returns false in some cases. For example, + // when the buffer is created in middleware and passed to NextServer. + if (Buffer.isBuffer(data)) { + return data.toString("utf8"); + } else if (ArrayBuffer.isView(data)) { + //@ts-ignore + return Buffer.from(data).toString("utf8"); + } else if (typeof data === "string") { + return data; + } else { + throw new Error(`response.getString() of unexpected type: ${typeof data}`); + } +} + +export const headerEnd = "\r\n\r\n"; diff --git a/packages/open-next/src/adapters/next-types.ts b/packages/open-next/src/adapters/next-types.ts index b2e26affc..372fbc1a5 100644 --- a/packages/open-next/src/adapters/next-types.ts +++ b/packages/open-next/src/adapters/next-types.ts @@ -1,8 +1,8 @@ // NOTE: add more next config typings as they become relevant import { InternalEvent } from "./event-mapper.js"; -import { IncomingMessage } from "./request.js"; -import { ServerResponse } from "./response.js"; +import { IncomingMessage } from "./http/request.js"; +import { ServerResponse } from "./http/response.js"; type RemotePattern = { protocol?: "http" | "https"; diff --git a/packages/open-next/src/adapters/plugins/lambdaHandler.ts b/packages/open-next/src/adapters/plugins/lambdaHandler.ts new file mode 100644 index 000000000..5b1234699 --- /dev/null +++ b/packages/open-next/src/adapters/plugins/lambdaHandler.ts @@ -0,0 +1,143 @@ +import { + APIGatewayProxyEvent, + APIGatewayProxyEventV2, + CloudFrontRequestEvent, +} from "aws-lambda"; +import path from "path"; + +import { convertFrom, convertTo, InternalEvent } from "../event-mapper"; +import { type IncomingMessage, ServerResponse } from "../http"; +import { debug, error } from "../logger"; +import { OPEN_NEXT_DIR } from "../server-adapter"; +import { CreateResponse } from "../types/plugin"; +import { generateUniqueId, loadBuildId, loadPublicAssets } from "../util"; +import { WarmerEvent, WarmerResponse } from "../warmer-function"; +//#override imports +import { + postProcessResponse, + processInternalEvent, +} from "./routing/default.js"; +//#endOverride +import { handler as serverHandler } from "./serverHandler"; + +export const NEXT_DIR = path.join(__dirname, ".next"); +const buildId = loadBuildId(NEXT_DIR); +const publicAssets = loadPublicAssets(OPEN_NEXT_DIR); + +const serverId = `server-${generateUniqueId()}`; + +//#override lambdaHandler +export async function lambdaHandler( + event: + | APIGatewayProxyEventV2 + | CloudFrontRequestEvent + | APIGatewayProxyEvent + | WarmerEvent, +) { + debug("event", event); + // Handler warmer + if ("type" in event) { + return formatWarmerResponse(event); + } + + // Parse Lambda event and create Next.js request + const internalEvent = convertFrom(event); + + // WORKAROUND: Set `x-forwarded-host` header (AWS specific) — https://github.com/serverless-stack/open-next#workaround-set-x-forwarded-host-header-aws-specific + if (internalEvent.headers["x-forwarded-host"]) { + internalEvent.headers.host = internalEvent.headers["x-forwarded-host"]; + } + + //TODO: uncomment this + // WORKAROUND: public/ static files served by the server function (AWS specific) — https://github.com/serverless-stack/open-next#workaround-public-static-files-served-by-the-server-function-aws-specific + // TODO: This is no longer required if each top-level file and folder in "/public" + // is handled by a separate cache behavior. Leaving here for backward compatibility. + // Remove this on next major release. + if (publicAssets.files.includes(internalEvent.rawPath)) { + return internalEvent.type === "cf" + ? formatCloudFrontFailoverResponse(event as CloudFrontRequestEvent) + : formatAPIGatewayFailoverResponse(); + } + + const createServerResponse: CreateResponse = (method, headers) => + new ServerResponse({ method, headers }); + + const preprocessResult = await processInternalEvent( + internalEvent, + createServerResponse, + ); + if ("type" in preprocessResult) { + return convertTo(preprocessResult); + } else { + const { + req, + res, + isExternalRewrite, + internalEvent: overwrittenInternalEvent, + } = preprocessResult; + + await processRequest(req, res, overwrittenInternalEvent, isExternalRewrite); + + const internalResult = await postProcessResponse({ + internalEvent: overwrittenInternalEvent, + req, + res, + isExternalRewrite, + }); + + return convertTo(internalResult); + } +} +//#endOverride + +async function processRequest( + req: IncomingMessage, + res: ServerResponse, + internalEvent: InternalEvent, + isExternalRewrite?: boolean, +) { + // @ts-ignore + // Next.js doesn't parse body if the property exists + // https://github.com/dougmoscrop/serverless-http/issues/227 + delete req.body; + + try { + // `serverHandler` is replaced at build time depending on user's + // nextjs version to patch Nextjs 13.4.x and future breaking changes. + await serverHandler(req, res, { + internalEvent, + buildId, + isExternalRewrite, + }); + } catch (e: any) { + error("NextJS request failed.", e); + + res.setHeader("Content-Type", "application/json"); + res.end( + JSON.stringify( + { + message: "Server failed to respond.", + details: e, + }, + null, + 2, + ), + ); + } +} + +function formatAPIGatewayFailoverResponse() { + return { statusCode: 503 }; +} + +function formatCloudFrontFailoverResponse(event: CloudFrontRequestEvent) { + return event.Records[0].cf.request; +} + +function formatWarmerResponse(event: WarmerEvent) { + return new Promise((resolve) => { + setTimeout(() => { + resolve({ serverId } satisfies WarmerResponse); + }, event.delay); + }); +} diff --git a/packages/open-next/src/adapters/plugins/routing/default.replacement.ts b/packages/open-next/src/adapters/plugins/routing/default.replacement.ts index 5d4f7ff79..1498f386d 100644 --- a/packages/open-next/src/adapters/plugins/routing/default.replacement.ts +++ b/packages/open-next/src/adapters/plugins/routing/default.replacement.ts @@ -1,12 +1,14 @@ /* eslint-disable simple-import-sort/imports */ -import type { PostProcessOptions, ProcessInternalEventResult } from "./types"; -import type { InternalEvent, InternalResult } from "../../event-mapper"; +import type { + PostProcessOptions, + ProcessInternalEvent, +} from "../../types/plugin"; +import type { InternalResult } from "../../event-mapper"; //#override imports import path from "node:path"; import { debug } from "../../logger"; -import { IncomingMessage } from "../../request"; -import { ServerResponse } from "../../response"; +import { IncomingMessage } from "../../http/request"; import { addNextConfigHeaders, fixDataPage, @@ -30,9 +32,10 @@ const configHeaders = loadConfigHeaders(NEXT_DIR); //#endOverride //#override processInternalEvent -export async function processInternalEvent( - event: InternalEvent, -): Promise { +export const processInternalEvent: ProcessInternalEvent = async ( + event, + createResponse, +) => { const nextHeaders = addNextConfigHeaders(event, configHeaders) ?? {}; debug("nextHeaders", nextHeaders); @@ -102,17 +105,13 @@ export async function processInternalEvent( }; debug("IncomingMessage constructor props", reqProps); const req = new IncomingMessage(reqProps); - const res = new ServerResponse({ - method: reqProps.method, - // Next headers should be added first in case middleware modifies headers - headers: { - ...nextHeaders, - ...middlewareResponseHeaders, - }, + const res = createResponse(reqProps.method, { + ...nextHeaders, + ...middlewareResponseHeaders, }); return { internalEvent: internalEvent, req, res, isExternalRewrite }; -} +}; //#endOverride //#override postProcessResponse diff --git a/packages/open-next/src/adapters/plugins/routing/default.ts b/packages/open-next/src/adapters/plugins/routing/default.ts index 6c3998181..1a04cca15 100644 --- a/packages/open-next/src/adapters/plugins/routing/default.ts +++ b/packages/open-next/src/adapters/plugins/routing/default.ts @@ -1,10 +1,13 @@ /* eslint-disable simple-import-sort/imports */ -import type { PostProcessOptions, ProcessInternalEventResult } from "./types"; +import type { + PostProcessOptions, + ProcessInternalEventResult, +} from "../../types/plugin"; import type { InternalEvent, InternalResult } from "../../event-mapper"; //#override imports import { debug } from "../../logger"; -import { IncomingMessage } from "../../request"; -import { ServerResponse } from "../../response"; +import { IncomingMessage } from "../../http/request"; +import { ServerResponse } from "../../http/response"; import { addOpenNextHeader, fixCacheHeaderForHtmlPages, @@ -17,6 +20,10 @@ import { convertRes } from "../../routing/util"; //#override processInternalEvent export async function processInternalEvent( internalEvent: InternalEvent, + createResponse: ( + method: string, + headers: Record, + ) => ServerResponse, ): Promise { const reqProps = { method: internalEvent.method, @@ -31,7 +38,7 @@ export async function processInternalEvent( remoteAddress: internalEvent.remoteAddress, }; const req = new IncomingMessage(reqProps); - const res = new ServerResponse({ method: reqProps.method, headers: {} }); + const res = createResponse(reqProps.method, {}); return { internalEvent, req, res, isExternalRewrite: false }; } //#endOverride diff --git a/packages/open-next/src/adapters/plugins/routing/types.ts b/packages/open-next/src/adapters/plugins/routing/types.ts index 194272d1e..e69de29bb 100644 --- a/packages/open-next/src/adapters/plugins/routing/types.ts +++ b/packages/open-next/src/adapters/plugins/routing/types.ts @@ -1,19 +0,0 @@ -import type { InternalEvent, InternalResult } from "../../event-mapper"; -import { IncomingMessage } from "../../request"; -import { ServerResponse } from "../../response"; - -export type ProcessInternalEventResult = - | { - internalEvent: InternalEvent; - req: IncomingMessage; - res: ServerResponse; - isExternalRewrite: boolean; - } - | InternalResult; - -export interface PostProcessOptions { - internalEvent: InternalEvent; - req: IncomingMessage; - res: ServerResponse; - isExternalRewrite?: boolean; -} diff --git a/packages/open-next/src/adapters/plugins/routing/util.ts b/packages/open-next/src/adapters/plugins/routing/util.ts index abd6cab9b..99496ef24 100644 --- a/packages/open-next/src/adapters/plugins/routing/util.ts +++ b/packages/open-next/src/adapters/plugins/routing/util.ts @@ -2,9 +2,9 @@ import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs"; import crypto from "crypto"; import path from "path"; +import { IncomingMessage } from "../../http/request.js"; +import { ServerResponse } from "../../http/response.js"; import { awsLogger, debug } from "../../logger.js"; -import { IncomingMessage } from "../../request.js"; -import { ServerResponse } from "../../response.js"; import { loadBuildId, loadHtmlPages } from "../../util.js"; enum CommonHeaders { diff --git a/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts b/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts index 3cb0648d7..d1c7f685d 100644 --- a/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts +++ b/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts @@ -1,9 +1,9 @@ /*eslint-disable simple-import-sort/imports */ import type { Options, PluginHandler } from "../next-types.js"; +import type { IncomingMessage } from "../http/request.js"; +import type { ServerResponse } from "../http/response.js"; //#override imports -import { IncomingMessage } from "../request.js"; -import { ServerResponse } from "../response.js"; import { proxyRequest } from "./routing/util.js"; import { requestHandler, setNextjsPrebundledReact } from "./util.js"; //#endOverride diff --git a/packages/open-next/src/adapters/plugins/serverHandler.ts b/packages/open-next/src/adapters/plugins/serverHandler.ts index 2bd8670be..429c85b83 100644 --- a/packages/open-next/src/adapters/plugins/serverHandler.ts +++ b/packages/open-next/src/adapters/plugins/serverHandler.ts @@ -1,6 +1,6 @@ +import type { IncomingMessage } from "../http/request.js"; +import { ServerResponse } from "../http/response.js"; import type { Options, PluginHandler } from "../next-types.js"; -import type { IncomingMessage } from "../request.js"; -import type { ServerResponse } from "../response.js"; //#override imports import { requestHandler, setNextjsPrebundledReact } from "./util.js"; //#endOverride diff --git a/packages/open-next/src/adapters/plugins/streaming.replacement.ts b/packages/open-next/src/adapters/plugins/streaming.replacement.ts new file mode 100644 index 000000000..b579ea6fc --- /dev/null +++ b/packages/open-next/src/adapters/plugins/streaming.replacement.ts @@ -0,0 +1,99 @@ +/*eslint-disable simple-import-sort/imports */ +import type { + APIGatewayProxyEvent, + APIGatewayProxyEventV2, + CloudFrontRequestEvent, +} from "aws-lambda"; + +import { convertFrom } from "../event-mapper"; +import { debug } from "../logger"; +import type { ResponseStream } from "../types/aws-lambda"; +import type { WarmerEvent } from "../warmer-function"; +//#override imports +import { StreamingServerResponse } from "../http/responseStreaming"; +import { processInternalEvent } from "./routing/default.js"; +import { + addOpenNextHeader, + fixCacheHeaderForHtmlPages, + fixSWRCacheHeader, + revalidateIfRequired, +} from "./routing/util"; +//#endOverride + +//#override lambdaHandler +export const lambdaHandler = awslambda.streamifyResponse(async function ( + event: + | APIGatewayProxyEventV2 + | CloudFrontRequestEvent + | APIGatewayProxyEvent + | WarmerEvent, + responseStream: ResponseStream, +) { + debug("event", event); + + // Handler warmer + if ("type" in event) { + throw new Error("Warmer function are not supported with streaming"); + } + // Parse Lambda event and create Next.js request + const internalEvent = convertFrom(event); + + // WORKAROUND: Set `x-forwarded-host` header (AWS specific) — https://github.com/serverless-stack/open-next#workaround-set-x-forwarded-host-header-aws-specific + if (internalEvent.headers["x-forwarded-host"]) { + internalEvent.headers.host = internalEvent.headers["x-forwarded-host"]; + } + + const createServerResponse = ( + method: string, + headers: Record, + ) => + new StreamingServerResponse( + { method, headers }, + responseStream, + // We need to fix the cache header before sending any response + async (headers) => { + fixCacheHeaderForHtmlPages(internalEvent.rawPath, headers); + fixSWRCacheHeader(headers); + addOpenNextHeader(headers); + }, + ); + + const preprocessResult = await processInternalEvent( + internalEvent, + createServerResponse, + ); + if ("type" in preprocessResult) { + //TODO: replace this line + const headers = preprocessResult.headers as Record; + console.log("headers", headers); + const res = createServerResponse("GET", headers); + // setImmediate(() => { + // console.log("preprocessResult.headers", headers); + // res.writeHead(preprocessResult.statusCode, headers); + // }); + setImmediate(() => { + res.writeHead(preprocessResult.statusCode, headers); + res.write(preprocessResult.body); + res.end(); + }); + // res.statusCode = preprocessResult.statusCode; + } else { + const { + req, + res, + isExternalRewrite, + internalEvent: overwrittenInternalEvent, + } = preprocessResult; + + //@ts-expect-error - processRequest is already defined in serverHandler.ts + await processRequest(req, res, overwrittenInternalEvent, isExternalRewrite); + + await revalidateIfRequired( + internalEvent.headers.host, + internalEvent.rawPath, + res.headers, + req, + ); + } +}); +//#endOverride diff --git a/packages/open-next/src/adapters/routing/middleware.ts b/packages/open-next/src/adapters/routing/middleware.ts index 3e94bcfc5..2c7b0e9c7 100644 --- a/packages/open-next/src/adapters/routing/middleware.ts +++ b/packages/open-next/src/adapters/routing/middleware.ts @@ -1,8 +1,8 @@ import path from "node:path"; import { InternalEvent, InternalResult } from "../event-mapper.js"; -import { IncomingMessage } from "../request.js"; -import { ServerResponse } from "../response.js"; +import { IncomingMessage } from "../http/request.js"; +import { ServerResponse } from "../http/response.js"; import { loadConfig } from "../util.js"; import { convertRes, diff --git a/packages/open-next/src/adapters/routing/util.ts b/packages/open-next/src/adapters/routing/util.ts index de5d72337..47fe610de 100644 --- a/packages/open-next/src/adapters/routing/util.ts +++ b/packages/open-next/src/adapters/routing/util.ts @@ -2,8 +2,8 @@ import fs from "node:fs"; import path from "node:path"; import { isBinaryContentType } from "../binary"; +import { ServerResponse } from "../http/response"; import { MiddlewareManifest } from "../next-types"; -import { ServerResponse } from "../response"; export function isExternal(url?: string) { if (!url) return false; diff --git a/packages/open-next/src/adapters/server-adapter.ts b/packages/open-next/src/adapters/server-adapter.ts index feb21a113..1d0a8deae 100644 --- a/packages/open-next/src/adapters/server-adapter.ts +++ b/packages/open-next/src/adapters/server-adapter.ts @@ -1,28 +1,8 @@ import path from "node:path"; -import type { - APIGatewayProxyEvent, - APIGatewayProxyEventV2, - CloudFrontRequestEvent, -} from "aws-lambda"; - -import { convertFrom, convertTo, InternalEvent } from "./event-mapper.js"; -import { debug, error } from "./logger.js"; -import { - postProcessResponse, - processInternalEvent, -} from "./plugins/routing/default.js"; -import { handler as serverHandler } from "./plugins/serverHandler.js"; -import { IncomingMessage } from "./request.js"; -import { ServerResponse } from "./response.js"; -import { - generateUniqueId, - loadBuildId, - loadConfig, - loadPublicAssets, - setNodeEnv, -} from "./util.js"; -import type { WarmerEvent, WarmerResponse } from "./warmer-function.js"; +import { debug } from "./logger.js"; +import { lambdaHandler } from "./plugins/lambdaHandler.js"; +import { loadBuildId, loadConfig, setNodeEnv } from "./util.js"; export const NEXT_DIR = path.join(__dirname, ".next"); export const OPEN_NEXT_DIR = path.join(__dirname, ".open-next"); @@ -35,72 +15,11 @@ setNodeEnv(); setBuildIdEnv(); setNextjsServerWorkingDirectory(); -const publicAssets = loadPublicAssets(OPEN_NEXT_DIR); -// Generate a 6 letter unique server ID -const serverId = `server-${generateUniqueId()}`; - ///////////// // Handler // ///////////// -export async function handler( - event: - | APIGatewayProxyEventV2 - | CloudFrontRequestEvent - | APIGatewayProxyEvent - | WarmerEvent, -) { - debug("event", event); - // Handler warmer - if ("type" in event) { - return formatWarmerResponse(event); - } - - // Parse Lambda event and create Next.js request - const internalEvent = convertFrom(event); - - process.env.OPEN_NEXT_ISR = internalEvent.headers["x-isr"] - ? "true" - : undefined; - - // WORKAROUND: Set `x-forwarded-host` header (AWS specific) — https://github.com/serverless-stack/open-next#workaround-set-x-forwarded-host-header-aws-specific - if (internalEvent.headers["x-forwarded-host"]) { - internalEvent.headers.host = internalEvent.headers["x-forwarded-host"]; - } - - // WORKAROUND: public/ static files served by the server function (AWS specific) — https://github.com/serverless-stack/open-next#workaround-public-static-files-served-by-the-server-function-aws-specific - // TODO: This is no longer required if each top-level file and folder in "/public" - // is handled by a separate cache behavior. Leaving here for backward compatibility. - // Remove this on next major release. - if (publicAssets.files.includes(internalEvent.rawPath)) { - return internalEvent.type === "cf" - ? formatCloudFrontFailoverResponse(event as CloudFrontRequestEvent) - : formatAPIGatewayFailoverResponse(); - } - - const preprocessResult = await processInternalEvent(internalEvent); - if ("type" in preprocessResult) { - return convertTo(preprocessResult); - } else { - const { - req, - res, - isExternalRewrite, - internalEvent: overwrittenInternalEvent, - } = preprocessResult; - - await processRequest(req, res, overwrittenInternalEvent, isExternalRewrite); - - const internalResult = await postProcessResponse({ - internalEvent: overwrittenInternalEvent, - req, - res, - isExternalRewrite, - }); - - return convertTo(internalResult); - } -} +export const handler = lambdaHandler; ////////////////////// // Helper functions // @@ -116,55 +35,3 @@ function setBuildIdEnv() { // invalidations. ie. `/_next/data/${process.env.NEXT_BUILD_ID}/foo.json` process.env.NEXT_BUILD_ID = buildId; } - -async function processRequest( - req: IncomingMessage, - res: ServerResponse, - internalEvent: InternalEvent, - isExternalRewrite?: boolean, -) { - // @ts-ignore - // Next.js doesn't parse body if the property exists - // https://github.com/dougmoscrop/serverless-http/issues/227 - delete req.body; - - try { - // `serverHandler` is replaced at build time depending on user's - // nextjs version to patch Nextjs 13.4.x and future breaking changes. - await serverHandler(req, res, { - internalEvent, - buildId, - isExternalRewrite, - }); - } catch (e: any) { - error("NextJS request failed.", e); - - res.setHeader("Content-Type", "application/json"); - res.end( - JSON.stringify( - { - message: "Server failed to respond.", - details: e, - }, - null, - 2, - ), - ); - } -} - -function formatAPIGatewayFailoverResponse() { - return { statusCode: 503 }; -} - -function formatCloudFrontFailoverResponse(event: CloudFrontRequestEvent) { - return event.Records[0].cf.request; -} - -function formatWarmerResponse(event: WarmerEvent) { - return new Promise((resolve) => { - setTimeout(() => { - resolve({ serverId } satisfies WarmerResponse); - }, event.delay); - }); -} diff --git a/packages/open-next/src/adapters/types/aws-lambda.ts b/packages/open-next/src/adapters/types/aws-lambda.ts new file mode 100644 index 000000000..22d720587 --- /dev/null +++ b/packages/open-next/src/adapters/types/aws-lambda.ts @@ -0,0 +1,27 @@ +import { APIGatewayProxyEventV2, Context } from "aws-lambda"; +import { Writable } from "stream"; + +export interface ResponseStream extends Writable { + getBufferedData(): Buffer; + setContentType(contentType: string): void; +} + +type Handler = ( + event: APIGatewayProxyEventV2, + responseStream: ResponseStream, + context?: Context, +) => Promise; + +interface Metadata { + statusCode: number; + headers: Record; +} + +declare global { + namespace awslambda { + function streamifyResponse(handler: Handler): Handler; + module HttpResponseStream { + function from(res: Writable, metadata: Metadata): ResponseStream; + } + } +} diff --git a/packages/open-next/src/adapters/types/plugin.ts b/packages/open-next/src/adapters/types/plugin.ts new file mode 100644 index 000000000..1fd08f9f1 --- /dev/null +++ b/packages/open-next/src/adapters/types/plugin.ts @@ -0,0 +1,29 @@ +import type { InternalEvent, InternalResult } from "../event-mapper"; +import type { IncomingMessage } from "../http/request"; +import type { ServerResponse } from "../http/response"; + +export type ProcessInternalEventResult = + | { + internalEvent: InternalEvent; + req: IncomingMessage; + res: ServerResponse; + isExternalRewrite: boolean; + } + | InternalResult; + +export type ProcessInternalEvent = ( + internalEvent: InternalEvent, + createResponse: CreateResponse, +) => Promise; + +export interface PostProcessOptions { + internalEvent: InternalEvent; + req: IncomingMessage; + res: ServerResponse; + isExternalRewrite?: boolean; +} + +export type CreateResponse = ( + method: string, + headers: Record, +) => ServerResponse; diff --git a/packages/open-next/src/build.ts b/packages/open-next/src/build.ts index 68f622636..57796435f 100644 --- a/packages/open-next/src/build.ts +++ b/packages/open-next/src/build.ts @@ -24,6 +24,11 @@ interface BuildOptions { * @default false */ debug?: boolean; + /** + * Enable streaming mode. + * @default false + */ + streaming?: boolean; /** * The command to build the Next.js app. * @default `npm run build`, `yarn build`, or `pnpm build` based on the lock file found in the app's directory or any of its parent directories. @@ -78,7 +83,7 @@ export async function build(opts: BuildOptions = {}) { initOutputDir(); createStaticAssets(); createCacheAssets(monorepoRoot); - await createServerBundle(monorepoRoot); + await createServerBundle(monorepoRoot, opts.streaming); createRevalidationBundle(); createImageOptimizationBundle(); createWarmerBundle(); @@ -481,7 +486,7 @@ function createCacheAssets(monorepoRoot: string) { /* Server Helper Functions */ /***************************/ -async function createServerBundle(monorepoRoot: string) { +async function createServerBundle(monorepoRoot: string, streaming = false) { console.info(`Bundling server function...`); const { appPath, appBuildOutputPath, outputDir } = options; @@ -514,25 +519,26 @@ async function createServerBundle(monorepoRoot: string) { // note: bundle in OpenNext package b/c the adapter relies on the // "serverless-http" package which is not a dependency in user's // Next.js app. + let plugins = compareSemver(options.nextVersion, "13.4.13") >= 0 ? [ - openNextPlugin({ - name: "opennext-13.4.13-serverHandler", - target: /plugins\/serverHandler\.js/g, - replacements: ["./serverHandler.replacement.js"], - }), - openNextPlugin({ - name: "opennext-13.4.13-util", - target: /plugins\/util\.js/g, - replacements: ["./util.replacement.js"], - }), - openNextPlugin({ - name: "opennext-13.4.13-default", - target: /plugins\/routing\/default\.js/g, - replacements: ["./default.replacement.js"], - }), - ] + openNextPlugin({ + name: "opennext-13.4.13-serverHandler", + target: /plugins\/serverHandler\.js/g, + replacements: ["./serverHandler.replacement.js"], + }), + openNextPlugin({ + name: "opennext-13.4.13-util", + target: /plugins\/util\.js/g, + replacements: ["./util.replacement.js"], + }), + openNextPlugin({ + name: "opennext-13.4.13-default", + target: /plugins\/routing\/default\.js/g, + replacements: ["./default.replacement.js"], + }), + ] : undefined; if (compareSemver(options.nextVersion, "13.5.1") >= 0) { @@ -555,7 +561,19 @@ async function createServerBundle(monorepoRoot: string) { ]; } - if (plugins) { + if (streaming) { + const streamingPlugin = openNextPlugin({ + target: /plugins\/lambdaHandler\.js/g, + replacements: ["./streaming.replacement.js"], + }); + if (plugins) { + plugins.push(streamingPlugin); + } else { + plugins = [streamingPlugin]; + } + } + + if (plugins && plugins.length > 0) { console.log( `Applying plugins:: [${plugins .map(({ name }) => name) @@ -727,8 +745,7 @@ function esbuildSync(esbuildOptions: ESBuildOptions) { if (result.errors.length > 0) { result.errors.forEach((error) => console.error(error)); throw new Error( - `There was a problem bundling ${ - (esbuildOptions.entryPoints as string[])[0] + `There was a problem bundling ${(esbuildOptions.entryPoints as string[])[0] }.`, ); } @@ -757,8 +774,7 @@ async function esbuildAsync(esbuildOptions: ESBuildOptions) { if (result.errors.length > 0) { result.errors.forEach((error) => console.error(error)); throw new Error( - `There was a problem bundling ${ - (esbuildOptions.entryPoints as string[])[0] + `There was a problem bundling ${(esbuildOptions.entryPoints as string[])[0] }.`, ); } diff --git a/packages/open-next/src/index.ts b/packages/open-next/src/index.ts index 714a03f85..e52cd9bdc 100644 --- a/packages/open-next/src/index.ts +++ b/packages/open-next/src/index.ts @@ -13,6 +13,7 @@ build({ buildOutputPath: args["--build-output-path"], appPath: args["--app-path"], minify: Object.keys(args).includes("--minify"), + streaming: Object.keys(args).includes("--streaming"), }); function parseArgs() { From 31f74b6b74fef0e8804d5c0f4dcc759b551c8b8a Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Sun, 3 Sep 2023 00:18:27 +0200 Subject: [PATCH 02/27] fix streaming not working after a while --- .../src/adapters/http/responseStreaming.ts | 29 +++++++++++-------- .../adapters/plugins/streaming.replacement.ts | 1 - 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index ef6ced9eb..3fbd112a5 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -41,6 +41,7 @@ export class StreamingServerResponse extends http.ServerResponse { } try { this.fixHeaders(this[HEADERS]); + this._wroteHeader = true; this.responseStream = awslambda.HttpResponseStream.from( this.responseStream, { @@ -49,7 +50,6 @@ export class StreamingServerResponse extends http.ServerResponse { }, ); - this._wroteHeader = true; debug("writeHead", this[HEADERS]); } catch (e) { this.responseStream.end(); @@ -76,12 +76,15 @@ export class StreamingServerResponse extends http.ServerResponse { this.internalWrite(chunk); } + if (!this._hasWritten) { + // We need to send data here, otherwise the stream will not end at all + this.internalWrite(new Uint8Array(8)); + } + setImmediate(() => { - if (!this._hasWritten) { - // We need to send data here, otherwise the stream will not end at all - this.internalWrite(new Uint8Array(8)); - } - this.responseStream.end(); + this.responseStream.end(() => { + debug("stream end", chunk); + }); }); debug("stream end", chunk); return this; @@ -114,15 +117,17 @@ export class StreamingServerResponse extends http.ServerResponse { _writableState: {}, writable: true, // @ts-ignore - on: Function.prototype, + on: this.responseStream.on.bind(this.responseStream), // @ts-ignore - removeListener: Function.prototype, + removeListener: this.responseStream.removeListener.bind( + this.responseStream, + ), // @ts-ignore - destroy: Function.prototype, + destroy: this.responseStream.destroy.bind(this.responseStream), // @ts-ignore - cork: Function.prototype, + cork: this.responseStream.cork.bind(this.responseStream), // @ts-ignore - uncork: Function.prototype, + uncork: this.responseStream.uncork.bind(this.responseStream), // @ts-ignore write: (data, encoding, cb) => { if (typeof encoding === "function") { @@ -141,8 +146,8 @@ export class StreamingServerResponse extends http.ServerResponse { this.responseStream.on("error", (err) => { this.emit("error", err); - this.responseStream.end(); error("error", err); + this.responseStream.end(); }); } } diff --git a/packages/open-next/src/adapters/plugins/streaming.replacement.ts b/packages/open-next/src/adapters/plugins/streaming.replacement.ts index b579ea6fc..d9202e53f 100644 --- a/packages/open-next/src/adapters/plugins/streaming.replacement.ts +++ b/packages/open-next/src/adapters/plugins/streaming.replacement.ts @@ -65,7 +65,6 @@ export const lambdaHandler = awslambda.streamifyResponse(async function ( if ("type" in preprocessResult) { //TODO: replace this line const headers = preprocessResult.headers as Record; - console.log("headers", headers); const res = createServerResponse("GET", headers); // setImmediate(() => { // console.log("preprocessResult.headers", headers); From 65f0debb6d0fbe95d7f660eaed6095bc04835da2 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Sun, 3 Sep 2023 10:42:36 +0200 Subject: [PATCH 03/27] fix page static props and streaming not always starting --- .../src/adapters/http/responseStreaming.ts | 37 +++++++++++++------ 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index 3fbd112a5..2dd6b47c5 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -42,13 +42,27 @@ export class StreamingServerResponse extends http.ServerResponse { try { this.fixHeaders(this[HEADERS]); this._wroteHeader = true; - this.responseStream = awslambda.HttpResponseStream.from( - this.responseStream, - { - statusCode: statusCode as number, - headers: this[HEADERS], - }, + // FIXME: This is extracted from the docker lambda node 18 runtime + // https://gist.github.com/conico974/13afd708af20711b97df439b910ceb53#file-index-mjs-L921-L932 + // We replace their write with ours which are inside a process.nextTick + // This way it seems to work all the time + // I think we can't ship this code as it is, it could break at anytime if they decide to change the runtime and they already did it in the past + this.responseStream.setContentType( + "application/vnd.awslambda.http-integration-response", ); + const prelude = JSON.stringify({ + statusCode: statusCode as number, + headers: this[HEADERS], + }); + this.internalWrite(prelude); + this.internalWrite(new Uint8Array(8)); + // this.responseStream = awslambda.HttpResponseStream.from( + // this.responseStream, + // { + // statusCode: statusCode as number, + // headers: this[HEADERS], + // }, + // ); debug("writeHead", this[HEADERS]); } catch (e) { @@ -76,22 +90,23 @@ export class StreamingServerResponse extends http.ServerResponse { this.internalWrite(chunk); } - if (!this._hasWritten) { - // We need to send data here, otherwise the stream will not end at all + if (!this._hasWritten && !chunk) { + // We need to send data here if there is none, otherwise the stream will not end at all this.internalWrite(new Uint8Array(8)); } - setImmediate(() => { + process.nextTick(() => { this.responseStream.end(() => { + // The callback seems necessary here debug("stream end", chunk); }); }); - debug("stream end", chunk); + // debug("stream end", chunk); return this; } private internalWrite(chunk: any) { - setImmediate(() => { + process.nextTick(() => { this.responseStream.write(chunk); this._hasWritten = true; }); From 9e3b98fd38470fe872ccbaaac6ee58fff5cf1cd3 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Sun, 3 Sep 2023 11:31:09 +0200 Subject: [PATCH 04/27] fix ISR not working reliably --- .../src/adapters/http/responseStreaming.ts | 14 +- .../plugins/routing/default.replacement.ts | 2 + .../src/adapters/plugins/routing/default.ts | 2 + .../src/adapters/plugins/routing/util.ts | 165 +++++++++--------- .../adapters/plugins/streaming.replacement.ts | 19 +- 5 files changed, 112 insertions(+), 90 deletions(-) diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index 2dd6b47c5..c4dc8361e 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -9,6 +9,7 @@ export class StreamingServerResponse extends http.ServerResponse { [HEADERS]: Record = {}; responseStream: ResponseStream; fixHeaders: (headers: Record) => void; + onEnd: (headers: Record) => Promise; private _wroteHeader = false; private _hasWritten = false; @@ -54,8 +55,12 @@ export class StreamingServerResponse extends http.ServerResponse { statusCode: statusCode as number, headers: this[HEADERS], }); - this.internalWrite(prelude); - this.internalWrite(new Uint8Array(8)); + process.nextTick(() => { + this.responseStream.write(prelude); + }); + process.nextTick(() => { + this.responseStream.write(new Uint8Array(8)); + }); // this.responseStream = awslambda.HttpResponseStream.from( // this.responseStream, // { @@ -96,9 +101,10 @@ export class StreamingServerResponse extends http.ServerResponse { } process.nextTick(() => { - this.responseStream.end(() => { + this.responseStream.end(async () => { // The callback seems necessary here debug("stream end", chunk); + await this.onEnd(this[HEADERS]); }); }); // debug("stream end", chunk); @@ -116,6 +122,7 @@ export class StreamingServerResponse extends http.ServerResponse { { method, headers }: { method?: string; headers?: Record }, responseStream: ResponseStream, fixHeaders: (headers: Record) => void, + onEnd: (headers: Record) => Promise, ) { //@ts-ignore super({ method }); @@ -123,6 +130,7 @@ export class StreamingServerResponse extends http.ServerResponse { this[HEADERS] = headers || {}; this.fixHeaders = fixHeaders; + this.onEnd = onEnd; this.responseStream = responseStream; this.useChunkedEncodingByDefault = false; diff --git a/packages/open-next/src/adapters/plugins/routing/default.replacement.ts b/packages/open-next/src/adapters/plugins/routing/default.replacement.ts index 1498f386d..dea088b8e 100644 --- a/packages/open-next/src/adapters/plugins/routing/default.replacement.ts +++ b/packages/open-next/src/adapters/plugins/routing/default.replacement.ts @@ -19,6 +19,7 @@ import { loadBuildId, loadConfigHeaders, loadRoutesManifest } from "../../util"; import { addOpenNextHeader, fixCacheHeaderForHtmlPages, + fixISRHeaders, fixSWRCacheHeader, revalidateIfRequired, } from "./util"; @@ -129,6 +130,7 @@ export async function postProcessResponse({ fixCacheHeaderForHtmlPages(internalEvent.rawPath, headers); fixSWRCacheHeader(headers); addOpenNextHeader(headers); + fixISRHeaders(headers); await revalidateIfRequired( internalEvent.headers.host, diff --git a/packages/open-next/src/adapters/plugins/routing/default.ts b/packages/open-next/src/adapters/plugins/routing/default.ts index 1a04cca15..76545aab2 100644 --- a/packages/open-next/src/adapters/plugins/routing/default.ts +++ b/packages/open-next/src/adapters/plugins/routing/default.ts @@ -11,6 +11,7 @@ import { ServerResponse } from "../../http/response"; import { addOpenNextHeader, fixCacheHeaderForHtmlPages, + fixISRHeaders, fixSWRCacheHeader, revalidateIfRequired, } from "./util"; @@ -58,6 +59,7 @@ export async function postProcessResponse({ fixCacheHeaderForHtmlPages(internalEvent.rawPath, headers); fixSWRCacheHeader(headers); addOpenNextHeader(headers); + fixISRHeaders(headers); await revalidateIfRequired( internalEvent.headers.host, diff --git a/packages/open-next/src/adapters/plugins/routing/util.ts b/packages/open-next/src/adapters/plugins/routing/util.ts index 99496ef24..0fd2e83e3 100644 --- a/packages/open-next/src/adapters/plugins/routing/util.ts +++ b/packages/open-next/src/adapters/plugins/routing/util.ts @@ -98,86 +98,93 @@ export async function revalidateIfRequired( headers[CommonHeaders.CACHE_CONTROL] = "s-maxage=2, stale-while-revalidate=2592000"; - // If the URL is rewritten, revalidation needs to be done on the rewritten URL. - // - Link to Next.js doc: https://nextjs.org/docs/pages/building-your-application/data-fetching/incremental-static-regeneration#on-demand-revalidation - // - Link to NextInternalRequestMeta: https://github.com/vercel/next.js/blob/57ab2818b93627e91c937a130fb56a36c41629c3/packages/next/src/server/request-meta.ts#L11 - // @ts-ignore - const internalMeta = req[Symbol.for("NextInternalRequestMeta")]; - - // When using Pages Router, two requests will be received: - // 1. one for the page: /foo - // 2. one for the json data: /_next/data/BUILD_ID/foo.json - // The rewritten url is correct for 1, but that for the second request - // does not include the "/_next/data/" prefix. Need to add it. - const revalidateUrl = internalMeta?._nextDidRewrite - ? rawPath.startsWith("/_next/data/") - ? `/_next/data/${buildId}${internalMeta?._nextRewroteUrl}.json` - : internalMeta?._nextRewroteUrl - : rawPath; - - // We need to pass etag to the revalidation queue to try to bypass the default 5 min deduplication window. - // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html - // If you need to have a revalidation happen more frequently than 5 minutes, - // your page will need to have a different etag to bypass the deduplication window. - // If data has the same etag during these 5 min dedup window, it will be deduplicated and not revalidated. - try { - const hash = (str: string) => - crypto.createHash("md5").update(str).digest("hex"); - - await sqsClient.send( - new SendMessageCommand({ - QueueUrl: REVALIDATION_QUEUE_URL, - MessageDeduplicationId: hash(`${rawPath}-${headers.etag}`), - MessageBody: JSON.stringify({ host, url: revalidateUrl }), - MessageGroupId: generateMessageGroupId(rawPath), - }), - ); - } catch (e) { - debug(`Failed to revalidate stale page ${rawPath}`); - debug(e); + export async function revalidateIfRequired( + host: string, + rawPath: string, + headers: Record, + req?: IncomingMessage, + ) { + if (headers["x-nextjs-cache"] !== "STALE") return; + // If the URL is rewritten, revalidation needs to be done on the rewritten URL. + // - Link to Next.js doc: https://nextjs.org/docs/pages/building-your-application/data-fetching/incremental-static-regeneration#on-demand-revalidation + // - Link to NextInternalRequestMeta: https://github.com/vercel/next.js/blob/57ab2818b93627e91c937a130fb56a36c41629c3/packages/next/src/server/request-meta.ts#L11 + // @ts-ignore + const internalMeta = req?.[Symbol.for("NextInternalRequestMeta")]; + + // When using Pages Router, two requests will be received: + // 1. one for the page: /foo + // 2. one for the json data: /_next/data/BUILD_ID/foo.json + // The rewritten url is correct for 1, but that for the second request + // does not include the "/_next/data/" prefix. Need to add it. + const revalidateUrl = internalMeta?._nextDidRewrite + ? rawPath.startsWith("/_next/data/") + ? `/_next/data/${buildId}${internalMeta?._nextRewroteUrl}.json` + : internalMeta?._nextRewroteUrl + : rawPath; + + // We need to pass etag to the revalidation queue to try to bypass the default 5 min deduplication window. + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html + // If you need to have a revalidation happen more frequently than 5 minutes, + // your page will need to have a different etag to bypass the deduplication window. + // If data has the same etag during these 5 min dedup window, it will be deduplicated and not revalidated. + try { + const hash = (str: string) => + crypto.createHash("md5").update(str).digest("hex"); + + await sqsClient.send( + new SendMessageCommand({ + QueueUrl: REVALIDATION_QUEUE_URL, + MessageDeduplicationId: hash(`${rawPath}-${headers.etag}`), + MessageBody: JSON.stringify({ host, url: revalidateUrl }), + MessageGroupId: generateMessageGroupId(rawPath), + }), + ); + } catch (e) { + debug(`Failed to revalidate stale page ${rawPath}`); + debug(e); + } } -} -// Since we're using a FIFO queue, every messageGroupId is treated sequentially -// This could cause a backlog of messages in the queue if there is too much page to -// revalidate at once. To avoid this, we generate a random messageGroupId for each -// revalidation request. -// We can't just use a random string because we need to ensure that the same rawPath -// will always have the same messageGroupId. -// https://stackoverflow.com/questions/521295/seeding-the-random-number-generator-in-javascript#answer-47593316 -function generateMessageGroupId(rawPath: string) { - let a = cyrb128(rawPath); - // We use mulberry32 to generate a random int between 0 and MAX_REVALIDATE_CONCURRENCY - var t = (a += 0x6d2b79f5); - t = Math.imul(t ^ (t >>> 15), t | 1); - t ^= t + Math.imul(t ^ (t >>> 7), t | 61); - const randomFloat = ((t ^ (t >>> 14)) >>> 0) / 4294967296; - // This will generate a random int between 0 and MAX_REVALIDATE_CONCURRENCY - // This means that we could have 1000 revalidate request at the same time - const maxConcurrency = parseInt( - process.env.MAX_REVALIDATE_CONCURRENCY ?? "10", - ); - const randomInt = Math.floor(randomFloat * maxConcurrency); - return `revalidate-${randomInt}`; -} + // Since we're using a FIFO queue, every messageGroupId is treated sequentially + // This could cause a backlog of messages in the queue if there is too much page to + // revalidate at once. To avoid this, we generate a random messageGroupId for each + // revalidation request. + // We can't just use a random string because we need to ensure that the same rawPath + // will always have the same messageGroupId. + // https://stackoverflow.com/questions/521295/seeding-the-random-number-generator-in-javascript#answer-47593316 + function generateMessageGroupId(rawPath: string) { + let a = cyrb128(rawPath); + // We use mulberry32 to generate a random int between 0 and MAX_REVALIDATE_CONCURRENCY + var t = (a += 0x6d2b79f5); + t = Math.imul(t ^ (t >>> 15), t | 1); + t ^= t + Math.imul(t ^ (t >>> 7), t | 61); + const randomFloat = ((t ^ (t >>> 14)) >>> 0) / 4294967296; + // This will generate a random int between 0 and MAX_REVALIDATE_CONCURRENCY + // This means that we could have 1000 revalidate request at the same time + const maxConcurrency = parseInt( + process.env.MAX_REVALIDATE_CONCURRENCY ?? "10", + ); + const randomInt = Math.floor(randomFloat * maxConcurrency); + return `revalidate-${randomInt}`; + } -// Used to generate a hash int from a string -function cyrb128(str: string) { - let h1 = 1779033703, - h2 = 3144134277, - h3 = 1013904242, - h4 = 2773480762; - for (let i = 0, k; i < str.length; i++) { - k = str.charCodeAt(i); - h1 = h2 ^ Math.imul(h1 ^ k, 597399067); - h2 = h3 ^ Math.imul(h2 ^ k, 2869860233); - h3 = h4 ^ Math.imul(h3 ^ k, 951274213); - h4 = h1 ^ Math.imul(h4 ^ k, 2716044179); + // Used to generate a hash int from a string + function cyrb128(str: string) { + let h1 = 1779033703, + h2 = 3144134277, + h3 = 1013904242, + h4 = 2773480762; + for (let i = 0, k; i < str.length; i++) { + k = str.charCodeAt(i); + h1 = h2 ^ Math.imul(h1 ^ k, 597399067); + h2 = h3 ^ Math.imul(h2 ^ k, 2869860233); + h3 = h4 ^ Math.imul(h3 ^ k, 951274213); + h4 = h1 ^ Math.imul(h4 ^ k, 2716044179); + } + h1 = Math.imul(h3 ^ (h1 >>> 18), 597399067); + h2 = Math.imul(h4 ^ (h2 >>> 22), 2869860233); + h3 = Math.imul(h1 ^ (h3 >>> 17), 951274213); + h4 = Math.imul(h2 ^ (h4 >>> 19), 2716044179); + (h1 ^= h2 ^ h3 ^ h4), (h2 ^= h1), (h3 ^= h1), (h4 ^= h1); + return h1 >>> 0; } - h1 = Math.imul(h3 ^ (h1 >>> 18), 597399067); - h2 = Math.imul(h4 ^ (h2 >>> 22), 2869860233); - h3 = Math.imul(h1 ^ (h3 >>> 17), 951274213); - h4 = Math.imul(h2 ^ (h4 >>> 19), 2716044179); - (h1 ^= h2 ^ h3 ^ h4), (h2 ^= h1), (h3 ^= h1), (h4 ^= h1); - return h1 >>> 0; -} diff --git a/packages/open-next/src/adapters/plugins/streaming.replacement.ts b/packages/open-next/src/adapters/plugins/streaming.replacement.ts index d9202e53f..22829a57e 100644 --- a/packages/open-next/src/adapters/plugins/streaming.replacement.ts +++ b/packages/open-next/src/adapters/plugins/streaming.replacement.ts @@ -15,6 +15,7 @@ import { processInternalEvent } from "./routing/default.js"; import { addOpenNextHeader, fixCacheHeaderForHtmlPages, + fixISRHeaders, fixSWRCacheHeader, revalidateIfRequired, } from "./routing/util"; @@ -51,10 +52,19 @@ export const lambdaHandler = awslambda.streamifyResponse(async function ( { method, headers }, responseStream, // We need to fix the cache header before sending any response - async (headers) => { + (headers) => { fixCacheHeaderForHtmlPages(internalEvent.rawPath, headers); fixSWRCacheHeader(headers); addOpenNextHeader(headers); + fixISRHeaders(headers); + }, + // This run in the callback of the response stream end + async (headers) => { + await revalidateIfRequired( + internalEvent.headers.host, + internalEvent.rawPath, + headers, + ); }, ); @@ -86,13 +96,6 @@ export const lambdaHandler = awslambda.streamifyResponse(async function ( //@ts-expect-error - processRequest is already defined in serverHandler.ts await processRequest(req, res, overwrittenInternalEvent, isExternalRewrite); - - await revalidateIfRequired( - internalEvent.headers.host, - internalEvent.rawPath, - res.headers, - req, - ); } }); //#endOverride From 171dd3c321c57365ea874e4f7e35722bd5b268a9 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 6 Sep 2023 11:13:07 +0200 Subject: [PATCH 05/27] better handling of errors --- .../src/adapters/http/responseStreaming.ts | 113 ++++++++++-------- packages/open-next/src/adapters/http/util.ts | 33 +++++ 2 files changed, 99 insertions(+), 47 deletions(-) diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index c4dc8361e..0ae8a36d4 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -1,7 +1,9 @@ import http from "node:http"; +import { Socket } from "node:net"; import { debug, error } from "../logger.js"; import type { ResponseStream } from "../types/aws-lambda.js"; +import { convertHeader, NO_OP, parseHeaders } from "./util.js"; const HEADERS = Symbol(); @@ -17,30 +19,33 @@ export class StreamingServerResponse extends http.ServerResponse { return this[HEADERS]; } - setHeader(name: string, value: string | number | readonly string[]): this { - // @ts-ignore - this[HEADERS][name.toLowerCase()] = value; + setHeader(name: string, value: string | number | string[]): this { + this[HEADERS][name.toLowerCase()] = convertHeader(value); return this; } writeHead( statusCode: number, - statusMessage?: string | undefined, - headers?: http.OutgoingHttpHeaders | http.OutgoingHttpHeader[] | undefined, - ): this; - writeHead( - statusCode: number, - headers?: http.OutgoingHttpHeaders | http.OutgoingHttpHeader[] | undefined, - ): this; - writeHead( - statusCode: unknown, - statusMessage?: unknown, - headers?: unknown, + _statusMessage?: + | string + | http.OutgoingHttpHeaders + | http.OutgoingHttpHeader[], + _headers?: http.OutgoingHttpHeaders | http.OutgoingHttpHeader[], ): this { + const headers = + typeof _statusMessage === "string" ? _headers : _statusMessage; + const statusMessage = + typeof _statusMessage === "string" ? _statusMessage : undefined; if (this._wroteHeader) { return this; } try { + debug("writeHead", statusCode, statusMessage, headers); + const parsedHeaders = parseHeaders(headers); + this[HEADERS] = { + ...this[HEADERS], + ...parsedHeaders, + }; this.fixHeaders(this[HEADERS]); this._wroteHeader = true; // FIXME: This is extracted from the docker lambda node 18 runtime @@ -61,6 +66,7 @@ export class StreamingServerResponse extends http.ServerResponse { process.nextTick(() => { this.responseStream.write(new Uint8Array(8)); }); + // This is the way we should do it but it doesn't work everytime for some reasons // this.responseStream = awslambda.HttpResponseStream.from( // this.responseStream, // { @@ -78,14 +84,13 @@ export class StreamingServerResponse extends http.ServerResponse { return this; } - end(cb?: (() => void) | undefined): this; - end(chunk: any, cb?: (() => void) | undefined): this; end( - chunk: any, - encoding: BufferEncoding, - cb?: (() => void) | undefined, - ): this; - end(chunk?: unknown, encoding?: unknown, cb?: unknown): this { + _chunk?: Uint8Array | string | (() => void), + _encoding?: BufferEncoding | (() => void), + _cb?: (() => void) | undefined, + ): this { + const chunk = typeof _chunk === "function" ? undefined : _chunk; + const cb = typeof _cb === "function" ? _cb : undefined; if (!this._wroteHeader) { // When next directly returns with end, the writeHead is not called, // so we need to call it here @@ -94,7 +99,6 @@ export class StreamingServerResponse extends http.ServerResponse { if (chunk && typeof chunk !== "function") { this.internalWrite(chunk); } - if (!this._hasWritten && !chunk) { // We need to send data here if there is none, otherwise the stream will not end at all this.internalWrite(new Uint8Array(8)); @@ -102,22 +106,36 @@ export class StreamingServerResponse extends http.ServerResponse { process.nextTick(() => { this.responseStream.end(async () => { - // The callback seems necessary here debug("stream end", chunk); await this.onEnd(this[HEADERS]); + cb?.(); }); }); - // debug("stream end", chunk); return this; } private internalWrite(chunk: any) { + this._hasWritten = true; process.nextTick(() => { - this.responseStream.write(chunk); - this._hasWritten = true; + if (this.responseStream.writableNeedDrain) { + this.responseStream.once("drain", () => { + this.internalWrite(chunk); + }); + } else { + this.responseStream.write(chunk); + } }); } + cancel(error?: Error) { + this.responseStream.off("close", this.cancel.bind(this)); + this.responseStream.off("error", this.cancel.bind(this)); + + if (error) { + this.responseStream.destroy(error); + } + } + constructor( { method, headers }: { method?: string; headers?: Record }, responseStream: ResponseStream, @@ -136,23 +154,19 @@ export class StreamingServerResponse extends http.ServerResponse { this.useChunkedEncodingByDefault = false; this.chunkedEncoding = false; - this.assignSocket({ + const socket: Partial & { _writableState: any } = { _writableState: {}, writable: true, - // @ts-ignore - on: this.responseStream.on.bind(this.responseStream), - // @ts-ignore - removeListener: this.responseStream.removeListener.bind( - this.responseStream, - ), - // @ts-ignore - destroy: this.responseStream.destroy.bind(this.responseStream), - // @ts-ignore - cork: this.responseStream.cork.bind(this.responseStream), - // @ts-ignore - uncork: this.responseStream.uncork.bind(this.responseStream), - // @ts-ignore - write: (data, encoding, cb) => { + on: NO_OP, + removeListener: NO_OP, + destroy: NO_OP, + cork: NO_OP, + uncork: NO_OP, + write: ( + data: Uint8Array | string, + encoding?: string | null | (() => void), + cb?: () => void, + ) => { if (typeof encoding === "function") { cb = encoding; encoding = undefined; @@ -163,14 +177,19 @@ export class StreamingServerResponse extends http.ServerResponse { if (typeof cb === "function") { cb(); } - return true; + return this.responseStream.writableNeedDrain; }, - }); + }; - this.responseStream.on("error", (err) => { - this.emit("error", err); - error("error", err); - this.responseStream.end(); + this.assignSocket(socket as Socket); + + this.responseStream.on("close", this.cancel.bind(this)); + this.responseStream.on("error", this.cancel.bind(this)); + + this.on("close", this.cancel.bind(this)); + this.on("error", this.cancel.bind(this)); + this.once("finish", () => { + this.emit("close"); }); } } diff --git a/packages/open-next/src/adapters/http/util.ts b/packages/open-next/src/adapters/http/util.ts index 605c2f453..132f6798b 100644 --- a/packages/open-next/src/adapters/http/util.ts +++ b/packages/open-next/src/adapters/http/util.ts @@ -1,3 +1,5 @@ +import http from "node:http"; + export function getString(data: any) { // Note: use `ArrayBuffer.isView()` to check for Uint8Array. Using // `instanceof Uint8Array` returns false in some cases. For example, @@ -15,3 +17,34 @@ export function getString(data: any) { } export const headerEnd = "\r\n\r\n"; + +export const NO_OP: (...args: any[]) => any = () => void 0; + +export const parseHeaders = ( + headers?: http.OutgoingHttpHeader[] | http.OutgoingHttpHeaders, +) => { + const result: Record = {}; + if (!headers) { + return result; + } + + for (const [key, value] of Object.entries(headers)) { + if (value === undefined) { + continue; + } else { + result[key] = convertHeader(value); + } + } + + return result; +}; + +export const convertHeader = (header: http.OutgoingHttpHeader) => { + if (typeof header === "string") { + return header; + } else if (Array.isArray(header)) { + return header.join(","); + } else { + return String(header); + } +}; From 15e7288f9ad2531adfc8d17003bcdcf51b83594b Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 6 Sep 2023 13:42:43 +0200 Subject: [PATCH 06/27] fix for node runtime v12 --- .../src/adapters/http/responseStreaming.ts | 133 ++++++++++-------- .../adapters/plugins/streaming.replacement.ts | 11 +- 2 files changed, 77 insertions(+), 67 deletions(-) diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index 0ae8a36d4..e9e185849 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -7,6 +7,13 @@ import { convertHeader, NO_OP, parseHeaders } from "./util.js"; const HEADERS = Symbol(); +export interface StreamingServerResponseProps { + method?: string; + headers?: Record; + responseStream: ResponseStream; + fixHeaders: (headers: Record) => void; + onEnd: (headers: Record) => Promise; +} export class StreamingServerResponse extends http.ServerResponse { [HEADERS]: Record = {}; responseStream: ResponseStream; @@ -15,6 +22,63 @@ export class StreamingServerResponse extends http.ServerResponse { private _wroteHeader = false; private _hasWritten = false; + constructor({ + method, + headers, + responseStream, + fixHeaders, + onEnd, + }: StreamingServerResponseProps) { + super({ method } as any); + + this[HEADERS] = headers || {}; + + this.fixHeaders = fixHeaders; + this.onEnd = onEnd; + this.responseStream = responseStream; + + this.useChunkedEncodingByDefault = false; + this.chunkedEncoding = false; + + const socket: Partial & { _writableState: any } = { + _writableState: {}, + writable: true, + on: NO_OP, + removeListener: NO_OP, + destroy: NO_OP, + cork: NO_OP, + uncork: NO_OP, + write: ( + data: Uint8Array | string, + encoding?: string | null | (() => void), + cb?: () => void, + ) => { + if (typeof encoding === "function") { + cb = encoding; + encoding = undefined; + } + + this.internalWrite(data); + + if (typeof cb === "function") { + cb(); + } + return true; + }, + }; + + this.assignSocket(socket as Socket); + + this.responseStream.on("close", this.cancel.bind(this)); + this.responseStream.on("error", this.cancel.bind(this)); + + this.on("close", this.cancel.bind(this)); + this.on("error", this.cancel.bind(this)); + this.once("finish", () => { + this.emit("close"); + }); + } + get headers() { return this[HEADERS]; } @@ -50,7 +114,7 @@ export class StreamingServerResponse extends http.ServerResponse { this._wroteHeader = true; // FIXME: This is extracted from the docker lambda node 18 runtime // https://gist.github.com/conico974/13afd708af20711b97df439b910ceb53#file-index-mjs-L921-L932 - // We replace their write with ours which are inside a process.nextTick + // We replace their write with ours which are inside a setImmediate // This way it seems to work all the time // I think we can't ship this code as it is, it could break at anytime if they decide to change the runtime and they already did it in the past this.responseStream.setContentType( @@ -60,10 +124,10 @@ export class StreamingServerResponse extends http.ServerResponse { statusCode: statusCode as number, headers: this[HEADERS], }); - process.nextTick(() => { + setImmediate(() => { this.responseStream.write(prelude); }); - process.nextTick(() => { + setImmediate(() => { this.responseStream.write(new Uint8Array(8)); }); // This is the way we should do it but it doesn't work everytime for some reasons @@ -104,7 +168,7 @@ export class StreamingServerResponse extends http.ServerResponse { this.internalWrite(new Uint8Array(8)); } - process.nextTick(() => { + setImmediate(() => { this.responseStream.end(async () => { debug("stream end", chunk); await this.onEnd(this[HEADERS]); @@ -116,8 +180,9 @@ export class StreamingServerResponse extends http.ServerResponse { private internalWrite(chunk: any) { this._hasWritten = true; - process.nextTick(() => { + setImmediate(() => { if (this.responseStream.writableNeedDrain) { + debug("drain"); this.responseStream.once("drain", () => { this.internalWrite(chunk); }); @@ -128,6 +193,7 @@ export class StreamingServerResponse extends http.ServerResponse { } cancel(error?: Error) { + debug("cancel", error); this.responseStream.off("close", this.cancel.bind(this)); this.responseStream.off("error", this.cancel.bind(this)); @@ -135,61 +201,4 @@ export class StreamingServerResponse extends http.ServerResponse { this.responseStream.destroy(error); } } - - constructor( - { method, headers }: { method?: string; headers?: Record }, - responseStream: ResponseStream, - fixHeaders: (headers: Record) => void, - onEnd: (headers: Record) => Promise, - ) { - //@ts-ignore - super({ method }); - - this[HEADERS] = headers || {}; - - this.fixHeaders = fixHeaders; - this.onEnd = onEnd; - this.responseStream = responseStream; - - this.useChunkedEncodingByDefault = false; - this.chunkedEncoding = false; - - const socket: Partial & { _writableState: any } = { - _writableState: {}, - writable: true, - on: NO_OP, - removeListener: NO_OP, - destroy: NO_OP, - cork: NO_OP, - uncork: NO_OP, - write: ( - data: Uint8Array | string, - encoding?: string | null | (() => void), - cb?: () => void, - ) => { - if (typeof encoding === "function") { - cb = encoding; - encoding = undefined; - } - - this.internalWrite(data); - - if (typeof cb === "function") { - cb(); - } - return this.responseStream.writableNeedDrain; - }, - }; - - this.assignSocket(socket as Socket); - - this.responseStream.on("close", this.cancel.bind(this)); - this.responseStream.on("error", this.cancel.bind(this)); - - this.on("close", this.cancel.bind(this)); - this.on("error", this.cancel.bind(this)); - this.once("finish", () => { - this.emit("close"); - }); - } } diff --git a/packages/open-next/src/adapters/plugins/streaming.replacement.ts b/packages/open-next/src/adapters/plugins/streaming.replacement.ts index 22829a57e..189a7fb90 100644 --- a/packages/open-next/src/adapters/plugins/streaming.replacement.ts +++ b/packages/open-next/src/adapters/plugins/streaming.replacement.ts @@ -48,25 +48,26 @@ export const lambdaHandler = awslambda.streamifyResponse(async function ( method: string, headers: Record, ) => - new StreamingServerResponse( - { method, headers }, + new StreamingServerResponse({ + method, + headers, responseStream, // We need to fix the cache header before sending any response - (headers) => { + fixHeaders: (headers) => { fixCacheHeaderForHtmlPages(internalEvent.rawPath, headers); fixSWRCacheHeader(headers); addOpenNextHeader(headers); fixISRHeaders(headers); }, // This run in the callback of the response stream end - async (headers) => { + onEnd: async (headers) => { await revalidateIfRequired( internalEvent.headers.host, internalEvent.rawPath, headers, ); }, - ); + }); const preprocessResult = await processInternalEvent( internalEvent, From 3d2b1c46b681a7079bc4e4b6bf38422f244e0fe7 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 6 Sep 2023 14:16:33 +0200 Subject: [PATCH 07/27] cleanup --- .../open-next/src/adapters/http/response.ts | 144 ++++++++++-------- .../src/adapters/http/responseStreaming.ts | 4 +- packages/open-next/src/adapters/next-types.ts | 6 +- .../src/adapters/plugins/lambdaHandler.ts | 10 +- .../plugins/routing/default.replacement.ts | 5 +- .../src/adapters/plugins/routing/default.ts | 17 ++- .../src/adapters/plugins/routing/types.ts | 0 .../src/adapters/plugins/routing/util.ts | 7 +- .../plugins/serverHandler.replacement.ts | 4 +- .../src/adapters/plugins/serverHandler.ts | 4 +- .../adapters/plugins/streaming.replacement.ts | 14 +- .../src/adapters/routing/middleware.ts | 7 +- .../open-next/src/adapters/routing/util.ts | 8 +- .../open-next/src/adapters/types/plugin.ts | 27 ++-- 14 files changed, 144 insertions(+), 113 deletions(-) delete mode 100644 packages/open-next/src/adapters/plugins/routing/types.ts diff --git a/packages/open-next/src/adapters/http/response.ts b/packages/open-next/src/adapters/http/response.ts index e717b3e4d..90ae47f97 100644 --- a/packages/open-next/src/adapters/http/response.ts +++ b/packages/open-next/src/adapters/http/response.ts @@ -2,15 +2,21 @@ // https://github.com/dougmoscrop/serverless-http/blob/master/lib/response.js // Licensed under the MIT License -// @ts-nocheck import http from "node:http"; +import { Socket } from "node:net"; -import { getString, headerEnd } from "./util.js"; +import { + convertHeader, + getString, + headerEnd, + NO_OP, + parseHeaders, +} from "./util.js"; const BODY = Symbol(); const HEADERS = Symbol(); -function addData(stream, data) { +function addData(stream: ServerlessResponse, data: Uint8Array | string) { if ( Buffer.isBuffer(data) || ArrayBuffer.isView(data) || @@ -22,77 +28,40 @@ function addData(stream, data) { } } -export class ServerResponse extends http.ServerResponse { - static from(res) { - const response = new ServerResponse(res); - - response.statusCode = res.statusCode; - response[HEADERS] = res.headers; - response[BODY] = [Buffer.from(res.body)]; - response.end(); - - return response; - } - - static body(res) { - return Buffer.concat(res[BODY]); - } - - static headers(res) { - const headers = - typeof res.getHeaders === "function" ? res.getHeaders() : res._headers; - - return Object.assign(headers, res[HEADERS]); - } - - get headers() { - return this[HEADERS]; - } - - setHeader(key, value) { - if (this._wroteHeader) { - this[HEADERS][key] = value; - } else { - super.setHeader(key, value); - } - return this; - } - - writeHead(statusCode, reason, obj) { - const headers = typeof reason === "string" ? obj : reason; - - for (const name in headers) { - this.setHeader(name, headers[name]); - - if (!this._wroteHeader) { - // we only need to initiate super.headers once - // writeHead will add the other headers itself - break; - } - } +export interface ServerlessResponseProps { + method: string; + headers: Record; +} - return super.writeHead(statusCode, reason, obj); - } +export class ServerlessResponse extends http.ServerResponse { + [BODY]: Buffer[]; + [HEADERS]: Record; + private _wroteHeader = false; + private _header = ""; - constructor({ method, headers }) { - super({ method, headers }); + constructor({ method, headers }: ServerlessResponseProps) { + super({ method, headers } as any); this[BODY] = []; - this[HEADERS] = headers || {}; + this[HEADERS] = parseHeaders(headers) || {}; this.useChunkedEncodingByDefault = false; this.chunkedEncoding = false; this._header = ""; - this.assignSocket({ + const socket: Partial & { _writableState: any } = { _writableState: {}, writable: true, - on: Function.prototype, - removeListener: Function.prototype, - destroy: Function.prototype, - cork: Function.prototype, - uncork: Function.prototype, - write: (data, encoding, cb) => { + on: NO_OP, + removeListener: NO_OP, + destroy: NO_OP, + cork: NO_OP, + uncork: NO_OP, + write: ( + data: Uint8Array | string, + encoding?: string | null | (() => void), + cb?: () => void, + ) => { if (typeof encoding === "function") { cb = encoding; encoding = null; @@ -118,11 +87,58 @@ export class ServerResponse extends http.ServerResponse { if (typeof cb === "function") { cb(); } + return true; }, - }); + }; + + this.assignSocket(socket as Socket); this.once("finish", () => { this.emit("close"); }); } + + static body(res: ServerlessResponse) { + return Buffer.concat(res[BODY]); + } + + static headers(res: ServerlessResponse) { + const headers = + typeof res.getHeaders === "function" ? res.getHeaders() : res[HEADERS]; + + return Object.assign(headers, res[HEADERS]); + } + + get headers() { + return this[HEADERS]; + } + + setHeader(key: string, value: string | number | string[]): this { + if (this._wroteHeader) { + this[HEADERS][key] = convertHeader(value); + } else { + super.setHeader(key, value); + } + return this; + } + + writeHead( + statusCode: number, + reason?: string | any | any[], + obj?: any | any[], + ) { + const headers = typeof reason === "string" ? obj : reason; + + for (const name in headers) { + this.setHeader(name, headers[name]); + + if (!this._wroteHeader) { + // we only need to initiate super.headers once + // writeHead will add the other headers itself + break; + } + } + + return super.writeHead(statusCode, reason, obj); + } } diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index e9e185849..711ae3d86 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -9,7 +9,7 @@ const HEADERS = Symbol(); export interface StreamingServerResponseProps { method?: string; - headers?: Record; + headers?: Record; responseStream: ResponseStream; fixHeaders: (headers: Record) => void; onEnd: (headers: Record) => Promise; @@ -31,7 +31,7 @@ export class StreamingServerResponse extends http.ServerResponse { }: StreamingServerResponseProps) { super({ method } as any); - this[HEADERS] = headers || {}; + this[HEADERS] = parseHeaders(headers) || {}; this.fixHeaders = fixHeaders; this.onEnd = onEnd; diff --git a/packages/open-next/src/adapters/next-types.ts b/packages/open-next/src/adapters/next-types.ts index 372fbc1a5..c739765a5 100644 --- a/packages/open-next/src/adapters/next-types.ts +++ b/packages/open-next/src/adapters/next-types.ts @@ -2,7 +2,7 @@ import { InternalEvent } from "./event-mapper.js"; import { IncomingMessage } from "./http/request.js"; -import { ServerResponse } from "./http/response.js"; +import { ServerlessResponse } from "./http/response.js"; type RemotePattern = { protocol?: "http" | "https"; @@ -138,7 +138,7 @@ export type Options = { export interface PluginHandler { ( req: IncomingMessage, - res: ServerResponse, + res: ServerlessResponse, options: Options, - ): Promise; + ): Promise; } diff --git a/packages/open-next/src/adapters/plugins/lambdaHandler.ts b/packages/open-next/src/adapters/plugins/lambdaHandler.ts index 5b1234699..a13b40115 100644 --- a/packages/open-next/src/adapters/plugins/lambdaHandler.ts +++ b/packages/open-next/src/adapters/plugins/lambdaHandler.ts @@ -6,7 +6,7 @@ import { import path from "path"; import { convertFrom, convertTo, InternalEvent } from "../event-mapper"; -import { type IncomingMessage, ServerResponse } from "../http"; +import { type IncomingMessage, ServerlessResponse } from "../http"; import { debug, error } from "../logger"; import { OPEN_NEXT_DIR } from "../server-adapter"; import { CreateResponse } from "../types/plugin"; @@ -59,8 +59,10 @@ export async function lambdaHandler( : formatAPIGatewayFailoverResponse(); } - const createServerResponse: CreateResponse = (method, headers) => - new ServerResponse({ method, headers }); + const createServerResponse: CreateResponse = ( + method, + headers, + ) => new ServerlessResponse({ method, headers }); const preprocessResult = await processInternalEvent( internalEvent, @@ -92,7 +94,7 @@ export async function lambdaHandler( async function processRequest( req: IncomingMessage, - res: ServerResponse, + res: ServerlessResponse, internalEvent: InternalEvent, isExternalRewrite?: boolean, ) { diff --git a/packages/open-next/src/adapters/plugins/routing/default.replacement.ts b/packages/open-next/src/adapters/plugins/routing/default.replacement.ts index dea088b8e..f5b394a73 100644 --- a/packages/open-next/src/adapters/plugins/routing/default.replacement.ts +++ b/packages/open-next/src/adapters/plugins/routing/default.replacement.ts @@ -25,6 +25,7 @@ import { } from "./util"; import { convertRes } from "../../routing/util"; import { handleMiddleware } from "../../routing/middleware"; +import { ServerlessResponse } from "../../http"; const NEXT_DIR = path.join(__dirname, ".next"); const buildId = loadBuildId(NEXT_DIR); @@ -122,7 +123,9 @@ export async function postProcessResponse({ res, isExternalRewrite, }: PostProcessOptions): Promise { - const { statusCode, headers, isBase64Encoded, body } = convertRes(res); + const { statusCode, headers, isBase64Encoded, body } = convertRes( + res as ServerlessResponse, + ); debug("ServerResponse data", { statusCode, headers, isBase64Encoded, body }); diff --git a/packages/open-next/src/adapters/plugins/routing/default.ts b/packages/open-next/src/adapters/plugins/routing/default.ts index 76545aab2..5f2694dad 100644 --- a/packages/open-next/src/adapters/plugins/routing/default.ts +++ b/packages/open-next/src/adapters/plugins/routing/default.ts @@ -1,5 +1,6 @@ /* eslint-disable simple-import-sort/imports */ import type { + CreateResponse, PostProcessOptions, ProcessInternalEventResult, } from "../../types/plugin"; @@ -7,7 +8,6 @@ import type { InternalEvent, InternalResult } from "../../event-mapper"; //#override imports import { debug } from "../../logger"; import { IncomingMessage } from "../../http/request"; -import { ServerResponse } from "../../http/response"; import { addOpenNextHeader, fixCacheHeaderForHtmlPages, @@ -16,16 +16,15 @@ import { revalidateIfRequired, } from "./util"; import { convertRes } from "../../routing/util"; +import { ServerlessResponse } from "../../http"; +import { ServerResponse } from "http"; //#endOverride //#override processInternalEvent -export async function processInternalEvent( +export async function processInternalEvent( internalEvent: InternalEvent, - createResponse: ( - method: string, - headers: Record, - ) => ServerResponse, -): Promise { + createResponse: CreateResponse, +): Promise> { const reqProps = { method: internalEvent.method, url: internalEvent.url, @@ -51,7 +50,9 @@ export async function postProcessResponse({ res, isExternalRewrite, }: PostProcessOptions): Promise { - const { statusCode, headers, isBase64Encoded, body } = convertRes(res); + const { statusCode, headers, isBase64Encoded, body } = convertRes( + res as ServerlessResponse, + ); debug("ServerResponse data", { statusCode, headers, isBase64Encoded, body }); diff --git a/packages/open-next/src/adapters/plugins/routing/types.ts b/packages/open-next/src/adapters/plugins/routing/types.ts deleted file mode 100644 index e69de29bb..000000000 diff --git a/packages/open-next/src/adapters/plugins/routing/util.ts b/packages/open-next/src/adapters/plugins/routing/util.ts index 0fd2e83e3..e661416fd 100644 --- a/packages/open-next/src/adapters/plugins/routing/util.ts +++ b/packages/open-next/src/adapters/plugins/routing/util.ts @@ -3,7 +3,7 @@ import crypto from "crypto"; import path from "path"; import { IncomingMessage } from "../../http/request.js"; -import { ServerResponse } from "../../http/response.js"; +import { ServerlessResponse } from "../../http/response.js"; import { awsLogger, debug } from "../../logger.js"; import { loadBuildId, loadHtmlPages } from "../../util.js"; @@ -22,7 +22,10 @@ const sqsClient = new SQSClient({ logger: awsLogger, }); -export async function proxyRequest(req: IncomingMessage, res: ServerResponse) { +export async function proxyRequest( + req: IncomingMessage, + res: ServerlessResponse, +) { const HttpProxy = require("next/dist/compiled/http-proxy") as any; const proxy = new HttpProxy({ diff --git a/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts b/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts index d1c7f685d..d9b48f7bf 100644 --- a/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts +++ b/packages/open-next/src/adapters/plugins/serverHandler.replacement.ts @@ -1,7 +1,7 @@ /*eslint-disable simple-import-sort/imports */ import type { Options, PluginHandler } from "../next-types.js"; import type { IncomingMessage } from "../http/request.js"; -import type { ServerResponse } from "../http/response.js"; +import type { ServerlessResponse } from "../http/response.js"; //#override imports import { proxyRequest } from "./routing/util.js"; @@ -11,7 +11,7 @@ import { requestHandler, setNextjsPrebundledReact } from "./util.js"; //#override handler export const handler: PluginHandler = async ( req: IncomingMessage, - res: ServerResponse, + res: ServerlessResponse, options: Options, ) => { let { internalEvent } = options; diff --git a/packages/open-next/src/adapters/plugins/serverHandler.ts b/packages/open-next/src/adapters/plugins/serverHandler.ts index 429c85b83..28a67a39c 100644 --- a/packages/open-next/src/adapters/plugins/serverHandler.ts +++ b/packages/open-next/src/adapters/plugins/serverHandler.ts @@ -1,5 +1,5 @@ import type { IncomingMessage } from "../http/request.js"; -import { ServerResponse } from "../http/response.js"; +import { ServerlessResponse } from "../http/response.js"; import type { Options, PluginHandler } from "../next-types.js"; //#override imports import { requestHandler, setNextjsPrebundledReact } from "./util.js"; @@ -8,7 +8,7 @@ import { requestHandler, setNextjsPrebundledReact } from "./util.js"; //#override handler export const handler: PluginHandler = async ( req: IncomingMessage, - res: ServerResponse, + res: ServerlessResponse, options: Options, ) => { setNextjsPrebundledReact(options.internalEvent.rawPath); diff --git a/packages/open-next/src/adapters/plugins/streaming.replacement.ts b/packages/open-next/src/adapters/plugins/streaming.replacement.ts index 189a7fb90..1f940e1db 100644 --- a/packages/open-next/src/adapters/plugins/streaming.replacement.ts +++ b/packages/open-next/src/adapters/plugins/streaming.replacement.ts @@ -19,6 +19,7 @@ import { fixSWRCacheHeader, revalidateIfRequired, } from "./routing/util"; +import { CreateResponse } from "../types/plugin"; //#endOverride //#override lambdaHandler @@ -44,9 +45,9 @@ export const lambdaHandler = awslambda.streamifyResponse(async function ( internalEvent.headers.host = internalEvent.headers["x-forwarded-host"]; } - const createServerResponse = ( + const createServerResponse: CreateResponse = ( method: string, - headers: Record, + headers: Record, ) => new StreamingServerResponse({ method, @@ -74,19 +75,14 @@ export const lambdaHandler = awslambda.streamifyResponse(async function ( createServerResponse, ); if ("type" in preprocessResult) { - //TODO: replace this line - const headers = preprocessResult.headers as Record; + const headers = preprocessResult.headers; const res = createServerResponse("GET", headers); - // setImmediate(() => { - // console.log("preprocessResult.headers", headers); - // res.writeHead(preprocessResult.statusCode, headers); - // }); + setImmediate(() => { res.writeHead(preprocessResult.statusCode, headers); res.write(preprocessResult.body); res.end(); }); - // res.statusCode = preprocessResult.statusCode; } else { const { req, diff --git a/packages/open-next/src/adapters/routing/middleware.ts b/packages/open-next/src/adapters/routing/middleware.ts index 2c7b0e9c7..40a4f99e4 100644 --- a/packages/open-next/src/adapters/routing/middleware.ts +++ b/packages/open-next/src/adapters/routing/middleware.ts @@ -2,7 +2,7 @@ import path from "node:path"; import { InternalEvent, InternalResult } from "../event-mapper.js"; import { IncomingMessage } from "../http/request.js"; -import { ServerResponse } from "../http/response.js"; +import { ServerlessResponse } from "../http/response.js"; import { loadConfig } from "../util.js"; import { convertRes, @@ -43,7 +43,10 @@ export async function handleMiddleware( if (!hasMatch) return internalEvent; const req = new IncomingMessage(internalEvent); - const res = new ServerResponse({ method: req.method, headers: {} }); + const res = new ServerlessResponse({ + method: req.method ?? "GET", + headers: {}, + }); // NOTE: Next middleware was originally developed to support nested middlewares // but that was discarded for simplicity. The MiddlewareInfo type still has the original diff --git a/packages/open-next/src/adapters/routing/util.ts b/packages/open-next/src/adapters/routing/util.ts index 47fe610de..f78046ed5 100644 --- a/packages/open-next/src/adapters/routing/util.ts +++ b/packages/open-next/src/adapters/routing/util.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import path from "node:path"; import { isBinaryContentType } from "../binary"; -import { ServerResponse } from "../http/response"; +import { ServerlessResponse } from "../http/response"; import { MiddlewareManifest } from "../next-types"; export function isExternal(url?: string) { @@ -27,17 +27,17 @@ export function getUrlParts(url: string, isExternal: boolean) { }; } -export function convertRes(res: ServerResponse) { +export function convertRes(res: ServerlessResponse) { // Format Next.js response to Lambda response const statusCode = res.statusCode || 200; - const headers = ServerResponse.headers(res); + const headers = ServerlessResponse.headers(res); const isBase64Encoded = isBinaryContentType( Array.isArray(headers["content-type"]) ? headers["content-type"][0] : headers["content-type"], ); const encoding = isBase64Encoded ? "base64" : "utf8"; - const body = ServerResponse.body(res).toString(encoding); + const body = ServerlessResponse.body(res).toString(encoding); return { statusCode, headers, diff --git a/packages/open-next/src/adapters/types/plugin.ts b/packages/open-next/src/adapters/types/plugin.ts index 1fd08f9f1..47c7ed3a0 100644 --- a/packages/open-next/src/adapters/types/plugin.ts +++ b/packages/open-next/src/adapters/types/plugin.ts @@ -1,29 +1,36 @@ +import type { ServerResponse } from "http"; + import type { InternalEvent, InternalResult } from "../event-mapper"; import type { IncomingMessage } from "../http/request"; -import type { ServerResponse } from "../http/response"; -export type ProcessInternalEventResult = +export type ProcessInternalEventResult< + Response extends ServerResponse = ServerResponse, +> = | { internalEvent: InternalEvent; req: IncomingMessage; - res: ServerResponse; + res: Response; isExternalRewrite: boolean; } | InternalResult; -export type ProcessInternalEvent = ( +export type ProcessInternalEvent< + Response extends ServerResponse = ServerResponse, +> = ( internalEvent: InternalEvent, - createResponse: CreateResponse, -) => Promise; + createResponse: CreateResponse, +) => Promise>; -export interface PostProcessOptions { +export interface PostProcessOptions< + Response extends ServerResponse = ServerResponse, +> { internalEvent: InternalEvent; req: IncomingMessage; - res: ServerResponse; + res: Response; isExternalRewrite?: boolean; } -export type CreateResponse = ( +export type CreateResponse = ( method: string, headers: Record, -) => ServerResponse; +) => Response; From 3ecb233f288c2eddbca9c5acf0abbd843575ac04 Mon Sep 17 00:00:00 2001 From: Dorseuil Nicolas Date: Wed, 6 Sep 2023 18:22:46 +0200 Subject: [PATCH 08/27] fix headers and middleware response headers being overwritten --- packages/open-next/src/adapters/http/response.ts | 8 +++++++- .../open-next/src/adapters/http/responseStreaming.ts | 9 +++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/packages/open-next/src/adapters/http/response.ts b/packages/open-next/src/adapters/http/response.ts index 90ae47f97..9731f55d5 100644 --- a/packages/open-next/src/adapters/http/response.ts +++ b/packages/open-next/src/adapters/http/response.ts @@ -38,12 +38,14 @@ export class ServerlessResponse extends http.ServerResponse { [HEADERS]: Record; private _wroteHeader = false; private _header = ""; + private _initialHeaders: Record = {}; constructor({ method, headers }: ServerlessResponseProps) { super({ method, headers } as any); this[BODY] = []; this[HEADERS] = parseHeaders(headers) || {}; + this._initialHeaders = this[HEADERS]; this.useChunkedEncodingByDefault = false; this.chunkedEncoding = false; @@ -106,7 +108,11 @@ export class ServerlessResponse extends http.ServerResponse { const headers = typeof res.getHeaders === "function" ? res.getHeaders() : res[HEADERS]; - return Object.assign(headers, res[HEADERS]); + return { + ...parseHeaders(headers), + ...res[HEADERS], + ...res._initialHeaders, + }; } get headers() { diff --git a/packages/open-next/src/adapters/http/responseStreaming.ts b/packages/open-next/src/adapters/http/responseStreaming.ts index 711ae3d86..1ebd66764 100644 --- a/packages/open-next/src/adapters/http/responseStreaming.ts +++ b/packages/open-next/src/adapters/http/responseStreaming.ts @@ -21,6 +21,7 @@ export class StreamingServerResponse extends http.ServerResponse { onEnd: (headers: Record) => Promise; private _wroteHeader = false; private _hasWritten = false; + private _initialHeaders: Record = {}; constructor({ method, @@ -32,6 +33,7 @@ export class StreamingServerResponse extends http.ServerResponse { super({ method } as any); this[HEADERS] = parseHeaders(headers) || {}; + this._initialHeaders = { ...this[HEADERS] }; this.fixHeaders = fixHeaders; this.onEnd = onEnd; @@ -111,6 +113,13 @@ export class StreamingServerResponse extends http.ServerResponse { ...parsedHeaders, }; this.fixHeaders(this[HEADERS]); + this[HEADERS] = { + ...this[HEADERS], + ...this._initialHeaders, + }; + + debug("writeHead", this[HEADERS]); + this._wroteHeader = true; // FIXME: This is extracted from the docker lambda node 18 runtime // https://gist.github.com/conico974/13afd708af20711b97df439b910ceb53#file-index-mjs-L921-L932 From 51b35aae0af5843c6c6d6e70642bb87f5226c640 Mon Sep 17 00:00:00 2001 From: Khue Nguyen Date: Thu, 7 Sep 2023 15:29:08 -0700 Subject: [PATCH 09/27] add padding to guarantee flush --- examples/app-pages-router/app/isr/page.tsx | 2 +- examples/app-pages-router/app/layout.tsx | 5 +- examples/app-pages-router/app/ssr/layout.tsx | 10 ++ examples/app-pages-router/app/ssr/page.tsx | 21 ++++- examples/app-pages-router/next.config.js | 1 + .../pages/pages_isr/index.tsx | 2 +- .../pages/pages_ssr/index.tsx | 2 +- examples/app-router/app/api/sse/route.ts | 47 ++++++++++ examples/app-router/app/isr/page.tsx | 2 +- examples/app-router/app/page.tsx | 6 +- examples/app-router/app/sse/page.tsx | 36 +++++++ examples/app-router/app/ssr/layout.tsx | 10 ++ examples/app-router/app/ssr/page.tsx | 21 ++++- examples/app-router/middleware.ts | 2 + examples/app-router/next.config.js | 1 + examples/app-router/package.json | 2 +- examples/pages-router/next.config.js | 1 + examples/pages-router/src/pages/isr/index.tsx | 2 +- examples/pages-router/src/pages/ssr/index.tsx | 7 +- examples/sst/README.md | 14 ++- examples/sst/package.json | 2 +- examples/sst/stacks/AppPagesRouter.ts | 1 + examples/sst/stacks/AppRouter.ts | 3 +- examples/sst/stacks/NextjsSite.ts | 7 ++ .../src/adapters/http/responseStreaming.ts | 25 ++++- .../tests/appPagesRouter/isr.test.ts | 8 +- .../tests/appPagesRouter/pages_isr.test.ts | 8 +- .../tests/appPagesRouter/pages_ssr.test.ts | 10 +- .../tests/appPagesRouter/ssr.test.ts | 66 ++++++------- .../tests-e2e/tests/appRouter/isr.test.ts | 10 +- .../tests-e2e/tests/appRouter/sse.test.ts | 46 +++++++++ .../tests-e2e/tests/appRouter/ssr.test.ts | 93 +++++++++++-------- .../tests-e2e/tests/pagesRouter/isr.test.ts | 10 +- .../tests-e2e/tests/pagesRouter/ssr.test.ts | 13 +-- 34 files changed, 360 insertions(+), 136 deletions(-) create mode 100644 examples/app-pages-router/app/ssr/layout.tsx create mode 100644 examples/app-router/app/api/sse/route.ts create mode 100644 examples/app-router/app/sse/page.tsx create mode 100644 examples/app-router/app/ssr/layout.tsx create mode 100644 examples/sst/stacks/NextjsSite.ts create mode 100644 packages/tests-e2e/tests/appRouter/sse.test.ts diff --git a/examples/app-pages-router/app/isr/page.tsx b/examples/app-pages-router/app/isr/page.tsx index e88f677c8..3eadac36d 100644 --- a/examples/app-pages-router/app/isr/page.tsx +++ b/examples/app-pages-router/app/isr/page.tsx @@ -5,5 +5,5 @@ async function getTime() { export const revalidate = 10; export default async function ISR() { const time = getTime(); - return
ISR: {time}
; + return
Time: {time}
; } diff --git a/examples/app-pages-router/app/layout.tsx b/examples/app-pages-router/app/layout.tsx index 85e1c420b..e49706e15 100644 --- a/examples/app-pages-router/app/layout.tsx +++ b/examples/app-pages-router/app/layout.tsx @@ -17,7 +17,10 @@ export default function RootLayout({ }) { return ( - {children} + +
Header
+ {children} + ); } diff --git a/examples/app-pages-router/app/ssr/layout.tsx b/examples/app-pages-router/app/ssr/layout.tsx new file mode 100644 index 000000000..3a8338f50 --- /dev/null +++ b/examples/app-pages-router/app/ssr/layout.tsx @@ -0,0 +1,10 @@ +import { PropsWithChildren } from "react"; + +export default function Layout({ children }: PropsWithChildren) { + return ( +
+

SSR

+ {children} +
+ ); +} diff --git a/examples/app-pages-router/app/ssr/page.tsx b/examples/app-pages-router/app/ssr/page.tsx index 394f6e148..ed3c0b22e 100644 --- a/examples/app-pages-router/app/ssr/page.tsx +++ b/examples/app-pages-router/app/ssr/page.tsx @@ -1,12 +1,23 @@ -import { wait } from "@open-next/utils"; +import React from "react"; + +import { headers } from "next/headers"; + +async function getTime() { + const res = await new Promise((resolve) => { + setTimeout(() => { + resolve(new Date().toISOString()); + }, 1500); + }); + return res; +} -export const revalidate = 0; export default async function SSR() { - await wait(2000); - const time = new Date().toISOString(); + const time = await getTime(); + const headerList = headers(); return (
-

SSR {time}

+

Time: {time}

+
{headerList.get("host")}
); } diff --git a/examples/app-pages-router/next.config.js b/examples/app-pages-router/next.config.js index 6e0d91116..29f248fa7 100644 --- a/examples/app-pages-router/next.config.js +++ b/examples/app-pages-router/next.config.js @@ -1,6 +1,7 @@ /** @type {import('next').NextConfig} */ const nextConfig = { poweredByHeader: false, + cleanDistDir: true, transpilePackages: ["@example/shared"], output: "standalone", outputFileTracing: "../sst", diff --git a/examples/app-pages-router/pages/pages_isr/index.tsx b/examples/app-pages-router/pages/pages_isr/index.tsx index c5efe2352..08a953bba 100644 --- a/examples/app-pages-router/pages/pages_isr/index.tsx +++ b/examples/app-pages-router/pages/pages_isr/index.tsx @@ -12,5 +12,5 @@ export async function getStaticProps() { export default function Page({ time, }: InferGetStaticPropsType) { - return
ISR: {time}
; + return
Time: {time}
; } diff --git a/examples/app-pages-router/pages/pages_ssr/index.tsx b/examples/app-pages-router/pages/pages_ssr/index.tsx index 372da27a5..86627836a 100644 --- a/examples/app-pages-router/pages/pages_ssr/index.tsx +++ b/examples/app-pages-router/pages/pages_ssr/index.tsx @@ -11,5 +11,5 @@ export async function getServerSideProps() { export default function Page({ time, }: InferGetServerSidePropsType) { - return
SSR: {time}
; + return
Time: {time}
; } diff --git a/examples/app-router/app/api/sse/route.ts b/examples/app-router/app/api/sse/route.ts new file mode 100644 index 000000000..375af9592 --- /dev/null +++ b/examples/app-router/app/api/sse/route.ts @@ -0,0 +1,47 @@ +import { wait } from "@open-next/utils"; +import { NextRequest } from "next/server"; + +export const dynamic = "force-dynamic"; + +export async function GET(request: NextRequest) { + const resStream = new TransformStream(); + const writer = resStream.writable.getWriter(); + + const res = new Response(resStream.readable, { + headers: { + "Content-Type": "text/event-stream", + Connection: "keep-alive", + "Cache-Control": "no-cache, no-transform", + }, + }); + + setTimeout(async () => { + writer.write( + `data: ${JSON.stringify({ + message: "open", + time: new Date().toISOString(), + })}\n\n`, + ); + for (let i = 1; i <= 4; i++) { + await wait(2000); + writer.write( + `data: ${JSON.stringify({ + message: "hello:" + i, + time: new Date().toISOString(), + })}\n\n`, + ); + } + + await wait(2000); // Wait for 4 seconds + writer.write( + `data: ${JSON.stringify({ + message: "close", + time: new Date().toISOString(), + })}\n\n`, + ); + await wait(5000); + await writer.close(); + }, 100); + + return res; +} diff --git a/examples/app-router/app/isr/page.tsx b/examples/app-router/app/isr/page.tsx index e88f677c8..3eadac36d 100644 --- a/examples/app-router/app/isr/page.tsx +++ b/examples/app-router/app/isr/page.tsx @@ -5,5 +5,5 @@ async function getTime() { export const revalidate = 10; export default async function ISR() { const time = getTime(); - return
ISR: {time}
; + return
Time: {time}
; } diff --git a/examples/app-router/app/page.tsx b/examples/app-router/app/page.tsx index e2aa3b3e7..66da3ac80 100644 --- a/examples/app-router/app/page.tsx +++ b/examples/app-router/app/page.tsx @@ -26,7 +26,8 @@ export default function Home() { new timestamp