-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch_search.ts
262 lines (237 loc) · 9.78 KB
/
batch_search.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import config from '../config'; // Keep this for fallback default timeout
import {
BaseSearchOptions,
BatchSearchConfiguration,
// Import optimized types
BatchSearchQuery,
PartitionedVectorDBInterface,
SearchExecutionOptions,
SearchResult,
} from '../types'; // Path to the optimized types file
import { createTimer } from '../utils/profiling';
/**
* BatchEngineSearch processes multiple vector queries in parallel using a PartitionedVectorDBInterface.
* This version uses the optimized type definitions.
*/
/**
* Manages efficient processing of multiple vector search queries in batches.
*
* The BatchEngineSearch class provides functionality to handle concurrent vector
* search operations against a partitioned vector database. It automatically manages
* large batches by splitting them into smaller chunks, processes queries in parallel,
* and handles timeout and error scenarios gracefully.
*
* Key features:
* - Efficient batching of multiple vector search queries
* - Automatic chunking of large query batches
* - Concurrent query execution with configurable timeouts
* - Support for preserving original query order in results
* - Integration with different search methods (HNSW or clustered)
* - Error handling and graceful degradation
*
* @example
* ```typescript
* const searchEngine = new PartitionedVectorDB(...);
* const batchSearch = new BatchEngineSearch(searchEngine, {
* maxBatchSize: 32,
* defaultSearchTimeoutMs: 10000
* });
*
* const queries = [
* { query: vectorA, k: 5, options: { useHNSW: true } },
* { query: vectorB, k: 10, options: { filter: { category: "tech" } } }
* ];
*
* const results = await batchSearch.searchBatch(queries);
* ```
*/
export class BatchEngineSearch {
private searchEngine: PartitionedVectorDBInterface;
private options: Required<BatchSearchConfiguration>; // Use the new configuration type
constructor(
searchEngine: PartitionedVectorDBInterface,
options: BatchSearchConfiguration = {} // Accept the new configuration type
) {
this.searchEngine = searchEngine;
// Define default values using keys from BatchSearchConfiguration
const defaults: Required<BatchSearchConfiguration> = {
maxBatchSize: 64,
prioritizeOrder: true,
groupSimilarQueries: false,
defaultSearchTimeoutMs: config.batchSearch?.defaultSearchTimeoutMs || 15000,
};
// Merge provided options with defaults
this.options = { ...defaults, ...options };
}
/**
* Process multiple search queries in a batch.
* @param queries - Array of search queries using the new BatchSearchQuery interface.
* @returns Results for each query.
*/
async searchBatch(queries: BatchSearchQuery[]): Promise<SearchResult[][]> {
if (!queries || !queries.length) return [];
const timer = createTimer();
timer.start('batch_search_partitioned');
// Split into smaller batches if too large, using the new option key
if (queries.length > this.options.maxBatchSize) {
const results: SearchResult[][] = [];
console.log(`Batch too large (${queries.length}), splitting into chunks of ${this.options.maxBatchSize}`);
for (let i = 0; i < queries.length; i += this.options.maxBatchSize) {
const batchQueries = queries.slice(i, i + this.options.maxBatchSize);
const batchResults = await this.searchBatch(batchQueries); // Recursive call
results.push(...batchResults);
}
timer.stop('batch_search_partitioned');
console.log(`Finished processing large batch (${queries.length}) in ${timer.getElapsed('batch_search_partitioned')}ms`);
return results;
}
// Process the current batch (or sub-batch)
const results = await this._processBatch(queries);
timer.stop('batch_search_partitioned');
console.log(`Processed batch of ${queries.length} queries in ${timer.getElapsed('batch_search_partitioned')}ms`);
return results;
}
/**
* Process batch queries using the PartitionedVectorDB directly.
* Leverages Promise.all for concurrency across queries and relies on the
* PartitionedVectorDB's internal parallelism.
*/
private async _processBatch(queries: BatchSearchQuery[]): Promise<SearchResult[][]> {
const timer = createTimer();
timer.start('process_batch');
let processedQueries = queries;
if (this.options.groupSimilarQueries) {
processedQueries = this._groupSimilarQueries(queries);
}
// Use Promise.all to run searches concurrently
const resultsPromises = processedQueries.map(async (queryData, originalIndex) => {
// Retain originalIndex for reordering if needed
// Destructure from BatchSearchQuery
const { query, k, options = {} } = queryData;
const queryTimer = createTimer();
queryTimer.start(`query_${originalIndex}`); // Use original index for tracking
let methodUsed = 'unknown';
try {
// Build options object to pass into the search engine method
// Combine fields from BaseSearchOptions and SearchExecutionOptions
const engineSearchOptions: BaseSearchOptions & SearchExecutionOptions = {
// BaseSearchOptions
k: k, // k is usually passed separately but can be included if the engine API requires it
filter: options.filter,
includeMetadata: false, // Metadata is usually not needed in raw search
distanceMetric: options.distanceMetric, // Allow overriding the default metric
// SearchExecutionOptions
partitionIds: options.partitionIds,
efSearch: options.efSearch, // For HNSW search
};
let queryResult: SearchResult[];
// Decide the method based on the options *of each query*
if (options.useHNSW && typeof this.searchEngine.findNearestHNSW === 'function') {
methodUsed = 'hnsw';
queryResult = await this.searchEngine.findNearestHNSW(
query,
k,
engineSearchOptions // Pass the merged options
);
} else if (typeof this.searchEngine.findNearest === 'function') {
methodUsed = 'clustered';
// Ensure HNSW-specific parameters are not passed to findNearest
const { efSearch, ...clusteredOptions } = engineSearchOptions;
queryResult = await this.searchEngine.findNearest(query, k, clusteredOptions);
} else {
throw new Error('Search engine provides neither findNearestHNSW nor findNearest.');
}
queryTimer.stop(`query_${originalIndex}`);
console.log(`Query ${originalIndex} (k=${k}, method=${methodUsed}) took ${queryTimer.getElapsed(`query_${originalIndex}`)}ms`);
return {
originalIndex: originalIndex, // Retain original index for reordering
result: queryResult,
error: null,
};
} catch (error) {
queryTimer.stop(`query_${originalIndex}`);
console.error(`Error processing query ${originalIndex} after ${queryTimer.getElapsed(`query_${originalIndex}`)}ms:`, error);
return {
originalIndex: originalIndex,
result: [] as SearchResult[], // Return an empty array in case of error
error: error instanceof Error ? error.message : String(error),
};
}
});
// Add timeout to each promise using the new option key
const timedPromises = resultsPromises.map((p) =>
Promise.race([
p,
new Promise<{
originalIndex: number;
result: SearchResult[];
error: string;
}>((_, reject) =>
setTimeout(
() => reject(new Error(`Query timed out after ${this.options.defaultSearchTimeoutMs}ms`)),
this.options.defaultSearchTimeoutMs // Use the new key
)
),
]).catch((error) => {
console.error('Batch query failed or timed out:', error);
return {
originalIndex: -1,
result: [],
error: (error as Error).message,
};
})
);
// Wait for all queries to complete or timeout
const settledResults = await Promise.all(timedPromises);
timer.stop('process_batch');
// Reconstruct results array, potentially reordering
const finalResults: SearchResult[][] = new Array(queries.length);
if (this.options.prioritizeOrder) {
// Use originalIndex to place results correctly
for (const res of settledResults) {
if (res.originalIndex !== -1 && res.originalIndex < finalResults.length) {
finalResults[res.originalIndex] = res.result;
if (res.error) {
console.warn(`Query at original index ${res.originalIndex} failed: ${res.error}`);
}
} else if (res.originalIndex === -1 && res.error) {
console.error(`A query timed out or failed without recoverable index: ${res.error}`);
}
}
for (let i = 0; i < finalResults.length; i++) {
if (finalResults[i] === undefined) {
console.warn(`Result for original index ${i} is missing (likely due to unrecoverable error/timeout).`);
finalResults[i] = [];
}
}
} else {
settledResults.forEach((res, i) => {
finalResults[i] = res.result;
if (res.error) {
console.warn(`Query at result index ${i} (order not prioritized) failed: ${res.error}`);
}
});
while (finalResults.length < queries.length) {
finalResults.push([]);
}
finalResults.length = queries.length;
}
return finalResults;
}
/**
* Placeholder for grouping similar queries.
* @private
*/
private _groupSimilarQueries(queries: BatchSearchQuery[]): BatchSearchQuery[] {
if (this.options.groupSimilarQueries) {
console.log('Query grouping requested but basic implementation used.');
}
return queries;
}
/**
* Clean up resources (no-op in this version).
*/
shutdown(): void {
console.log('PartitionedBatchSearch shutdown.');
}
}