1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */

24
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */

35
36 package java.util.concurrent;
37
38 import static java.util.concurrent.TimeUnit.MILLISECONDS;
39 import static java.util.concurrent.TimeUnit.NANOSECONDS;
40
41 import java.util.AbstractQueue;
42 import java.util.Arrays;
43 import java.util.Collection;
44 import java.util.Iterator;
45 import java.util.List;
46 import java.util.NoSuchElementException;
47 import java.util.Objects;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.locks.Condition;
50 import java.util.concurrent.locks.ReentrantLock;
51
52 /**
53  * A {@link ThreadPoolExecutor} that can additionally schedule
54  * commands to run after a given delay, or to execute periodically.
55  * This class is preferable to {@link java.util.Timer} when multiple
56  * worker threads are needed, or when the additional flexibility or
57  * capabilities of {@link ThreadPoolExecutor} (which this class
58  * extends) are required.
59  *
60  * <p>Delayed tasks execute no sooner than they are enabled, but
61  * without any real-time guarantees about when, after they are
62  * enabled, they will commence. Tasks scheduled for exactly the same
63  * execution time are enabled in first-in-first-out (FIFO) order of
64  * submission.
65  *
66  * <p>When a submitted task is cancelled before it is run, execution
67  * is suppressed.  By default, such a cancelled task is not
68  * automatically removed from the work queue until its delay elapses.
69  * While this enables further inspection and monitoring, it may also
70  * cause unbounded retention of cancelled tasks.  To avoid this, use
71  * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
72  * removed from the work queue at time of cancellation.
73  *
74  * <p>Successive executions of a periodic task scheduled via
75  * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
76  * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
77  * do not overlap. While different executions may be performed by
78  * different threads, the effects of prior executions
79  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
80  * those of subsequent ones.
81  *
82  * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
83  * of the inherited tuning methods are not useful for it. In
84  * particular, because it acts as a fixed-sized pool using
85  * {@code corePoolSize} threads and an unbounded queue, adjustments
86  * to {@code maximumPoolSize} have no useful effect. Additionally, it
87  * is almost never a good idea to set {@code corePoolSize} to zero or
88  * use {@code allowCoreThreadTimeOut} because this may leave the pool
89  * without threads to handle tasks once they become eligible to run.
90  *
91  * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
92  * this class uses {@link Executors#defaultThreadFactory} as the
93  * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
94  * as the default rejected execution handler.
95  *
96  * <p><b>Extension notes:</b> This class overrides the
97  * {@link ThreadPoolExecutor#execute(Runnable) execute} and
98  * {@link AbstractExecutorService#submit(Runnable) submit}
99  * methods to generate internal {@link ScheduledFuture} objects to
100  * control per-task delays and scheduling.  To preserve
101  * functionality, any further overrides of these methods in
102  * subclasses must invoke superclass versions, which effectively
103  * disables additional task customization.  However, this class
104  * provides alternative protected extension method
105  * {@code decorateTask} (one version each for {@code Runnable} and
106  * {@code Callable}) that can be used to customize the concrete task
107  * types used to execute commands entered via {@code execute},
108  * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
109  * and {@code scheduleWithFixedDelay}.  By default, a
110  * {@code ScheduledThreadPoolExecutor} uses a task type extending
111  * {@link FutureTask}. However, this may be modified or replaced using
112  * subclasses of the form:
113  *
114  * <pre> {@code
115  * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
116  *
117  *   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
118  *
119  *   protected <V> RunnableScheduledFuture<V> decorateTask(
120  *                Runnable r, RunnableScheduledFuture<V> task) {
121  *       return new CustomTask<V>(r, task);
122  *   }
123  *
124  *   protected <V> RunnableScheduledFuture<V> decorateTask(
125  *                Callable<V> c, RunnableScheduledFuture<V> task) {
126  *       return new CustomTask<V>(c, task);
127  *   }
128  *   // ... add constructors, etc.
129  * }}</pre>
130  *
131  * @since 1.5
132  * @author Doug Lea
133  */

