|
65 | 65 | * @since 7/23/13
|
66 | 66 | */
|
67 | 67 | public class OWOWCache {
|
68 |
| - public static final String NAME_ID_MAP_EXTENSION = ".cm"; |
69 |
| - |
70 |
| - private static final String NAME_ID_MAP = "name_id_map" + NAME_ID_MAP_EXTENSION; |
71 |
| - |
72 |
| - public static final int MIN_CACHE_SIZE = 16; |
73 |
| - |
74 |
| - public static final long MAGIC_NUMBER = 0xFACB03FEL; |
75 |
| - |
76 |
| - private final ConcurrentSkipListMap<GroupKey, WriteGroup> writeGroups = new ConcurrentSkipListMap<GroupKey, WriteGroup>(); |
77 |
| - private final OBinarySerializer<String> stringSerializer; |
78 |
| - private final Map<Long, OFileClassic> files; |
79 |
| - private final boolean syncOnPageFlush; |
80 |
| - private final int pageSize; |
81 |
| - private final long groupTTL; |
82 |
| - private final OWriteAheadLog writeAheadLog; |
83 |
| - private final AtomicInteger cacheSize = new AtomicInteger(); |
84 |
| - private final OLockManager<GroupKey, Thread> lockManager = new OLockManager<GroupKey, Thread>(true, OGlobalConfiguration.DISK_WRITE_CACHE_FLUSH_LOCK_TIMEOUT.getValueAsInteger() |
85 |
| - ); |
86 |
| - private final OStorageLocalAbstract storageLocal; |
87 |
| - private final Object syncObject = new Object(); |
88 |
| - private final ScheduledExecutorService commitExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { |
89 |
| - @Override |
90 |
| - public Thread newThread(Runnable r) { |
91 |
| - Thread thread = new Thread(r); |
92 |
| - thread.setDaemon(true); |
93 |
| - thread.setName("OrientDB Write Cache Flush Task (" + storageLocal.getName() + ")"); |
94 |
| - return thread; |
95 |
| - } |
96 |
| - }); |
97 |
| - private Map<String, Long> nameIdMap; |
98 |
| - private RandomAccessFile nameIdMapHolder; |
99 |
| - private volatile int cacheMaxSize; |
100 |
| - private long fileCounter = 0; |
101 |
| - private GroupKey lastGroupKey = new GroupKey(0, -1); |
102 |
| - private File nameIdMapHolderFile; |
| 68 | + public static final String NAME_ID_MAP_EXTENSION = ".cm"; |
| 69 | + |
| 70 | + private static final String NAME_ID_MAP = "name_id_map" + NAME_ID_MAP_EXTENSION; |
| 71 | + |
| 72 | + public static final int MIN_CACHE_SIZE = 16; |
| 73 | + |
| 74 | + public static final long MAGIC_NUMBER = 0xFACB03FEL; |
| 75 | + |
| 76 | + private final ConcurrentSkipListMap<GroupKey, WriteGroup> writeGroups = new ConcurrentSkipListMap<GroupKey, WriteGroup>(); |
| 77 | + private final OBinarySerializer<String> stringSerializer; |
| 78 | + private final Map<Long, OFileClassic> files; |
| 79 | + private final boolean syncOnPageFlush; |
| 80 | + private final int pageSize; |
| 81 | + private final long groupTTL; |
| 82 | + private final OWriteAheadLog writeAheadLog; |
| 83 | + private final AtomicInteger cacheSize = new AtomicInteger(); |
| 84 | + private final OLockManager<GroupKey, Thread> lockManager = new OLockManager<GroupKey, Thread>( |
| 85 | + true, |
| 86 | + OGlobalConfiguration.DISK_WRITE_CACHE_FLUSH_LOCK_TIMEOUT |
| 87 | + .getValueAsInteger()); |
| 88 | + private final OStorageLocalAbstract storageLocal; |
| 89 | + private final Object syncObject = new Object(); |
| 90 | + private final ScheduledExecutorService commitExecutor = Executors |
| 91 | + .newSingleThreadScheduledExecutor(new ThreadFactory() { |
| 92 | + @Override |
| 93 | + public Thread newThread(Runnable r) { |
| 94 | + Thread thread = new Thread(r); |
| 95 | + thread.setDaemon(true); |
| 96 | + thread |
| 97 | + .setName("OrientDB Write Cache Flush Task (" |
| 98 | + + storageLocal.getName() + ")"); |
| 99 | + return thread; |
| 100 | + } |
| 101 | + }); |
| 102 | + private Map<String, Long> nameIdMap; |
| 103 | + private RandomAccessFile nameIdMapHolder; |
| 104 | + |
| 105 | + private volatile int cacheMaxSize; |
| 106 | + |
| 107 | + private long fileCounter = 0; |
| 108 | + private GroupKey lastGroupKey = new GroupKey(0, -1); |
| 109 | + private File nameIdMapHolderFile; |
| 110 | + |
| 111 | + private long lastTimeWhenCacheIsEmpty = -1; |
103 | 112 |
|
104 | 113 | private static final class NameFileIdEntry {
|
105 | 114 | private final String name;
|
@@ -146,14 +155,14 @@ private GroupKey(long fileId, long groupIndex) {
|
146 | 155 |
|
147 | 156 | @Override
|
148 | 157 | public int compareTo(GroupKey other) {
|
149 |
| - if (fileId>other.fileId) |
| 158 | + if (fileId > other.fileId) |
150 | 159 | return 1;
|
151 |
| - if (fileId<other.fileId) |
| 160 | + if (fileId < other.fileId) |
152 | 161 | return -1;
|
153 | 162 |
|
154 |
| - if (groupIndex>other.groupIndex) |
| 163 | + if (groupIndex > other.groupIndex) |
155 | 164 | return 1;
|
156 |
| - if (groupIndex<other.groupIndex) |
| 165 | + if (groupIndex < other.groupIndex) |
157 | 166 | return -1;
|
158 | 167 |
|
159 | 168 | return 0;
|
@@ -194,32 +203,61 @@ private final class PeriodicFlushTask implements Runnable {
|
194 | 203 | @Override
|
195 | 204 | public void run() {
|
196 | 205 | try {
|
197 |
| - if (writeGroups.isEmpty()) |
| 206 | + if (writeGroups.isEmpty()) { |
| 207 | + if (lastTimeWhenCacheIsEmpty < 0) |
| 208 | + lastTimeWhenCacheIsEmpty = System.currentTimeMillis(); |
| 209 | + else { |
| 210 | + if (System.currentTimeMillis() - lastTimeWhenCacheIsEmpty >= OGlobalConfiguration.DISK_WRITE_CACHE_FLUSH_WRITE_INACTIVITY_INTERVAL |
| 211 | + .getValueAsLong()) { |
| 212 | + for (OFileClassic fileClassic : files.values()) { |
| 213 | + String fileName = null; |
| 214 | + try { |
| 215 | + fileName = fileClassic.getName(); |
| 216 | + |
| 217 | + if (!fileClassic.isSoftlyClosedCache()) { |
| 218 | + fileClassic.synch(); |
| 219 | + fileClassic.setSoftlyClosed(true); |
| 220 | + |
| 221 | + OLogManager.instance() |
| 222 | + .info(this, "Write inactivity interval was reached, file '" + fileName + "' was flushed."); |
| 223 | + } |
| 224 | + |
| 225 | + } catch (Exception e) { |
| 226 | + if (fileName != null) |
| 227 | + OLogManager.instance().error(this, "Error on synchronization of file " + fileName, e); |
| 228 | + } |
| 229 | + } |
| 230 | + } |
| 231 | + } |
| 232 | + |
198 | 233 | return;
|
| 234 | + } |
| 235 | + |
| 236 | + lastTimeWhenCacheIsEmpty = -1; |
199 | 237 |
|
200 | 238 | int writeGroupsToFlush;
|
201 | 239 | boolean useForceSync = false;
|
202 | 240 | double threshold = ((double) cacheSize.get()) / cacheMaxSize;
|
203 |
| - if (threshold>0.8) { |
| 241 | + if (threshold > 0.8) { |
204 | 242 | writeGroupsToFlush = (int) (0.2 * writeGroups.size());
|
205 | 243 | useForceSync = true;
|
206 |
| - } else if (threshold>0.9) { |
| 244 | + } else if (threshold > 0.9) { |
207 | 245 | writeGroupsToFlush = (int) (0.4 * writeGroups.size());
|
208 | 246 | useForceSync = true;
|
209 | 247 | } else
|
210 | 248 | writeGroupsToFlush = 1;
|
211 | 249 |
|
212 |
| - if (writeGroupsToFlush<1) |
| 250 | + if (writeGroupsToFlush < 1) |
213 | 251 | writeGroupsToFlush = 1;
|
214 | 252 |
|
215 | 253 | int flushedGroups = 0;
|
216 | 254 |
|
217 | 255 | flushedGroups = flushRing(writeGroupsToFlush, flushedGroups, false);
|
218 | 256 |
|
219 |
| - if (flushedGroups<writeGroupsToFlush && useForceSync) |
| 257 | + if (flushedGroups < writeGroupsToFlush && useForceSync) |
220 | 258 | flushedGroups = flushRing(writeGroupsToFlush, flushedGroups, true);
|
221 | 259 |
|
222 |
| - if (flushedGroups<writeGroupsToFlush && cacheSize.get()>cacheMaxSize) { |
| 260 | + if (flushedGroups < writeGroupsToFlush && cacheSize.get() > cacheMaxSize) { |
223 | 261 | if (OGlobalConfiguration.SERVER_CACHE_INCREASE_ON_DEMAND.getValueAsBoolean()) {
|
224 | 262 | final long oldCacheMaxSize = cacheMaxSize;
|
225 | 263 |
|
|
0 commit comments