-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpartitioned_vector_db.ts
2010 lines (1766 loc) · 84.7 KB
/
partitioned_vector_db.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// --- START OF FILE partitioned_vector_db.ts ---
// partitioned_db_optimized.ts
import { EventEmitter } from 'events';
import { existsSync, promises as fs, mkdirSync } from 'fs';
import { LRUCache } from 'lru-cache'; // Using a robust LRU cache library
import path from 'path';
import HNSW from '../ann/hnsw'; // Assuming HNSW is a class for the clustering algorithm
import { log } from '../utils/log';
import defaultSystemConfiguration from '../config';
import { BuildIndexHNSWOptions, ClusteredVectorDBOptions, DBStats, DistanceMetric, HNSWStats, PartitionConfig, PartitionedDBEventData, PartitionedDBStats, PartitionedVectorDBInterface, PartitionedVectorDBOptions, SearchOptions, SearchResult, TypedEventEmitter, Vector, VectorData } from '../types'; // Adjust path as needed
import { ClusteredVectorDB } from './clustered_vector_db';
// --- Types ---
const DEFAULT_PARTITION_CAPACITY = 100000;
const DEFAULT_MAX_ACTIVE_PARTITIONS = 3; // Keep a few partitions warm
const HNSW_INDEX_DIR_NAME = 'hnsw';
const HNSW_INDEX_FILE_NAME = 'hnsw_index.json'; // Using binary for HNSW potentially
/**
* PartitionedVectorDB distributes vectors across multiple ClusteredVectorDB partitions
* for improved scalability and performance with very large datasets.
* It uses an LRU cache to manage loaded partitions in memory and integrates HNSW index persistence.
*
* Storage strategy:
* - Uses ClusteredVectorDB as the partition implementation
* - Each partition uses optimized binary storage for vectors and JSON for metadata (handled by ClusteredVectorDB)
* - HNSW indices are stored separately per partition.
* - Partitions are stored in separate directories with their own config files
*/
/**
* The `PartitionedVectorDB` class provides a partitioned, in-memory vector database
* with support for clustering, HNSW indexing, and LRU-based partition management.
* It is designed to handle large-scale vector data by dividing it into manageable
* partitions, each with its own configuration and storage.
* ### Features:
* - **Partition Management**: Automatically manages partitions with configurable capacity.
* - **LRU Cache**: Keeps a limited number of partitions in memory for efficient access.
* - **HNSW Indexing**: Supports approximate nearest neighbor search using HNSW indices.
* - **Auto-Partitioning**: Automatically creates and activates new partitions when needed.
* - **Persistence**: Saves and loads partition configurations and data to/from disk.
* - **Event-Driven**: Emits events for lifecycle operations like initialization, partition loading, and errors.
* ### Usage:
* 1. Create an instance of `PartitionedVectorDB` with desired options.
* 2. Use methods like `addVector`, `bulkAdd`, `findNearest`, and `findNearestHNSW` to interact with the database.
* 3. Manage partitions using methods like `createPartition`, `setActivePartition`, and `getPartition`.
* 4. Save and load the database state using `save` and `load`.
* ### Events:
* - `db:initialized`: Emitted when the database is fully initialized.
* - `partition:loaded`: Emitted when a partition is loaded into memory.
* - `partition:unloaded`: Emitted when a partition is evicted from memory.
* - `partition:error`: Emitted when an error occurs during partition operations.
* - `vector:add`: Emitted when a vector is added to a partition.
* - `vector:delete`: Emitted when a vector is deleted from a partition.
* - `db:close`: Emitted when the database is closed.
* ### Example:
* ```typescript
* const db = new PartitionedVectorDB({
* partitionsDir: './data/partitions',
* partitionCapacity: 1000,
* maxActivePartitions: 5,
* autoCreatePartitions: true,
* vectorSize: 128,
* });
*
* await db.initializationPromise; // Wait for initialization
*
* // Add a vector
* const { partitionId, vectorId } = await db.addVector(undefined, [0.1, 0.2, 0.3], { label: 'example' });
*
* // Search for nearest neighbors
* const results = await db.findNearest([0.1, 0.2, 0.3], 5);
*
* // Save the database state
* await db.save();
*
* // Close the database
* await db.close();
* ```
* ### Constructor Options:
* - `partitionsDir`: Directory where partition data is stored.
* - `partitionCapacity`: Maximum number of vectors per partition.
* - `maxActivePartitions`: Maximum number of partitions to keep in memory.
* - `autoCreatePartitions`: Whether to automatically create new partitions when needed.
* - `vectorSize`: Suggested size of vectors (optional).
* - `useCompression`: Whether to enable compression for partition data.
* - `clusterOptions`: Default options for clustered vector databases.
* - `autoLoadHNSW`: Whether to automatically load HNSW indices.
* ### Methods:
* - `addVector`: Adds a single vector to the active partition.
* - `bulkAdd`: Adds multiple vectors across partitions.
* - `findNearest`: Finds nearest neighbors using standard search.
* - `findNearestHNSW`: Finds nearest neighbors using HNSW indices.
* - `createPartition`: Creates a new partition.
* - `setActivePartition`: Sets the active partition.
* - `getPartition`: Loads and retrieves a specific partition.
* - `getActivePartition`: Retrieves the currently active partition.
* - `save`: Saves the database state, including partitions and indices.
* - `load`: Loads the database state from disk.
* - `close`: Closes the database, saving state and releasing resources.
* - `buildIndexHNSW`: Builds HNSW indices for specified or all loaded partitions.
* - `saveHNSWIndices`: Saves HNSW indices for specified or all loaded partitions.
* - `loadHNSWIndices`: Loads HNSW indices for specified or all loaded partitions.
* - `getStats`: Retrieves database statistics.
* - `getVector`: Retrieves a vector by ID.
* - `getMetadata`: Retrieves metadata for a vector by ID.
* - `deleteVector`: Deletes a vector by ID.
* - `updateMetadata`: Updates metadata for a vector by ID.
* ### Internal Methods:
* - `_initialize`: Handles asynchronous initialization of the database.
* - `_loadPartition`: Loads a specific partition into memory.
* - `_saveHNSWIndex`: Saves the HNSW index for a partition.
* - `_loadHNSWIndex`: Loads the HNSW index for a partition.
* - `_ensureActivePartitionHasCapacity`: Ensures the active partition has enough capacity.
* - `_saveSinglePartitionConfig`: Saves a single partition configuration to disk.
* - `_loadPartitionConfigs`: Loads all partition configurations from disk.
* ### Notes:
* - This class is designed for scenarios where vector data is too large to fit into memory at once.
* - It relies on partitioning and LRU caching to manage memory usage efficiently.
* - HNSW indexing provides fast approximate nearest neighbor search but requires additional memory.
*/
export class PartitionedVectorDB extends (EventEmitter as new () => TypedEventEmitter<PartitionedDBEventData>) implements PartitionedVectorDBInterface {
private readonly partitionsDir: string;
private readonly partitionCapacity: number;
private readonly maxActivePartitions: number;
private readonly autoCreatePartitions: boolean;
private readonly vectorSize: number;
private readonly useCompression: boolean; // Passed down to partitions
private readonly defaultClusterOptions: Omit<ClusteredVectorDBOptions, 'clusterSize'>;
private readonly autoLoadHNSW: boolean; // Option to auto-load HNSW indices
private readonly runKMeansOnLoad: boolean; // Option for K-Means on load
// In-memory state
private partitionConfigs: Map<string, PartitionConfig>; // All known configs
private loadedPartitions: LRUCache<string, ClusteredVectorDB>; // LRU Cache for loaded DBs
private hnswIndices: Map<string, HNSW>; // Manage HNSW indices per partition ID
private activePartitionId: string | null;
private isInitialized: boolean = false;
public initializationPromise: Promise<void>;
private saveConfigPromise: Promise<void> | null = null;
private isClosing: boolean = false; // Flag to prevent operations during close
constructor(options: PartitionedVectorDBOptions = {}) {
super();
log('info', '[PartitionedVectorDB] Initializing with options:', JSON.stringify(options, null, 2));
this.partitionsDir = options.partitionsDir || path.join(process.cwd(), 'database', 'partitions');
this.partitionCapacity = options.partitionCapacity || DEFAULT_PARTITION_CAPACITY;
this.maxActivePartitions = options.maxActivePartitions || DEFAULT_MAX_ACTIVE_PARTITIONS;
this.autoCreatePartitions = options.autoCreatePartitions !== false; // Default true
this.vectorSize = options.vectorSize ?? defaultSystemConfiguration.defaults.vectorSize;
this.useCompression = options.useCompression ?? false; // Default false
this.defaultClusterOptions = options.clusterOptions ?? {};
this.autoLoadHNSW = options.autoLoadHNSW ?? true; // Default true
this.runKMeansOnLoad = options.runKMeansOnLoad ?? defaultSystemConfiguration.indexing.runKMeansOnLoad; // Default false
log('info', `[PartitionedVectorDB] Configuration:
- partitionsDir: ${this.partitionsDir}
- partitionCapacity: ${this.partitionCapacity}
- maxActivePartitions: ${this.maxActivePartitions}
- autoCreatePartitions: ${this.autoCreatePartitions}
- vectorSize: ${this.vectorSize ?? 'not specified'}
- useCompression: ${this.useCompression}
- autoLoadHNSW: ${this.autoLoadHNSW}
- runKMeansOnLoad: ${this.runKMeansOnLoad}`);
this.partitionConfigs = new Map();
this.hnswIndices = new Map();
this.activePartitionId = null;
// --- Initialize LRU Cache ---
this.loadedPartitions = new LRUCache<string, ClusteredVectorDB>({
max: this.maxActivePartitions,
// Dispose function called when an item is removed (evicted)
dispose: async (dbInstance, partitionId, reason) => {
log('info', `[PartitionedVectorDB] Disposing partition ${partitionId} from memory (Reason: ${reason}).`);
// Save is handled by the main save() method or explicitly before eviction if needed.
// Close the DB instance to release resources.
const hnswIndex = this.hnswIndices.get(partitionId);
if (hnswIndex) {
// Decide if HNSW index should be saved on eviction - maybe not, rely on explicit save?
// await this._saveHNSWIndex(partitionId); // Optional: save index on eviction
this.hnswIndices.delete(partitionId); // Remove from memory map
log('info', `[PartitionedVectorDB] Unloaded HNSW index for evicted partition ${partitionId}`);
}
try {
// Close partition DB (releases file handles, etc., but VectorDB.close might save if path set - review VectorDB.close)
// Ideally, saving is orchestrated explicitly via PartitionedVectorDB.save()
await dbInstance.close();
this.emit('partition:unloaded', { id: partitionId });
} catch (error: any) {
log('error', `[PartitionedVectorDB] Error closing partition ${partitionId} during dispose:`, error);
this.emit('partition:error', {
id: partitionId,
error,
operation: 'dispose',
});
}
},
});
// Ensure partitions directory exists
try {
if (!existsSync(this.partitionsDir)) {
mkdirSync(this.partitionsDir, { recursive: true });
}
} catch (err: any) {
// Fatal if we cannot ensure the base directory exists
throw new Error(`FATAL: Could not create or access partitions directory: ${this.partitionsDir} - ${err.message}`);
}
// Defer actual loading to an async method
this.initializationPromise = this._initialize(options.autoLoadPartitions !== false);
}
/** Checks if the database is initialized and ready for operations. */
IsReady(): boolean {
return this.isInitialized && !this.isClosing;
}
/**
* Ensure initialization is complete before performing operations.
*/
private async _ensureInitialized(force: boolean = false): Promise<void> {
if (this.isClosing) throw new Error('Database is closing or closed.');
if (!this.isInitialized && !force) {
await this.initializationPromise;
}
}
/**
* Asynchronous initialization: Loads configs and potentially active partitions & indices.
*/
private async _initialize(autoLoad: boolean): Promise<void> {
if (this.isInitialized) return;
log('info', `[PartitionedVectorDB] Starting initialization (autoLoad: ${autoLoad})`);
try {
// 1. Load all partition configurations first
await this._loadPartitionConfigs();
log('info', `[PartitionedVectorDB] Loaded ${this.partitionConfigs.size} partition configurations.`);
// 2. Determine which partitions to load initially (e.g., active one)
const partitionsToLoad: string[] = [];
if (autoLoad && this.activePartitionId) {
partitionsToLoad.push(this.activePartitionId);
// Optionally load more based on LRU or other criteria if needed
}
// 3. Load partitions and potentially their HNSW indices in parallel
if (partitionsToLoad.length > 0) {
log('info', `[PartitionedVectorDB] Auto-loading initial partitions: [${partitionsToLoad.join(', ')}]`);
await Promise.all(partitionsToLoad.map((id) => this._loadPartition(id, this.autoLoadHNSW)));
log('info', `[PartitionedVectorDB] Initial partitions loaded (${this.loadedPartitions.size} in memory, ${this.hnswIndices.size} HNSW indices loaded).`);
} else {
log('info', '[PartitionedVectorDB] No initial partitions specified for auto-loading.');
}
this.isInitialized = true;
log('info', `[PartitionedVectorDB] Initialization complete. Active: ${this.activePartitionId ?? 'None'}`);
this.emit('db:initialized', {
partitionCount: this.partitionConfigs.size,
loadedCount: this.loadedPartitions.size,
activeId: this.activePartitionId,
});
} catch (err: any) {
log('error', `[PartitionedVectorDB] FATAL: Error during initialization:`, err);
this.emit('partition:error', { error: err, operation: 'initialize' });
// Potentially set a flag indicating failed initialization?
throw err; // Re-throw to signal failure
}
}
/**
* Load all partition configuration files from the directory.
* Finds the active partition or sets one if needed.
*/
private async _loadPartitionConfigs(): Promise<void> {
log('info', `[PartitionedVectorDB] Loading partition configurations from ${this.partitionsDir}`);
this.partitionConfigs.clear();
let foundActiveId: string | null = null;
const configsRead: PartitionConfig[] = [];
try {
const entries = await fs.readdir(this.partitionsDir, {
withFileTypes: true,
});
const partitionDirs = entries.filter((e) => e.isDirectory());
log('info', `[PartitionedVectorDB] Found ${partitionDirs.length} potential partition directories.`);
for (const dir of partitionDirs) {
const configPath = path.join(this.partitionsDir, dir.name, `${dir.name}.config.json`);
if (existsSync(configPath)) {
log('info', `[PartitionedVectorDB] Attempting to load config: ${configPath}`);
try {
const content = await fs.readFile(configPath, 'utf8');
const config = JSON.parse(content) as PartitionConfig;
// Basic validation
if (config.id && config.dbDirName === dir.name) {
this.partitionConfigs.set(config.id, config);
configsRead.push(config);
log('info', `[PartitionedVectorDB] Loaded config for partition: ${config.id} (Dir: ${dir.name}, Active: ${config.active}, Vectors: ${config.vectorCount})`);
if (config.active) {
if (foundActiveId && foundActiveId !== config.id) {
log('warn', `[PartitionedVectorDB] Multiple active partitions defined! Found ${config.id} after ${foundActiveId}. Deactivating ${config.id}.`);
config.active = false;
// Schedule a save to fix the inconsistency?
this.scheduleSaveConfigs();
} else {
foundActiveId = config.id;
}
}
} else {
log('warn', `[PartitionedVectorDB] Invalid partition config format or mismatched ID/DirName: ${configPath}`);
}
} catch (e: any) {
log('warn', `[PartitionedVectorDB] Error reading/parsing partition config ${configPath}:`, e);
}
} else {
log('info', `[PartitionedVectorDB] No config file found in directory: ${dir.name}`);
}
}
this.activePartitionId = foundActiveId;
log('info', `[PartitionedVectorDB] Active partition ID after scan: ${this.activePartitionId ?? 'None'}`);
// If no active partition found, try to set one or create the first one
if (!this.activePartitionId && this.partitionConfigs.size > 0) {
// Find the first config (order might not be guaranteed, consider sorting by name/ID if needed)
const firstConfig = this.partitionConfigs.values().next().value as PartitionConfig | undefined;
if (firstConfig) {
log('info', `[PartitionedVectorDB] No active partition found, activating first available: ${firstConfig.id}`);
firstConfig.active = true;
this.activePartitionId = firstConfig.id;
this.scheduleSaveConfigs(); // Save the change
}
} else if (!this.activePartitionId && this.autoCreatePartitions) {
log('info', '[PartitionedVectorDB] No partitions found, creating initial partition.');
// Call createPartition but skip initialization check within it
await this.createPartition(`p-${Date.now()}`, 'Initial Partition', {
setActive: true,
skipInitializationCheck: true,
});
// Re-fetch active ID potentially set by createPartition
this.activePartitionId = Array.from(this.partitionConfigs.values()).find((c) => c.active)?.id ?? null;
}
this.emit('partitions:loaded', {
count: this.partitionConfigs.size,
active: this.activePartitionId,
});
} catch (error: any) {
if (error.code === 'ENOENT' && !existsSync(this.partitionsDir)) {
log('warn', `[PartitionedVectorDB] Partitions directory ${this.partitionsDir} not found. It will be created when needed.`);
// If autoCreate is on, the first partition creation will handle it.
} else {
log('error', '[PartitionedVectorDB] Error listing or reading partition configs:', error);
throw error; // Propagate other errors
}
}
}
/**
* Loads a specific partition's DB instance into the LRU cache if not already present.
* Optionally loads the HNSW index as well.
* Returns the loaded DB instance or null on failure.
*/
private async _loadPartition(
partitionId: string,
loadHNSW: boolean = this.autoLoadHNSW // Use instance default
): Promise<ClusteredVectorDB | null> {
if (this.isClosing) return null; // Prevent loading during close
const cachedDb = this.loadedPartitions.get(partitionId);
if (cachedDb) {
// If DB is already loaded, ensure HNSW is loaded if requested and not already loaded
if (loadHNSW && !this.hnswIndices.has(partitionId)) {
await this._loadHNSWIndex(partitionId, cachedDb); // Pass the DB instance
}
return cachedDb;
}
const config = this.partitionConfigs.get(partitionId);
if (!config) {
log('warn', `[PartitionedVectorDB] Partition config not found for ID: ${partitionId}. Cannot load.`);
return null;
}
// Construct paths relative to the main partitions directory
const partitionDirPath = path.join(this.partitionsDir, config.dbDirName);
const dbBasePath = path.join(partitionDirPath, 'data');
log('info', `[PartitionedVectorDB] Loading partition ${partitionId} DB from base path: ${dbBasePath}`);
try {
// Ensure the specific partition directory exists
if (!existsSync(partitionDirPath)) {
await fs.mkdir(partitionDirPath, { recursive: true });
log('info', `[PartitionedVectorDB] Created directory for partition ${partitionId}: ${partitionDirPath}`);
}
// Also ensure the data directory exists for a new partition
const dataDir = path.dirname(dbBasePath);
if (!existsSync(dataDir)) {
await fs.mkdir(dataDir, { recursive: true });
log('info', `[PartitionedVectorDB] Created data directory for partition ${partitionId}: ${dataDir}`);
}
const metaFilePath = path.join(dbBasePath, 'meta.json');
const vectorFilePath = path.join(dbBasePath, 'vec.bin');
const clusterFilePath = path.join(dbBasePath, 'cluster.json');
if (!existsSync(metaFilePath)) {
log('info', '`[PartitionedVectorDB] Meta file not found, creating new one.`');
await fs.writeFile(metaFilePath, JSON.stringify({}), 'utf8');
}
if (!existsSync(vectorFilePath)) {
log('info', '`[PartitionedVectorDB] Vector file not found, creating new one.`');
await fs.writeFile(vectorFilePath, Buffer.alloc(0));
}
if (!existsSync(clusterFilePath)) {
log('info', '`[PartitionedVectorDB] Vector file not found, creating new one.`');
await fs.writeFile(clusterFilePath, JSON.stringify({}), 'utf8');
}
const hnswIndexDir = path.join(partitionDirPath, HNSW_INDEX_DIR_NAME);
const hnswIndexPath = path.join(hnswIndexDir, HNSW_INDEX_FILE_NAME);
if (!existsSync(hnswIndexDir)) {
log('info', `[PartitionedVectorDB] HNSW index directory not found, creating new one.`);
await fs.mkdir(hnswIndexDir, { recursive: true });
}
if (!existsSync(hnswIndexPath)) {
log('info', `[PartitionedVectorDB] HNSW index file not found, creating new one.`);
await fs.writeFile(hnswIndexPath, JSON.stringify(defaultSystemConfiguration.indexing.hnsw), 'utf8');
}
// --- Load the ClusteredVectorDB ---
const clusterDbOptions: ClusteredVectorDBOptions = {
...this.defaultClusterOptions,
clusterSize: config.clusterSize, // Use specific or default
useCompression: this.useCompression, // Pass down compression setting
runKMeansOnLoad: this.runKMeansOnLoad, // Pass down K-Means option
};
const vectorDB = new ClusteredVectorDB(
this.vectorSize, // Pass the suggested vector size
dbBasePath, // Pass the base path for data files
clusterDbOptions
);
await vectorDB.load(); // Wait for initialization
// Successfully loaded the DB, add to LRU cache
this.loadedPartitions.set(partitionId, vectorDB);
log('info', `[PartitionedVectorDB] Partition DB ${partitionId} loaded. Vector count: ${vectorDB.getVectorCount()}`);
// --- Optionally Load HNSW Index ---
log('info', `[PartitionedVectorDB] Loading HNSW index for partition ${partitionId}`);
if (loadHNSW) {
await this._loadHNSWIndex(partitionId, vectorDB);
}
log('info', `[PartitionedVectorDB] HNSW index loaded for partition ${partitionId}`);
this.emit('partition:loaded', {
id: partitionId,
name: config.name,
vectorCount: vectorDB.getVectorCount(),
hnswLoaded: this.hnswIndices.has(partitionId),
});
// --- Sync vector count ---
log('info', `[PartitionedVectorDB] Syncing vector count for partition ${partitionId}`);
const loadedCount = vectorDB.getVectorCount();
log('info', `[PartitionedVectorDB] Loaded vector count: ${loadedCount}`);
if (config.vectorCount !== loadedCount) {
log('warn', `[PartitionedVectorDB] Partition ${partitionId}: Config count (${config.vectorCount}) differs from loaded DB count (${loadedCount}). Updating config.`);
config.vectorCount = loadedCount;
this.scheduleSaveConfigs(); // Save updated count later
}
return vectorDB;
} catch (error: any) {
log('error', `[PartitionedVectorDB] Error loading partition DB ${partitionId} from ${dbBasePath}:`, error);
// Clean up potentially partially loaded state? Remove from cache if added?
this.loadedPartitions.delete(partitionId);
this.hnswIndices.delete(partitionId); // Ensure HNSW is also removed if DB load failed
this.emit('partition:error', {
id: partitionId,
error,
operation: 'loadPartitionDB',
});
return null;
}
}
/** Loads the HNSW index for a given partition ID if it exists. */
private async _loadHNSWIndex(partitionId: string, dbInstance: ClusteredVectorDB): Promise<boolean> {
log('info', `[PartitionedVectorDB] Loading HNSW index for partition ${partitionId}`);
if (this.hnswIndices.has(partitionId)) {
log('info', `[PartitionedVectorDB] HNSW index for ${partitionId} already loaded.`);
return true; // Already loaded
}
if (this.isClosing) return false;
const config = this.partitionConfigs.get(partitionId);
if (!config) {
log('warn', `[PartitionedVectorDB] Cannot load HNSW index: Config not found for ${partitionId}`);
return false;
}
const indexDir = path.join(this.partitionsDir, config.dbDirName, HNSW_INDEX_DIR_NAME);
const indexPath = path.join(indexDir, HNSW_INDEX_FILE_NAME);
if (existsSync(indexPath)) {
log('info', `[PartitionedVectorDB] Loading HNSW index for partition ${partitionId} from ${indexPath}`);
try {
const hnswIndex = await HNSW.loadIndex(indexPath, dbInstance);
this.hnswIndices.set(partitionId, hnswIndex);
log('info', `[PartitionedVectorDB] Successfully loaded HNSW index for ${partitionId}. Nodes: ${hnswIndex.getNodeCount()}`);
this.emit('partition:indexLoaded', {
id: partitionId,
indexType: 'hnsw',
path: indexPath,
});
return true;
} catch (error: any) {
log('error', `[PartitionedVectorDB] Error loading HNSW index for partition ${partitionId} from ${indexPath}:`, error.message || error);
this.emit('partition:error', {
id: partitionId,
error,
operation: 'loadHNSWIndex',
});
return false;
}
} else {
log('info', `[PartitionedVectorDB] HNSW index file not found for partition ${partitionId} at ${indexPath}. Index not loaded.`);
return false; // Index file doesn't exist
}
}
/** Saves the HNSW index for a given partition ID. */
private async _saveHNSWIndex(partitionId: string): Promise<boolean> {
log('info', `[PartitionedVectorDB] Saving HNSW index for partition ${partitionId}`);
const hnswIndex = this.hnswIndices.get(partitionId);
const config = this.partitionConfigs.get(partitionId);
if (!hnswIndex) {
log('info', `[PartitionedVectorDB] No HNSW index instance found in memory for partition ${partitionId}. Skipping save.`);
return false;
}
if (!config) {
log('warn', `[PartitionedVectorDB] Cannot save HNSW index: Config not found for ${partitionId}`);
return false;
}
if (this.isClosing) {
log('warn', `[PartitionedVectorDB] Skipping HNSW index save for ${partitionId} during close operation (already handled or closing).`);
return false;
}
const indexDir = path.join(this.partitionsDir, config.dbDirName, HNSW_INDEX_DIR_NAME);
const indexPath = path.join(indexDir, HNSW_INDEX_FILE_NAME);
log('info', `[PartitionedVectorDB] Saving HNSW index for partition ${partitionId} to ${indexPath}`);
try {
// Ensure directory exists
if (!existsSync(indexDir)) {
await fs.mkdir(indexDir, { recursive: true });
}
await hnswIndex.saveIndex(indexPath); // HNSW handles the actual saving
log('info', `[PartitionedVectorDB] Successfully saved HNSW index for ${partitionId}.`);
this.emit('partition:indexSaved', {
id: partitionId,
indexType: 'hnsw',
path: indexPath,
});
return true;
} catch (error: any) {
log('error', `[PartitionedVectorDB] Error saving HNSW index for partition ${partitionId} to ${indexPath}:`, error);
this.emit('partition:error', {
id: partitionId,
error,
operation: 'saveHNSWIndex',
path: indexPath,
});
return false;
}
}
/**
* Get a partition instance by ID. Loads it (and its index if configured) if necessary.
*/
async getPartition(id: string): Promise<ClusteredVectorDB | null> {
log('info', `[PartitionedVectorDB] Getting partition ${id}...`);
await this._ensureInitialized();
// _loadPartition handles cache checking, loading DB, and potentially HNSW index
return this._loadPartition(id); // Uses instance default for loading HNSW
}
/**
* Get the currently active partition instance. Loads it if necessary.
*/
async getActivePartition(): Promise<ClusteredVectorDB | null> {
log('info', `[PartitionedVectorDB] Getting active partition...`);
await this._ensureInitialized();
if (!this.activePartitionId) {
log('warn', '[PartitionedVectorDB] No active partition is set.');
return null;
}
return this._loadPartition(this.activePartitionId); // Loads DB and potentially HNSW
}
// =====================================================================
// Public API Methods (Add, Search, Delete, Stats, etc.)
// =====================================================================
/**
* Explicitly save the entire state: configs, loaded partition data, and loaded HNSW indices.
*/
async save(): Promise<void> {
await this._ensureInitialized();
if (this.isClosing) {
log('warn', '[PartitionedVectorDB] Attempted to save while closing.');
return;
}
log('info', '[PartitionedVectorDB] Starting comprehensive save...');
// 1. Save all configurations (ensures counts, active status, etc., are up-to-date)
// Use await on the debounced save to ensure it finishes before proceeding
await this.savePartitionConfigs();
log('info', `[PartitionedVectorDB] Partition configurations saved. Active partition: ${this.activePartitionId}`);
// Ensure the save promise is resolved before proceeding
if (this.saveConfigPromise) await this.saveConfigPromise; // Ensure pending config save finishes
// 2. Save data for all *loaded* partitions in parallel
const loadedPartitionIds = Array.from(this.loadedPartitions.keys());
log('info', `[PartitionedVectorDB] Saving data for ${loadedPartitionIds.length} loaded partitions...`);
const partitionSavePromises = loadedPartitionIds.map(async (id) => {
const partition = this.loadedPartitions.peek(id); // Use peek to avoid altering LRU order
if (partition) {
try {
// Check if the underlying DB instance exists and has a save method
if (typeof partition.save === 'function') {
await partition.save(); // Call the save method of ClusteredVectorDB/VectorDB
log('info', `[PartitionedVectorDB] Saved data for partition ${id}`);
return true;
} else {
log('warn', `[PartitionedVectorDB] Partition ${id} instance cannot be saved (missing save method or wrong type).`);
return false;
}
} catch (error) {
log('error', `[PartitionedVectorDB] Error saving data for partition ${id}:`, error);
this.emit('partition:error', {
id,
error,
operation: 'savePartitionData',
});
return false; // Indicate failure for this partition
}
}
return true; // Partition not found in cache (shouldn't happen with keys()), consider it success?
});
// 3. Save all *loaded* HNSW indices in parallel
const loadedHnswIds = Array.from(this.hnswIndices.keys());
log('info', `[PartitionedVectorDB] Saving ${loadedHnswIds.length} loaded HNSW indices...`);
const hnswSavePromises = loadedHnswIds.map((id) => this._saveHNSWIndex(id));
// Wait for all saves to complete
const [partitionResults, hnswResults] = await Promise.all([Promise.all(partitionSavePromises), Promise.all(hnswSavePromises)]);
const successfulPartitions = partitionResults.filter((r) => r).length;
const successfulHnsw = hnswResults.filter((r) => r).length;
log('info', `[PartitionedVectorDB] Comprehensive save complete. Partitions saved: ${successfulPartitions}/${loadedPartitionIds.length}. HNSW indices saved: ${successfulHnsw}/${loadedHnswIds.length}.`);
this.emit('db:saved', {
partitionsSaved: successfulPartitions,
indicesSaved: successfulHnsw,
});
}
/**
* Loads partition configurations and optionally pre-loads data/indices.
* This is typically called during initialization but can be called manually.
*/
async load(): Promise<void> {
if (this.isInitialized && !this.isClosing) {
log('warn', '[PartitionedVectorDB] Database already initialized. Call close() before loading again.');
return;
}
this.isClosing = false; // Reset closing flag if re-loading
this.isInitialized = false; // Reset initialization flag
// Reset internal state before loading
this.loadedPartitions.clear();
this.hnswIndices.clear();
this.partitionConfigs.clear();
this.activePartitionId = null;
log('info', '[PartitionedVectorDB] Starting manual load process...');
// Re-run the initialization logic, including loading configs and initial partitions/indices
this.initializationPromise = this._initialize(this.autoLoadHNSW); // Use constructor options
await this.initializationPromise;
log('info', '[PartitionedVectorDB] Manual load process finished.');
this.emit('db:loaded', {
partitionCount: this.partitionConfigs.size,
loadedCount: this.loadedPartitions.size,
activeId: this.activePartitionId,
});
}
/**
* Build HNSW indices for specified or all loaded partitions
* Ensures partition is loaded before building.
*/
async buildIndexHNSW(partitionId?: string, options?: BuildIndexHNSWOptions): Promise<void> {
await this._ensureInitialized(options?.force);
const buildSingleIndex = async (id: string): Promise<void> => {
log('info', `[PartitionedVectorDB] Building HNSW index for partition ${id}...`);
const partition = await this.getPartition(id); // Ensures partition DB is loaded
if (!partition) {
log('error', `[PartitionedVectorDB] Cannot build HNSW index: Partition ${id} not found or could not be loaded.`);
return;
}
let hnswIndex = this.hnswIndices.get(id);
if (!hnswIndex) {
log('info', `[PartitionedVectorDB] Creating new HNSW index instance for partition ${id} before building.`);
hnswIndex = new HNSW(partition); // Pass the loaded partition DB
this.hnswIndices.set(id, hnswIndex);
}
log('info', `[PartitionedVectorDB] Building HNSW index for partition ${id}...`);
try {
await hnswIndex.buildIndex({
...options,
// Wrap progress callback to emit event
progressCallback: (progress) => {
options?.progressCallback?.(progress); // Call original callback if provided
this.emit('partition:indexProgress', {
id,
progress,
operation: 'buildHNSW',
});
},
});
log('info', `[PartitionedVectorDB] HNSW index built successfully for partition ${id}.`);
this.emit('partition:indexed', { id, indexType: 'hnsw' });
} catch (error: any) {
log('error', `[PartitionedVectorDB] Error building HNSW index for partition ${id}:`, error);
this.emit('partition:error', {
id,
error,
operation: 'buildHNSWIndex',
});
}
};
if (partitionId) {
await buildSingleIndex(partitionId);
} else {
// Build for all currently *loaded* partitions in parallel
const partitionIds = Array.from(this.loadedPartitions.keys());
log('info', `[PartitionedVectorDB] Building HNSW indices for ${partitionIds.length} loaded partitions in parallel...`);
await Promise.all(partitionIds.map((id) => buildSingleIndex(id)));
log('info', `[PartitionedVectorDB] Finished building HNSW indices for loaded partitions.`);
}
}
/**
* Find nearest neighbors using HNSW indices across specified or all *loaded* partitions.
* Optimized for parallel search. Loads partitions/indices if needed.
*/
async findNearestHNSW(
query: Vector,
k: number = 10,
options: SearchOptions & {
partitionIds?: string[];
exactDimensions?: boolean;
} = {}
): Promise<SearchResult[]> {
await this._ensureInitialized();
const queryVector = query instanceof Float32Array ? query : new Float32Array(query);
// Determine target partitions: provided list OR all configured partitions (load on demand)
// Decide whether to search *all* configured or just *currently loaded*
// Let's search specified OR all *loaded* by default for performance.
// If you need to search *all* partitions (loading unloaded ones), adjust the logic.
const targetPartitionIds = options.partitionIds
? options.partitionIds.filter((id) => this.partitionConfigs.has(id)) // Filter valid provided IDs
: Array.from(this.loadedPartitions.keys()); // Default to currently loaded
if (targetPartitionIds.length === 0) {
log('warn', '[PartitionedVectorDB] No valid partitions specified or loaded to search with HNSW.');
return [];
}
log('info', `[PartitionedVectorDB] Performing HNSW search on partitions: [${targetPartitionIds.join(', ')}]`);
// Perform search in parallel
const searchResultsNested = await Promise.all(
targetPartitionIds.map(async (partitionId) => {
try {
// 1. Ensure Partition DB is loaded
const partition = await this._loadPartition(partitionId, false); // Load DB only first
if (!partition) {
log('warn', `[PartitionedVectorDB] Skipping HNSW search on partition ${partitionId}: Could not load DB.`);
return [];
}
// 2. Ensure HNSW Index is loaded (or try loading it)
let hnswIndex = this.hnswIndices.get(partitionId);
if (!hnswIndex) {
const loaded = await this._loadHNSWIndex(partitionId, partition);
if (loaded) {
hnswIndex = this.hnswIndices.get(partitionId);
} else {
// Optional: Build index on the fly if not found? Risky for performance.
// log('info', `[PartitionedVectorDB] HNSW index for ${partitionId} not found. Building on-the-fly for search.`);
// hnswIndex = new HNSW(partition);
// await hnswIndex.buildIndex(); // Consider build options
// this.hnswIndices.set(partitionId, hnswIndex);
log('warn', `[PartitionedVectorDB] Skipping HNSW search on partition ${partitionId}: Index not loaded and not found.`);
return []; // Skip if index cannot be loaded/created
}
}
// 3. Perform the search on the loaded index
if (hnswIndex) {
return await hnswIndex.findNearest(queryVector, k, {
...options,
filter: options.filter, // Pass down filter
});
} else {
return []; // Should not happen if logic above is correct
}
} catch (error) {
log('error', `[PartitionedVectorDB] Error during HNSW search for partition ${partitionId}:`, error);
this.emit('partition:error', {
id: partitionId,
error,
operation: 'searchHNSW',
});
return []; // Return empty results for this partition on error
}
})
);
// Flatten results, sort by distance, and take top k
const mergedResults = searchResultsNested.flat();
mergedResults.sort((a, b) => a.dist - b.dist);
return mergedResults.slice(0, k);
}
/**
* Explicitly save HNSW indices for specified or all *loaded* partitions.
*/
async saveHNSWIndices(partitionId?: string): Promise<void> {
await this._ensureInitialized();
const idsToSave = partitionId ? [partitionId] : Array.from(this.hnswIndices.keys()); // Save only loaded indices
if (idsToSave.length === 0) {
log('info', '[PartitionedVectorDB] No HNSW indices loaded or specified to save.');
return;
}
log('info', `[PartitionedVectorDB] Saving HNSW indices for partitions: [${idsToSave.join(', ')}]`);
await Promise.all(idsToSave.map((id) => this._saveHNSWIndex(id)));
log('info', '[PartitionedVectorDB] Finished saving HNSW indices.');
}
/**
* Explicitly load HNSW indices for specified or all *loaded* partitions.
* Requires the partition DB to be loaded first.
*/
async loadHNSWIndices(partitionId?: string): Promise<void> {
await this._ensureInitialized();
const loadIndexForPartition = async (id: string): Promise<void> => {
const partition = this.loadedPartitions.peek(id); // Check if DB is loaded without changing LRU order
if (!partition) {
log('warn', `[PartitionedVectorDB] Cannot load HNSW index for ${id}: Partition DB not loaded.`);
// Optionally load the DB first: await this._loadPartition(id, false);
return;
}
if (this.hnswIndices.has(id)) {
log('info', `[PartitionedVectorDB] HNSW index for ${id} is already loaded.`);
return;
}
await this._loadHNSWIndex(id, partition); // Attempt to load
};
const idsToLoad = partitionId ? [partitionId] : Array.from(this.loadedPartitions.keys()); // Try loading for all loaded partitions
if (idsToLoad.length === 0) {
log('info', '[PartitionedVectorDB] No partitions loaded or specified to load HNSW indices for.');
return;
}
log('info', `[PartitionedVectorDB] Loading HNSW indices for partitions: [${idsToLoad.join(', ')}]`);
await Promise.all(idsToLoad.map((id) => loadIndexForPartition(id)));
log('info', `[PartitionedVectorDB] Finished loading HNSW indices. Indices in memory: ${this.hnswIndices.size}`);
}
/** Get HNSW stats */
getHNSWStats(partitionId: string): HNSWStats | null {
if (!this.isInitialized) return null;
const hnswIndex = this.hnswIndices.get(partitionId);
return hnswIndex ? hnswIndex.getStats() : null;
}
/**
* Close the partitioned database, saving state and releasing resources.
*/
async close(): Promise<void> {
if (this.isInitialized) {
log('warn', '[PartitionedVectorDB] Close operation called before initialization.');
return;
}
if (this.isClosing) {
log('warn', '[PartitionedVectorDB] Close operation already in progress.');
return;
}
log('info', '[PartitionedVectorDB] Closing database...');
this.isClosing = true;
// 1. Ensure initialization finished (to avoid race conditions)
// We might be closing before initialization fully completed
try {
await this.initializationPromise;
} catch (initError) {
log('warn', '[PartitionedVectorDB] Initialization failed, proceeding with close anyway:', initError);
}
// 2. Perform final save of everything loaded
try {
await this.save(); // Comprehensive save of configs, partitions, indices
} catch (saveError) {
log('error', '[PartitionedVectorDB] Error during final save operation:', saveError);
// Continue closing even if save fails
}
// 3. Clear the LRU cache - this triggers dispose which calls close() on individual DBs
// Dispose should NOT save again, just release resources.
this.loadedPartitions.clear();
// 4. Clear HNSW index map (dispose might have already removed some)
this.hnswIndices.clear();
// 5. Clear partition configs
this.partitionConfigs.clear();
// 6. Reset state
this.activePartitionId = null;
this.isInitialized = false; // Mark as not initialized
// Keep isClosing = true
this.emit('db:close', undefined);
log('info', '[PartitionedVectorDB] Database closed.');
}
// --- Configuration Saving ---
/** Saves all partition configurations (debounced). */
async savePartitionConfigs(): Promise<void> {
if (this.isClosing) return; // Don't save during close triggered by 'save' itself
if (!this.saveConfigPromise) {
this.saveConfigPromise = (async () => {
// await new Promise((resolve) => setTimeout(resolve, 500)); // Simple debounce delay
log('info', '[PartitionedVectorDB] Debounced saving of partition configurations...');
const configsToSave = Array.from(this.partitionConfigs.values());
try {
const savePromises = configsToSave.map((config) => this._saveSinglePartitionConfig(config));
await Promise.all(savePromises);
log('info', `[PartitionedVectorDB] Saved ${configsToSave.length} partition configurations.`);
this.emit('config:saved', undefined);
} catch (error: any) {
log('error', '[PartitionedVectorDB] Error saving one or more partition configs:', error);
// Emit specific error?
} finally {
this.saveConfigPromise = null; // Release lock
}
})();
}