Skip to content

feat: Add RunPipeline tool #253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Session } from "./session.js";
import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js";
import { AtlasTools } from "./tools/atlas/tools.js";
import { MongoDbTools } from "./tools/mongodb/tools.js";
import { PlaygroundTools } from "./tools/playground/tools.js";
import logger, { initializeLogger, LogId } from "./logger.js";
import { ObjectId } from "mongodb";
import { Telemetry } from "./telemetry/telemetry.js";
Expand Down Expand Up @@ -134,7 +135,7 @@ export class Server {
}

private registerTools() {
for (const tool of [...AtlasTools, ...MongoDbTools]) {
for (const tool of [...AtlasTools, ...MongoDbTools, ...PlaygroundTools]) {
new tool(this.session, this.userConfig, this.telemetry).register(this.mcpServer);
}
}
Expand Down
167 changes: 167 additions & 0 deletions src/tools/playground/runPipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
import { OperationType, TelemetryToolMetadata, ToolArgs, ToolBase, ToolCategory } from "../tool.js";
import { z } from "zod";
import { CallToolResult } from "@modelcontextprotocol/sdk/types.js";
import { EJSON } from "bson";

const PLAYGROUND_SEARCH_URL = "https://search-playground.mongodb.com/api/tools/code-playground/search";

const DEFAULT_DOCUMENTS = [
{
name: "First document",
},
{
name: "Second document",
},
];

const DEFAULT_SEARCH_INDEX_DEFINITION = {
mappings: {
dynamic: true,
},
};

const DEFAULT_PIPELINE = [
{
$search: {
index: "default",
text: {
query: "first",
path: {
wildcard: "*",
},
},
},
},
];

const DEFAULT_SYNONYMS: Array<Record<string, unknown>> = [];

export const RunPipelineOperationArgs = {
documents: z
.array(z.record(z.string(), z.unknown()))
.max(500)
.describe("Documents to run the pipeline against. 500 is maximum.")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Worth adding .max(500) to codify this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice idea, added

.default(DEFAULT_DOCUMENTS),
aggregationPipeline: z
.array(z.record(z.string(), z.unknown()))
.describe("MongoDB aggregation pipeline to run on the provided documents.")
.default(DEFAULT_PIPELINE),
searchIndexDefinition: z
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are more specific types for aggregationPipeline/searchIndexDefinition/synonyms useful for the LLM or is it already pretty good at determining the types from the description?

For ex, the search playground looks limited to a subset of aggregation pipeline stages. Would those be helpful to include in the type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel it would be hard to add more specific zod types here. All these entities have a complex dynamic structure unfortunately.

I updated Aggregation pipeline... to MongoDB aggregation pipeline (same for other fields) to stress MongoDB part that hopefully nudges LLM to the right direction.

Regarding supported stages, I’d avoid listing them here. If we hardcode them, the list will likely get out of sync over time between the Playground and MCP. I’d rather rely on the Playground’s response to flag any unsupported stages. It actually supports more than what’s in the public docs (product wants to position it as a Search only playground for now).

.record(z.string(), z.unknown())
.describe("MongoDB search index definition to create before running the pipeline.")
.optional()
.default(DEFAULT_SEARCH_INDEX_DEFINITION),
synonyms: z
.array(z.record(z.any()))
.describe("MongoDB synonyms mapping to create before running the pipeline.")
.optional()
.default(DEFAULT_SYNONYMS),
};

interface RunRequest {
documents: string;
aggregationPipeline: string;
indexDefinition: string;
synonyms: string;
}

interface RunResponse {
documents: Array<Record<string, unknown>>;
}

interface RunErrorResponse {
code: string;
message: string;
}

export class RunPipeline extends ToolBase {
protected name = "run-pipeline";
protected description =
"Run MongoDB aggregation pipeline for provided documents without needing an Atlas account, cluster, or collection. The tool can be useful for running ad-hoc pipelines for testing or debugging.";
protected category: ToolCategory = "playground";
protected operationType: OperationType = "metadata";
protected argsShape = RunPipelineOperationArgs;

protected async execute(toolArgs: ToolArgs<typeof this.argsShape>): Promise<CallToolResult> {
const runRequest = this.convertToRunRequest(toolArgs);
const runResponse = await this.runPipeline(runRequest);
const toolResult = this.convertToToolResult(runResponse);
return toolResult;
}

protected resolveTelemetryMetadata(): TelemetryToolMetadata {
return {};
}

private async runPipeline(runRequest: RunRequest): Promise<RunResponse> {
const options: RequestInit = {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify(runRequest),
};

let response: Response;
try {
response = await fetch(PLAYGROUND_SEARCH_URL, options);
} catch {
throw new Error("Cannot run pipeline: network error.");
}

if (!response.ok) {
const errorMessage = await this.getPlaygroundResponseError(response);
throw new Error(`Pipeline run failed: ${errorMessage}`);
}

try {
return (await response.json()) as RunResponse;
} catch {
throw new Error("Pipeline run failed: response is not valid JSON.");
}
}

private async getPlaygroundResponseError(response: Response): Promise<string> {
let errorMessage = `HTTP ${response.status} ${response.statusText}.`;
try {
const errorResponse = (await response.json()) as RunErrorResponse;
errorMessage += ` Error code: ${errorResponse.code}. Error message: ${errorResponse.message}`;
} catch {
// Ignore JSON parse errors
}

return errorMessage;
}

private convertToRunRequest(toolArgs: ToolArgs<typeof this.argsShape>): RunRequest {
try {
return {
documents: JSON.stringify(toolArgs.documents),
aggregationPipeline: JSON.stringify(toolArgs.aggregationPipeline),
indexDefinition: JSON.stringify(toolArgs.searchIndexDefinition || DEFAULT_SEARCH_INDEX_DEFINITION),
synonyms: JSON.stringify(toolArgs.synonyms || DEFAULT_SYNONYMS),
};
} catch {
throw new Error("Invalid arguments type.");
}
}

private convertToToolResult(runResponse: RunResponse): CallToolResult {
const content: Array<{ text: string; type: "text" }> = [
{
text: `Found ${runResponse.documents.length} documents":`,
type: "text",
},
...runResponse.documents.map((doc) => {
return {
text: EJSON.stringify(doc),
type: "text",
} as { text: string; type: "text" };
}),
];

return {
content,
};
}
}
3 changes: 3 additions & 0 deletions src/tools/playground/tools.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { RunPipeline } from "./runPipeline.js";

export const PlaygroundTools = [RunPipeline];
2 changes: 1 addition & 1 deletion src/tools/tool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { UserConfig } from "../config.js";
export type ToolArgs<Args extends ZodRawShape> = z.objectOutputType<Args, ZodNever>;

export type OperationType = "metadata" | "read" | "create" | "delete" | "update";
export type ToolCategory = "mongodb" | "atlas";
export type ToolCategory = "mongodb" | "atlas" | "playground";
export type TelemetryToolMetadata = {
projectId?: string;
orgId?: string;
Expand Down
44 changes: 44 additions & 0 deletions tests/integration/tools/playground/runPipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { describeWithMongoDB } from "../mongodb/mongodbHelpers.js";
import { getResponseElements } from "../../helpers.js";

describeWithMongoDB("runPipeline tool", (integration) => {
it("should return results", async () => {
await integration.connectMcpClient();
const response = await integration.mcpClient().callTool({
name: "run-pipeline",
arguments: {
documents: [{ name: "First document" }, { name: "Second document" }],
aggregationPipeline: [
{
$search: {
index: "default",
text: {
query: "first",
path: {
wildcard: "*",
},
},
},
},
{
$project: {
_id: 0,
name: 1,
},
},
],
},
});
const elements = getResponseElements(response.content);
expect(elements).toEqual([
{
text: 'Found 1 documents":',
type: "text",
},
{
text: '{"name":"First document"}',
type: "text",
},
]);
});
});
Loading