1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import static java.util.concurrent.TimeUnit.MILLISECONDS;
39 import static java.util.concurrent.TimeUnit.NANOSECONDS;
40
41 import java.util.AbstractQueue;
42 import java.util.Arrays;
43 import java.util.Collection;
44 import java.util.Iterator;
45 import java.util.List;
46 import java.util.NoSuchElementException;
47 import java.util.Objects;
48 import java.util.concurrent.atomic.AtomicLong;
49 import java.util.concurrent.locks.Condition;
50 import java.util.concurrent.locks.ReentrantLock;
51
52 /**
53 * A {@link ThreadPoolExecutor} that can additionally schedule
54 * commands to run after a given delay, or to execute periodically.
55 * This class is preferable to {@link java.util.Timer} when multiple
56 * worker threads are needed, or when the additional flexibility or
57 * capabilities of {@link ThreadPoolExecutor} (which this class
58 * extends) are required.
59 *
60 * <p>Delayed tasks execute no sooner than they are enabled, but
61 * without any real-time guarantees about when, after they are
62 * enabled, they will commence. Tasks scheduled for exactly the same
63 * execution time are enabled in first-in-first-out (FIFO) order of
64 * submission.
65 *
66 * <p>When a submitted task is cancelled before it is run, execution
67 * is suppressed. By default, such a cancelled task is not
68 * automatically removed from the work queue until its delay elapses.
69 * While this enables further inspection and monitoring, it may also
70 * cause unbounded retention of cancelled tasks. To avoid this, use
71 * {@link #setRemoveOnCancelPolicy} to cause tasks to be immediately
72 * removed from the work queue at time of cancellation.
73 *
74 * <p>Successive executions of a periodic task scheduled via
75 * {@link #scheduleAtFixedRate scheduleAtFixedRate} or
76 * {@link #scheduleWithFixedDelay scheduleWithFixedDelay}
77 * do not overlap. While different executions may be performed by
78 * different threads, the effects of prior executions
79 * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
80 * those of subsequent ones.
81 *
82 * <p>While this class inherits from {@link ThreadPoolExecutor}, a few
83 * of the inherited tuning methods are not useful for it. In
84 * particular, because it acts as a fixed-sized pool using
85 * {@code corePoolSize} threads and an unbounded queue, adjustments
86 * to {@code maximumPoolSize} have no useful effect. Additionally, it
87 * is almost never a good idea to set {@code corePoolSize} to zero or
88 * use {@code allowCoreThreadTimeOut} because this may leave the pool
89 * without threads to handle tasks once they become eligible to run.
90 *
91 * <p>As with {@code ThreadPoolExecutor}, if not otherwise specified,
92 * this class uses {@link Executors#defaultThreadFactory} as the
93 * default thread factory, and {@link ThreadPoolExecutor.AbortPolicy}
94 * as the default rejected execution handler.
95 *
96 * <p><b>Extension notes:</b> This class overrides the
97 * {@link ThreadPoolExecutor#execute(Runnable) execute} and
98 * {@link AbstractExecutorService#submit(Runnable) submit}
99 * methods to generate internal {@link ScheduledFuture} objects to
100 * control per-task delays and scheduling. To preserve
101 * functionality, any further overrides of these methods in
102 * subclasses must invoke superclass versions, which effectively
103 * disables additional task customization. However, this class
104 * provides alternative protected extension method
105 * {@code decorateTask} (one version each for {@code Runnable} and
106 * {@code Callable}) that can be used to customize the concrete task
107 * types used to execute commands entered via {@code execute},
108 * {@code submit}, {@code schedule}, {@code scheduleAtFixedRate},
109 * and {@code scheduleWithFixedDelay}. By default, a
110 * {@code ScheduledThreadPoolExecutor} uses a task type extending
111 * {@link FutureTask}. However, this may be modified or replaced using
112 * subclasses of the form:
113 *
114 * <pre> {@code
115 * public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {
116 *
117 * static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }
118 *
119 * protected <V> RunnableScheduledFuture<V> decorateTask(
120 * Runnable r, RunnableScheduledFuture<V> task) {
121 * return new CustomTask<V>(r, task);
122 * }
123 *
124 * protected <V> RunnableScheduledFuture<V> decorateTask(
125 * Callable<V> c, RunnableScheduledFuture<V> task) {
126 * return new CustomTask<V>(c, task);
127 * }
128 * // ... add constructors, etc.
129 * }}</pre>
130 *
131 * @since 1.5
132 * @author Doug Lea
133 */
134 public class ScheduledThreadPoolExecutor
135 extends ThreadPoolExecutor
136 implements ScheduledExecutorService {
137
138 /*
139 * This class specializes ThreadPoolExecutor implementation by
140 *
141 * 1. Using a custom task type ScheduledFutureTask, even for tasks
142 * that don't require scheduling because they are submitted
143 * using ExecutorService rather than ScheduledExecutorService
144 * methods, which are treated as tasks with a delay of zero.
145 *
146 * 2. Using a custom queue (DelayedWorkQueue), a variant of
147 * unbounded DelayQueue. The lack of capacity constraint and
148 * the fact that corePoolSize and maximumPoolSize are
149 * effectively identical simplifies some execution mechanics
150 * (see delayedExecute) compared to ThreadPoolExecutor.
151 *
152 * 3. Supporting optional run-after-shutdown parameters, which
153 * leads to overrides of shutdown methods to remove and cancel
154 * tasks that should NOT be run after shutdown, as well as
155 * different recheck logic when task (re)submission overlaps
156 * with a shutdown.
157 *
158 * 4. Task decoration methods to allow interception and
159 * instrumentation, which are needed because subclasses cannot
160 * otherwise override submit methods to get this effect. These
161 * don't have any impact on pool control logic though.
162 */
163
164 /**
165 * False if should cancel/suppress periodic tasks on shutdown.
166 */
167 private volatile boolean continueExistingPeriodicTasksAfterShutdown;
168
169 /**
170 * False if should cancel non-periodic not-yet-expired tasks on shutdown.
171 */
172 private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
173
174 /**
175 * True if ScheduledFutureTask.cancel should remove from queue.
176 */
177 volatile boolean removeOnCancel;
178
179 /**
180 * Sequence number to break scheduling ties, and in turn to
181 * guarantee FIFO order among tied entries.
182 */
183 private static final AtomicLong sequencer = new AtomicLong();
184
185 private class ScheduledFutureTask<V>
186 extends FutureTask<V> implements RunnableScheduledFuture<V> {
187
188 /** Sequence number to break ties FIFO */
189 private final long sequenceNumber;
190
191 /** The nanoTime-based time when the task is enabled to execute. */
192 private volatile long time;
193
194 /**
195 * Period for repeating tasks, in nanoseconds.
196 * A positive value indicates fixed-rate execution.
197 * A negative value indicates fixed-delay execution.
198 * A value of 0 indicates a non-repeating (one-shot) task.
199 */
200 private final long period;
201
202 /** The actual task to be re-enqueued by reExecutePeriodic */
203 RunnableScheduledFuture<V> outerTask = this;
204
205 /**
206 * Index into delay queue, to support faster cancellation.
207 */
208 int heapIndex;
209
210 /**
211 * Creates a one-shot action with given nanoTime-based trigger time.
212 */
213 ScheduledFutureTask(Runnable r, V result, long triggerTime,
214 long sequenceNumber) {
215 super(r, result);
216 this.time = triggerTime;
217 this.period = 0;
218 this.sequenceNumber = sequenceNumber;
219 }
220
221 /**
222 * Creates a periodic action with given nanoTime-based initial
223 * trigger time and period.
224 */
225 ScheduledFutureTask(Runnable r, V result, long triggerTime,
226 long period, long sequenceNumber) {
227 super(r, result);
228 this.time = triggerTime;
229 this.period = period;
230 this.sequenceNumber = sequenceNumber;
231 }
232
233 /**
234 * Creates a one-shot action with given nanoTime-based trigger time.
235 */
236 ScheduledFutureTask(Callable<V> callable, long triggerTime,
237 long sequenceNumber) {
238 super(callable);
239 this.time = triggerTime;
240 this.period = 0;
241 this.sequenceNumber = sequenceNumber;
242 }
243
244 public long getDelay(TimeUnit unit) {
245 return unit.convert(time - System.nanoTime(), NANOSECONDS);
246 }
247
248 public int compareTo(Delayed other) {
249 if (other == this) // compare zero if same object
250 return 0;
251 if (other instanceof ScheduledFutureTask) {
252 ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
253 long diff = time - x.time;
254 if (diff < 0)
255 return -1;
256 else if (diff > 0)
257 return 1;
258 else if (sequenceNumber < x.sequenceNumber)
259 return -1;
260 else
261 return 1;
262 }
263 long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
264 return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
265 }
266
267 /**
268 * Returns {@code true} if this is a periodic (not a one-shot) action.
269 *
270 * @return {@code true} if periodic
271 */
272 public boolean isPeriodic() {
273 return period != 0;
274 }
275
276 /**
277 * Sets the next time to run for a periodic task.
278 */
279 private void setNextRunTime() {
280 long p = period;
281 if (p > 0)
282 time += p;
283 else
284 time = triggerTime(-p);
285 }
286
287 public boolean cancel(boolean mayInterruptIfRunning) {
288 // The racy read of heapIndex below is benign:
289 // if heapIndex < 0, then OOTA guarantees that we have surely
290 // been removed; else we recheck under lock in remove()
291 boolean cancelled = super.cancel(mayInterruptIfRunning);
292 if (cancelled && removeOnCancel && heapIndex >= 0)
293 remove(this);
294 return cancelled;
295 }
296
297 /**
298 * Overrides FutureTask version so as to reset/requeue if periodic.
299 */
300 public void run() {
301 if (!canRunInCurrentRunState(this))
302 cancel(false);
303 else if (!isPeriodic())
304 super.run();
305 else if (super.runAndReset()) {
306 setNextRunTime();
307 reExecutePeriodic(outerTask);
308 }
309 }
310 }
311
312 /**
313 * Returns true if can run a task given current run state and
314 * run-after-shutdown parameters.
315 */
316 boolean canRunInCurrentRunState(RunnableScheduledFuture<?> task) {
317 if (!isShutdown())
318 return true;
319 if (isStopped())
320 return false;
321 return task.isPeriodic()
322 ? continueExistingPeriodicTasksAfterShutdown
323 : (executeExistingDelayedTasksAfterShutdown
324 || task.getDelay(NANOSECONDS) <= 0);
325 }
326
327 /**
328 * Main execution method for delayed or periodic tasks. If pool
329 * is shut down, rejects the task. Otherwise adds task to queue
330 * and starts a thread, if necessary, to run it. (We cannot
331 * prestart the thread to run the task because the task (probably)
332 * shouldn't be run yet.) If the pool is shut down while the task
333 * is being added, cancel and remove it if required by state and
334 * run-after-shutdown parameters.
335 *
336 * @param task the task
337 */
338 private void delayedExecute(RunnableScheduledFuture<?> task) {
339 if (isShutdown())
340 reject(task);
341 else {
342 super.getQueue().add(task);
343 if (!canRunInCurrentRunState(task) && remove(task))
344 task.cancel(false);
345 else
346 ensurePrestart();
347 }
348 }
349
350 /**
351 * Requeues a periodic task unless current run state precludes it.
352 * Same idea as delayedExecute except drops task rather than rejecting.
353 *
354 * @param task the task
355 */
356 void reExecutePeriodic(RunnableScheduledFuture<?> task) {
357 if (canRunInCurrentRunState(task)) {
358 super.getQueue().add(task);
359 if (canRunInCurrentRunState(task) || !remove(task)) {
360 ensurePrestart();
361 return;
362 }
363 }
364 task.cancel(false);
365 }
366
367 /**
368 * Cancels and clears the queue of all tasks that should not be run
369 * due to shutdown policy. Invoked within super.shutdown.
370 */
371 @Override void onShutdown() {
372 BlockingQueue<Runnable> q = super.getQueue();
373 boolean keepDelayed =
374 getExecuteExistingDelayedTasksAfterShutdownPolicy();
375 boolean keepPeriodic =
376 getContinueExistingPeriodicTasksAfterShutdownPolicy();
377 // Traverse snapshot to avoid iterator exceptions
378 // TODO: implement and use efficient removeIf
379 // super.getQueue().removeIf(...);
380 for (Object e : q.toArray()) {
381 if (e instanceof RunnableScheduledFuture) {
382 RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>)e;
383 if ((t.isPeriodic()
384 ? !keepPeriodic
385 : (!keepDelayed && t.getDelay(NANOSECONDS) > 0))
386 || t.isCancelled()) { // also remove if already cancelled
387 if (q.remove(t))
388 t.cancel(false);
389 }
390 }
391 }
392 tryTerminate();
393 }
394
395 /**
396 * Modifies or replaces the task used to execute a runnable.
397 * This method can be used to override the concrete
398 * class used for managing internal tasks.
399 * The default implementation simply returns the given task.
400 *
401 * @param runnable the submitted Runnable
402 * @param task the task created to execute the runnable
403 * @param <V> the type of the task's result
404 * @return a task that can execute the runnable
405 * @since 1.6
406 */
407 protected <V> RunnableScheduledFuture<V> decorateTask(
408 Runnable runnable, RunnableScheduledFuture<V> task) {
409 return task;
410 }
411
412 /**
413 * Modifies or replaces the task used to execute a callable.
414 * This method can be used to override the concrete
415 * class used for managing internal tasks.
416 * The default implementation simply returns the given task.
417 *
418 * @param callable the submitted Callable
419 * @param task the task created to execute the callable
420 * @param <V> the type of the task's result
421 * @return a task that can execute the callable
422 * @since 1.6
423 */
424 protected <V> RunnableScheduledFuture<V> decorateTask(
425 Callable<V> callable, RunnableScheduledFuture<V> task) {
426 return task;
427 }
428
429 /**
430 * The default keep-alive time for pool threads.
431 *
432 * Normally, this value is unused because all pool threads will be
433 * core threads, but if a user creates a pool with a corePoolSize
434 * of zero (against our advice), we keep a thread alive as long as
435 * there are queued tasks. If the keep alive time is zero (the
436 * historic value), we end up hot-spinning in getTask, wasting a
437 * CPU. But on the other hand, if we set the value too high, and
438 * users create a one-shot pool which they don't cleanly shutdown,
439 * the pool's non-daemon threads will prevent JVM termination. A
440 * small but non-zero value (relative to a JVM's lifetime) seems
441 * best.
442 */
443 private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;
444
445 /**
446 * Creates a new {@code ScheduledThreadPoolExecutor} with the
447 * given core pool size.
448 *
449 * @param corePoolSize the number of threads to keep in the pool, even
450 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
451 * @throws IllegalArgumentException if {@code corePoolSize < 0}
452 */
453 public ScheduledThreadPoolExecutor(int corePoolSize) {
454 super(corePoolSize, Integer.MAX_VALUE,
455 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
456 new DelayedWorkQueue());
457 }
458
459 /**
460 * Creates a new {@code ScheduledThreadPoolExecutor} with the
461 * given initial parameters.
462 *
463 * @param corePoolSize the number of threads to keep in the pool, even
464 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
465 * @param threadFactory the factory to use when the executor
466 * creates a new thread
467 * @throws IllegalArgumentException if {@code corePoolSize < 0}
468 * @throws NullPointerException if {@code threadFactory} is null
469 */
470 public ScheduledThreadPoolExecutor(int corePoolSize,
471 ThreadFactory threadFactory) {
472 super(corePoolSize, Integer.MAX_VALUE,
473 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
474 new DelayedWorkQueue(), threadFactory);
475 }
476
477 /**
478 * Creates a new {@code ScheduledThreadPoolExecutor} with the
479 * given initial parameters.
480 *
481 * @param corePoolSize the number of threads to keep in the pool, even
482 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
483 * @param handler the handler to use when execution is blocked
484 * because the thread bounds and queue capacities are reached
485 * @throws IllegalArgumentException if {@code corePoolSize < 0}
486 * @throws NullPointerException if {@code handler} is null
487 */
488 public ScheduledThreadPoolExecutor(int corePoolSize,
489 RejectedExecutionHandler handler) {
490 super(corePoolSize, Integer.MAX_VALUE,
491 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
492 new DelayedWorkQueue(), handler);
493 }
494
495 /**
496 * Creates a new {@code ScheduledThreadPoolExecutor} with the
497 * given initial parameters.
498 *
499 * @param corePoolSize the number of threads to keep in the pool, even
500 * if they are idle, unless {@code allowCoreThreadTimeOut} is set
501 * @param threadFactory the factory to use when the executor
502 * creates a new thread
503 * @param handler the handler to use when execution is blocked
504 * because the thread bounds and queue capacities are reached
505 * @throws IllegalArgumentException if {@code corePoolSize < 0}
506 * @throws NullPointerException if {@code threadFactory} or
507 * {@code handler} is null
508 */
509 public ScheduledThreadPoolExecutor(int corePoolSize,
510 ThreadFactory threadFactory,
511 RejectedExecutionHandler handler) {
512 super(corePoolSize, Integer.MAX_VALUE,
513 DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
514 new DelayedWorkQueue(), threadFactory, handler);
515 }
516
517 /**
518 * Returns the nanoTime-based trigger time of a delayed action.
519 */
520 private long triggerTime(long delay, TimeUnit unit) {
521 return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
522 }
523
524 /**
525 * Returns the nanoTime-based trigger time of a delayed action.
526 */
527 long triggerTime(long delay) {
528 return System.nanoTime() +
529 ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
530 }
531
532 /**
533 * Constrains the values of all delays in the queue to be within
534 * Long.MAX_VALUE of each other, to avoid overflow in compareTo.
535 * This may occur if a task is eligible to be dequeued, but has
536 * not yet been, while some other task is added with a delay of
537 * Long.MAX_VALUE.
538 */
539 private long overflowFree(long delay) {
540 Delayed head = (Delayed) super.getQueue().peek();
541 if (head != null) {
542 long headDelay = head.getDelay(NANOSECONDS);
543 if (headDelay < 0 && (delay - headDelay < 0))
544 delay = Long.MAX_VALUE + headDelay;
545 }
546 return delay;
547 }
548
549 /**
550 * @throws RejectedExecutionException {@inheritDoc}
551 * @throws NullPointerException {@inheritDoc}
552 */
553 public ScheduledFuture<?> schedule(Runnable command,
554 long delay,
555 TimeUnit unit) {
556 if (command == null || unit == null)
557 throw new NullPointerException();
558 RunnableScheduledFuture<Void> t = decorateTask(command,
559 new ScheduledFutureTask<Void>(command, null,
560 triggerTime(delay, unit),
561 sequencer.getAndIncrement()));
562 delayedExecute(t);
563 return t;
564 }
565
566 /**
567 * @throws RejectedExecutionException {@inheritDoc}
568 * @throws NullPointerException {@inheritDoc}
569 */
570 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
571 long delay,
572 TimeUnit unit) {
573 if (callable == null || unit == null)
574 throw new NullPointerException();
575 RunnableScheduledFuture<V> t = decorateTask(callable,
576 new ScheduledFutureTask<V>(callable,
577 triggerTime(delay, unit),
578 sequencer.getAndIncrement()));
579 delayedExecute(t);
580 return t;
581 }
582
583 /**
584 * Submits a periodic action that becomes enabled first after the
585 * given initial delay, and subsequently with the given period;
586 * that is, executions will commence after
587 * {@code initialDelay}, then {@code initialDelay + period}, then
588 * {@code initialDelay + 2 * period}, and so on.
589 *
590 * <p>The sequence of task executions continues indefinitely until
591 * one of the following exceptional completions occur:
592 * <ul>
593 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
594 * via the returned future.
595 * <li>Method {@link #shutdown} is called and the {@linkplain
596 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
597 * whether to continue after shutdown} is not set true, or method
598 * {@link #shutdownNow} is called; also resulting in task
599 * cancellation.
600 * <li>An execution of the task throws an exception. In this case
601 * calling {@link Future#get() get} on the returned future will throw
602 * {@link ExecutionException}, holding the exception as its cause.
603 * </ul>
604 * Subsequent executions are suppressed. Subsequent calls to
605 * {@link Future#isDone isDone()} on the returned future will
606 * return {@code true}.
607 *
608 * <p>If any execution of this task takes longer than its period, then
609 * subsequent executions may start late, but will not concurrently
610 * execute.
611 *
612 * @throws RejectedExecutionException {@inheritDoc}
613 * @throws NullPointerException {@inheritDoc}
614 * @throws IllegalArgumentException {@inheritDoc}
615 */
616 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
617 long initialDelay,
618 long period,
619 TimeUnit unit) {
620 if (command == null || unit == null)
621 throw new NullPointerException();
622 if (period <= 0L)
623 throw new IllegalArgumentException();
624 ScheduledFutureTask<Void> sft =
625 new ScheduledFutureTask<Void>(command,
626 null,
627 triggerTime(initialDelay, unit),
628 unit.toNanos(period),
629 sequencer.getAndIncrement());
630 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
631 sft.outerTask = t;
632 delayedExecute(t);
633 return t;
634 }
635
636 /**
637 * Submits a periodic action that becomes enabled first after the
638 * given initial delay, and subsequently with the given delay
639 * between the termination of one execution and the commencement of
640 * the next.
641 *
642 * <p>The sequence of task executions continues indefinitely until
643 * one of the following exceptional completions occur:
644 * <ul>
645 * <li>The task is {@linkplain Future#cancel explicitly cancelled}
646 * via the returned future.
647 * <li>Method {@link #shutdown} is called and the {@linkplain
648 * #getContinueExistingPeriodicTasksAfterShutdownPolicy policy on
649 * whether to continue after shutdown} is not set true, or method
650 * {@link #shutdownNow} is called; also resulting in task
651 * cancellation.
652 * <li>An execution of the task throws an exception. In this case
653 * calling {@link Future#get() get} on the returned future will throw
654 * {@link ExecutionException}, holding the exception as its cause.
655 * </ul>
656 * Subsequent executions are suppressed. Subsequent calls to
657 * {@link Future#isDone isDone()} on the returned future will
658 * return {@code true}.
659 *
660 * @throws RejectedExecutionException {@inheritDoc}
661 * @throws NullPointerException {@inheritDoc}
662 * @throws IllegalArgumentException {@inheritDoc}
663 */
664 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
665 long initialDelay,
666 long delay,
667 TimeUnit unit) {
668 if (command == null || unit == null)
669 throw new NullPointerException();
670 if (delay <= 0L)
671 throw new IllegalArgumentException();
672 ScheduledFutureTask<Void> sft =
673 new ScheduledFutureTask<Void>(command,
674 null,
675 triggerTime(initialDelay, unit),
676 -unit.toNanos(delay),
677 sequencer.getAndIncrement());
678 RunnableScheduledFuture<Void> t = decorateTask(command, sft);
679 sft.outerTask = t;
680 delayedExecute(t);
681 return t;
682 }
683
684 /**
685 * Executes {@code command} with zero required delay.
686 * This has effect equivalent to
687 * {@link #schedule(Runnable,long,TimeUnit) schedule(command, 0, anyUnit)}.
688 * Note that inspections of the queue and of the list returned by
689 * {@code shutdownNow} will access the zero-delayed
690 * {@link ScheduledFuture}, not the {@code command} itself.
691 *
692 * <p>A consequence of the use of {@code ScheduledFuture} objects is
693 * that {@link ThreadPoolExecutor#afterExecute afterExecute} is always
694 * called with a null second {@code Throwable} argument, even if the
695 * {@code command} terminated abruptly. Instead, the {@code Throwable}
696 * thrown by such a task can be obtained via {@link Future#get}.
697 *
698 * @throws RejectedExecutionException at discretion of
699 * {@code RejectedExecutionHandler}, if the task
700 * cannot be accepted for execution because the
701 * executor has been shut down
702 * @throws NullPointerException {@inheritDoc}
703 */
704 public void execute(Runnable command) {
705 schedule(command, 0, NANOSECONDS);
706 }
707
708 // Override AbstractExecutorService methods
709
710 /**
711 * @throws RejectedExecutionException {@inheritDoc}
712 * @throws NullPointerException {@inheritDoc}
713 */
714 public Future<?> submit(Runnable task) {
715 return schedule(task, 0, NANOSECONDS);
716 }
717
718 /**
719 * @throws RejectedExecutionException {@inheritDoc}
720 * @throws NullPointerException {@inheritDoc}
721 */
722 public <T> Future<T> submit(Runnable task, T result) {
723 return schedule(Executors.callable(task, result), 0, NANOSECONDS);
724 }
725
726 /**
727 * @throws RejectedExecutionException {@inheritDoc}
728 * @throws NullPointerException {@inheritDoc}
729 */
730 public <T> Future<T> submit(Callable<T> task) {
731 return schedule(task, 0, NANOSECONDS);
732 }
733
734 /**
735 * Sets the policy on whether to continue executing existing
736 * periodic tasks even when this executor has been {@code shutdown}.
737 * In this case, executions will continue until {@code shutdownNow}
738 * or the policy is set to {@code false} when already shutdown.
739 * This value is by default {@code false}.
740 *
741 * @param value if {@code true}, continue after shutdown, else don't
742 * @see #getContinueExistingPeriodicTasksAfterShutdownPolicy
743 */
744 public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
745 continueExistingPeriodicTasksAfterShutdown = value;
746 if (!value && isShutdown())
747 onShutdown();
748 }
749
750 /**
751 * Gets the policy on whether to continue executing existing
752 * periodic tasks even when this executor has been {@code shutdown}.
753 * In this case, executions will continue until {@code shutdownNow}
754 * or the policy is set to {@code false} when already shutdown.
755 * This value is by default {@code false}.
756 *
757 * @return {@code true} if will continue after shutdown
758 * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
759 */
760 public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
761 return continueExistingPeriodicTasksAfterShutdown;
762 }
763
764 /**
765 * Sets the policy on whether to execute existing delayed
766 * tasks even when this executor has been {@code shutdown}.
767 * In this case, these tasks will only terminate upon
768 * {@code shutdownNow}, or after setting the policy to
769 * {@code false} when already shutdown.
770 * This value is by default {@code true}.
771 *
772 * @param value if {@code true}, execute after shutdown, else don't
773 * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
774 */
775 public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
776 executeExistingDelayedTasksAfterShutdown = value;
777 if (!value && isShutdown())
778 onShutdown();
779 }
780
781 /**
782 * Gets the policy on whether to execute existing delayed
783 * tasks even when this executor has been {@code shutdown}.
784 * In this case, these tasks will only terminate upon
785 * {@code shutdownNow}, or after setting the policy to
786 * {@code false} when already shutdown.
787 * This value is by default {@code true}.
788 *
789 * @return {@code true} if will execute after shutdown
790 * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
791 */
792 public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
793 return executeExistingDelayedTasksAfterShutdown;
794 }
795
796 /**
797 * Sets the policy on whether cancelled tasks should be immediately
798 * removed from the work queue at time of cancellation. This value is
799 * by default {@code false}.
800 *
801 * @param value if {@code true}, remove on cancellation, else don't
802 * @see #getRemoveOnCancelPolicy
803 * @since 1.7
804 */
805 public void setRemoveOnCancelPolicy(boolean value) {
806 removeOnCancel = value;
807 }
808
809 /**
810 * Gets the policy on whether cancelled tasks should be immediately
811 * removed from the work queue at time of cancellation. This value is
812 * by default {@code false}.
813 *
814 * @return {@code true} if cancelled tasks are immediately removed
815 * from the queue
816 * @see #setRemoveOnCancelPolicy
817 * @since 1.7
818 */
819 public boolean getRemoveOnCancelPolicy() {
820 return removeOnCancel;
821 }
822
823 /**
824 * Initiates an orderly shutdown in which previously submitted
825 * tasks are executed, but no new tasks will be accepted.
826 * Invocation has no additional effect if already shut down.
827 *
828 * <p>This method does not wait for previously submitted tasks to
829 * complete execution. Use {@link #awaitTermination awaitTermination}
830 * to do that.
831 *
832 * <p>If the {@code ExecuteExistingDelayedTasksAfterShutdownPolicy}
833 * has been set {@code false}, existing delayed tasks whose delays
834 * have not yet elapsed are cancelled. And unless the {@code
835 * ContinueExistingPeriodicTasksAfterShutdownPolicy} has been set
836 * {@code true}, future executions of existing periodic tasks will
837 * be cancelled.
838 *
839 * @throws SecurityException {@inheritDoc}
840 */
841 public void shutdown() {
842 super.shutdown();
843 }
844
845 /**
846 * Attempts to stop all actively executing tasks, halts the
847 * processing of waiting tasks, and returns a list of the tasks
848 * that were awaiting execution. These tasks are drained (removed)
849 * from the task queue upon return from this method.
850 *
851 * <p>This method does not wait for actively executing tasks to
852 * terminate. Use {@link #awaitTermination awaitTermination} to
853 * do that.
854 *
855 * <p>There are no guarantees beyond best-effort attempts to stop
856 * processing actively executing tasks. This implementation
857 * interrupts tasks via {@link Thread#interrupt}; any task that
858 * fails to respond to interrupts may never terminate.
859 *
860 * @return list of tasks that never commenced execution.
861 * Each element of this list is a {@link ScheduledFuture}.
862 * For tasks submitted via one of the {@code schedule}
863 * methods, the element will be identical to the returned
864 * {@code ScheduledFuture}. For tasks submitted using
865 * {@link #execute execute}, the element will be a
866 * zero-delay {@code ScheduledFuture}.
867 * @throws SecurityException {@inheritDoc}
868 */
869 public List<Runnable> shutdownNow() {
870 return super.shutdownNow();
871 }
872
873 /**
874 * Returns the task queue used by this executor. Access to the
875 * task queue is intended primarily for debugging and monitoring.
876 * This queue may be in active use. Retrieving the task queue
877 * does not prevent queued tasks from executing.
878 *
879 * <p>Each element of this queue is a {@link ScheduledFuture}.
880 * For tasks submitted via one of the {@code schedule} methods, the
881 * element will be identical to the returned {@code ScheduledFuture}.
882 * For tasks submitted using {@link #execute execute}, the element
883 * will be a zero-delay {@code ScheduledFuture}.
884 *
885 * <p>Iteration over this queue is <em>not</em> guaranteed to traverse
886 * tasks in the order in which they will execute.
887 *
888 * @return the task queue
889 */
890 public BlockingQueue<Runnable> getQueue() {
891 return super.getQueue();
892 }
893
894 /**
895 * Specialized delay queue. To mesh with TPE declarations, this
896 * class must be declared as a BlockingQueue<Runnable> even though
897 * it can only hold RunnableScheduledFutures.
898 */
899 static class DelayedWorkQueue extends AbstractQueue<Runnable>
900 implements BlockingQueue<Runnable> {
901
902 /*
903 * A DelayedWorkQueue is based on a heap-based data structure
904 * like those in DelayQueue and PriorityQueue, except that
905 * every ScheduledFutureTask also records its index into the
906 * heap array. This eliminates the need to find a task upon
907 * cancellation, greatly speeding up removal (down from O(n)
908 * to O(log n)), and reducing garbage retention that would
909 * otherwise occur by waiting for the element to rise to top
910 * before clearing. But because the queue may also hold
911 * RunnableScheduledFutures that are not ScheduledFutureTasks,
912 * we are not guaranteed to have such indices available, in
913 * which case we fall back to linear search. (We expect that
914 * most tasks will not be decorated, and that the faster cases
915 * will be much more common.)
916 *
917 * All heap operations must record index changes -- mainly
918 * within siftUp and siftDown. Upon removal, a task's
919 * heapIndex is set to -1. Note that ScheduledFutureTasks can
920 * appear at most once in the queue (this need not be true for
921 * other kinds of tasks or work queues), so are uniquely
922 * identified by heapIndex.
923 */
924
925 private static final int INITIAL_CAPACITY = 16;
926 private RunnableScheduledFuture<?>[] queue =
927 new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
928 private final ReentrantLock lock = new ReentrantLock();
929 private int size;
930
931 /**
932 * Thread designated to wait for the task at the head of the
933 * queue. This variant of the Leader-Follower pattern
934 * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
935 * minimize unnecessary timed waiting. When a thread becomes
936 * the leader, it waits only for the next delay to elapse, but
937 * other threads await indefinitely. The leader thread must
938 * signal some other thread before returning from take() or
939 * poll(...), unless some other thread becomes leader in the
940 * interim. Whenever the head of the queue is replaced with a
941 * task with an earlier expiration time, the leader field is
942 * invalidated by being reset to null, and some waiting
943 * thread, but not necessarily the current leader, is
944 * signalled. So waiting threads must be prepared to acquire
945 * and lose leadership while waiting.
946 */
947 private Thread leader;
948
949 /**
950 * Condition signalled when a newer task becomes available at the
951 * head of the queue or a new thread may need to become leader.
952 */
953 private final Condition available = lock.newCondition();
954
955 /**
956 * Sets f's heapIndex if it is a ScheduledFutureTask.
957 */
958 private static void setIndex(RunnableScheduledFuture<?> f, int idx) {
959 if (f instanceof ScheduledFutureTask)
960 ((ScheduledFutureTask)f).heapIndex = idx;
961 }
962
963 /**
964 * Sifts element added at bottom up to its heap-ordered spot.
965 * Call only when holding lock.
966 */
967 private void siftUp(int k, RunnableScheduledFuture<?> key) {
968 while (k > 0) {
969 int parent = (k - 1) >>> 1;
970 RunnableScheduledFuture<?> e = queue[parent];
971 if (key.compareTo(e) >= 0)
972 break;
973 queue[k] = e;
974 setIndex(e, k);
975 k = parent;
976 }
977 queue[k] = key;
978 setIndex(key, k);
979 }
980
981 /**
982 * Sifts element added at top down to its heap-ordered spot.
983 * Call only when holding lock.
984 */
985 private void siftDown(int k, RunnableScheduledFuture<?> key) {
986 int half = size >>> 1;
987 while (k < half) {
988 int child = (k << 1) + 1;
989 RunnableScheduledFuture<?> c = queue[child];
990 int right = child + 1;
991 if (right < size && c.compareTo(queue[right]) > 0)
992 c = queue[child = right];
993 if (key.compareTo(c) <= 0)
994 break;
995 queue[k] = c;
996 setIndex(c, k);
997 k = child;
998 }
999 queue[k] = key;
1000 setIndex(key, k);
1001 }
1002
1003 /**
1004 * Resizes the heap array. Call only when holding lock.
1005 */
1006 private void grow() {
1007 int oldCapacity = queue.length;
1008 int newCapacity = oldCapacity + (oldCapacity >> 1); // grow 50%
1009 if (newCapacity < 0) // overflow
1010 newCapacity = Integer.MAX_VALUE;
1011 queue = Arrays.copyOf(queue, newCapacity);
1012 }
1013
1014 /**
1015 * Finds index of given object, or -1 if absent.
1016 */
1017 private int indexOf(Object x) {
1018 if (x != null) {
1019 if (x instanceof ScheduledFutureTask) {
1020 int i = ((ScheduledFutureTask) x).heapIndex;
1021 // Sanity check; x could conceivably be a
1022 // ScheduledFutureTask from some other pool.
1023 if (i >= 0 && i < size && queue[i] == x)
1024 return i;
1025 } else {
1026 for (int i = 0; i < size; i++)
1027 if (x.equals(queue[i]))
1028 return i;
1029 }
1030 }
1031 return -1;
1032 }
1033
1034 public boolean contains(Object x) {
1035 final ReentrantLock lock = this.lock;
1036 lock.lock();
1037 try {
1038 return indexOf(x) != -1;
1039 } finally {
1040 lock.unlock();
1041 }
1042 }
1043
1044 public boolean remove(Object x) {
1045 final ReentrantLock lock = this.lock;
1046 lock.lock();
1047 try {
1048 int i = indexOf(x);
1049 if (i < 0)
1050 return false;
1051
1052 setIndex(queue[i], -1);
1053 int s = --size;
1054 RunnableScheduledFuture<?> replacement = queue[s];
1055 queue[s] = null;
1056 if (s != i) {
1057 siftDown(i, replacement);
1058 if (queue[i] == replacement)
1059 siftUp(i, replacement);
1060 }
1061 return true;
1062 } finally {
1063 lock.unlock();
1064 }
1065 }
1066
1067 public int size() {
1068 final ReentrantLock lock = this.lock;
1069 lock.lock();
1070 try {
1071 return size;
1072 } finally {
1073 lock.unlock();
1074 }
1075 }
1076
1077 public boolean isEmpty() {
1078 return size() == 0;
1079 }
1080
1081 public int remainingCapacity() {
1082 return Integer.MAX_VALUE;
1083 }
1084
1085 public RunnableScheduledFuture<?> peek() {
1086 final ReentrantLock lock = this.lock;
1087 lock.lock();
1088 try {
1089 return queue[0];
1090 } finally {
1091 lock.unlock();
1092 }
1093 }
1094
1095 public boolean offer(Runnable x) {
1096 if (x == null)
1097 throw new NullPointerException();
1098 RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
1099 final ReentrantLock lock = this.lock;
1100 lock.lock();
1101 try {
1102 int i = size;
1103 if (i >= queue.length)
1104 grow();
1105 size = i + 1;
1106 if (i == 0) {
1107 queue[0] = e;
1108 setIndex(e, 0);
1109 } else {
1110 siftUp(i, e);
1111 }
1112 if (queue[0] == e) {
1113 leader = null;
1114 available.signal();
1115 }
1116 } finally {
1117 lock.unlock();
1118 }
1119 return true;
1120 }
1121
1122 public void put(Runnable e) {
1123 offer(e);
1124 }
1125
1126 public boolean add(Runnable e) {
1127 return offer(e);
1128 }
1129
1130 public boolean offer(Runnable e, long timeout, TimeUnit unit) {
1131 return offer(e);
1132 }
1133
1134 /**
1135 * Performs common bookkeeping for poll and take: Replaces
1136 * first element with last and sifts it down. Call only when
1137 * holding lock.
1138 * @param f the task to remove and return
1139 */
1140 private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
1141 int s = --size;
1142 RunnableScheduledFuture<?> x = queue[s];
1143 queue[s] = null;
1144 if (s != 0)
1145 siftDown(0, x);
1146 setIndex(f, -1);
1147 return f;
1148 }
1149
1150 public RunnableScheduledFuture<?> poll() {
1151 final ReentrantLock lock = this.lock;
1152 lock.lock();
1153 try {
1154 RunnableScheduledFuture<?> first = queue[0];
1155 return (first == null || first.getDelay(NANOSECONDS) > 0)
1156 ? null
1157 : finishPoll(first);
1158 } finally {
1159 lock.unlock();
1160 }
1161 }
1162
1163 public RunnableScheduledFuture<?> take() throws InterruptedException {
1164 final ReentrantLock lock = this.lock;
1165 lock.lockInterruptibly();
1166 try {
1167 for (;;) {
1168 RunnableScheduledFuture<?> first = queue[0];
1169 if (first == null)
1170 available.await();
1171 else {
1172 long delay = first.getDelay(NANOSECONDS);
1173 if (delay <= 0L)
1174 return finishPoll(first);
1175 first = null; // don't retain ref while waiting
1176 if (leader != null)
1177 available.await();
1178 else {
1179 Thread thisThread = Thread.currentThread();
1180 leader = thisThread;
1181 try {
1182 available.awaitNanos(delay);
1183 } finally {
1184 if (leader == thisThread)
1185 leader = null;
1186 }
1187 }
1188 }
1189 }
1190 } finally {
1191 if (leader == null && queue[0] != null)
1192 available.signal();
1193 lock.unlock();
1194 }
1195 }
1196
1197 public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
1198 throws InterruptedException {
1199 long nanos = unit.toNanos(timeout);
1200 final ReentrantLock lock = this.lock;
1201 lock.lockInterruptibly();
1202 try {
1203 for (;;) {
1204 RunnableScheduledFuture<?> first = queue[0];
1205 if (first == null) {
1206 if (nanos <= 0L)
1207 return null;
1208 else
1209 nanos = available.awaitNanos(nanos);
1210 } else {
1211 long delay = first.getDelay(NANOSECONDS);
1212 if (delay <= 0L)
1213 return finishPoll(first);
1214 if (nanos <= 0L)
1215 return null;
1216 first = null; // don't retain ref while waiting
1217 if (nanos < delay || leader != null)
1218 nanos = available.awaitNanos(nanos);
1219 else {
1220 Thread thisThread = Thread.currentThread();
1221 leader = thisThread;
1222 try {
1223 long timeLeft = available.awaitNanos(delay);
1224 nanos -= delay - timeLeft;
1225 } finally {
1226 if (leader == thisThread)
1227 leader = null;
1228 }
1229 }
1230 }
1231 }
1232 } finally {
1233 if (leader == null && queue[0] != null)
1234 available.signal();
1235 lock.unlock();
1236 }
1237 }
1238
1239 public void clear() {
1240 final ReentrantLock lock = this.lock;
1241 lock.lock();
1242 try {
1243 for (int i = 0; i < size; i++) {
1244 RunnableScheduledFuture<?> t = queue[i];
1245 if (t != null) {
1246 queue[i] = null;
1247 setIndex(t, -1);
1248 }
1249 }
1250 size = 0;
1251 } finally {
1252 lock.unlock();
1253 }
1254 }
1255
1256 public int drainTo(Collection<? super Runnable> c) {
1257 return drainTo(c, Integer.MAX_VALUE);
1258 }
1259
1260 public int drainTo(Collection<? super Runnable> c, int maxElements) {
1261 Objects.requireNonNull(c);
1262 if (c == this)
1263 throw new IllegalArgumentException();
1264 if (maxElements <= 0)
1265 return 0;
1266 final ReentrantLock lock = this.lock;
1267 lock.lock();
1268 try {
1269 int n = 0;
1270 for (RunnableScheduledFuture<?> first;
1271 n < maxElements
1272 && (first = queue[0]) != null
1273 && first.getDelay(NANOSECONDS) <= 0;) {
1274 c.add(first); // In this order, in case add() throws.
1275 finishPoll(first);
1276 ++n;
1277 }
1278 return n;
1279 } finally {
1280 lock.unlock();
1281 }
1282 }
1283
1284 public Object[] toArray() {
1285 final ReentrantLock lock = this.lock;
1286 lock.lock();
1287 try {
1288 return Arrays.copyOf(queue, size, Object[].class);
1289 } finally {
1290 lock.unlock();
1291 }
1292 }
1293
1294 @SuppressWarnings("unchecked")
1295 public <T> T[] toArray(T[] a) {
1296 final ReentrantLock lock = this.lock;
1297 lock.lock();
1298 try {
1299 if (a.length < size)
1300 return (T[]) Arrays.copyOf(queue, size, a.getClass());
1301 System.arraycopy(queue, 0, a, 0, size);
1302 if (a.length > size)
1303 a[size] = null;
1304 return a;
1305 } finally {
1306 lock.unlock();
1307 }
1308 }
1309
1310 public Iterator<Runnable> iterator() {
1311 final ReentrantLock lock = this.lock;
1312 lock.lock();
1313 try {
1314 return new Itr(Arrays.copyOf(queue, size));
1315 } finally {
1316 lock.unlock();
1317 }
1318 }
1319
1320 /**
1321 * Snapshot iterator that works off copy of underlying q array.
1322 */
1323 private class Itr implements Iterator<Runnable> {
1324 final RunnableScheduledFuture<?>[] array;
1325 int cursor; // index of next element to return; initially 0
1326 int lastRet = -1; // index of last element returned; -1 if no such
1327
1328 Itr(RunnableScheduledFuture<?>[] array) {
1329 this.array = array;
1330 }
1331
1332 public boolean hasNext() {
1333 return cursor < array.length;
1334 }
1335
1336 public Runnable next() {
1337 if (cursor >= array.length)
1338 throw new NoSuchElementException();
1339 return array[lastRet = cursor++];
1340 }
1341
1342 public void remove() {
1343 if (lastRet < 0)
1344 throw new IllegalStateException();
1345 DelayedWorkQueue.this.remove(array[lastRet]);
1346 lastRet = -1;
1347 }
1348 }
1349 }
1350 }
1351