1 /**
2  *  Copyright Terracotta, Inc.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */

16
17 package net.sf.ehcache.store.disk;
18
19 import static java.util.concurrent.TimeUnit.MILLISECONDS;
20
21 import java.io.ByteArrayInputStream;
22 import java.io.EOFException;
23 import java.io.File;
24 import java.io.FileInputStream;
25 import java.io.FileNotFoundException;
26 import java.io.FileOutputStream;
27 import java.io.IOException;
28 import java.io.ObjectInputStream;
29 import java.io.ObjectOutputStream;
30 import java.io.RandomAccessFile;
31 import java.io.Serializable;
32 import java.util.ConcurrentModificationException;
33 import java.util.List;
34 import java.util.concurrent.BlockingQueue;
35 import java.util.concurrent.Callable;
36 import java.util.concurrent.Future;
37 import java.util.concurrent.ScheduledThreadPoolExecutor;
38 import java.util.concurrent.ThreadFactory;
39 import java.util.concurrent.TimeUnit;
40 import java.util.concurrent.atomic.AtomicInteger;
41 import java.util.concurrent.locks.Lock;
42
43 import net.sf.ehcache.CacheException;
44 import net.sf.ehcache.DiskStorePathManager;
45 import net.sf.ehcache.Ehcache;
46 import net.sf.ehcache.Element;
47 import net.sf.ehcache.concurrent.ConcurrencyUtil;
48 import net.sf.ehcache.config.CacheConfiguration;
49 import net.sf.ehcache.config.PinningConfiguration;
50 import net.sf.ehcache.event.RegisteredEventListeners;
51 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
52 import net.sf.ehcache.store.FrontEndCacheTier;
53 import net.sf.ehcache.store.disk.ods.FileAllocationTree;
54 import net.sf.ehcache.store.disk.ods.Region;
55 import net.sf.ehcache.util.MemoryEfficientByteArrayOutputStream;
56 import net.sf.ehcache.util.PreferTCCLObjectInputStream;
57
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 /**
62  * A mock-up of a on-disk element proxy factory.
63  *
64  * @author Chris Dennis
65  * @author Ludovic Orban
66  */

