1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.tomcat.util.threads;
18
19 import java.util.Collection;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.tomcat.util.res.StringManager;
25
26 /**
27 * As task queue specifically designed to run with a thread pool executor. The
28 * task queue is optimised to properly utilize threads within a thread pool
29 * executor. If you use a normal queue, the executor will spawn threads when
30 * there are idle threads and you wont be able to force items onto the queue
31 * itself.
32 */
33 public class TaskQueue extends LinkedBlockingQueue<Runnable> {
34
35 private static final long serialVersionUID = 1L;
36 protected static final StringManager sm = StringManager
37 .getManager("org.apache.tomcat.util.threads.res");
38
39 private transient volatile ThreadPoolExecutor parent = null;
40
41 // No need to be volatile. This is written and read in a single thread
42 // (when stopping a context and firing the listeners)
43 private Integer forcedRemainingCapacity = null;
44
45 public TaskQueue() {
46 super();
47 }
48
49 public TaskQueue(int capacity) {
50 super(capacity);
51 }
52
53 public TaskQueue(Collection<? extends Runnable> c) {
54 super(c);
55 }
56
57 public void setParent(ThreadPoolExecutor tp) {
58 parent = tp;
59 }
60
61 public boolean force(Runnable o) {
62 if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
63 return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
64 }
65
66 public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
67 if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
68 return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
69 }
70
71 @Override
72 public boolean offer(Runnable o) {
73 //we can't do any checks
74 if (parent==null) return super.offer(o);
75 //we are maxed out on threads, simply queue the object
76 if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
77 //we have idle threads, just add it to the queue
78 if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
79 //if we have less threads than maximum force creation of a new thread
80 if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
81 //if we reached here, we need to add it to the queue
82 return super.offer(o);
83 }
84
85
86 @Override
87 public Runnable poll(long timeout, TimeUnit unit)
88 throws InterruptedException {
89 Runnable runnable = super.poll(timeout, unit);
90 if (runnable == null && parent != null) {
91 // the poll timed out, it gives an opportunity to stop the current
92 // thread if needed to avoid memory leaks.
93 parent.stopCurrentThreadIfNeeded();
94 }
95 return runnable;
96 }
97
98 @Override
99 public Runnable take() throws InterruptedException {
100 if (parent != null && parent.currentThreadShouldBeStopped()) {
101 return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
102 TimeUnit.MILLISECONDS);
103 // yes, this may return null (in case of timeout) which normally
104 // does not occur with take()
105 // but the ThreadPoolExecutor implementation allows this
106 }
107 return super.take();
108 }
109
110 @Override
111 public int remainingCapacity() {
112 if (forcedRemainingCapacity != null) {
113 // ThreadPoolExecutor.setCorePoolSize checks that
114 // remainingCapacity==0 to allow to interrupt idle threads
115 // I don't see why, but this hack allows to conform to this
116 // "requirement"
117 return forcedRemainingCapacity.intValue();
118 }
119 return super.remainingCapacity();
120 }
121
122 public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
123 this.forcedRemainingCapacity = forcedRemainingCapacity;
124 }
125
126 }
127