1 /*
2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3 *
4 * This code is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 only, as
6 * published by the Free Software Foundation. Oracle designates this
7 * particular file as subject to the "Classpath" exception as provided
8 * by Oracle in the LICENSE file that accompanied this code.
9 *
10 * This code is distributed in the hope that it will be useful, but WITHOUT
11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * version 2 for more details (a copy is included in the LICENSE file that
14 * accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License version
17 * 2 along with this work; if not, write to the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19 *
20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21 * or visit www.oracle.com if you need additional information or have any
22 * questions.
23 */
24
25 /*
26 * This file is available under and governed by the GNU General Public
27 * License version 2 only, as published by the Free Software Foundation.
28 * However, the following notice accompanied the original version of this
29 * file:
30 *
31 * Written by Doug Lea with assistance from members of JCP JSR-166
32 * Expert Group and released to the public domain, as explained at
33 * http://creativecommons.org/publicdomain/zero/1.0/
34 */
35
36 package java.util.concurrent;
37
38 import java.lang.invoke.MethodHandles;
39 import java.lang.invoke.VarHandle;
40 import java.util.concurrent.locks.LockSupport;
41
42 /**
43 * A cancellable asynchronous computation. This class provides a base
44 * implementation of {@link Future}, with methods to start and cancel
45 * a computation, query to see if the computation is complete, and
46 * retrieve the result of the computation. The result can only be
47 * retrieved when the computation has completed; the {@code get}
48 * methods will block if the computation has not yet completed. Once
49 * the computation has completed, the computation cannot be restarted
50 * or cancelled (unless the computation is invoked using
51 * {@link #runAndReset}).
52 *
53 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
54 * {@link Runnable} object. Because {@code FutureTask} implements
55 * {@code Runnable}, a {@code FutureTask} can be submitted to an
56 * {@link Executor} for execution.
57 *
58 * <p>In addition to serving as a standalone class, this class provides
59 * {@code protected} functionality that may be useful when creating
60 * customized task classes.
61 *
62 * @since 1.5
63 * @author Doug Lea
64 * @param <V> The result type returned by this FutureTask's {@code get} methods
65 */
66 public class FutureTask<V> implements RunnableFuture<V> {
67 /*
68 * Revision notes: This differs from previous versions of this
69 * class that relied on AbstractQueuedSynchronizer, mainly to
70 * avoid surprising users about retaining interrupt status during
71 * cancellation races. Sync control in the current design relies
72 * on a "state" field updated via CAS to track completion, along
73 * with a simple Treiber stack to hold waiting threads.
74 */
75
76 /**
77 * The run state of this task, initially NEW. The run state
78 * transitions to a terminal state only in methods set,
79 * setException, and cancel. During completion, state may take on
80 * transient values of COMPLETING (while outcome is being set) or
81 * INTERRUPTING (only while interrupting the runner to satisfy a
82 * cancel(true)). Transitions from these intermediate to final
83 * states use cheaper ordered/lazy writes because values are unique
84 * and cannot be further modified.
85 *
86 * Possible state transitions:
87 * NEW -> COMPLETING -> NORMAL
88 * NEW -> COMPLETING -> EXCEPTIONAL
89 * NEW -> CANCELLED
90 * NEW -> INTERRUPTING -> INTERRUPTED
91 */
92 private volatile int state;
93 private static final int NEW = 0;
94 private static final int COMPLETING = 1;
95 private static final int NORMAL = 2;
96 private static final int EXCEPTIONAL = 3;
97 private static final int CANCELLED = 4;
98 private static final int INTERRUPTING = 5;
99 private static final int INTERRUPTED = 6;
100
101 /** The underlying callable; nulled out after running */
102 private Callable<V> callable;
103 /** The result to return or exception to throw from get() */
104 private Object outcome; // non-volatile, protected by state reads/writes
105 /** The thread running the callable; CASed during run() */
106 private volatile Thread runner;
107 /** Treiber stack of waiting threads */
108 private volatile WaitNode waiters;
109
110 /**
111 * Returns result or throws exception for completed task.
112 *
113 * @param s completed state value
114 */
115 @SuppressWarnings("unchecked")
116 private V report(int s) throws ExecutionException {
117 Object x = outcome;
118 if (s == NORMAL)
119 return (V)x;
120 if (s >= CANCELLED)
121 throw new CancellationException();
122 throw new ExecutionException((Throwable)x);
123 }
124
125 /**
126 * Creates a {@code FutureTask} that will, upon running, execute the
127 * given {@code Callable}.
128 *
129 * @param callable the callable task
130 * @throws NullPointerException if the callable is null
131 */
132 public FutureTask(Callable<V> callable) {
133 if (callable == null)
134 throw new NullPointerException();
135 this.callable = callable;
136 this.state = NEW; // ensure visibility of callable
137 }
138
139 /**
140 * Creates a {@code FutureTask} that will, upon running, execute the
141 * given {@code Runnable}, and arrange that {@code get} will return the
142 * given result on successful completion.
143 *
144 * @param runnable the runnable task
145 * @param result the result to return on successful completion. If
146 * you don't need a particular result, consider using
147 * constructions of the form:
148 * {@code Future<?> f = new FutureTask<Void>(runnable, null)}
149 * @throws NullPointerException if the runnable is null
150 */
151 public FutureTask(Runnable runnable, V result) {
152 this.callable = Executors.callable(runnable, result);
153 this.state = NEW; // ensure visibility of callable
154 }
155
156 public boolean isCancelled() {
157 return state >= CANCELLED;
158 }
159
160 public boolean isDone() {
161 return state != NEW;
162 }
163
164 public boolean cancel(boolean mayInterruptIfRunning) {
165 if (!(state == NEW && STATE.compareAndSet
166 (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
167 return false;
168 try { // in case call to interrupt throws exception
169 if (mayInterruptIfRunning) {
170 try {
171 Thread t = runner;
172 if (t != null)
173 t.interrupt();
174 } finally { // final state
175 STATE.setRelease(this, INTERRUPTED);
176 }
177 }
178 } finally {
179 finishCompletion();
180 }
181 return true;
182 }
183
184 /**
185 * @throws CancellationException {@inheritDoc}
186 */
187 public V get() throws InterruptedException, ExecutionException {
188 int s = state;
189 if (s <= COMPLETING)
190 s = awaitDone(false, 0L);
191 return report(s);
192 }
193
194 /**
195 * @throws CancellationException {@inheritDoc}
196 */
197 public V get(long timeout, TimeUnit unit)
198 throws InterruptedException, ExecutionException, TimeoutException {
199 if (unit == null)
200 throw new NullPointerException();
201 int s = state;
202 if (s <= COMPLETING &&
203 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
204 throw new TimeoutException();
205 return report(s);
206 }
207
208 /**
209 * Protected method invoked when this task transitions to state
210 * {@code isDone} (whether normally or via cancellation). The
211 * default implementation does nothing. Subclasses may override
212 * this method to invoke completion callbacks or perform
213 * bookkeeping. Note that you can query status inside the
214 * implementation of this method to determine whether this task
215 * has been cancelled.
216 */
217 protected void done() { }
218
219 /**
220 * Sets the result of this future to the given value unless
221 * this future has already been set or has been cancelled.
222 *
223 * <p>This method is invoked internally by the {@link #run} method
224 * upon successful completion of the computation.
225 *
226 * @param v the value
227 */
228 protected void set(V v) {
229 if (STATE.compareAndSet(this, NEW, COMPLETING)) {
230 outcome = v;
231 STATE.setRelease(this, NORMAL); // final state
232 finishCompletion();
233 }
234 }
235
236 /**
237 * Causes this future to report an {@link ExecutionException}
238 * with the given throwable as its cause, unless this future has
239 * already been set or has been cancelled.
240 *
241 * <p>This method is invoked internally by the {@link #run} method
242 * upon failure of the computation.
243 *
244 * @param t the cause of failure
245 */
246 protected void setException(Throwable t) {
247 if (STATE.compareAndSet(this, NEW, COMPLETING)) {
248 outcome = t;
249 STATE.setRelease(this, EXCEPTIONAL); // final state
250 finishCompletion();
251 }
252 }
253
254 public void run() {
255 if (state != NEW ||
256 !RUNNER.compareAndSet(this, null, Thread.currentThread()))
257 return;
258 try {
259 Callable<V> c = callable;
260 if (c != null && state == NEW) {
261 V result;
262 boolean ran;
263 try {
264 result = c.call();
265 ran = true;
266 } catch (Throwable ex) {
267 result = null;
268 ran = false;
269 setException(ex);
270 }
271 if (ran)
272 set(result);
273 }
274 } finally {
275 // runner must be non-null until state is settled to
276 // prevent concurrent calls to run()
277 runner = null;
278 // state must be re-read after nulling runner to prevent
279 // leaked interrupts
280 int s = state;
281 if (s >= INTERRUPTING)
282 handlePossibleCancellationInterrupt(s);
283 }
284 }
285
286 /**
287 * Executes the computation without setting its result, and then
288 * resets this future to initial state, failing to do so if the
289 * computation encounters an exception or is cancelled. This is
290 * designed for use with tasks that intrinsically execute more
291 * than once.
292 *
293 * @return {@code true} if successfully run and reset
294 */
295 protected boolean runAndReset() {
296 if (state != NEW ||
297 !RUNNER.compareAndSet(this, null, Thread.currentThread()))
298 return false;
299 boolean ran = false;
300 int s = state;
301 try {
302 Callable<V> c = callable;
303 if (c != null && s == NEW) {
304 try {
305 c.call(); // don't set result
306 ran = true;
307 } catch (Throwable ex) {
308 setException(ex);
309 }
310 }
311 } finally {
312 // runner must be non-null until state is settled to
313 // prevent concurrent calls to run()
314 runner = null;
315 // state must be re-read after nulling runner to prevent
316 // leaked interrupts
317 s = state;
318 if (s >= INTERRUPTING)
319 handlePossibleCancellationInterrupt(s);
320 }
321 return ran && s == NEW;
322 }
323
324 /**
325 * Ensures that any interrupt from a possible cancel(true) is only
326 * delivered to a task while in run or runAndReset.
327 */
328 private void handlePossibleCancellationInterrupt(int s) {
329 // It is possible for our interrupter to stall before getting a
330 // chance to interrupt us. Let's spin-wait patiently.
331 if (s == INTERRUPTING)
332 while (state == INTERRUPTING)
333 Thread.yield(); // wait out pending interrupt
334
335 // assert state == INTERRUPTED;
336
337 // We want to clear any interrupt we may have received from
338 // cancel(true). However, it is permissible to use interrupts
339 // as an independent mechanism for a task to communicate with
340 // its caller, and there is no way to clear only the
341 // cancellation interrupt.
342 //
343 // Thread.interrupted();
344 }
345
346 /**
347 * Simple linked list nodes to record waiting threads in a Treiber
348 * stack. See other classes such as Phaser and SynchronousQueue
349 * for more detailed explanation.
350 */
351 static final class WaitNode {
352 volatile Thread thread;
353 volatile WaitNode next;
354 WaitNode() { thread = Thread.currentThread(); }
355 }
356
357 /**
358 * Removes and signals all waiting threads, invokes done(), and
359 * nulls out callable.
360 */
361 private void finishCompletion() {
362 // assert state > COMPLETING;
363 for (WaitNode q; (q = waiters) != null;) {
364 if (WAITERS.weakCompareAndSet(this, q, null)) {
365 for (;;) {
366 Thread t = q.thread;
367 if (t != null) {
368 q.thread = null;
369 LockSupport.unpark(t);
370 }
371 WaitNode next = q.next;
372 if (next == null)
373 break;
374 q.next = null; // unlink to help gc
375 q = next;
376 }
377 break;
378 }
379 }
380
381 done();
382
383 callable = null; // to reduce footprint
384 }
385
386 /**
387 * Awaits completion or aborts on interrupt or timeout.
388 *
389 * @param timed true if use timed waits
390 * @param nanos time to wait, if timed
391 * @return state upon completion or at timeout
392 */
393 private int awaitDone(boolean timed, long nanos)
394 throws InterruptedException {
395 // The code below is very delicate, to achieve these goals:
396 // - call nanoTime exactly once for each call to park
397 // - if nanos <= 0L, return promptly without allocation or nanoTime
398 // - if nanos == Long.MIN_VALUE, don't underflow
399 // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
400 // and we suffer a spurious wakeup, we will do no worse than
401 // to park-spin for a while
402 long startTime = 0L; // Special value 0L means not yet parked
403 WaitNode q = null;
404 boolean queued = false;
405 for (;;) {
406 int s = state;
407 if (s > COMPLETING) {
408 if (q != null)
409 q.thread = null;
410 return s;
411 }
412 else if (s == COMPLETING)
413 // We may have already promised (via isDone) that we are done
414 // so never return empty-handed or throw InterruptedException
415 Thread.yield();
416 else if (Thread.interrupted()) {
417 removeWaiter(q);
418 throw new InterruptedException();
419 }
420 else if (q == null) {
421 if (timed && nanos <= 0L)
422 return s;
423 q = new WaitNode();
424 }
425 else if (!queued)
426 queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
427 else if (timed) {
428 final long parkNanos;
429 if (startTime == 0L) { // first time
430 startTime = System.nanoTime();
431 if (startTime == 0L)
432 startTime = 1L;
433 parkNanos = nanos;
434 } else {
435 long elapsed = System.nanoTime() - startTime;
436 if (elapsed >= nanos) {
437 removeWaiter(q);
438 return state;
439 }
440 parkNanos = nanos - elapsed;
441 }
442 // nanoTime may be slow; recheck before parking
443 if (state < COMPLETING)
444 LockSupport.parkNanos(this, parkNanos);
445 }
446 else
447 LockSupport.park(this);
448 }
449 }
450
451 /**
452 * Tries to unlink a timed-out or interrupted wait node to avoid
453 * accumulating garbage. Internal nodes are simply unspliced
454 * without CAS since it is harmless if they are traversed anyway
455 * by releasers. To avoid effects of unsplicing from already
456 * removed nodes, the list is retraversed in case of an apparent
457 * race. This is slow when there are a lot of nodes, but we don't
458 * expect lists to be long enough to outweigh higher-overhead
459 * schemes.
460 */
461 private void removeWaiter(WaitNode node) {
462 if (node != null) {
463 node.thread = null;
464 retry:
465 for (;;) { // restart on removeWaiter race
466 for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
467 s = q.next;
468 if (q.thread != null)
469 pred = q;
470 else if (pred != null) {
471 pred.next = s;
472 if (pred.thread == null) // check for race
473 continue retry;
474 }
475 else if (!WAITERS.compareAndSet(this, q, s))
476 continue retry;
477 }
478 break;
479 }
480 }
481 }
482
483 /**
484 * Returns a string representation of this FutureTask.
485 *
486 * @implSpec
487 * The default implementation returns a string identifying this
488 * FutureTask, as well as its completion state. The state, in
489 * brackets, contains one of the strings {@code "Completed Normally"},
490 * {@code "Completed Exceptionally"}, {@code "Cancelled"}, or {@code
491 * "Not completed"}.
492 *
493 * @return a string representation of this FutureTask
494 */
495 public String toString() {
496 final String status;
497 switch (state) {
498 case NORMAL:
499 status = "[Completed normally]";
500 break;
501 case EXCEPTIONAL:
502 status = "[Completed exceptionally: " + outcome + "]";
503 break;
504 case CANCELLED:
505 case INTERRUPTING:
506 case INTERRUPTED:
507 status = "[Cancelled]";
508 break;
509 default:
510 final Callable<?> callable = this.callable;
511 status = (callable == null)
512 ? "[Not completed]"
513 : "[Not completed, task = " + callable + "]";
514 }
515 return super.toString() + status;
516 }
517
518 // VarHandle mechanics
519 private static final VarHandle STATE;
520 private static final VarHandle RUNNER;
521 private static final VarHandle WAITERS;
522 static {
523 try {
524 MethodHandles.Lookup l = MethodHandles.lookup();
525 STATE = l.findVarHandle(FutureTask.class, "state", int.class);
526 RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
527 WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
528 } catch (ReflectiveOperationException e) {
529 throw new ExceptionInInitializerError(e);
530 }
531
532 // Reduce the risk of rare disastrous classloading in first call to
533 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
534 Class<?> ensureLoaded = LockSupport.class;
535 }
536
537 }
538