67 @IgnoreSizeOf
68 public class DiskStorageFactory {
69
70     /**
71      * Path stub used to create unique ehcache directories.
72      */

73     private static final int SERIALIZATION_CONCURRENCY_DELAY = 250;
74     private static final int SHUTDOWN_GRACE_PERIOD = 60;
75     private static final int MEGABYTE = 1024 * 1024;
76     private static final int MAX_EVICT = 5;
77     private static final int SAMPLE_SIZE = 30;
78
79     private static final Logger LOG = LoggerFactory.getLogger(DiskStorageFactory.class.getName());
80
81     /**
82      * The store bound to this factory.
83      */

84     protected volatile DiskStore                  store;
85
86     private final BlockingQueue<Runnable> diskQueue;
87     /**
88      * Executor service used to write elements to disk
89      */

90     private final ScheduledThreadPoolExecutor diskWriter;
91
92     private final long queueCapacity;
93
94     private final File             file;
95     private final RandomAccessFile[] dataAccess;
96
97     private final FileAllocationTree allocator;
98
99     private final RegisteredEventListeners eventService;
100
101     private volatile int elementSize;
102
103     private final ElementSubstituteFilter onDiskFilter = new OnDiskFilter();
104
105     private final AtomicInteger onDisk = new AtomicInteger();
106
107     private final File indexFile;
108
109     private final IndexWriteTask flushTask;
110
111     private volatile int diskCapacity;
112
113     private volatile boolean pinningEnabled;
114
115     private final boolean diskPersistent;
116
117     private final DiskStorePathManager diskStorePathManager;
118     /**
119      * Constructs an disk persistent factory for the given cache and disk path.
120      *
121      * @param cache cache that fronts this factory
122      */

123     public DiskStorageFactory(Ehcache cache, RegisteredEventListeners cacheEventNotificationService) {
124         this.diskStorePathManager = cache.getCacheManager().getDiskStorePathManager();
125         this.file = diskStorePathManager.getFile(cache.getName(), ".data");
126
127         this.indexFile = diskStorePathManager.getFile(cache.getName(), ".index");
128         this.pinningEnabled = determineCachePinned(cache.getCacheConfiguration());
129         this.diskPersistent = cache.getCacheConfiguration().isDiskPersistent();
130
131         if (diskPersistent && diskStorePathManager.isAutoCreated()) {
132             LOG.warn("Data in persistent disk stores is ignored for stores from automatically created directories.\n"
133                     + "Remove diskPersistent or resolve the conflicting disk paths in cache configuration.\n" + "Deleting data file "
134                     + file.getAbsolutePath());
135             deleteFile(file);
136         } else if (!diskPersistent) {
137             deleteFile(file);
138             deleteFile(indexFile);
139         }
140
141         try {
142             dataAccess = allocateRandomAccessFiles(file, cache.getCacheConfiguration().getDiskAccessStripes());
143         } catch (FileNotFoundException e) {
144             throw new CacheException(e);
145         }
146         this.allocator = new FileAllocationTree(Long.MAX_VALUE, dataAccess[0]);
147
148         diskWriter = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
149             public Thread newThread(Runnable r) {
150                 Thread t = new Thread(r, file.getName());
151                 t.setDaemon(false);
152                 return t;
153             }
154         });
155         this.diskQueue = diskWriter.getQueue();
156         this.eventService = cache.getCacheEventNotificationService();
157         this.queueCapacity = cache.getCacheConfiguration().getDiskSpoolBufferSizeMB() * MEGABYTE;
158         this.diskCapacity = cache.getCacheConfiguration().getMaxElementsOnDisk();
159
160         diskWriter.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
161         diskWriter.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
162         long expiryInterval = cache.getCacheConfiguration().getDiskExpiryThreadIntervalSeconds();
163         diskWriter.scheduleWithFixedDelay(new DiskExpiryTask(), expiryInterval, expiryInterval, TimeUnit.SECONDS);
164
165         flushTask = new IndexWriteTask(indexFile, cache.getCacheConfiguration().isClearOnFlush());
166
167         if (!getDataFile().exists() || (getDataFile().length() == 0)) {
168             LOG.debug("Matching data file missing (or empty) for index file. Deleting index file " + indexFile);
169             deleteFile(indexFile);
170         } else if (getDataFile().exists() && indexFile.exists()) {
171             if (getDataFile().lastModified() > (indexFile.lastModified() + TimeUnit.SECONDS.toMillis(1))) {
172                 LOG.warn("The index for data file {} is out of date, probably due to an unclean shutdown. "
173                         + "Deleting index file {}", getDataFile(), indexFile);
174                 deleteFile(indexFile);
175             }
176         }
177     }
178
179     private boolean determineCachePinned(CacheConfiguration cacheConfiguration) {
180         PinningConfiguration pinningConfiguration = cacheConfiguration.getPinningConfiguration();
181         if (pinningConfiguration == null) {
182             return false;
183         }
184
185         switch (pinningConfiguration.getStore()) {
186             case LOCALHEAP:
187             case LOCALMEMORY:
188                 return false;
189
190             case INCACHE:
191                 return true;
192
193             default:
194                 throw new IllegalArgumentException();
195         }
196     }
197
198     private static RandomAccessFile[] allocateRandomAccessFiles(File f, int stripes) throws FileNotFoundException {
199         int roundedStripes = stripes;
200         while ((roundedStripes & (roundedStripes - 1)) != 0) {
201             ++roundedStripes;
202         }
203
204         RandomAccessFile [] result = new RandomAccessFile[roundedStripes];
205         for (int i = 0; i < result.length; ++i) {
206             result[i] = new RandomAccessFile(f, "rw");
207         }
208
209         return result;
210     }
211
212     private RandomAccessFile getDataAccess(Object key) {
213         return this.dataAccess[ConcurrencyUtil.selectLock(key, dataAccess.length)];
214     }
215
216     /**
217      * Return this size in bytes of this factory
218      *
219      * @return this size in bytes of this factory
220      */

221     public long getOnDiskSizeInBytes() {
222         synchronized (dataAccess[0]) {
223             try {
224                 return dataAccess[0].length();
225             } catch (IOException e) {
226                 LOG.warn("Exception trying to determine store size", e);
227                 return 0;
228             }
229         }
230     }
231
232     /**
233      * Bind a store instance to this factory.
234      *
235      * @param store store to bind
236      */

237     public void bind(DiskStore store) {
238         this.store = store;
239         loadIndex();
240     }
241
242     /**
243      * Free any manually managed resources used by this {@link DiskSubstitute}.
244      *
245      * @param lock the lock protecting the DiskSubstitute
246      * @param substitute DiskSubstitute being freed.
247      */