134 public class ScheduledThreadPoolExecutor
135         extends ThreadPoolExecutor
136         implements ScheduledExecutorService {
137
138     /*
139      * This class specializes ThreadPoolExecutor implementation by
140      *
141      * 1. Using a custom task type ScheduledFutureTask, even for tasks
142      *    that don't require scheduling because they are submitted
143      *    using ExecutorService rather than ScheduledExecutorService
144      *    methods, which are treated as tasks with a delay of zero.
145      *
146      * 2. Using a custom queue (DelayedWorkQueue), a variant of
147      *    unbounded DelayQueue. The lack of capacity constraint and
148      *    the fact that corePoolSize and maximumPoolSize are
149      *    effectively identical simplifies some execution mechanics
150      *    (see delayedExecute) compared to ThreadPoolExecutor.
151      *
152      * 3. Supporting optional run-after-shutdown parameters, which
153      *    leads to overrides of shutdown methods to remove and cancel
154      *    tasks that should NOT be run after shutdown, as well as
155      *    different recheck logic when task (re)submission overlaps
156      *    with a shutdown.
157      *
158      * 4. Task decoration methods to allow interception and
159      *    instrumentation, which are needed because subclasses cannot
160      *    otherwise override submit methods to get this effect. These
161      *    don't have any impact on pool control logic though.
162      */

163
164     /**
165      * False if should cancel/suppress periodic tasks on shutdown.
166      */

167     private volatile boolean continueExistingPeriodicTasksAfterShutdown;
168
169     /**
170      * False if should cancel non-periodic not-yet-expired tasks on shutdown.
171      */

172     private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
173
174     /**
175      * True if ScheduledFutureTask.cancel should remove from queue.
176      */

177     volatile boolean removeOnCancel;
178
179     /**
180      * Sequence number to break scheduling ties, and in turn to
181      * guarantee FIFO order among tied entries.
182      */

183     private static final AtomicLong sequencer = new AtomicLong();
184
185     private class ScheduledFutureTask<V>
186             extends FutureTask<V> implements RunnableScheduledFuture<V> {
187
188         /** Sequence number to break ties FIFO */
189         private final long sequenceNumber;
190
191         /** The nanoTime-based time when the task is enabled to execute. */
192         private volatile long time;
193
194         /**
195          * Period for repeating tasks, in nanoseconds.
196          * A positive value indicates fixed-rate execution.
197          * A negative value indicates fixed-delay execution.
198          * A value of 0 indicates a non-repeating (one-shot) task.
199          */

200         private final long period;
201
202         /** The actual task to be re-enqueued by reExecutePeriodic */
203         RunnableScheduledFuture<V> outerTask = this;
204
205         /**
206          * Index into delay queue, to support faster cancellation.
207          */

208         int heapIndex;
209
210         /**
211          * Creates a one-shot action with given nanoTime-based trigger time.
212          */

213         ScheduledFutureTask(Runnable r, V result, long triggerTime,
214                             long sequenceNumber) {
215             super(r, result);
216             this.time = triggerTime;
217             this.period = 0;
218             this.sequenceNumber = sequenceNumber;
219         }
220
221         /**
222          * Creates a periodic action with given nanoTime-based initial
223          * trigger time and period.
224          */

225         ScheduledFutureTask(Runnable r, V result, long triggerTime,
226                             long period, long sequenceNumber) {
227             super(r, result);
228             this.time = triggerTime;
229             this.period = period;
230             this.sequenceNumber = sequenceNumber;
231         }
232
233         /**
234          * Creates a one-shot action with given nanoTime-based trigger time.
235          */

236         ScheduledFutureTask(Callable<V> callable, long triggerTime,
237                             long sequenceNumber) {
238             super(callable);
239             this.time = triggerTime;
240             this.period = 0;
241             this.sequenceNumber = sequenceNumber;
242         }
243
244         public long getDelay(TimeUnit unit) {
245             return unit.convert(time - System.nanoTime(), NANOSECONDS);
246         }
247
248         public int compareTo(Delayed other) {
249             if (other == this// compare zero if same object
250                 return 0;
251             if (other instanceof ScheduledFutureTask) {
252                 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
253                 long diff = time - x.time;
254                 if (diff < 0)
255                     return -1;
256                 else if (diff > 0)
257                     return 1;
258                 else if (sequenceNumber < x.sequenceNumber)
259                     return -1;
260                 else
261                     return 1;
262             }
263             long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
264             return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
265         }
266
267         /**
268          * Returns {@code trueif this is a periodic (not a one-shot) action.
269          *
270          * @return {@code trueif periodic
271          */

272         public boolean isPeriodic() {
273             return period != 0;
274         }
275
276         /**
277          * Sets the next time to run for a periodic task.
278          */

279         private void setNextRunTime() {
280             long p = period;
281             if (p > 0)
282                 time += p;
283             else
284                 time = triggerTime(-p);
285         }
286
287         public boolean cancel(boolean mayInterruptIfRunning) {
288             // The racy read of heapIndex below is benign:
289             // if heapIndex < 0, then OOTA guarantees that we have surely
290             // been removed; else we recheck under lock in remove()
291             boolean cancelled = super.cancel(mayInterruptIfRunning);
292             if (cancelled && removeOnCancel && heapIndex >= 0)
293                 remove(this);
294             return cancelled;
295         }
296
297         /**
298          * Overrides FutureTask version so as to reset/requeue if periodic.
299          */

300         public void run() {
301             if (!canRunInCurrentRunState(this))
302                 cancel(false);
303             else if (!isPeriodic())
304                 super.run();
305             else if (super.runAndReset()) {
306                 setNextRunTime();
307                 reExecutePeriodic(outerTask);
308             }
309         }
310     }
311
312     /**
313      * Returns true if can run a task given current run state and
314      * run-after-shutdown parameters.
315      */

316     boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
317         if (!isShutdown())
318             return true;
319         if (isStopped())
320             return false;
321         return task.isPeriodic()
322             ? continueExistingPeriodicTasksAfterShutdown
323             : (executeExistingDelayedTasksAfterShutdown
324                || task.getDelay(NANOSECONDS) <= 0);
325     }
326
327     /**
328      * Main execution method for delayed or periodic tasks.  If pool
329      * is shut down, rejects the task. Otherwise adds task to queue
330      * and starts a thread, if necessary, to run it.  (We cannot
331      * prestart the thread to run the task because the task (probably)
332      * shouldn't be run yet.)  If the pool is shut down while the task
333      * is being added, cancel and remove it if required by state and
334      * run-after-shutdown parameters.
335      *
336      * @param task the task
337      */

338     private void delayedExecute(RunnableScheduledFuture<?> task) {
339         if (isShutdown())
340             reject(task);
341         else {
342             super.getQueue().add(task);
343             if (!canRunInCurrentRunState(task) && remove(task))
344                 task.cancel(false);
345             else
346                 ensurePrestart();
347         }
348     }
349
350     /**
351      * Requeues a periodic task unless current run state precludes it.
352      * Same idea as delayedExecute except drops task rather than rejecting.
353      *
354      * @param task the task
355      */

356     void reExecutePeriodic(RunnableScheduledFuture<?> task) {
357         if (canRunInCurrentRunState(task)) {
358             super.getQueue().add(task);
359             if (canRunInCurrentRunState(task) || !remove(task)) {
360                 ensurePrestart();
361                 return;
362             }
363         }
364         task.cancel(false);
365     }
366
367     /**
368      * Cancels and clears the queue of all tasks that should not be run
369      * due to shutdown policy.  Invoked within super.shutdown.
370      */

371     @Override void onShutdown() {
372         BlockingQueue<Runnable> q = super.getQueue();
373         boolean keepDelayed =
374             getExecuteExistingDelayedTasksAfterShutdownPolicy();
375         boolean keepPeriodic =
376             getContinueExistingPeriodicTasksAfterShutdownPolicy();
377         // Traverse snapshot to avoid iterator exceptions
378         // TODO: implement and use efficient removeIf
379         // super.getQueue().removeIf(...);
380         for (Object e : q.toArray()) {
381             if (e instanceof RunnableScheduledFuture) {
382                 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
383                 if ((t.isPeriodic()
384                      ? !keepPeriodic
385                      : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
386                     || t.isCancelled()) { // also remove if already cancelled
387                     if (q.remove(t))
388                         t.cancel(false);
389                 }
390             }
391         }
392         tryTerminate();
393     }
394
395     /**
396      * Modifies or replaces the task used to execute a runnable.
397      * This method can be used to override the concrete
398      * class used for managing internal tasks.
399      * The default implementation simply returns the given task.
400      *
401      * @param runnable the submitted Runnable
402      * @param task the task created to execute the runnable
403      * @param <V> the type of the task's result
404      * @return a task that can execute the runnable
405      * @since 1.6
406      */

407     protected <V> RunnableScheduledFuture<V> decorateTask(
408         Runnable runnable, RunnableScheduledFuture<V> task) {
409         return task;
410     }
411
412     /**
413      * Modifies or replaces the task used to execute a callable.
414      * This method can be used to override the concrete
415      * class used for managing internal tasks.
416      * The default implementation simply returns the given task.
417      *
418      * @param callable the submitted Callable
419      * @param task the task created to execute the callable
420      * @param <V> the type of the task's result
421      * @return a task that can execute the callable
422      * @since 1.6
423      */

424     protected <V> RunnableScheduledFuture<V> decorateTask(
425         Callable<V> callable, RunnableScheduledFuture<V> task) {
426         return task;
427     }
428
429     /**
430      * The default keep-alive time for pool threads.
431      *
432      * Normally, this value is unused because all pool threads will be
433      * core threads, but if a user creates a pool with a corePoolSize
434      * of zero (against our advice), we keep a thread alive as long as
435      * there are queued tasks.  If the keep alive time is zero (the
436      * historic value), we end up hot-spinning in getTask, wasting a
437      * CPU.  But on the other hand, if we set the value too high, and
438      * users create a one-shot pool which they don't cleanly shutdown,
439      * the pool's non-daemon threads will prevent JVM termination.  A
440      * small but non-zero value (relative to a JVM's lifetime) seems
441      * best.
442      */

443     private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
444
445     /**
446      * Creates a new {@code ScheduledThreadPoolExecutor} with the
447      * given core pool size.
448      *
449      * @param corePoolSize the number of threads to keep in the pool, even
450      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
451      * @throws IllegalArgumentException if {@code corePoolSize < 0}
452      */

453     public ScheduledThreadPoolExecutor(int corePoolSize) {
454         super(corePoolSize, Integer.MAX_VALUE,
455               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
456               new DelayedWorkQueue());
457     }
458
459     /**
460      * Creates a new {@code ScheduledThreadPoolExecutor} with the
461      * given initial parameters.
462      *
463      * @param corePoolSize the number of threads to keep in the pool, even
464      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
465      * @param threadFactory the factory to use when the executor
466      *        creates a new thread
467      * @throws IllegalArgumentException if {@code corePoolSize < 0}
468      * @throws NullPointerException if {@code threadFactory} is null
469      */

470     public ScheduledThreadPoolExecutor(int corePoolSize,
471                                        ThreadFactory threadFactory) {
472         super(corePoolSize, Integer.MAX_VALUE,
473               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
474               new DelayedWorkQueue(), threadFactory);
475     }
476
477     /**
478      * Creates a new {@code ScheduledThreadPoolExecutor} with the
479      * given initial parameters.
480      *
481      * @param corePoolSize the number of threads to keep in the pool, even
482      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
483      * @param handler the handler to use when execution is blocked
484      *        because the thread bounds and queue capacities are reached
485      * @throws IllegalArgumentException if {@code corePoolSize < 0}
486      * @throws NullPointerException if {@code handler} is null
487      */

488     public ScheduledThreadPoolExecutor(int corePoolSize,
489                                        RejectedExecutionHandler handler) {
490         super(corePoolSize, Integer.MAX_VALUE,
491               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
492               new DelayedWorkQueue(), handler);
493     }
494
495     /**
496      * Creates a new {@code ScheduledThreadPoolExecutor} with the
497      * given initial parameters.
498      *
499      * @param corePoolSize the number of threads to keep in the pool, even
500      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
501      * @param threadFactory the factory to use when the executor
502      *        creates a new thread
503      * @param handler the handler to use when execution is blocked
504      *        because the thread bounds and queue capacities are reached
505      * @throws IllegalArgumentException if {@code corePoolSize < 0}
506      * @throws NullPointerException if {@code threadFactory} or
507      *         {@code handler} is null
508      */

509     public ScheduledThreadPoolExecutor(int corePoolSize,
510                                        ThreadFactory threadFactory,
511                                        RejectedExecutionHandler handler) {
512         super(corePoolSize, Integer.MAX_VALUE,
513               DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
514               new DelayedWorkQueue(), threadFactory, handler);
515     }
516
517     /**
518      * Returns the nanoTime-based trigger time of a delayed action.
519      */

520     private long triggerTime(long delay, TimeUnit unit) {
521         return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
522     }
523
524     /**
525      * Returns the nanoTime-based trigger time of a delayed action.
526      */

527     long triggerTime(long delay) {
528         return System.nanoTime() +
529             ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
530     }
531
532     /**
533      * Constrains the values of all delays in the queue to be within
534      * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
535      * This may occur if a task is eligible to be dequeued, but has
536      * not yet been, while some other task is added with a delay of
537      * Long.MAX_VALUE.
538      */

539     private long overflowFree(long delay) {
540         Delayed head = (Delayed) super.getQueue().peek();
541         if (head != null) {
542             long headDelay = head.getDelay(NANOSECONDS);
543             if (headDelay < 0 && (delay - headDelay < 0))
544                 delay = Long.MAX_VALUE + headDelay;
545         }
546         return delay;
547     }
548
549     /**
550      * @throws RejectedExecutionException {@inheritDoc}
551      * @throws NullPointerException       {@inheritDoc}
552      */

553     public ScheduledFuture<?> schedule(Runnable command,
554                                        long delay,
555                                        TimeUnit unit) {
556         if (command == null || unit == null)
557             throw new NullPointerException();
558         RunnableScheduledFuture<Void> t = decorateTask(command,
559             new ScheduledFutureTask<Void>(command, null,
560                                           triggerTime(delay, unit),
561                                           sequencer.getAndIncrement()));
562         delayedExecute(t);
563         return t;
564     }
565
566     /**
567      * @throws RejectedExecutionException {@inheritDoc}
568      * @throws NullPointerException       {@inheritDoc}
569      */

570     public <V> ScheduledFuture<V> schedule(Callable<V> callable,
571                                            long delay,
572                                            TimeUnit unit) {
573         if (callable == null || unit == null)
574             throw new NullPointerException();
575         RunnableScheduledFuture<V> t = decorateTask(callable,
576             new ScheduledFutureTask<V>(callable,
577                                        triggerTime(delay, unit),
578                                        sequencer.getAndIncrement()));
579         delayedExecute(t);
580         return t;
581     }
582
583     /**
584      * Submits a periodic action that becomes enabled first after the
585      * given initial delay, and subsequently with the given period;
586      * that is, executions will commence after
587      * {@code initialDelay}, then {@code initialDelay + period}, then
588      * {@code initialDelay + 2 * period}, and so on.
589      *
590      * <p>The sequence of task executions continues indefinitely until
591      * one of the following exceptional completions occur:
592      * <ul>
593      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
594      * via the returned future.
595      * <li>Method {@link #shutdown} is called and the {@linkplain
596      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
597      * whether to continue after shutdown} is not set true, or method
598      * {@link #shutdownNow} is called; also resulting in task
599      * cancellation.
600      * <li>An execution of the task throws an exception.  In this case
601      * calling {@link Future#get() get} on the returned future will throw
602      * {@link ExecutionException}, holding the exception as its cause.
603      * </ul>
604      * Subsequent executions are suppressed.  Subsequent calls to
605      * {@link Future#isDone isDone()} on the returned future will
606      * return {@code true}.
607      *
608      * <p>If any execution of this task takes longer than its period, then
609      * subsequent executions may start late, but will not concurrently
610      * execute.
611      *
612      * @throws RejectedExecutionException {@inheritDoc}
613      * @throws NullPointerException       {@inheritDoc}
614      * @throws IllegalArgumentException   {@inheritDoc}
615      */

616     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
617                                                   long initialDelay,
618                                                   long period,
619                                                   TimeUnit unit) {
620         if (command == null || unit == null)
621             throw new NullPointerException();
622         if (period <= 0L)
623             throw new IllegalArgumentException();
624         ScheduledFutureTask<Void> sft =
625             new ScheduledFutureTask<Void>(command,
626                                           null,
627                                           triggerTime(initialDelay, unit),
628                                           unit.toNanos(period),
629                                           sequencer.getAndIncrement());
630         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
631         sft.outerTask = t;
632         delayedExecute(t);
633         return t;
634     }
635
636     /**
637      * Submits a periodic action that becomes enabled first after the
638      * given initial delay, and subsequently with the given delay
639      * between the termination of one execution and the commencement of
640      * the next.
641      *
642      * <p>The sequence of task executions continues indefinitely until
643      * one of the following exceptional completions occur:
644      * <ul>
645      * <li>The task is {@linkplain Future#cancel explicitly cancelled}
646      * via the returned future.
647      * <li>Method {@link #shutdown} is called and the {@linkplain
648      * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
649      * whether to continue after shutdown} is not set true, or method
650      * {@link #shutdownNow} is called; also resulting in task
651      * cancellation.
652      * <li>An execution of the task throws an exception.  In this case
653      * calling {@link Future#get() get} on the returned future will throw
654      * {@link ExecutionException}, holding the exception as its cause.
655      * </ul>
656      * Subsequent executions are suppressed.  Subsequent calls to
657      * {@link Future#isDone isDone()} on the returned future will
658      * return {@code true}.
659      *
660      * @throws RejectedExecutionException {@inheritDoc}
661      * @throws NullPointerException       {@inheritDoc}
662      * @throws IllegalArgumentException   {@inheritDoc}
663      */

664     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
665                                                      long initialDelay,
666                                                      long delay,
667                                                      TimeUnit unit) {
668         if (command == null || unit == null)
669             throw new NullPointerException();
670         if (delay <= 0L)
671             throw new IllegalArgumentException();
672         ScheduledFutureTask<Void> sft =
673             new ScheduledFutureTask<Void>(command,
674                                           null,
675                                           triggerTime(initialDelay, unit),
676                                           -unit.toNanos(delay),
677                                           sequencer.getAndIncrement());
678         RunnableScheduledFuture<Void> t = decorateTask(command, sft);
679         sft.outerTask = t;
680         delayedExecute(t);
681         return t;
682     }
683
684     /**
685      * Executes {@code command} with zero required delay.
686      * This has effect equivalent to
687      * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
688      * Note that inspections of the queue and of the list returned by
689      * {@code shutdownNow} will access the zero-delayed
690      * {@link ScheduledFuture}, not the {@code command} itself.
691      *
692      * <p>A consequence of the use of {@code ScheduledFuture} objects is
693      * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
694      * called with a null second {@code Throwable} argument, even if the
695      * {@code command} terminated abruptly.  Instead, the {@code Throwable}
696      * thrown by such a task can be obtained via {@link Future#get}.
697      *
698      * @throws RejectedExecutionException at discretion of
699      *         {@code RejectedExecutionHandler}, if the task
700      *         cannot be accepted for execution because the
701      *         executor has been shut down
702      * @throws NullPointerException {@inheritDoc}
703      */

