From 256587a44c107b8eed652c716c487e09f991532b Mon Sep 17 00:00:00 2001 From: Vitalii Mishchenko Date: Tue, 13 May 2025 12:59:30 -0700 Subject: [PATCH 1/2] Add RunPipeline tool --- src/server.ts | 3 +- src/tools/playground/runPipeline.ts | 166 ++++++++++++++++++ src/tools/playground/tools.ts | 3 + src/tools/tool.ts | 2 +- .../tools/playground/runPipeline.test.ts | 44 +++++ 5 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 src/tools/playground/runPipeline.ts create mode 100644 src/tools/playground/tools.ts create mode 100644 tests/integration/tools/playground/runPipeline.test.ts diff --git a/src/server.ts b/src/server.ts index b0e8e19c..2a2cedf3 100644 --- a/src/server.ts +++ b/src/server.ts @@ -3,6 +3,7 @@ import { Session } from "./session.js"; import { Transport } from "@modelcontextprotocol/sdk/shared/transport.js"; import { AtlasTools } from "./tools/atlas/tools.js"; import { MongoDbTools } from "./tools/mongodb/tools.js"; +import { PlaygroundTools } from "./tools/playground/tools.js"; import logger, { initializeLogger, LogId } from "./logger.js"; import { ObjectId } from "mongodb"; import { Telemetry } from "./telemetry/telemetry.js"; @@ -134,7 +135,7 @@ export class Server { } private registerTools() { - for (const tool of [...AtlasTools, ...MongoDbTools]) { + for (const tool of [...AtlasTools, ...MongoDbTools, ...PlaygroundTools]) { new tool(this.session, this.userConfig, this.telemetry).register(this.mcpServer); } } diff --git a/src/tools/playground/runPipeline.ts b/src/tools/playground/runPipeline.ts new file mode 100644 index 00000000..e7996663 --- /dev/null +++ b/src/tools/playground/runPipeline.ts @@ -0,0 +1,166 @@ +import { OperationType, TelemetryToolMetadata, ToolArgs, ToolBase, ToolCategory } from "../tool.js"; +import { z } from "zod"; +import { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; +import { EJSON } from "bson"; + +const PLAYGROUND_SEARCH_URL = "https://search-playground.mongodb.com/api/tools/code-playground/search"; + +const DEFAULT_DOCUMENTS = [ + { + name: "First document", + }, + { + name: "Second document", + }, +]; + +const DEFAULT_SEARCH_INDEX_DEFINITION = { + mappings: { + dynamic: true, + }, +}; + +const DEFAULT_PIPELINE = [ + { + $search: { + index: "default", + text: { + query: "first", + path: { + wildcard: "*", + }, + }, + }, + }, +]; + +const DEFAULT_SYNONYMS: Array> = []; + +export const RunPipelineOperationArgs = { + documents: z + .array(z.record(z.string(), z.unknown())) + .describe("Documents to run the pipeline against. 500 is maximum.") + .default(DEFAULT_DOCUMENTS), + aggregationPipeline: z + .array(z.record(z.string(), z.unknown())) + .describe("Aggregation pipeline to run on the provided documents.") + .default(DEFAULT_PIPELINE), + searchIndexDefinition: z + .record(z.string(), z.unknown()) + .describe("Search index to create before running the pipeline.") + .optional() + .default(DEFAULT_SEARCH_INDEX_DEFINITION), + synonyms: z + .array(z.record(z.any())) + .describe("Synonyms mapping to create before running the pipeline.") + .optional() + .default(DEFAULT_SYNONYMS), +}; + +interface RunRequest { + documents: string; + aggregationPipeline: string; + indexDefinition: string; + synonyms: string; +} + +interface RunResponse { + documents: Array>; +} + +interface RunErrorResponse { + code: string; + message: string; +} + +export class RunPipeline extends ToolBase { + protected name = "run-pipeline"; + protected description = + "Run aggregation pipeline for provided documents without needing an Atlas account, cluster, or collection."; + protected category: ToolCategory = "playground"; + protected operationType: OperationType = "metadata"; + protected argsShape = RunPipelineOperationArgs; + + protected async execute(toolArgs: ToolArgs): Promise { + const runRequest = this.convertToRunRequest(toolArgs); + const runResponse = await this.runPipeline(runRequest); + const toolResult = this.convertToToolResult(runResponse); + return toolResult; + } + + protected resolveTelemetryMetadata(): TelemetryToolMetadata { + return {}; + } + + private async runPipeline(runRequest: RunRequest): Promise { + const options: RequestInit = { + method: "POST", + headers: { + "Content-Type": "application/json", + }, + body: JSON.stringify(runRequest), + }; + + let response: Response; + try { + response = await fetch(PLAYGROUND_SEARCH_URL, options); + } catch { + throw new Error("Cannot run pipeline: network error."); + } + + if (!response.ok) { + const errorMessage = await this.getPlaygroundResponseError(response); + throw new Error(`Pipeline run failed: ${errorMessage}`); + } + + try { + return (await response.json()) as RunResponse; + } catch { + throw new Error("Pipeline run failed: response is not valid JSON."); + } + } + + private async getPlaygroundResponseError(response: Response): Promise { + let errorMessage = `HTTP ${response.status} ${response.statusText}.`; + try { + const errorResponse = (await response.json()) as RunErrorResponse; + errorMessage += ` Error code: ${errorResponse.code}. Error message: ${errorResponse.message}`; + } catch { + // Ignore JSON parse errors + } + + return errorMessage; + } + + private convertToRunRequest(toolArgs: ToolArgs): RunRequest { + try { + return { + documents: JSON.stringify(toolArgs.documents), + aggregationPipeline: JSON.stringify(toolArgs.aggregationPipeline), + indexDefinition: JSON.stringify(toolArgs.searchIndexDefinition || DEFAULT_SEARCH_INDEX_DEFINITION), + synonyms: JSON.stringify(toolArgs.synonyms || DEFAULT_SYNONYMS), + }; + } catch { + throw new Error("Invalid arguments type."); + } + } + + private convertToToolResult(runResponse: RunResponse): CallToolResult { + const content: Array<{ text: string; type: "text" }> = [ + { + text: `Found ${runResponse.documents.length} documents":`, + type: "text", + }, + ...runResponse.documents.map((doc) => { + return { + text: EJSON.stringify(doc), + type: "text", + } as { text: string; type: "text" }; + }), + ]; + + return { + content, + }; + } +} diff --git a/src/tools/playground/tools.ts b/src/tools/playground/tools.ts new file mode 100644 index 00000000..25e1290a --- /dev/null +++ b/src/tools/playground/tools.ts @@ -0,0 +1,3 @@ +import { RunPipeline } from "./runPipeline.js"; + +export const PlaygroundTools = [RunPipeline]; diff --git a/src/tools/tool.ts b/src/tools/tool.ts index 5e4fc1a3..d06973b8 100644 --- a/src/tools/tool.ts +++ b/src/tools/tool.ts @@ -10,7 +10,7 @@ import { UserConfig } from "../config.js"; export type ToolArgs = z.objectOutputType; export type OperationType = "metadata" | "read" | "create" | "delete" | "update"; -export type ToolCategory = "mongodb" | "atlas"; +export type ToolCategory = "mongodb" | "atlas" | "playground"; export type TelemetryToolMetadata = { projectId?: string; orgId?: string; diff --git a/tests/integration/tools/playground/runPipeline.test.ts b/tests/integration/tools/playground/runPipeline.test.ts new file mode 100644 index 00000000..3036d42d --- /dev/null +++ b/tests/integration/tools/playground/runPipeline.test.ts @@ -0,0 +1,44 @@ +import { describeWithMongoDB } from "../mongodb/mongodbHelpers.js"; +import { getResponseElements } from "../../helpers.js"; + +describeWithMongoDB("runPipeline tool", (integration) => { + it("should return results", async () => { + await integration.connectMcpClient(); + const response = await integration.mcpClient().callTool({ + name: "run-pipeline", + arguments: { + documents: [{ name: "First document" }, { name: "Second document" }], + aggregationPipeline: [ + { + $search: { + index: "default", + text: { + query: "first", + path: { + wildcard: "*", + }, + }, + }, + }, + { + $project: { + _id: 0, + name: 1, + }, + }, + ], + }, + }); + const elements = getResponseElements(response.content); + expect(elements).toEqual([ + { + text: 'Found 1 documents":', + type: "text", + }, + { + text: '{"name":"First document"}', + type: "text", + }, + ]); + }); +}); From 3df5eca0365850e0368644406221fbe8d3fa148f Mon Sep 17 00:00:00 2001 From: Vitalii Mishchenko Date: Thu, 15 May 2025 08:31:08 -0700 Subject: [PATCH 2/2] Address comments --- src/tools/playground/runPipeline.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/tools/playground/runPipeline.ts b/src/tools/playground/runPipeline.ts index e7996663..d00d9be8 100644 --- a/src/tools/playground/runPipeline.ts +++ b/src/tools/playground/runPipeline.ts @@ -39,20 +39,21 @@ const DEFAULT_SYNONYMS: Array> = []; export const RunPipelineOperationArgs = { documents: z .array(z.record(z.string(), z.unknown())) + .max(500) .describe("Documents to run the pipeline against. 500 is maximum.") .default(DEFAULT_DOCUMENTS), aggregationPipeline: z .array(z.record(z.string(), z.unknown())) - .describe("Aggregation pipeline to run on the provided documents.") + .describe("MongoDB aggregation pipeline to run on the provided documents.") .default(DEFAULT_PIPELINE), searchIndexDefinition: z .record(z.string(), z.unknown()) - .describe("Search index to create before running the pipeline.") + .describe("MongoDB search index definition to create before running the pipeline.") .optional() .default(DEFAULT_SEARCH_INDEX_DEFINITION), synonyms: z .array(z.record(z.any())) - .describe("Synonyms mapping to create before running the pipeline.") + .describe("MongoDB synonyms mapping to create before running the pipeline.") .optional() .default(DEFAULT_SYNONYMS), }; @@ -76,7 +77,7 @@ interface RunErrorResponse { export class RunPipeline extends ToolBase { protected name = "run-pipeline"; protected description = - "Run aggregation pipeline for provided documents without needing an Atlas account, cluster, or collection."; + "Run MongoDB aggregation pipeline for provided documents without needing an Atlas account, cluster, or collection. The tool can be useful for running ad-hoc pipelines for testing or debugging."; protected category: ToolCategory = "playground"; protected operationType: OperationType = "metadata"; protected argsShape = RunPipelineOperationArgs;