248     public void free(Lock lock, DiskSubstitute substitute) {
249         free(lock, substitute, false);
250     }
251
252     /**
253      * Free any manually managed resources used by this {@link DiskSubstitute}.
254      *
255      * @param lock the lock protecting the DiskSubstitute
256      * @param substitute DiskSubstitute being freed.
257      * @param faultFailure true if this DiskSubstitute should be freed because of a disk failure
258      */

259     public void free(Lock lock, DiskSubstitute substitute, boolean faultFailure) {
260         if (substitute instanceof DiskStorageFactory.DiskMarker) {
261             if (!faultFailure) {
262                 onDisk.decrementAndGet();
263             }
264             //free done asynchronously under the relevant segment lock...
265             DiskFreeTask free = new DiskFreeTask(lock, (DiskMarker) substitute);
266             if (lock.tryLock()) {
267                 try {
268                     free.call();
269                 } finally {
270                     lock.unlock();
271                 }
272             } else {
273                 schedule(free);
274             }
275         }
276     }
277
278     /**
279      * Mark this on-disk marker as used (hooks into the file space allocation structure).
280      *
281      * @param marker on-disk marker to mark as used
282      */

283     protected void markUsed(DiskMarker marker) {
284         allocator.mark(new Region(marker.getPosition(), marker.getPosition() + marker.getSize() - 1));
285     }
286
287     /**
288      * Shrink this store's data file down to a minimal size for its contents.
289      */

290     protected void shrinkDataFile() {
291         synchronized (dataAccess[0]) {
292             try {
293                 dataAccess[0].setLength(allocator.getFileSize());
294             } catch (IOException e) {
295                 LOG.error("Exception trying to shrink data file to size", e);
296             }
297         }
298     }
299     /**
300      * Shuts down this disk factory.
301      * <p>
302      * This shuts down the executor and then waits for its termination, before closing the data file.
303      * @throws java.io.IOException if an IO error occurred
304      */

305     protected void shutdown() throws IOException {
306         diskWriter.shutdown();
307         for (int i = 0; i < SHUTDOWN_GRACE_PERIOD; i++) {
308             try {
309                 if (diskWriter.awaitTermination(1, TimeUnit.SECONDS)) {
310                     break;
311                 } else {
312                     LOG.info("Waited " + (i + 1) + " seconds for shutdown of [" + file.getName() + "]");
313                 }
314             } catch (InterruptedException e) {
315                 LOG.warn("Received exception while waiting for shutdown", e);
316             }
317         }
318
319         for (final RandomAccessFile raf : dataAccess) {
320             synchronized (raf) {
321                 raf.close();
322             }
323         }
324
325         if (!diskPersistent) {
326             deleteFile(file);
327             deleteFile(indexFile);
328         }
329     }
330
331     /**
332      * Deletes the data file for this factory.
333      */

334     protected void delete() {
335         deleteFile(file);
336         allocator.clear();
337     }
338
339     /**
340      * Schedule to given task on the disk writer executor service.
341      *
342      * @param <U> return type of the callable
343      * @param call callable to call
344      * @return Future representing the return of this call
345      */

346     protected <U> Future<U> schedule(Callable<U> call) {
347         return diskWriter.submit(call);
348     }
349
350     /**
351      * Read the data at the given marker, and return the associated deserialized Element.
352      *
353      * @param marker marker to read
354      * @return deserialized Element
355      * @throws java.io.IOException on read error
356      * @throws ClassNotFoundException on deserialization error
357      */

358     protected Element read(DiskMarker marker) throws IOException, ClassNotFoundException {
359         final byte[] buffer = new byte[marker.getSize()];
360         final RandomAccessFile data = getDataAccess(marker.getKey());
361         synchronized (data) {
362             // Load the element
363             data.seek(marker.getPosition());
364             data.readFully(buffer);
365         }
366
367         ObjectInputStream objstr = new PreferTCCLObjectInputStream(new ByteArrayInputStream(buffer));
368
369         try {
370             return (Element) objstr.readObject();
371         } finally {
372             objstr.close();
373         }
374     }
375
376     /**
377      * Write the given element to disk, and return the associated marker.
378      *
379      * @param element to write
380      * @return marker representing the element
381      * @throws java.io.IOException on write error
382      */