704     public void execute(Runnable command) {
705         schedule(command, 0, NANOSECONDS);
706     }
707
708     // Override AbstractExecutorService methods
709
710     /**
711      * @throws RejectedExecutionException {@inheritDoc}
712      * @throws NullPointerException       {@inheritDoc}
713      */

714     public Future<?> submit(Runnable task) {
715         return schedule(task, 0, NANOSECONDS);
716     }
717
718     /**
719      * @throws RejectedExecutionException {@inheritDoc}
720      * @throws NullPointerException       {@inheritDoc}
721      */

722     public <T> Future<T> submit(Runnable task, T result) {
723         return schedule(Executors.callable(task, result), 0, NANOSECONDS);
724     }
725
726     /**
727      * @throws RejectedExecutionException {@inheritDoc}
728      * @throws NullPointerException       {@inheritDoc}
729      */

730     public <T> Future<T> submit(Callable<T> task) {
731         return schedule(task, 0, NANOSECONDS);
732     }
733
734     /**
735      * Sets the policy on whether to continue executing existing
736      * periodic tasks even when this executor has been {@code shutdown}.
737      * In this case, executions will continue until {@code shutdownNow}
738      * or the policy is set to {@code false} when already shutdown.
739      * This value is by default {@code false}.
740      *
741      * @param value if {@code true}, continue after shutdown, else don't
742      * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
743      */

