Skip to content

Commit da7717a

Browse files
Issue # 1605 hash index operations are logged, tests are passed.
1 parent 09e7f83 commit da7717a

File tree

15 files changed

+628
-76
lines changed

15 files changed

+628
-76
lines changed

core/src/main/java/com/orientechnologies/orient/core/index/hashindex/local/OHashTableDirectory.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,21 @@ public OHashTableDirectory(String defaultExtension, String name, boolean durable
4747
this.diskCache = storage.getDiskCache();
4848
this.durableInNonTxMode = durableInNonTxMode;
4949
this.storage = storage;
50+
51+
init(storage.getAtomicOperationsManager(), storage.getWALInstance());
5052
}
5153

5254
public void create() throws IOException {
55+
startAtomicOperation();
5356
acquireExclusiveLock();
5457
try {
5558
fileId = diskCache.openFile(name + defaultExtension);
56-
59+
logFileCreation(name + defaultExtension, fileId);
5760
init();
61+
endAtomicOperation(false);
62+
} catch (RuntimeException e) {
63+
endAtomicOperation(true);
64+
throw e;
5865
} finally {
5966
releaseExclusiveLock();
6067
}
@@ -606,4 +613,12 @@ protected void startAtomicOperation() throws IOException {
606613

607614
super.startAtomicOperation();
608615
}
616+
617+
@Override
618+
protected void logFileCreation(String fileName, long fileId) throws IOException {
619+
if (storage.getStorageTransaction() == null && !durableInNonTxMode)
620+
return;
621+
622+
super.logFileCreation(fileName, fileId);
623+
}
609624
}