383     protected DiskMarker write(Element element) throws IOException {
384         MemoryEfficientByteArrayOutputStream buffer = serializeElement(element);
385         int bufferLength = buffer.size();
386         elementSize = bufferLength;
387         DiskMarker marker = alloc(element, bufferLength);
388         // Write the record
389         final RandomAccessFile data = getDataAccess(element.getObjectKey());
390         synchronized (data) {
391             data.seek(marker.getPosition());
392             data.write(buffer.toByteArray(), 0, bufferLength);
393         }
394         return marker;
395     }
396
397     private MemoryEfficientByteArrayOutputStream serializeElement(Element element) throws IOException {
398         // try two times to Serialize. A ConcurrentModificationException can occur because Java's serialization
399         // mechanism is not threadsafe and POJOs are seldom implemented in a threadsafe way.
400         // e.g. we are serializing an ArrayList field while another thread somewhere in the application is appending to it.
401         // The best we can do is try again and then give up.
402         ConcurrentModificationException exception = null;
403         for (int retryCount = 0; retryCount < 2; retryCount++) {
404             try {
405                 return MemoryEfficientByteArrayOutputStream.serialize(element);
406             } catch (ConcurrentModificationException e) {
407                 exception = e;
408                 try {
409                     // wait for the other thread(s) to finish
410                     MILLISECONDS.sleep(SERIALIZATION_CONCURRENCY_DELAY);
411                 } catch (InterruptedException e1) {
412                     //no-op
413                 }
414             }
415         }
416         throw exception;
417     }
418
419     private DiskMarker alloc(Element element, int size) throws IOException {
420         //check for a matching chunk
421         Region r = allocator.alloc(size);
422         return createMarker(r.start(), size, element);
423     }
424
425     /**
426      * Free the given marker to be used by a subsequent write.
427      *
428      * @param marker marker to be free'd
429      */

430     protected void free(DiskMarker marker) {
431         allocator.free(new Region(marker.getPosition(), marker.getPosition() + marker.getSize() - 1));
432     }
433
434     /**
435      * Return {@code trueif the disk write queue is full.
436      *
437      * @return {@code trueif the disk write queue is full.
438      */

439     public boolean bufferFull() {
440         return (diskQueue.size() * elementSize) > queueCapacity;
441     }
442
443     /**
444      * Return a reference to the data file backing this factory.
445      *
446      * @return a reference to the data file backing this factory.
447      */

448     public File getDataFile() {
449         return file;
450     }
451
452     /**
453      * DiskWriteTasks are used to serialize elements
454      * to disk and fault in the resultant DiskMarker
455      * instance.
456      */

457     abstract class DiskWriteTask implements Callable<DiskMarker> {
458
459         private final Placeholder placeholder;
460
461         /**
462          * Create a disk-write task for the given placeholder.
463          *
464          * @param p a disk-write task for the given placeholder.
465          */

466         DiskWriteTask(Placeholder p) {
467             this.placeholder = p;
468         }
469
470         /**
471          * Return the placeholder that this task will write.
472          *
473          * @return the placeholder that this task will write.
474          */

475         Placeholder getPlaceholder() {
476             return placeholder;
477         }
478
479         /**
480          * {@inheritDoc}
481          */

482         public DiskMarker call() {
483             try {
484                 if (store.containsKey(placeholder.getKey())) {
485                     DiskMarker marker = write(placeholder.getElement());
486                     if (marker != null && store.fault(placeholder.getKey(), placeholder, marker)) {
487                         return marker;
488                     } else {
489                         return null;
490                     }
491                 } else {
492                     return null;
493                 }
494             } catch (Throwable e) {
495                 LOG.error("Disk Write of " + placeholder.getKey() + " failed: ", e);
496                 FrontEndCacheTier frontEndCacheTier = eventService.getFrontEndCacheTier();
497                 if (frontEndCacheTier != null) {
498                     Lock l = frontEndCacheTier.getLockFor(placeholder.getKey()).writeLock();
499                     l.lock();
500                     try {
501                         placeholder.setFailedToFlush(true);
502                         if (!frontEndCacheTier.isCached(placeholder.getKey())) {
503                             store.evict(placeholder.getKey(), placeholder);
504                         }
505                     } finally {
506                         l.unlock();
507                     }
508                 } else {
509                     placeholder.setFailedToFlush(true);
510                 }
511                 return null;
512             }
513         }
514     }
515
516     /**
517      * Disk free tasks are used to asynchronously free DiskMarker instances under the correct
518      * exclusive write lock.  This ensure markers are not free'd until no more readers can be
519      * holding references to them.
520      */