744     public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
745         continueExistingPeriodicTasksAfterShutdown = value;
746         if (!value && isShutdown())
747             onShutdown();
748     }
749
750     /**
751      * Gets the policy on whether to continue executing existing
752      * periodic tasks even when this executor has been {@code shutdown}.
753      * In this case, executions will continue until {@code shutdownNow}
754      * or the policy is set to {@code false} when already shutdown.
755      * This value is by default {@code false}.
756      *
757      * @return {@code trueif will continue after shutdown
758      * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
759      */

760     public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
761         return continueExistingPeriodicTasksAfterShutdown;
762     }
763
764     /**
765      * Sets the policy on whether to execute existing delayed
766      * tasks even when this executor has been {@code shutdown}.
767      * In this case, these tasks will only terminate upon
768      * {@code shutdownNow}, or after setting the policy to
769      * {@code false} when already shutdown.
770      * This value is by default {@code true}.
771      *
772      * @param value if {@code true}, execute after shutdown, else don't
773      * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
774      */

775     public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
776         executeExistingDelayedTasksAfterShutdown = value;
777         if (!value && isShutdown())
778             onShutdown();
779     }
780
781     /**
782      * Gets the policy on whether to execute existing delayed
783      * tasks even when this executor has been {@code shutdown}.
784      * In this case, these tasks will only terminate upon
785      * {@code shutdownNow}, or after setting the policy to
786      * {@code false} when already shutdown.
787      * This value is by default {@code true}.
788      *
789      * @return {@code trueif will execute after shutdown
790      * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
791      */

