Skip to content

refactor: waitUntil passed around via ALS #733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .changeset/pink-papayas-smoke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"@opennextjs/aws": minor
---

refactor: `waitUntil` passed around via ALS and `OpenNextHandler` signature has changed

BREAKING CHANGE: `waitUntil` is passed around via ALS to fix #713.

`globalThis.openNextWaitUntil` is no more available, you can access `waitUntil`
on the ALS context: `globalThis.__openNextAls.getStore()`

The `OpenNextHandler` signature has changed: the second parameter was a `StreamCreator`.
It was changed to be of type `OpenNextHandlerOptions` which has both a `streamCreator` key
and a `waitUntil` key.

If you use a custom wrapper, you need to update the call to the handler as follow:

```ts
// before
globalThis.openNextWaitUntil = myWaitUntil;
handler(internalEvent, myStreamCreator);

// after
handler(internalEvent, { streamCreator: myStreamCreator, waitUntil: myWaitUntil });
```
4 changes: 3 additions & 1 deletion packages/open-next/src/adapters/edge-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { InternalEvent, InternalResult } from "types/open-next";
import { runWithOpenNextRequestContext } from "utils/promise";
import { emptyReadableStream } from "utils/stream";

import type { OpenNextHandlerOptions } from "types/overrides";
// We import it like that so that the edge plugin can replace it
import { NextConfig } from "../adapters/config";
import { createGenericHandler } from "../core/createGenericHandler";
Expand All @@ -16,12 +17,13 @@ globalThis.__openNextAls = new AsyncLocalStorage();

