1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.tomcat.util.threads;
18
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.RejectedExecutionHandler;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.tomcat.util.res.StringManager;
28
29 /**
30 * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
31 * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
32 * If a RejectedExecutionHandler is not specified a default one will be configured
33 * and that one will always throw a RejectedExecutionException
34 *
35 */
36 public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
37 /**
38 * The string manager for this package.
39 */
40 protected static final StringManager sm = StringManager
41 .getManager("org.apache.tomcat.util.threads.res");
42
43 /**
44 * The number of tasks submitted but not yet finished. This includes tasks
45 * in the queue and tasks that have been handed to a worker thread but the
46 * latter did not start executing the task yet.
47 * This number is always greater or equal to {@link #getActiveCount()}.
48 */
49 private final AtomicInteger submittedCount = new AtomicInteger(0);
50 private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
51
52 /**
53 * Most recent time in ms when a thread decided to kill itself to avoid
54 * potential memory leaks. Useful to throttle the rate of renewals of
55 * threads.
56 */
57 private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
58
59 /**
60 * Delay in ms between 2 threads being renewed. If negative, do not renew threads.
61 */
62 private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
63
64 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
65 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
66 prestartAllCoreThreads();
67 }
68
69 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
70 RejectedExecutionHandler handler) {
71 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
72 prestartAllCoreThreads();
73 }
74
75 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
76 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
77 prestartAllCoreThreads();
78 }
79
80 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
81 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
82 prestartAllCoreThreads();
83 }
84
85 public long getThreadRenewalDelay() {
86 return threadRenewalDelay;
87 }
88
89 public void setThreadRenewalDelay(long threadRenewalDelay) {
90 this.threadRenewalDelay = threadRenewalDelay;
91 }
92
93 @Override
94 protected void afterExecute(Runnable r, Throwable t) {
95 submittedCount.decrementAndGet();
96
97 if (t == null) {
98 stopCurrentThreadIfNeeded();
99 }
100 }
101
102 /**
103 * If the current thread was started before the last time when a context was
104 * stopped, an exception is thrown so that the current thread is stopped.
105 */
106 protected void stopCurrentThreadIfNeeded() {
107 if (currentThreadShouldBeStopped()) {
108 long lastTime = lastTimeThreadKilledItself.longValue();
109 if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
110 if (lastTimeThreadKilledItself.compareAndSet(lastTime,
111 System.currentTimeMillis() + 1)) {
112 // OK, it's really time to dispose of this thread
113
114 final String msg = sm.getString(
115 "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
116 Thread.currentThread().getName());
117
118 throw new StopPooledThreadException(msg);
119 }
120 }
121 }
122 }
123
124 protected boolean currentThreadShouldBeStopped() {
125 if (threadRenewalDelay >= 0
126 && Thread.currentThread() instanceof TaskThread) {
127 TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
128 if (currentTaskThread.getCreationTime() <
129 this.lastContextStoppedTime.longValue()) {
130 return true;
131 }
132 }
133 return false;
134 }
135
136 public int getSubmittedCount() {
137 return submittedCount.get();
138 }
139
140 /**
141 * {@inheritDoc}
142 */
143 @Override
144 public void execute(Runnable command) {
145 execute(command,0,TimeUnit.MILLISECONDS);
146 }
147
148 /**
149 * Executes the given command at some time in the future. The command
150 * may execute in a new thread, in a pooled thread, or in the calling
151 * thread, at the discretion of the <code>Executor</code> implementation.
152 * If no threads are available, it will be added to the work queue.
153 * If the work queue is full, the system will wait for the specified
154 * time and it throw a RejectedExecutionException if the queue is still
155 * full after that.
156 *
157 * @param command the runnable task
158 * @param timeout A timeout for the completion of the task
159 * @param unit The timeout time unit
160 * @throws RejectedExecutionException if this task cannot be
161 * accepted for execution - the queue is full
162 * @throws NullPointerException if command or unit is null
163 */
164 public void execute(Runnable command, long timeout, TimeUnit unit) {
165 submittedCount.incrementAndGet();
166 try {
167 super.execute(command);
168 } catch (RejectedExecutionException rx) {
169 if (super.getQueue() instanceof TaskQueue) {
170 final TaskQueue queue = (TaskQueue)super.getQueue();
171 try {
172 if (!queue.force(command, timeout, unit)) {
173 submittedCount.decrementAndGet();
174 throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
175 }
176 } catch (InterruptedException x) {
177 submittedCount.decrementAndGet();
178 throw new RejectedExecutionException(x);
179 }
180 } else {
181 submittedCount.decrementAndGet();
182 throw rx;
183 }
184
185 }
186 }
187
188 public void contextStopping() {
189 this.lastContextStoppedTime.set(System.currentTimeMillis());
190
191 // save the current pool parameters to restore them later
192 int savedCorePoolSize = this.getCorePoolSize();
193 TaskQueue taskQueue =
194 getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
195 if (taskQueue != null) {
196 // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
197 // checks that queue.remainingCapacity()==0. I did not understand
198 // why, but to get the intended effect of waking up idle threads, I
199 // temporarily fake this condition.
200 taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
201 }
202
203 // setCorePoolSize(0) wakes idle threads
204 this.setCorePoolSize(0);
205
206 // TaskQueue.take() takes care of timing out, so that we are sure that
207 // all threads of the pool are renewed in a limited time, something like
208 // (threadKeepAlive + longest request time)
209
210 if (taskQueue != null) {
211 // ok, restore the state of the queue and pool
212 taskQueue.setForcedRemainingCapacity(null);
213 }
214 this.setCorePoolSize(savedCorePoolSize);
215 }
216
217 private static class RejectHandler implements RejectedExecutionHandler {
218 @Override
219 public void rejectedExecution(Runnable r,
220 java.util.concurrent.ThreadPoolExecutor executor) {
221 throw new RejectedExecutionException();
222 }
223
224 }
225
226
227 }
228