521     private final class DiskFreeTask implements Callable<Void> {
522         private final Lock lock;
523         private final DiskMarker marker;
524
525         private DiskFreeTask(Lock lock, DiskMarker marker) {
526             this.lock = lock;
527             this.marker = marker;
528         }
529
530         /**
531          * {@inheritDoc}
532          */

533         public Void call() {
534             lock.lock();
535             try {
536                 DiskStorageFactory.this.free(marker);
537             } finally {
538                 lock.unlock();
539             }
540             return null;
541         }
542     }
543
544     /**
545      * Abstract superclass for all disk substitutes.
546      */

547     public abstract static class DiskSubstitute {
548
549         /**
550          * Cached size of this mapping on the Java heap.
551          */

552         protected transient volatile long onHeapSize;
553
554         @IgnoreSizeOf
555         private transient volatile DiskStorageFactory factory;
556
557         /**
558          * Create a disk substitute bound to no factory.  This constructor is used during
559          * de-serialization.
560          */

561         public DiskSubstitute() {
562             this.factory = null;
563         }
564
565         /**
566          * Create a disk substitute bound to the given factory.
567          *
568          * @param factory the factory to bind to.
569          */

570         DiskSubstitute(DiskStorageFactory factory) {
571             this.factory = factory;
572         }
573
574         /**
575          * Return the key to which this marker is (or should be) mapped.
576          *
577          * @return the key to which this marker is (or should be) mapped.
578          */

579         abstract Object getKey();
580
581         /**
582          * Return the total number of hits on this marker
583          *
584          * @return the total number of hits on this marker
585          */

586         abstract long getHitCount();
587
588         /**
589          * Return the time at which this marker expires.
590          *
591          * @return the time at which this marker expires.
592          */

593         abstract long getExpirationTime();
594
595         /**
596          * Mark the disk substitute as installed
597          */

598         abstract void installed();
599
600         /**
601          * Returns the {@link DiskStorageFactory} instance that generated this <code>DiskSubstitute</code>
602          *
603          * @return an <code>ElementProxyFactory</code>
604          */

605         public final DiskStorageFactory getFactory() {
606             return factory;
607         }
608
609         /**
610          * Bind this marker to a given factory.
611          * <p>
612          * Used during deserialization of markers to associate them with the deserializing factory.
613          * @param factory the factory to bind to
614          */

615         void bindFactory(DiskStorageFactory factory) {
616             this.factory = factory;
617         }
618     }
619
620     /**
621      * Placeholder instances are put in place to prevent
622      * duplicate write requests while Elements are being
623      * written to disk.
624      */

625     final class Placeholder extends DiskSubstitute {
626         @IgnoreSizeOf
627         private final Object key;
628         private final Element element;
629
630         private volatile boolean failedToFlush;
631
632         /**
633          * Create a Placeholder wrapping the given element and key.
634          *
635          * @param element the element to wrap
636          */

637         Placeholder(Element element) {
638             super(DiskStorageFactory.this);
639             this.key = element.getObjectKey();
640             this.element = element;
641         }
642
643         /**
644          * Whether flushing this to disk ever failed
645          * @return true if failed, otherwise false
646          */

647         boolean hasFailedToFlush() {
648             return failedToFlush;
649         }
650
651         private void setFailedToFlush(final boolean failedToFlush) {
652             this.failedToFlush = failedToFlush;
653         }
654
655         /**
656          * {@inheritDoc}
657          */

658         @Override
659         public void installed() {
660             DiskStorageFactory.this.schedule(new PersistentDiskWriteTask(this));
661         }
662
663         /**
664          * {@inheritDoc}
665          */

666         @Override
667         Object getKey() {
668             return key;
669         }
670
671         /**
672          * {@inheritDoc}
673          */

674         @Override
675         long getHitCount() {
676             return getElement().getHitCount();
677         }
678
679         @Override
680         long getExpirationTime() {
681             return getElement().getExpirationTime();
682         }
683
684         /**
685          * Return the element that this Placeholder is wrapping.
686          * @return the element that this Placeholder is wrapping.
687          */

688         Element getElement() {
689             return element;
690         }
691     }
692
693     /**
694      * DiskMarker instances point to the location of their
695      * associated serialized Element instance.
696      */