const defaultHandler = async (
internalEvent: InternalEvent,
options?: OpenNextHandlerOptions,
): Promise<InternalResult> => {
globalThis.isEdgeRuntime = true;

// We run everything in the async local storage context so that it is available in edge runtime functions
return runWithOpenNextRequestContext(
{ isISRRevalidation: false },
{ isISRRevalidation: false, waitUntil: options?.waitUntil },
async () => {
const host = internalEvent.headers.host
? `https://${internalEvent.headers.host}`
Expand Down
7 changes: 4 additions & 3 deletions packages/open-next/src/adapters/image-optimization-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import type {
} from "types/open-next.js";
import { emptyReadableStream, toReadableStream } from "utils/stream.js";

import type { OpenNextHandlerOptions } from "types/overrides.js";
import { createGenericHandler } from "../core/createGenericHandler.js";
import { resolveImageLoader } from "../core/resolve.js";
import { debug, error } from "./logger.js";
Expand Down Expand Up @@ -58,7 +59,7 @@ export const handler = await createGenericHandler({

export async function defaultHandler(
event: InternalEvent,
streamCreator?: StreamCreator,
options?: OpenNextHandlerOptions,
): Promise<InternalResult> {
// Images are handled via header and query param information.
debug("handler event", event);
Expand Down Expand Up @@ -99,9 +100,9 @@ export async function defaultHandler(
downloadHandler,
);

return buildSuccessResponse(result, streamCreator, etag);
return buildSuccessResponse(result, options?.streamCreator, etag);
} catch (e: any) {
return buildFailureResponse(e, streamCreator);
return buildFailureResponse(e, options?.streamCreator);
}
}

Expand Down
7 changes: 6 additions & 1 deletion packages/open-next/src/adapters/middleware.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type {
} from "types/open-next";
import { runWithOpenNextRequestContext } from "utils/promise";

import type { OpenNextHandlerOptions } from "types/overrides";
import { debug, error } from "../adapters/logger";
import { createGenericHandler } from "../core/createGenericHandler";
import {
Expand All @@ -24,6 +25,7 @@ globalThis.__openNextAls = new AsyncLocalStorage();

const defaultHandler = async (
internalEvent: InternalEvent,
options?: OpenNextHandlerOptions,
): Promise<InternalResult | MiddlewareResult> => {
const originResolver = await resolveOriginResolver(
globalThis.openNextConfig.middleware?.originResolver,
Expand All @@ -49,7 +51,10 @@ const defaultHandler = async (

// We run everything in the async local storage context so that it is available in the external middleware
return runWithOpenNextRequestContext(
{ isISRRevalidation: internalEvent.headers["x-isr"] === "1" },
{
isISRRevalidation: internalEvent.headers["x-isr"] === "1",
waitUntil: options?.waitUntil,
},
async () => {
const result = await routingHandler(internalEvent);
if ("internalEvent" in result) {
Expand Down
15 changes: 9 additions & 6 deletions packages/open-next/src/core/requestHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import type {
InternalResult,
ResolvedRoute,
RoutingResult,
StreamCreator,
} from "types/open-next";
import { runWithOpenNextRequestContext } from "utils/promise";

import type { OpenNextHandlerOptions } from "types/overrides";
import { debug, error, warn } from "../adapters/logger";
import { patchAsyncStorage } from "./patchAsyncStorage";
import { convertRes, createServerResponse } from "./routing/util";
Expand All @@ -29,12 +29,15 @@ patchAsyncStorage();

export async function openNextHandler(
internalEvent: InternalEvent,
responseStreaming?: StreamCreator,
options?: OpenNextHandlerOptions,
): Promise<InternalResult> {
const initialHeaders = internalEvent.headers;
// We run everything in the async local storage context so that it is available in the middleware as well as in NextServer
return runWithOpenNextRequestContext(
{ isISRRevalidation: initialHeaders["x-isr"] === "1" },
{
isISRRevalidation: initialHeaders["x-isr"] === "1",
waitUntil: options?.waitUntil,
},
async () => {
if (initialHeaders["x-forwarded-host"]) {
initialHeaders.host = initialHeaders["x-forwarded-host"];
Expand Down Expand Up @@ -116,7 +119,7 @@ export async function openNextHandler(

if ("type" in routingResult) {
// response is used only in the streaming case
if (responseStreaming) {
if (options?.streamCreator) {
const response = createServerResponse(
{
internalEvent,
Expand All @@ -127,7 +130,7 @@ export async function openNextHandler(
initialPath: internalEvent.rawPath,
},
headers,
responseStreaming,
options.streamCreator,
);
response.statusCode = routingResult.statusCode;
response.flushHeaders();
Expand Down Expand Up @@ -171,7 +174,7 @@ export async function openNextHandler(
const res = createServerResponse(
routingResult,
overwrittenResponseHeaders,
responseStreaming,
options?.streamCreator,
);

await processRequest(req, res, preprocessedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ const handler: WrapperHandler = async (handler, converter) =>
},
};

const response = await handler(internalEvent, streamCreator);
const response = await handler(internalEvent, { streamCreator });

const isUsingEdge = globalThis.isEdgeRuntime ?? false;
if (isUsingEdge) {
Expand Down
4 changes: 3 additions & 1 deletion packages/open-next/src/overrides/wrappers/aws-lambda.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ const handler: WrapperHandler =
},
};

const response = await handler(internalEvent, fakeStream);
const response = await handler(internalEvent, {
streamCreator: fakeStream,
});

return converter.convertTo(response, event);
};
Expand Down
5 changes: 3 additions & 2 deletions packages/open-next/src/overrides/wrappers/cloudflare-edge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ const handler: WrapperHandler<
ctx: WorkerContext,
): Promise<Response> => {
globalThis.process = process;
globalThis.openNextWaitUntil = ctx.waitUntil.bind(ctx);

// Set the environment variables
// Cloudflare suggests to not override the process.env object but instead apply the values to it
Expand Down Expand Up @@ -63,7 +62,9 @@ const handler: WrapperHandler<
}
}

const response = await handler(internalEvent);
const response = await handler(internalEvent, {
waitUntil: ctx.waitUntil.bind(ctx),
});

const result: Response = await converter.convertTo(response);

Expand Down
8 changes: 6 additions & 2 deletions packages/open-next/src/overrides/wrappers/cloudflare-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const handler: WrapperHandler<InternalEvent, InternalResult> =
ctx: any,
): Promise<Response> => {
globalThis.process = process;
globalThis.openNextWaitUntil = ctx.waitUntil.bind(ctx);

// Set the environment variables
// Cloudflare suggests to not override the process.env object but instead apply the values to it
Expand Down Expand Up @@ -75,7 +74,12 @@ const handler: WrapperHandler<InternalEvent, InternalResult> =
},
};

ctx.waitUntil(handler(internalEvent, streamCreator));
ctx.waitUntil(
handler(internalEvent, {
streamCreator,
waitUntil: ctx.waitUntil.bind(ctx),
}),
);

return promiseResponse;
};
Expand Down
12 changes: 8 additions & 4 deletions packages/open-next/src/overrides/wrappers/dummy.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import type { InternalEvent, StreamCreator } from "types/open-next";
import type { Wrapper, WrapperHandler } from "types/overrides";
import type { InternalEvent } from "types/open-next";
import type {
OpenNextHandlerOptions,
Wrapper,
WrapperHandler,
} from "types/overrides";

const dummyWrapper: WrapperHandler = async (handler, converter) => {
return async (event: InternalEvent, responseStream?: StreamCreator) => {
return await handler(event, responseStream);
return async (event: InternalEvent, options?: OpenNextHandlerOptions) => {
return await handler(event, options);
};
};

Expand Down
8 changes: 4 additions & 4 deletions packages/open-next/src/overrides/wrappers/express-dev.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,25 @@ const wrapper: WrapperHandler = async (handler, converter) => {

app.all("/_next/image", async (req, res) => {
const internalEvent = await converter.convertFrom(req);
const _res: StreamCreator = {
const streamCreator: StreamCreator = {
writeHeaders: (prelude) => {
res.writeHead(prelude.statusCode, prelude.headers);
return res;
},
};
await imageHandler(internalEvent, _res);
await imageHandler(internalEvent, { streamCreator });
});

app.all("*paths", async (req, res) => {
const internalEvent = await converter.convertFrom(req);
const _res: StreamCreator = {
const streamCreator: StreamCreator = {
writeHeaders: (prelude) => {
res.writeHead(prelude.statusCode, prelude.headers);
return res;
},
onFinish: () => {},
};
await handler(internalEvent, _res);
await handler(internalEvent, { streamCreator });
});

const server = app.listen(
Expand Down
4 changes: 2 additions & 2 deletions packages/open-next/src/overrides/wrappers/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { debug, error } from "../../adapters/logger";
const wrapper: WrapperHandler = async (handler, converter) => {
const server = createServer(async (req, res) => {
const internalEvent = await converter.convertFrom(req);
const _res: StreamCreator = {
const streamCreator: StreamCreator = {
writeHeaders: (prelude) => {
res.setHeader("Set-Cookie", prelude.cookies);
res.writeHead(prelude.statusCode, prelude.headers);
Expand All @@ -23,7 +23,7 @@ const wrapper: WrapperHandler = async (handler, converter) => {
});
res.end("OK");
} else {
await handler(internalEvent, _res);
await handler(internalEvent, { streamCreator });
}
});

Expand Down
10 changes: 2 additions & 8 deletions packages/open-next/src/types/global.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type {
} from "types/overrides";

import type { DetachedPromiseRunner } from "../utils/promise";
import type { OpenNextConfig } from "./open-next";
import type { OpenNextConfig, WaitUntil } from "./open-next";

export interface RequestData {
geo?: {
Expand Down Expand Up @@ -59,6 +59,7 @@ interface OpenNextRequestContext {
pendingPromiseRunner: DetachedPromiseRunner;
isISRRevalidation?: boolean;
mergeHeadersPriority?: "middleware" | "handler";
waitUntil?: WaitUntil;
}

declare global {
Expand Down Expand Up @@ -152,13 +153,6 @@ declare global {
*/
var __openNextAls: AsyncLocalStorage<OpenNextRequestContext>;

/**
* The function that is used to run background tasks even after the response has been sent.
* This one is defined by the wrapper function as most of them don't need or support this feature.
* If not present, all the awaiting promises will be resolved before sending the response.
*/
var openNextWaitUntil: ((promise: Promise<void>) => void) | undefined;

/**
* The entries object that contains the functions that are available in the function.
* Only available in edge runtime functions.
Expand Down
1 change: 1 addition & 0 deletions packages/open-next/src/types/open-next.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export interface StreamCreator {
onFinish?: (length: number) => void;
}

export type WaitUntil = (promise: Promise<void>) => void;
export interface DangerousOptions {
/**
* The tag cache is used for revalidateTags and revalidatePath.
Expand Down
10 changes: 9 additions & 1 deletion packages/open-next/src/types/overrides.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
Origin,
ResolvedRoute,
StreamCreator,
WaitUntil,
} from "./open-next";

// Queue
Expand Down Expand Up @@ -122,10 +123,17 @@ export type Wrapper<
edgeRuntime?: boolean;
};

export type OpenNextHandlerOptions = {
// Create a `Writeable` for streaming responses.
streamCreator?: StreamCreator;
// Extends the liftetime of the runtime after the response is returned.
waitUntil?: WaitUntil;
};

export type OpenNextHandler<
E extends BaseEventOrResult = InternalEvent,
R extends BaseEventOrResult = InternalResult,
> = (event: E, responseStream?: StreamCreator) => Promise<R>;
> = (event: E, options?: OpenNextHandlerOptions) => Promise<R>;

export type Converter<
E extends BaseEventOrResult = InternalEvent,
Expand Down
Loading