-
Notifications
You must be signed in to change notification settings - Fork 24
feat: Add RunPipeline tool #253
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
import { OperationType, TelemetryToolMetadata, ToolArgs, ToolBase, ToolCategory } from "../tool.js"; | ||
import { z } from "zod"; | ||
import { CallToolResult } from "@modelcontextprotocol/sdk/types.js"; | ||
import { EJSON } from "bson"; | ||
|
||
const PLAYGROUND_SEARCH_URL = "https://search-playground.mongodb.com/api/tools/code-playground/search"; | ||
|
||
const DEFAULT_DOCUMENTS = [ | ||
{ | ||
name: "First document", | ||
}, | ||
{ | ||
name: "Second document", | ||
}, | ||
]; | ||
|
||
const DEFAULT_SEARCH_INDEX_DEFINITION = { | ||
mappings: { | ||
dynamic: true, | ||
}, | ||
}; | ||
|
||
const DEFAULT_PIPELINE = [ | ||
{ | ||
$search: { | ||
index: "default", | ||
text: { | ||
query: "first", | ||
path: { | ||
wildcard: "*", | ||
}, | ||
}, | ||
}, | ||
}, | ||
]; | ||
|
||
const DEFAULT_SYNONYMS: Array<Record<string, unknown>> = []; | ||
|
||
export const RunPipelineOperationArgs = { | ||
documents: z | ||
.array(z.record(z.string(), z.unknown())) | ||
.max(500) | ||
.describe("Documents to run the pipeline against. 500 is maximum.") | ||
.default(DEFAULT_DOCUMENTS), | ||
aggregationPipeline: z | ||
.array(z.record(z.string(), z.unknown())) | ||
.describe("MongoDB aggregation pipeline to run on the provided documents.") | ||
.default(DEFAULT_PIPELINE), | ||
searchIndexDefinition: z | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are more specific types for For ex, the search playground looks limited to a subset of aggregation pipeline stages. Would those be helpful to include in the type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel it would be hard to add more specific I updated Regarding supported stages, I’d avoid listing them here. If we hardcode them, the list will likely get out of sync over time between the Playground and MCP. I’d rather rely on the Playground’s response to flag any unsupported stages. It actually supports more than what’s in the public docs (product wants to position it as a Search only playground for now). |
||
.record(z.string(), z.unknown()) | ||
.describe("MongoDB search index definition to create before running the pipeline.") | ||
.optional() | ||
.default(DEFAULT_SEARCH_INDEX_DEFINITION), | ||
synonyms: z | ||
.array(z.record(z.any())) | ||
.describe("MongoDB synonyms mapping to create before running the pipeline.") | ||
.optional() | ||
.default(DEFAULT_SYNONYMS), | ||
}; | ||
|
||
interface RunRequest { | ||
documents: string; | ||
aggregationPipeline: string; | ||
indexDefinition: string; | ||
synonyms: string; | ||
} | ||
|
||
interface RunResponse { | ||
documents: Array<Record<string, unknown>>; | ||
} | ||
|
||
interface RunErrorResponse { | ||
code: string; | ||
message: string; | ||
} | ||
|
||
export class RunPipeline extends ToolBase { | ||
protected name = "run-pipeline"; | ||
protected description = | ||
"Run MongoDB aggregation pipeline for provided documents without needing an Atlas account, cluster, or collection. The tool can be useful for running ad-hoc pipelines for testing or debugging."; | ||
protected category: ToolCategory = "playground"; | ||
protected operationType: OperationType = "metadata"; | ||
protected argsShape = RunPipelineOperationArgs; | ||
|
||
protected async execute(toolArgs: ToolArgs<typeof this.argsShape>): Promise<CallToolResult> { | ||
const runRequest = this.convertToRunRequest(toolArgs); | ||
const runResponse = await this.runPipeline(runRequest); | ||
const toolResult = this.convertToToolResult(runResponse); | ||
return toolResult; | ||
} | ||
|
||
protected resolveTelemetryMetadata(): TelemetryToolMetadata { | ||
return {}; | ||
} | ||
|
||
private async runPipeline(runRequest: RunRequest): Promise<RunResponse> { | ||
const options: RequestInit = { | ||
method: "POST", | ||
headers: { | ||
"Content-Type": "application/json", | ||
}, | ||
body: JSON.stringify(runRequest), | ||
}; | ||
|
||
let response: Response; | ||
try { | ||
response = await fetch(PLAYGROUND_SEARCH_URL, options); | ||
} catch { | ||
throw new Error("Cannot run pipeline: network error."); | ||
} | ||
|
||
if (!response.ok) { | ||
const errorMessage = await this.getPlaygroundResponseError(response); | ||
throw new Error(`Pipeline run failed: ${errorMessage}`); | ||
} | ||
|
||
try { | ||
return (await response.json()) as RunResponse; | ||
} catch { | ||
throw new Error("Pipeline run failed: response is not valid JSON."); | ||
} | ||
} | ||
|
||
private async getPlaygroundResponseError(response: Response): Promise<string> { | ||
let errorMessage = `HTTP ${response.status} ${response.statusText}.`; | ||
try { | ||
const errorResponse = (await response.json()) as RunErrorResponse; | ||
errorMessage += ` Error code: ${errorResponse.code}. Error message: ${errorResponse.message}`; | ||
} catch { | ||
// Ignore JSON parse errors | ||
} | ||
|
||
return errorMessage; | ||
} | ||
|
||
private convertToRunRequest(toolArgs: ToolArgs<typeof this.argsShape>): RunRequest { | ||
try { | ||
return { | ||
documents: JSON.stringify(toolArgs.documents), | ||
aggregationPipeline: JSON.stringify(toolArgs.aggregationPipeline), | ||
indexDefinition: JSON.stringify(toolArgs.searchIndexDefinition || DEFAULT_SEARCH_INDEX_DEFINITION), | ||
synonyms: JSON.stringify(toolArgs.synonyms || DEFAULT_SYNONYMS), | ||
}; | ||
} catch { | ||
throw new Error("Invalid arguments type."); | ||
} | ||
} | ||
|
||
private convertToToolResult(runResponse: RunResponse): CallToolResult { | ||
const content: Array<{ text: string; type: "text" }> = [ | ||
{ | ||
text: `Found ${runResponse.documents.length} documents":`, | ||
type: "text", | ||
}, | ||
...runResponse.documents.map((doc) => { | ||
return { | ||
text: EJSON.stringify(doc), | ||
type: "text", | ||
} as { text: string; type: "text" }; | ||
}), | ||
]; | ||
|
||
return { | ||
content, | ||
}; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import { RunPipeline } from "./runPipeline.js"; | ||
|
||
export const PlaygroundTools = [RunPipeline]; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import { describeWithMongoDB } from "../mongodb/mongodbHelpers.js"; | ||
import { getResponseElements } from "../../helpers.js"; | ||
|
||
describeWithMongoDB("runPipeline tool", (integration) => { | ||
it("should return results", async () => { | ||
await integration.connectMcpClient(); | ||
const response = await integration.mcpClient().callTool({ | ||
name: "run-pipeline", | ||
arguments: { | ||
documents: [{ name: "First document" }, { name: "Second document" }], | ||
aggregationPipeline: [ | ||
{ | ||
$search: { | ||
index: "default", | ||
text: { | ||
query: "first", | ||
path: { | ||
wildcard: "*", | ||
}, | ||
}, | ||
}, | ||
}, | ||
{ | ||
$project: { | ||
_id: 0, | ||
name: 1, | ||
}, | ||
}, | ||
], | ||
}, | ||
}); | ||
const elements = getResponseElements(response.content); | ||
expect(elements).toEqual([ | ||
{ | ||
text: 'Found 1 documents":', | ||
type: "text", | ||
}, | ||
{ | ||
text: '{"name":"First document"}', | ||
type: "text", | ||
}, | ||
]); | ||
}); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Worth adding
.max(500)
to codify this?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice idea, added