697     public static class DiskMarker extends DiskSubstitute implements Serializable {
698
699         @IgnoreSizeOf
700         private final Object key;
701
702         private final long position;
703         private final int size;
704
705         private volatile long hitCount;
706
707         private volatile long expiry;
708
709         /**
710          * Create a new marker tied to the given factory instance.
711          *
712          * @param factory factory responsible for this marker
713          * @param position position on disk where the element will be stored
714          * @param size size of the serialized element
715          * @param element element being stored
716          */

717         DiskMarker(DiskStorageFactory factory, long position, int size, Element element) {
718             super(factory);
719             this.position = position;
720             this.size = size;
721
722             this.key = element.getObjectKey();
723             this.hitCount = element.getHitCount();
724             this.expiry = element.getExpirationTime();
725         }
726
727         /**
728          * Create a new marker tied to the given factory instance.
729          *
730          * @param factory factory responsible for this marker
731          * @param position position on disk where the element will be stored
732          * @param size size of the serialized element
733          * @param key key to which this element is mapped
734          * @param hits hit count for this element
735          */

736         DiskMarker(DiskStorageFactory factory, long position, int size, Object key, long hits) {
737             super(factory);
738             this.position = position;
739             this.size = size;
740
741             this.key = key;
742             this.hitCount = hits;
743         }
744
745         /**
746          * Key to which this Element is mapped.
747          *
748          * @return key for this Element
749          */

750         @Override
751         Object getKey() {
752             return key;
753         }
754
755         /**
756          * Number of hits on this Element.
757          */

758         @Override
759         long getHitCount() {
760             return hitCount;
761         }
762
763         /**
764          * Disk offset at which this element is stored.
765          *
766          * @return disk offset
767          */

768         private long getPosition() {
769             return position;
770         }
771
772         /**
773          * Returns the size of the currently occupying element.
774          *
775          * @return size of the stored element
776          */

777         public int getSize() {
778             return size;
779         }
780
781         /**
782          * {@inheritDoc}
783          * <p>
784          * A No-Op
785          */

786         @Override
787         public void installed() {
788             //no-op
789         }
790
791         /**
792          * {@inheritDoc}
793          */

794         @Override
795         long getExpirationTime() {
796             return expiry;
797         }
798
799         /**
800          * Increment statistic associated with a hit on this cache.
801          *
802          * @param e element deserialized from disk
803          */

804         void hit(Element e) {
805             hitCount++;
806             expiry = e.getExpirationTime();
807         }
808     }
809
810
811     /**
812      * Remove elements created by this factory if they have expired.
813      */

814     public void expireElements() {
815         new DiskExpiryTask().run();
816     }
817
818     /**
819      * Causes removal of all expired elements (and fires the relevant events).
820      */

821     private final class DiskExpiryTask implements Runnable {
822
823         /**
824          * {@inheritDoc}
825          */

826         public void run() {
827             long now = System.currentTimeMillis();
828             for (Object key : store.keySet()) {
829                 Object value = store.unretrievedGet(key);
830                 if (created(value) && value instanceof DiskStorageFactory.DiskMarker) {
831                     checkExpiry((DiskMarker) value, now);
832                 }
833             }
834         }
835
836         private void checkExpiry(DiskMarker marker, long now) {
837             if (marker.getExpirationTime() < now) {
838                 if (eventService.hasCacheEventListeners()) {
839                     try {
840                         Element element = read(marker);
841                         if (store.remove(marker.getKey()) != null) {
842                             eventService.notifyElementExpiry(element, false);
843                         }
844                     } catch (Exception e) {
845                         // ignore
846                     }
847                 } else {
848                     store.evict(marker.getKey(), marker);
849                 }
850             }
851         }
852     }
853
854     /**
855      * Attempt to delete the corresponding file and log an error on failure.
856      * @param f the file to delete
857      */

858     protected static void deleteFile(File f) {
859         if (!f.delete()) {
860             LOG.debug("Failed to delete file {}", f.getName());
861         }
862     }
863
864
865     /**
866      * Create a disk substitute for an element
867      *
868      * @param element the element to create a disk substitute for
869      * @return The substitute element
870      * @throws IllegalArgumentException if element cannot be substituted
871      */

872     public DiskSubstitute create(Element element) throws IllegalArgumentException {
873         return new Placeholder(element);
874     }
875
876     /**
877      * Decodes the supplied {@link DiskSubstitute}.
878      *
879      * @param object ElementSubstitute to decode
880      * @return the decoded element
881      */

882     public Element retrieve(DiskSubstitute object) {
883         if (object instanceof DiskMarker) {
884             try {
885                 DiskMarker marker = (DiskMarker) object;
886                 return read(marker);
887             } catch (IOException e) {
888                 throw new CacheException(e);
889             } catch (ClassNotFoundException e) {
890                 throw new CacheException(e);
891             }
892         } else if (object instanceof Placeholder) {
893             return ((Placeholder) object).getElement();
894         } else {
895             return null;
896         }
897     }
898
899     /**
900      * Decodes the supplied {@link DiskSubstitute}, updating statistics.
901      *
902      * @param object ElementSubstitute to decode
903      * @return the decoded element
904      */

