Skip to content

Commit 2e5b31f

Browse files
committed
Merge branch 'develop' into parallelquery
2 parents 74c6aaf + e97ca91 commit 2e5b31f

File tree

14 files changed

+223
-243
lines changed

14 files changed

+223
-243
lines changed

commons/src/main/java/com/orientechnologies/common/collection/OSingleItemSet.java

Lines changed: 0 additions & 99 deletions
This file was deleted.

core/src/main/java/com/orientechnologies/orient/core/config/OGlobalConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ public enum OGlobalConfiguration {
7676
DISK_WRITE_CACHE_PAGE_FLUSH_INTERVAL("storage.diskCache.writeCachePageFlushInterval",
7777
"Interval between flushing of pages from write cache in ms.", Integer.class, 100),
7878

79+
DISK_WRITE_CACHE_FLUSH_WRITE_INACTIVITY_INTERVAL("storage.diskCache.writeCacheFlushInactivityInterval",
80+
"Interval between 2 writes to the disk cache,"
81+
+ " if writes are done with interval more than provided all files will be fsynced before next write,"
82+
+ " which allows do not do data restore after server crash (in ms).", Long.class, 60 * 1000),
83+
7984
DISK_WRITE_CACHE_FLUSH_LOCK_TIMEOUT("storage.diskCache.writeCacheFlushLockTimeout",
8085
"Maximum amount of time till write cache will be wait before page flush in ms.", Integer.class, -1),
8186

core/src/main/java/com/orientechnologies/orient/core/index/hashindex/local/cache/OWOWCache.java

Lines changed: 83 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -65,41 +65,50 @@
6565
* @since 7/23/13
6666
*/
6767
public class OWOWCache {
68-
public static final String NAME_ID_MAP_EXTENSION = ".cm";
69-
70-
private static final String NAME_ID_MAP = "name_id_map" + NAME_ID_MAP_EXTENSION;
71-
72-
public static final int MIN_CACHE_SIZE = 16;
73-
74-
public static final long MAGIC_NUMBER = 0xFACB03FEL;
75-
76-
private final ConcurrentSkipListMap<GroupKey, WriteGroup> writeGroups = new ConcurrentSkipListMap<GroupKey, WriteGroup>();
77-
private final OBinarySerializer<String> stringSerializer;
78-
private final Map<Long, OFileClassic> files;
79-
private final boolean syncOnPageFlush;
80-
private final int pageSize;
81-
private final long groupTTL;
82-
private final OWriteAheadLog writeAheadLog;
83-
private final AtomicInteger cacheSize = new AtomicInteger();
84-
private final OLockManager<GroupKey, Thread> lockManager = new OLockManager<GroupKey, Thread>(true, OGlobalConfiguration.DISK_WRITE_CACHE_FLUSH_LOCK_TIMEOUT.getValueAsInteger()
85-
);
86-
private final OStorageLocalAbstract storageLocal;
87-
private final Object syncObject = new Object();
88-
private final ScheduledExecutorService commitExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
89-
@Override
90-
public Thread newThread(Runnable r) {
91-
Thread thread = new Thread(r);
92-
thread.setDaemon(true);
93-
thread.setName("OrientDB Write Cache Flush Task (" + storageLocal.getName() + ")");
94-
return thread;
95-
}
96-
});
97-
private Map<String, Long> nameIdMap;
98-
private RandomAccessFile nameIdMapHolder;
99-
private volatile int cacheMaxSize;
100-
private long fileCounter = 0;
101-
private GroupKey lastGroupKey = new GroupKey(0, -1);
102-
private File nameIdMapHolderFile;
68+
public static final String NAME_ID_MAP_EXTENSION = ".cm";
69+
70+
private static final String NAME_ID_MAP = "name_id_map" + NAME_ID_MAP_EXTENSION;
71+
72+
public static final int MIN_CACHE_SIZE = 16;
73+
74+
public static final long MAGIC_NUMBER = 0xFACB03FEL;
75+
76+
private final ConcurrentSkipListMap<GroupKey, WriteGroup> writeGroups = new ConcurrentSkipListMap<GroupKey, WriteGroup>();
77+
private final OBinarySerializer<String> stringSerializer;
78+
private final Map<Long, OFileClassic> files;
79+
private final boolean syncOnPageFlush;
80+
private final int pageSize;
81+
private final long groupTTL;
82+
private final OWriteAheadLog writeAheadLog;
83+
private final AtomicInteger cacheSize = new AtomicInteger();
84+
private final OLockManager<GroupKey, Thread> lockManager = new OLockManager<GroupKey, Thread>(
85+
true,
86+
OGlobalConfiguration.DISK_WRITE_CACHE_FLUSH_LOCK_TIMEOUT
87+
.getValueAsInteger());
88+
private final OStorageLocalAbstract storageLocal;
89+
private final Object syncObject = new Object();
90+
private final ScheduledExecutorService commitExecutor = Executors
91+
.newSingleThreadScheduledExecutor(new ThreadFactory() {
92+
@Override
93+
public Thread newThread(Runnable r) {
94+
Thread thread = new Thread(r);
95+
thread.setDaemon(true);
96+
thread
97+
.setName("OrientDB Write Cache Flush Task ("
98+
+ storageLocal.getName() + ")");
99+
return thread;
100+
}
101+
});
102+
private Map<String, Long> nameIdMap;
103+
private RandomAccessFile nameIdMapHolder;
104+
105+
private volatile int cacheMaxSize;
106+
107+
private long fileCounter = 0;
108+
private GroupKey lastGroupKey = new GroupKey(0, -1);
109+
private File nameIdMapHolderFile;
110+
111+
private long lastTimeWhenCacheIsEmpty = -1;
103112