792     public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
793         return executeExistingDelayedTasksAfterShutdown;
794     }
795
796     /**
797      * Sets the policy on whether cancelled tasks should be immediately
798      * removed from the work queue at time of cancellation.  This value is
799      * by default {@code false}.
800      *
801      * @param value if {@code true}, remove on cancellation, else don't
802      * @see #getRemoveOnCancelPolicy
803      * @since 1.7
804      */

805     public void setRemoveOnCancelPolicy(boolean value) {
806         removeOnCancel = value;
807     }
808
809     /**
810      * Gets the policy on whether cancelled tasks should be immediately
811      * removed from the work queue at time of cancellation.  This value is
812      * by default {@code false}.
813      *
814      * @return {@code trueif cancelled tasks are immediately removed
815      *         from the queue
816      * @see #setRemoveOnCancelPolicy
817      * @since 1.7
818      */

819     public boolean getRemoveOnCancelPolicy() {
820         return removeOnCancel;
821     }
822
823     /**
824      * Initiates an orderly shutdown in which previously submitted
825      * tasks are executed, but no new tasks will be accepted.
826      * Invocation has no additional effect if already shut down.
827      *
828      * <p>This method does not wait for previously submitted tasks to
829      * complete execution.  Use {@link #awaitTermination awaitTermination}
830      * to do that.
831      *
832      * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
833      * has been set {@code false}, existing delayed tasks whose delays
834      * have not yet elapsed are cancelled.  And unless the {@code
835      * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
836      * {@code true}, future executions of existing periodic tasks will
837      * be cancelled.
838      *
839      * @throws SecurityException {@inheritDoc}
840      */