905     public Element retrieve(DiskSubstitute object, Segment segment) {
906         if (object instanceof DiskMarker) {
907             try {
908                 DiskMarker marker = (DiskMarker) object;
909                 segment.diskHit();
910                 Element e = read(marker);
911                 marker.hit(e);
912                 return e;
913             } catch (IOException e) {
914                 throw new CacheException(e);
915             } catch (ClassNotFoundException e) {
916                 throw new CacheException(e);
917             }
918         } else if (object instanceof DiskStorageFactory.Placeholder) {
919             segment.diskHit();
920             return ((Placeholder) object).getElement();
921         } else {
922             segment.miss();
923             return null;
924         }
925     }
926
927     /**
928      * Returns <code>true</code> if this factory created the given object.
929      *
930      * @param object object to check
931      * @return <code>true</code> if object created by this factory
932      */

933     public boolean created(Object object) {
934         if (object instanceof DiskSubstitute) {
935             return ((DiskSubstitute) object).getFactory() == this;
936         } else {
937             return false;
938         }
939     }
940
941     /**
942      * Unbinds a store instance from this factory
943      */

944     public void unbind() {
945         try {
946             flushTask.call();
947         } catch (Throwable t) {
948             LOG.error("Could not flush disk cache. Initial cause was " + t.getMessage(), t);
949         }
950
951         try {
952             shutdown();
953             if (diskStorePathManager.isAutoCreated()) {
954                 deleteFile(indexFile);
955                 delete();
956             }
957         } catch (IOException e) {
958             LOG.error("Could not shut down disk cache. Initial cause was " + e.getMessage(), e);
959         }
960     }
961
962     /**
963      * Schedule a flush (index write) for this factory.
964      * @return a Future
965      */

966     public Future<Void> flush() {
967         return schedule(flushTask);
968     }
969
970     private DiskMarker createMarker(long position, int size, Element element) {
971         return new DiskMarker(this, position, size, element);
972     }
973
974     private boolean isPinningEnabled() {
975         return pinningEnabled;
976     }
977
978     /**
979      * Evict some elements, if possible
980      *
981      * @param count the number of elements to evict
982      * @return the number of elements actually evicted
983      */

984     int evict(int count) {
985         // see void onDiskEvict(int size, Object keyHint)
986         if (isPinningEnabled()) {
987             return 0;
988         }
989
990         int evicted = 0;
991         for (int i = 0; i < count; i++) {
992             DiskSubstitute target = this.getDiskEvictionTarget(null, count);
993             if (target != null) {
994                 Element evictedElement = store.evictElement(target.getKey(), null);
995                 if (evictedElement != null) {
996                     evicted++;
997                 }
998             }
999         }
1000         return evicted;
1001     }
1002
1003     /**
1004      * Filters for on-disk elements created by this factory
1005      */

1006     private class OnDiskFilter implements ElementSubstituteFilter {
1007
1008         /**
1009          * {@inheritDoc}
1010          */

1011         public boolean allows(Object object) {
1012             if (!created(object)) {
1013                 return false;
1014             }
1015
1016             return object instanceof DiskMarker;
1017         }
1018     }
1019
1020     /**
1021      * Return the number of on-disk elements
1022      *
1023      * @return the number of on-disk elements
1024      */

1025     public int getOnDiskSize() {
1026         return onDisk.get();
1027     }
1028
1029     /**
1030      * Set the maximum on-disk capacity for this factory.
1031      *
1032      * @param capacity the maximum on-disk capacity for this factory.
1033      */