core/src/main/java/com/orientechnologies/orient/core/index/hashindex/local/OLocalHashTable.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,12 @@ public void create(String name, OBinarySerializer<K> keySerializer, OBinarySeria
109109

110110
init(storage.getAtomicOperationsManager(), storage.getWALInstance());
111111
this.directory = new OHashTableDirectory(treeStateFileExtension, name, durableInNonTxMode, storage);
112-
fileStateId = diskCache.openFile(name + metadataConfigurationFileExtension);
113112

114113
startAtomicOperation();
115114
try {
115+
fileStateId = diskCache.openFile(name + metadataConfigurationFileExtension);
116+
logFileCreation(name + metadataConfigurationFileExtension, fileStateId);
117+
116118
directory.create();
117119

118120
hashStateEntry = diskCache.allocateNewPage(fileStateId);
@@ -138,8 +140,10 @@ public void create(String name, OBinarySerializer<K> keySerializer, OBinarySeria
138140

139141
initHashTreeState();
140142

141-
if (nullKeyIsSupported)
143+
if (nullKeyIsSupported) {
142144
nullBucketFileId = diskCache.openFile(name + nullBucketFileExtension);
145+
logFileCreation(name + nullBucketFileExtension, nullBucketFileId);
146+
}
143147

144148
endAtomicOperation(false);
145149
} catch (IOException e) {
@@ -187,6 +191,14 @@ protected void startAtomicOperation() throws IOException {
187191
super.startAtomicOperation();
188192
}
189193

194+
@Override
195+
protected void logFileCreation(String fileName, long fileId) throws IOException {
196+
if (storage.getStorageTransaction() == null && !durableInNonTxMode)
197+
return;
198+
199+
super.logFileCreation(fileName, fileId);
200+
}
201+
190202
@Override
191203
protected void logPageChanges(ODurablePage localPage, long fileId, long pageIndex, boolean isNewPage) throws IOException {
192204
final OStorageTransaction transaction = storage.getStorageTransaction();
@@ -295,6 +307,8 @@ private void createFileMetadata(int fileLevel, OHashIndexFileLevelMetadataPage p
295307
final String fileName = name + fileLevel + bucketFileExtension;
296308
final long fileId = diskCache.openFile(fileName);
297309

310+
logFileCreation(fileName, fileId);
311+
298312
page.setFileMetadata(fileLevel, fileId, 0, -1);
299313
}
300314

@@ -405,8 +419,10 @@ public V remove(K key) {
405419
final OHashIndexBucket<K, V> bucket = new OHashIndexBucket<K, V>(dataPointer.getDataPointer(), keySerializer,
406420
valueSerializer, keyTypes, getTrackMode());
407421
final int positionIndex = bucket.getIndex(hashCode, key);
408-
if (positionIndex < 0)
422+
if (positionIndex < 0) {
423+
endAtomicOperation(false);
409424
return null;
425+
}
410426

411427
removed = bucket.deleteEntry(positionIndex).value;
412428
sizeDiff--;
@@ -436,16 +452,21 @@ public V remove(K key) {
436452
return null;
437453

438454
V removed = null;
455+
439456
OCacheEntry cacheEntry = diskCache.load(nullBucketFileId, 0, false);
440457
OCachePointer cachePointer = cacheEntry.getCachePointer();
441458
cachePointer.acquireExclusiveLock();
442459
try {
443-
final ONullBucket<V> nullBucket = new ONullBucket<V>(cachePointer.getDataPointer(), ODurablePage.TrackMode.NONE,
444-
valueSerializer, false);
460+
final ONullBucket<V> nullBucket = new ONullBucket<V>(cachePointer.getDataPointer(), getTrackMode(), valueSerializer,
461+
false);
462+
445463
removed = nullBucket.getValue();
446464
if (removed != null) {
447465
nullBucket.removeValue();
448466
sizeDiff--;
467+
cacheEntry.markDirty();
468+
469+
logPageChanges(nullBucket, cacheEntry.getFileId(), cacheEntry.getPageIndex(), false);
449470
}
450471
} finally {
451472
cachePointer.releaseExclusiveLock();
@@ -1443,14 +1464,15 @@ private void doPut(K key, V value) throws IOException {
14431464
OCachePointer cachePointer = cacheEntry.getCachePointer();
14441465
cachePointer.acquireExclusiveLock();
14451466
try {
1446-
ONullBucket<V> nullBucket = new ONullBucket<V>(cachePointer.getDataPointer(), ODurablePage.TrackMode.NONE, valueSerializer,
1447-
isNew);
1467+
ONullBucket<V> nullBucket = new ONullBucket<V>(cachePointer.getDataPointer(), getTrackMode(), valueSerializer, isNew);
14481468
if (nullBucket.getValue() != null)
14491469
sizeDiff--;
14501470

14511471
nullBucket.setValue(value);
14521472
sizeDiff++;
14531473
cacheEntry.markDirty();
1474+
1475+
logPageChanges(nullBucket, cacheEntry.getFileId(), cacheEntry.getPageIndex(), isNew);
14541476
} finally {
14551477
cachePointer.releaseExclusiveLock();
14561478
diskCache.release(cacheEntry);

core/src/main/java/com/orientechnologies/orient/core/index/hashindex/local/cache/ODiskCache.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ public interface ODiskCache {
6464

6565
void openFile(long fileId) throws IOException;
6666

67+
void openFile(String fileName, long fileId) throws IOException;
68+
6769
OCacheEntry load(long fileId, long pageIndex, boolean checkPinnedPages) throws IOException;
6870

6971
void pinPage(OCacheEntry cacheEntry) throws IOException;

core/src/main/java/com/orientechnologies/orient/core/index/hashindex/local/cache/OReadWriteDiskCache.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,22 @@ public void openFile(final long fileId) throws IOException {
127127
}
128128
}
129129

130+
@Override
131+
public void openFile(String fileName, long fileId) throws IOException {
132+
synchronized (syncObject) {
133+
long existingFileId = writeCache.isOpen(fileName);
134+
135+
if (fileId == existingFileId)
136+
return;
137+
else if (existingFileId >= 0)
138+
throw new OStorageException("File with given name already exists but has different id " + existingFileId + " vs. proposed "
139+
+ fileId);
140+
141+
writeCache.openFile(fileName, fileId);
142+
filePages.put(fileId, new HashSet<Long>());
143+
}
144+
}
145+
130146
@Override
131147
public boolean exists(final String fileName) {
132148
synchronized (syncObject) {

core/src/main/java/com/orientechnologies/orient/core/index/hashindex/local/cache/OWOWCache.java

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,22 +89,22 @@ public class OWOWCache {
8989
private final Object syncObject = new Object();
9090
private final ScheduledExecutorService commitExecutor = Executors
9191
.newSingleThreadScheduledExecutor(new ThreadFactory() {
92-
@Override
93-
public Thread newThread(Runnable r) {
94-
Thread thread = new Thread(r);
95-
thread.setDaemon(true);
96-
thread
97-
.setName("OrientDB Write Cache Flush Task ("
98-
+ storageLocal.getName() + ")");
99-
return thread;
100-
}
101-
});
92+
@Override
93+
public Thread newThread(Runnable r) {
94+
Thread thread = new Thread(r);
95+
thread.setDaemon(true);
96+
thread
97+
.setName("OrientDB Write Cache Flush Task ("
98+
+ storageLocal.getName() + ")");
99+
return thread;
100+
}
101+
});
102102
private Map<String, Long> nameIdMap;
103103
private RandomAccessFile nameIdMapHolder;
104104
private volatile int cacheMaxSize;
105-
private long fileCounter = 0;
105+
private long fileCounter = 0;
106106
private GroupKey lastGroupKey = new GroupKey(0, -1);
107-
private File nameIdMapHolderFile;
107+
private File nameIdMapHolderFile;
108108

109109
private static final class NameFileIdEntry {
110110
private final String name;
@@ -151,14 +151,14 @@ private GroupKey(long fileId, long groupIndex) {
151151

152152
@Override
153153
public int compareTo(GroupKey other) {
154-
if (fileId>other.fileId)
154+
if (fileId > other.fileId)
155155
return 1;
156156
if (fileId < other.fileId)
157157
return -1;
158158

159-
if (groupIndex>other.groupIndex)
159+
if (groupIndex > other.groupIndex)
160160
return 1;
161-
if (groupIndex<other.groupIndex)
161+
if (groupIndex < other.groupIndex)
162162
return -1;
163163

164164
return 0;
@@ -205,26 +205,26 @@ public void run() {
205205
int writeGroupsToFlush;
206206
boolean useForceSync = false;
207207
double threshold = ((double) cacheSize.get()) / cacheMaxSize;
208-
if (threshold>0.8) {
208+
if (threshold > 0.8) {
209209
writeGroupsToFlush = (int) (0.2 * writeGroups.size());
210210
useForceSync = true;
211-
} else if (threshold>0.9) {
211+
} else if (threshold > 0.9) {
212212
writeGroupsToFlush = (int) (0.4 * writeGroups.size());
213213
useForceSync = true;
214214
} else
215215
writeGroupsToFlush = 1;
216216

217-
if (writeGroupsToFlush<1)
217+
if (writeGroupsToFlush < 1)
218218
writeGroupsToFlush = 1;
219219

220220
int flushedGroups = 0;
221221

222222
flushedGroups = flushRing(writeGroupsToFlush, flushedGroups, false);
223223

224-
if (flushedGroups<writeGroupsToFlush && useForceSync)
224+
if (flushedGroups < writeGroupsToFlush && useForceSync)
225225
flushedGroups = flushRing(writeGroupsToFlush, flushedGroups, true);
226226

227-
if (flushedGroups<writeGroupsToFlush && cacheSize.get()>cacheMaxSize) {
227+
if (flushedGroups < writeGroupsToFlush && cacheSize.get() > cacheMaxSize) {
228228
if (OGlobalConfiguration.SERVER_CACHE_INCREASE_ON_DEMAND.getValueAsBoolean()) {
229229
final long oldCacheMaxSize = cacheMaxSize;
230230

@@ -476,6 +476,35 @@ public long openFile(String fileName) throws IOException {
476476
}
477477
}
478478

479+
public void openFile(String fileName, long fileId) throws IOException {
480+
synchronized (syncObject) {
481+
initNameIdMapping();
482+
483+
OFileClassic fileClassic;
484+
485+
Long existingFileId = nameIdMap.get(fileName);
486+
487+
if (existingFileId != null) {
488+
if (existingFileId == fileId)
489+
fileClassic = files.get(fileId);
490+
else
491+
throw new OStorageException("File with given name already exists but has different id " + existingFileId
492+
+ " vs. proposed " + fileId);
493+
} else {
494+
if (fileCounter < fileId)
495+
fileCounter = fileId;
496+
497+
fileClassic = createFile(fileName);
498+
499+
files.put(fileId, fileClassic);
500+
nameIdMap.put(fileName, fileId);
501+
writeNameIdEntry(new NameFileIdEntry(fileName, fileId), true);
502+
}
503+
504+
openFile(fileClassic);
505+
}
506+
}
507+
479508
public void lock() throws IOException {
480509
for (OFileClassic file : files.values()) {
481510
file.lock();
@@ -901,8 +930,11 @@ private void openFile(OFileClassic fileClassic) throws IOException {
901930
if (fileClassic.exists()) {
902931
if (!fileClassic.isOpen())
903932
fileClassic.open();
904-
} else
933+
} else {
905934
fileClassic.create(-1);
935+
fileClassic.synch();
936+
}
937+
906938
}
907939

908940
private void initNameIdMapping() throws IOException {

core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/OStorageLocalAbstract.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,7 @@
3030
import com.orientechnologies.orient.core.storage.impl.local.paginated.atomicoperations.OAtomicOperation;
3131
import com.orientechnologies.orient.core.storage.impl.local.paginated.atomicoperations.OAtomicOperationsManager;
3232
import com.orientechnologies.orient.core.storage.impl.local.paginated.base.ODurablePage;
33-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OAtomicUnitEndRecord;
34-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OAtomicUnitStartRecord;
35-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OLogSequenceNumber;
36-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OOperationUnitId;
37-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OOperationUnitRecord;
38-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OPageChanges;
39-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OUpdatePageRecord;
40-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OWALRecord;
41-
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.OWriteAheadLog;
33+
import com.orientechnologies.orient.core.storage.impl.local.paginated.wal.*;
4234
import com.orientechnologies.orient.core.tx.OTransaction;
4335
import com.orientechnologies.orient.core.version.ORecordVersion;
4436

@@ -223,6 +215,11 @@ protected void undoOperation(List<OLogSequenceNumber> operationUnit) throws IOEx
223215
cachePointer.releaseExclusiveLock();
224216
diskCache.release(cacheEntry);
225217
}
218+
} else if (record instanceof OFileCreatedCreatedWALRecord) {
219+
final OFileCreatedCreatedWALRecord fileCreatedCreatedRecord = (OFileCreatedCreatedWALRecord) record;
220+
221+
diskCache.openFile(fileCreatedCreatedRecord.getFileName(), fileCreatedCreatedRecord.getFileId());
222+
diskCache.deleteFile(fileCreatedCreatedRecord.getFileId());
226223
} else {
227224
OLogManager.instance().error(this, "Invalid WAL record type was passed %s. Given record will be skipped.",
228225
record.getClass());

core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/OLocalPaginatedStorage.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,6 +1798,11 @@ private void restoreFrom(OLogSequenceNumber lsn) throws IOException {
17981798
diskCache.release(cacheEntry);
17991799
}
18001800

1801+
} else if (operationUnitRecord instanceof OFileCreatedCreatedWALRecord) {
1802+
1803+
final OFileCreatedCreatedWALRecord fileCreatedCreatedRecord = (OFileCreatedCreatedWALRecord) operationUnitRecord;
1804+
diskCache.openFile(fileCreatedCreatedRecord.getFileName(), fileCreatedCreatedRecord.getFileId());
1805+
18011806
} else if (operationUnitRecord instanceof OAtomicUnitEndRecord) {
18021807
final OAtomicUnitEndRecord atomicUnitEndRecord = (OAtomicUnitEndRecord) walRecord;
18031808

core/src/main/java/com/orientechnologies/orient/core/storage/impl/local/paginated/base/ODurableComponent.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,16 @@ protected void logPageChanges(ODurablePage localPage, long fileId, long pageInde
109109
}
110110
}
111111

112+
protected void logFileCreation(String fileName, long fileId) throws IOException {
113+
if (writeAheadLog != null) {
114+
final OAtomicOperation atomicOperation = atomicOperationsManager.getCurrentOperation();
115+
assert atomicOperation != null;
116+
117+
final OOperationUnitId unitId = atomicOperation.getOperationUnitId();
118+
writeAheadLog.log(new OFileCreatedCreatedWALRecord(unitId, fileName, fileId));
119+
}
120+
}
121+
112122
protected void lockTillAtomicOperationCompletes() {
113123
atomicOperationsManager.lockTillOperationComplete(this);
114124
}

0 commit comments

Comments
 (0)