841     public void shutdown() {
842         super.shutdown();
843     }
844
845     /**
846      * Attempts to stop all actively executing tasks, halts the
847      * processing of waiting tasks, and returns a list of the tasks
848      * that were awaiting execution. These tasks are drained (removed)
849      * from the task queue upon return from this method.
850      *
851      * <p>This method does not wait for actively executing tasks to
852      * terminate.  Use {@link #awaitTermination awaitTermination} to
853      * do that.
854      *
855      * <p>There are no guarantees beyond best-effort attempts to stop
856      * processing actively executing tasks.  This implementation
857      * interrupts tasks via {@link Thread#interrupt}; any task that
858      * fails to respond to interrupts may never terminate.
859      *
860      * @return list of tasks that never commenced execution.
861      *         Each element of this list is a {@link ScheduledFuture}.
862      *         For tasks submitted via one of the {@code schedule}
863      *         methods, the element will be identical to the returned
864      *         {@code ScheduledFuture}.  For tasks submitted using
865      *         {@link #execute execute}, the element will be a
866      *         zero-delay {@code ScheduledFuture}.
867      * @throws SecurityException {@inheritDoc}
868      */

869     public List<Runnable> shutdownNow() {
870         return super.shutdownNow();
871     }
872
873     /**
874      * Returns the task queue used by this executor.  Access to the
875      * task queue is intended primarily for debugging and monitoring.
876      * This queue may be in active use.  Retrieving the task queue
877      * does not prevent queued tasks from executing.
878      *
879      * <p>Each element of this queue is a {@link ScheduledFuture}.
880      * For tasks submitted via one of the {@code schedule} methods, the
881      * element will be identical to the returned {@code ScheduledFuture}.
882      * For tasks submitted using {@link #execute execute}, the element
883      * will be a zero-delay {@code ScheduledFuture}.
884      *
885      * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
886      * tasks in the order in which they will execute.
887      *
888      * @return the task queue
889      */

890     public BlockingQueue<Runnable> getQueue() {
891         return super.getQueue();
892     }
893
894     /**
895      * Specialized delay queue. To mesh with TPE declarations, this
896      * class must be declared as a BlockingQueue<Runnable> even though
897      * it can only hold RunnableScheduledFutures.
898      */

899     static class DelayedWorkQueue extends AbstractQueue<Runnable>
900         implements BlockingQueue<Runnable> {
901
902         /*
903          * A DelayedWorkQueue is based on a heap-based data structure
904          * like those in DelayQueue and PriorityQueue, except that
905          * every ScheduledFutureTask also records its index into the
906          * heap array. This eliminates the need to find a task upon
907          * cancellation, greatly speeding up removal (down from O(n)
908          * to O(log n)), and reducing garbage retention that would
909          * otherwise occur by waiting for the element to rise to top
910          * before clearing. But because the queue may also hold
911          * RunnableScheduledFutures that are not ScheduledFutureTasks,
912          * we are not guaranteed to have such indices available, in
913          * which case we fall back to linear search. (We expect that
914          * most tasks will not be decorated, and that the faster cases
915          * will be much more common.)
916          *
917          * All heap operations must record index changes -- mainly
918          * within siftUp and siftDown. Upon removal, a task's
919          * heapIndex is set to -1. Note that ScheduledFutureTasks can
920          * appear at most once in the queue (this need not be true for
921          * other kinds of tasks or work queues), so are uniquely
922          * identified by heapIndex.
923          */

924
925         private static final int INITIAL_CAPACITY = 16;
926         private RunnableScheduledFuture<?>[] queue =
927             new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
928         private final ReentrantLock lock = new ReentrantLock();
929         private int size;
930
931         /**
932          * Thread designated to wait for the task at the head of the
933          * queue.  This variant of the Leader-Follower pattern
934          * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
935          * minimize unnecessary timed waiting.  When a thread becomes
936          * the leader, it waits only for the next delay to elapse, but
937          * other threads await indefinitely.  The leader thread must
938          * signal some other thread before returning from take() or
939          * poll(...), unless some other thread becomes leader in the
940          * interim.  Whenever the head of the queue is replaced with a
941          * task with an earlier expiration time, the leader field is
942          * invalidated by being reset to null, and some waiting
943          * thread, but not necessarily the current leader, is
944          * signalled.  So waiting threads must be prepared to acquire
945          * and lose leadership while waiting.
946          */

947         private Thread leader;
948
949         /**
950          * Condition signalled when a newer task becomes available at the
951          * head of the queue or a new thread may need to become leader.
952          */

953         private final Condition available = lock.newCondition();
954
955         /**
956          * Sets f's heapIndex if it is a ScheduledFutureTask.
957          */

958         private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
959             if (f instanceof ScheduledFutureTask)
960                 ((ScheduledFutureTask)f).heapIndex = idx;
961         }
962
963         /**
964          * Sifts element added at bottom up to its heap-ordered spot.
965          * Call only when holding lock.
966          */

967         private void siftUp(int k, RunnableScheduledFuture<?> key) {
968             while (k > 0) {
969                 int parent = (k - 1) >>> 1;
970                 RunnableScheduledFuture<?> e = queue[parent];
971                 if (key.compareTo(e) >= 0)
972                     break;
973                 queue[k] = e;
974                 setIndex(e, k);
975                 k = parent;
976             }
977             queue[k] = key;
978             setIndex(key, k);
979         }
980
981         /**
982          * Sifts element added at top down to its heap-ordered spot.
983          * Call only when holding lock.
984          */