1034     public void setOnDiskCapacity(int capacity) {
1035         diskCapacity = capacity;
1036     }
1037
1038     private void onDiskEvict(int size, Object keyHint) {
1039         if (diskCapacity > 0 && !isPinningEnabled()) {
1040             int overflow = size - diskCapacity;
1041             for (int i = 0; i < Math.min(MAX_EVICT, overflow); i++) {
1042                 DiskSubstitute target = getDiskEvictionTarget(keyHint, size);
1043                 if (target != null) {
1044                     final Element element = store.evictElement(target.getKey(), target);
1045                     if (element != null && onDisk.get() <= diskCapacity) {
1046                         break;
1047                     }
1048                 }
1049             }
1050         }
1051     }
1052
1053     private DiskSubstitute getDiskEvictionTarget(Object keyHint, int size) {
1054         List<DiskSubstitute> sample = store.getRandomSample(onDiskFilter, Math.min(SAMPLE_SIZE, size), keyHint);
1055         DiskSubstitute target = null;
1056         DiskSubstitute hintTarget = null;
1057         for (DiskSubstitute substitute : sample) {
1058             if ((target == null) || (substitute.getHitCount() < target.getHitCount())) {
1059                 if (substitute.getKey().equals(keyHint)) {
1060                     hintTarget = substitute;
1061                 } else {
1062                     target = substitute;
1063                 }
1064             }
1065         }
1066         return target != null ? target : hintTarget;
1067     }
1068
1069     /**
1070      * Disk write task implementation for disk persistent stores.
1071      */

1072     private final class PersistentDiskWriteTask extends DiskWriteTask {
1073
1074         /**
1075          * Create a disk persistent disk-write task for this placeholder.
1076          *
1077          * @param p the placeholder
1078          */

1079         PersistentDiskWriteTask(Placeholder p) {
1080             super(p);
1081         }
1082
1083         /**
1084          * {@inheritDoc}
1085          */

1086         @Override
1087         public DiskMarker call() {
1088             DiskMarker result = super.call();
1089             if (result != null) {
1090                 int disk = onDisk.incrementAndGet();
1091                 onDiskEvict(disk, getPlaceholder().getKey());
1092             }
1093             return result;
1094         }
1095     }
1096
1097     /**
1098      * Task that writes the index file for this factory.
1099      */

1100     class IndexWriteTask implements Callable<Void> {
1101
1102         private final File index;
1103         private final boolean clearOnFlush;
1104
1105         /**
1106          * Create a disk flush task that writes to the given file.
1107          *
1108          * @param index the file to write the index to
1109          * @param clear clear on flush flag
1110          */

1111         IndexWriteTask(File index, boolean clear) {
1112             this.index = index;
1113             this.clearOnFlush = clear;
1114         }
1115
1116         /**
1117          * {@inheritDoc}
1118          */

1119         public synchronized Void call() throws IOException, InterruptedException {
1120             ObjectOutputStream oos = new ObjectOutputStream(new FileOutputStream(index));
1121             try {
1122                 for (Object key : store.keySet()) {
1123                     Object o = store.unretrievedGet(key);
1124                     if (o instanceof Placeholder && !((Placeholder)o).failedToFlush) {
1125                         o = new PersistentDiskWriteTask((Placeholder) o).call();
1126                         if (o == null) {
1127                             o = store.unretrievedGet(key);
1128                         }
1129                     }
1130
1131                     if (o instanceof DiskMarker) {
1132                         DiskMarker marker = (DiskMarker) o;
1133                         oos.writeObject(key);
1134                         oos.writeObject(marker);
1135                     }
1136                 }
1137             } finally {
1138                 oos.close();
1139             }
1140             return null;
1141         }
1142
1143     }
1144
1145     private void loadIndex() {
1146         if (!indexFile.exists()) {
1147             return;
1148         }
1149
1150         try {
1151             ObjectInputStream ois = new PreferTCCLObjectInputStream(new FileInputStream(indexFile));
1152             try {
1153                 Object key = ois.readObject();
1154                 Object value = ois.readObject();
1155
1156                 DiskMarker marker = (DiskMarker) value;
1157                 while (true) {
1158                     marker.bindFactory(this);
1159                     markUsed(marker);
1160                     if (store.putRawIfAbsent(key, marker)) {
1161                         onDisk.incrementAndGet();
1162                     } else {
1163                         // the disk pool is full
1164                         return;
1165                     }
1166                     key = ois.readObject();
1167                     marker = (DiskMarker) ois.readObject();
1168                 }
1169             } finally {
1170                 ois.close();
1171             }
1172         } catch (EOFException e) {
1173             // end of file reached, stop processing
1174         } catch (Exception e) {
1175             LOG.warn("Index file {} is corrupt, deleting and ignoring it : {}", indexFile, e);
1176             e.printStackTrace();
1177             store.removeAll();
1178             deleteFile(indexFile);
1179         } finally {
1180             shrinkDataFile();
1181         }
1182     }
1183
1184     /**
1185      * Return the index file for this store.
1186      * @return the index file
1187      */

1188     public File getIndexFile() {
1189         return indexFile;
1190     }
1191 }
1192