104113
private static final class NameFileIdEntry {
105114
private final String name;
@@ -146,14 +155,14 @@ private GroupKey(long fileId, long groupIndex) {
146155

147156
@Override
148157
public int compareTo(GroupKey other) {
149-
if (fileId>other.fileId)
158+
if (fileId > other.fileId)
150159
return 1;
151-
if (fileId<other.fileId)
160+
if (fileId < other.fileId)
152161
return -1;
153162

154-
if (groupIndex>other.groupIndex)
163+
if (groupIndex > other.groupIndex)
155164
return 1;
156-
if (groupIndex<other.groupIndex)
165+
if (groupIndex < other.groupIndex)
157166
return -1;
158167

159168
return 0;
@@ -194,32 +203,61 @@ private final class PeriodicFlushTask implements Runnable {
194203
@Override
195204
public void run() {
196205
try {
197-
if (writeGroups.isEmpty())
206+
if (writeGroups.isEmpty()) {
207+
if (lastTimeWhenCacheIsEmpty < 0)
208+
lastTimeWhenCacheIsEmpty = System.currentTimeMillis();
209+
else {
210+
if (System.currentTimeMillis() - lastTimeWhenCacheIsEmpty >= OGlobalConfiguration.DISK_WRITE_CACHE_FLUSH_WRITE_INACTIVITY_INTERVAL
211+
.getValueAsLong()) {
212+
for (OFileClassic fileClassic : files.values()) {
213+
String fileName = null;
214+
try {
215+
fileName = fileClassic.getName();
216+
217+
if (!fileClassic.isSoftlyClosedCache()) {
218+
fileClassic.synch();
219+
fileClassic.setSoftlyClosed(true);
220+
221+
OLogManager.instance()
222+
.info(this, "Write inactivity interval was reached, file '" + fileName + "' was flushed.");
223+
}
224+
225+
} catch (Exception e) {
226+
if (fileName != null)
227+
OLogManager.instance().error(this, "Error on synchronization of file " + fileName, e);
228+
}
229+
}
230+
}
231+
}
232+
198233
return;
234+
}
235+
236+
lastTimeWhenCacheIsEmpty = -1;
199237

200238
int writeGroupsToFlush;
201239
boolean useForceSync = false;
202240
double threshold = ((double) cacheSize.get()) / cacheMaxSize;
203-
if (threshold>0.8) {
241+
if (threshold > 0.8) {
204242
writeGroupsToFlush = (int) (0.2 * writeGroups.size());
205243
useForceSync = true;
206-
} else if (threshold>0.9) {
244+
} else if (threshold > 0.9) {
207245
writeGroupsToFlush = (int) (0.4 * writeGroups.size());
208246
useForceSync = true;
209247
} else
210248
writeGroupsToFlush = 1;
211249

212-
if (writeGroupsToFlush<1)
250+
if (writeGroupsToFlush < 1)
213251
writeGroupsToFlush = 1;
214252

215253
int flushedGroups = 0;
216254

217255
flushedGroups = flushRing(writeGroupsToFlush, flushedGroups, false);
218256

219-
if (flushedGroups<writeGroupsToFlush && useForceSync)
257+
if (flushedGroups < writeGroupsToFlush && useForceSync)
220258
flushedGroups = flushRing(writeGroupsToFlush, flushedGroups, true);
221259

222-
if (flushedGroups<writeGroupsToFlush && cacheSize.get()>cacheMaxSize) {
260+
if (flushedGroups < writeGroupsToFlush && cacheSize.get() > cacheMaxSize) {
223261
if (OGlobalConfiguration.SERVER_CACHE_INCREASE_ON_DEMAND.getValueAsBoolean()) {
224262
final long oldCacheMaxSize = cacheMaxSize;
225263

core/src/main/java/com/orientechnologies/orient/core/storage/fs/OAbstractFile.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,9 @@ public boolean open() throws IOException {
180180
OLogManager.instance().error(this, "Invalid filledUp size (=" + filledUpTo + "). The file could be corrupted", null,
181181
OStorageException.class);
182182

183-
if (failCheck) {
183+
if (failCheck)
184184
wasSoftlyClosed = isSoftlyClosed();
185185

186-
if (wasSoftlyClosed)
187-
setSoftlyClosed(false);
188-
}
189-
190186
if (version < CURRENT_VERSION) {
191187
setSize(fileSize, true);
192188
setFilledUpTo(filledUpTo, true);
@@ -763,7 +759,7 @@ public void setFailCheck(boolean failCheck) {
763759

764760
}
765761

766-
protected void setDirty() {
762+
protected void setDirty() throws IOException {
767763
acquireWriteLock();
768764
try {
769765
if (!dirty)

0 commit comments

Comments
 (0)