985         private void siftDown(int k, RunnableScheduledFuture<?> key) {
986             int half = size >>> 1;
987             while (k < half) {
988                 int child = (k << 1) + 1;
989                 RunnableScheduledFuture<?> c = queue[child];
990                 int right = child + 1;
991                 if (right < size && c.compareTo(queue[right]) > 0)
992                     c = queue[child = right];
993                 if (key.compareTo(c) <= 0)
994                     break;
995                 queue[k] = c;
996                 setIndex(c, k);
997                 k = child;
998             }
999             queue[k] = key;
1000             setIndex(key, k);
1001         }
1002
1003         /**
1004          * Resizes the heap array.  Call only when holding lock.
1005          */

1006         private void grow() {
1007             int oldCapacity = queue.length;
1008             int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1009             if (newCapacity < 0) // overflow
1010                 newCapacity = Integer.MAX_VALUE;
1011             queue = Arrays.copyOf(queue, newCapacity);
1012         }
1013
1014         /**
1015          * Finds index of given object, or -1 if absent.
1016          */

1017         private int indexOf(Object x) {
1018             if (x != null) {
1019                 if (x instanceof ScheduledFutureTask) {
1020                     int i = ((ScheduledFutureTask) x).heapIndex;
1021                     // Sanity check; x could conceivably be a
1022                     // ScheduledFutureTask from some other pool.
1023                     if (i >= 0 && i < size && queue[i] == x)
1024                         return i;
1025                 } else {
1026                     for (int i = 0; i < size; i++)
1027                         if (x.equals(queue[i]))
1028                             return i;
1029                 }
1030             }
1031             return -1;
1032         }
1033
1034         public boolean contains(Object x) {
1035             final ReentrantLock lock = this.lock;
1036             lock.lock();
1037             try {
1038                 return indexOf(x) != -1;
1039             } finally {
1040                 lock.unlock();
1041             }
1042         }
1043
1044         public boolean remove(Object x) {
1045             final ReentrantLock lock = this.lock;
1046             lock.lock();
1047             try {
1048                 int i = indexOf(x);
1049                 if (i < 0)
1050                     return false;
1051
1052                 setIndex(queue[i], -1);
1053                 int s = --size;
1054                 RunnableScheduledFuture<?> replacement = queue[s];
1055                 queue[s] = null;
1056                 if (s != i) {
1057                     siftDown(i, replacement);
1058                     if (queue[i] == replacement)
1059                         siftUp(i, replacement);
1060                 }
1061                 return true;
1062             } finally {
1063                 lock.unlock();
1064             }
1065         }
1066
1067         public int size() {
1068             final ReentrantLock lock = this.lock;
1069             lock.lock();
1070             try {
1071                 return size;
1072             } finally {
1073                 lock.unlock();
1074             }
1075         }
1076
1077         public boolean isEmpty() {
1078             return size() == 0;
1079         }
1080
1081         public int remainingCapacity() {
1082             return Integer.MAX_VALUE;
1083         }
1084
1085         public RunnableScheduledFuture<?> peek() {
1086             final ReentrantLock lock = this.lock;
1087             lock.lock();
1088             try {
1089                 return queue[0];
1090             } finally {
1091                 lock.unlock();
1092             }
1093         }
1094
1095         public boolean offer(Runnable x) {
1096             if (x == null)
1097                 throw new NullPointerException();
1098             RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1099             final ReentrantLock lock = this.lock;
1100             lock.lock();
1101             try {
1102                 int i = size;
1103                 if (i >= queue.length)
1104                     grow();
1105                 size = i + 1;
1106                 if (i == 0) {
1107                     queue[0] = e;
1108                     setIndex(e, 0);
1109                 } else {
1110                     siftUp(i, e);
1111                 }
1112                 if (queue[0] == e) {
1113                     leader = null;
1114                     available.signal();
1115                 }
1116             } finally {
1117                 lock.unlock();
1118             }
1119             return true;
1120         }
1121
1122         public void put(Runnable e) {
1123             offer(e);
1124         }
1125
1126         public boolean add(Runnable e) {
1127             return offer(e);
1128         }
1129
1130         public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1131             return offer(e);
1132         }
1133
1134         /**
1135          * Performs common bookkeeping for poll and take: Replaces
1136          * first element with last and sifts it down.  Call only when
1137          * holding lock.
1138          * @param f the task to remove and return
1139          */

1140         private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1141             int s = --size;
1142             RunnableScheduledFuture<?> x = queue[s];
1143             queue[s] = null;
1144             if (s != 0)
1145                 siftDown(0, x);
1146             setIndex(f, -1);
1147             return f;
1148         }
1149
1150         public RunnableScheduledFuture<?> poll() {
1151             final ReentrantLock lock = this.lock;
1152             lock.lock();
1153             try {
1154                 RunnableScheduledFuture<?> first = queue[0];
1155                 return (first == null || first.getDelay(NANOSECONDS) > 0)
1156                     ? null
1157                     : finishPoll(first);
1158             } finally {
1159                 lock.unlock();
1160             }
1161         }
1162
1163         public RunnableScheduledFuture<?> take() throws InterruptedException {
1164             final ReentrantLock lock = this.lock;
1165             lock.lockInterruptibly();
1166             try {
1167                 for (;;) {
1168                     RunnableScheduledFuture<?> first = queue[0];
1169                     if (first == null)
1170                         available.await();
1171                     else {
1172                         long delay = first.getDelay(NANOSECONDS);
1173                         if (delay <= 0L)
1174                             return finishPoll(first);
1175                         first = null// don't retain ref while waiting
1176                         if (leader != null)
1177                             available.await();
1178                         else {
1179                             Thread thisThread = Thread.currentThread();
1180                             leader = thisThread;
1181                             try {
1182                                 available.awaitNanos(delay);
1183                             } finally {
1184                                 if (leader == thisThread)
1185                                     leader = null;
1186                             }
1187                         }
1188                     }
1189                 }
1190             } finally {
1191                 if (leader == null && queue[0] != null)
1192                     available.signal();
1193                 lock.unlock();
1194             }
1195         }
1196
1197         public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1198             throws InterruptedException {
1199             long nanos = unit.toNanos(timeout);
1200             final ReentrantLock lock = this.lock;
1201             lock.lockInterruptibly();
1202             try {
1203                 for (;;) {
1204                     RunnableScheduledFuture<?> first = queue[0];
1205                     if (first == null) {
1206                         if (nanos <= 0L)
1207                             return null;
1208                         else
1209                             nanos = available.awaitNanos(nanos);
1210                     } else {
1211                         long delay = first.getDelay(NANOSECONDS);
1212                         if (delay <= 0L)
1213                             return finishPoll(first);
1214                         if (nanos <= 0L)
1215                             return null;
1216                         first = null// don't retain ref while waiting
1217                         if (nanos < delay || leader != null)
1218                             nanos = available.awaitNanos(nanos);
1219                         else {
1220                             Thread thisThread = Thread.currentThread();
1221                             leader = thisThread;
1222                             try {
1223                                 long timeLeft = available.awaitNanos(delay);
1224                                 nanos -= delay - timeLeft;
1225                             } finally {
1226                                 if (leader == thisThread)
1227                                     leader = null;
1228                             }
1229                         }
1230                     }
1231                 }
1232             } finally {
1233                 if (leader == null && queue[0] != null)
1234                     available.signal();
1235                 lock.unlock();
1236             }
1237         }
1238
1239         public void clear() {
1240             final ReentrantLock lock = this.lock;
1241             lock.lock();
1242             try {
1243                 for (int i = 0; i < size; i++) {
1244                     RunnableScheduledFuture<?> t = queue[i];
1245                     if (t != null) {
1246                         queue[i] = null;
1247                         setIndex(t, -1);
1248                     }
1249                 }
1250                 size = 0;
1251             } finally {
1252                 lock.unlock();
1253             }
1254         }
1255
1256         public int drainTo(Collection<? super Runnable> c) {
1257             return drainTo(c, Integer.MAX_VALUE);
1258         }
1259
1260         public int drainTo(Collection<? super Runnable> c, int maxElements) {
1261             Objects.requireNonNull(c);
1262             if (c == this)
1263                 throw new IllegalArgumentException();
1264             if (maxElements <= 0)
1265                 return 0;
1266             final ReentrantLock lock = this.lock;
1267             lock.lock();
1268             try {
1269                 int n = 0;
1270                 for (RunnableScheduledFuture<?> first;
1271                      n < maxElements
1272                          && (first = queue[0]) != null
1273                          && first.getDelay(NANOSECONDS) <= 0;) {
1274                     c.add(first);   // In this order, in case add() throws.
1275                     finishPoll(first);
1276                     ++n;
1277                 }
1278                 return n;
1279             } finally {
1280                 lock.unlock();
1281             }
1282         }
1283
1284         public Object[] toArray() {
1285             final ReentrantLock lock = this.lock;
1286             lock.lock();
1287             try {
1288                 return Arrays.copyOf(queue, size, Object[].class);
1289             } finally {
1290                 lock.unlock();
1291             }
1292         }
1293
1294         @SuppressWarnings("unchecked")
1295         public <T> T[] toArray(T[] a) {
1296             final ReentrantLock lock = this.lock;
1297             lock.lock();
1298             try {
1299                 if (a.length < size)
1300                     return (T[]) Arrays.copyOf(queue, size, a.getClass());
1301                 System.arraycopy(queue, 0, a, 0, size);
1302                 if (a.length > size)
1303                     a[size] = null;
1304                 return a;
1305             } finally {
1306                 lock.unlock();
1307             }
1308         }
1309
1310         public Iterator<Runnable> iterator() {
1311             final ReentrantLock lock = this.lock;
1312             lock.lock();
1313             try {
1314                 return new Itr(Arrays.copyOf(queue, size));
1315             } finally {
1316                 lock.unlock();
1317             }
1318         }
1319
1320         /**
1321          * Snapshot iterator that works off copy of underlying q array.
1322          */

1323         private class Itr implements Iterator<Runnable> {
1324             final RunnableScheduledFuture<?>[] array;
1325             int cursor;        // index of next element to return; initially 0
1326             int lastRet = -1;  // index of last element returned; -1 if no such
1327
1328             Itr(RunnableScheduledFuture<?>[] array) {
1329                 this.array = array;
1330             }
1331
1332             public boolean hasNext() {
1333                 return cursor < array.length;
1334             }
1335
1336             public Runnable next() {
1337                 if (cursor >= array.length)
1338                     throw new NoSuchElementException();
1339                 return array[lastRet = cursor++];
1340             }
1341
1342             public void remove() {
1343                 if (lastRet < 0)
1344                     throw new IllegalStateException();
1345                 DelayedWorkQueue.this.remove(array[lastRet]);
1346                 lastRet = -1;
1347             }
1348         }
1349     }
1350 }
1351