1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.tomcat.util.threads;
18
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.RejectedExecutionHandler;
22 import java.util.concurrent.ThreadFactory;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicInteger;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.tomcat.util.res.StringManager;
28
29 /**
30  * Same as a java.util.concurrent.ThreadPoolExecutor but implements a much more efficient
31  * {@link #getSubmittedCount()} method, to be used to properly handle the work queue.
32  * If a RejectedExecutionHandler is not specified a default one will be configured
33  * and that one will always throw a RejectedExecutionException
34  *
35  */

36 public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
37     /**
38      * The string manager for this package.
39      */

40     protected static final StringManager sm = StringManager
41             .getManager("org.apache.tomcat.util.threads.res");
42
43     /**
44      * The number of tasks submitted but not yet finished. This includes tasks
45      * in the queue and tasks that have been handed to a worker thread but the
46      * latter did not start executing the task yet.
47      * This number is always greater or equal to {@link #getActiveCount()}.
48      */

49     private final AtomicInteger submittedCount = new AtomicInteger(0);
50     private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
51
52     /**
53      * Most recent time in ms when a thread decided to kill itself to avoid
54      * potential memory leaks. Useful to throttle the rate of renewals of
55      * threads.
56      */

57     private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
58
59     /**
60      * Delay in ms between 2 threads being renewed. If negative, do not renew threads.
61      */

62     private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
63
64     public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
65         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
66         prestartAllCoreThreads();
67     }
68
69     public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
70             RejectedExecutionHandler handler) {
71         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
72         prestartAllCoreThreads();
73     }
74
75     public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
76         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
77         prestartAllCoreThreads();
78     }
79
80     public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
81         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
82         prestartAllCoreThreads();
83     }
84
85     public long getThreadRenewalDelay() {
86         return threadRenewalDelay;
87     }
88
89     public void setThreadRenewalDelay(long threadRenewalDelay) {
90         this.threadRenewalDelay = threadRenewalDelay;
91     }
92
93     @Override
94     protected void afterExecute(Runnable r, Throwable t) {
95         submittedCount.decrementAndGet();
96
97         if (t == null) {
98             stopCurrentThreadIfNeeded();
99         }
100     }
101
102     /**
103      * If the current thread was started before the last time when a context was
104      * stopped, an exception is thrown so that the current thread is stopped.
105      */

106     protected void stopCurrentThreadIfNeeded() {
107         if (currentThreadShouldBeStopped()) {
108             long lastTime = lastTimeThreadKilledItself.longValue();
109             if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
110                 if (lastTimeThreadKilledItself.compareAndSet(lastTime,
111                         System.currentTimeMillis() + 1)) {
112                     // OK, it's really time to dispose of this thread
113
114                     final String msg = sm.getString(
115                                     "threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
116                                     Thread.currentThread().getName());
117
118                     throw new StopPooledThreadException(msg);
119                 }
120             }
121         }
122     }
123
124     protected boolean currentThreadShouldBeStopped() {
125         if (threadRenewalDelay >= 0
126             && Thread.currentThread() instanceof TaskThread) {
127             TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
128             if (currentTaskThread.getCreationTime() <
129                     this.lastContextStoppedTime.longValue()) {
130                 return true;
131             }
132         }
133         return false;
134     }
135
136     public int getSubmittedCount() {
137         return submittedCount.get();
138     }
139
140     /**
141      * {@inheritDoc}
142      */

143     @Override
144     public void execute(Runnable command) {
145         execute(command,0,TimeUnit.MILLISECONDS);
146     }
147
148     /**
149      * Executes the given command at some time in the future.  The command
150      * may execute in a new thread, in a pooled thread, or in the calling
151      * thread, at the discretion of the <code>Executor</code> implementation.
152      * If no threads are available, it will be added to the work queue.
153      * If the work queue is full, the system will wait for the specified
154      * time and it throw a RejectedExecutionException if the queue is still
155      * full after that.
156      *
157      * @param command the runnable task
158      * @param timeout A timeout for the completion of the task
159      * @param unit The timeout time unit
160      * @throws RejectedExecutionException if this task cannot be
161      * accepted for execution - the queue is full
162      * @throws NullPointerException if command or unit is null
163      */

164     public void execute(Runnable command, long timeout, TimeUnit unit) {
165         submittedCount.incrementAndGet();
166         try {
167             super.execute(command);
168         } catch (RejectedExecutionException rx) {
169             if (super.getQueue() instanceof TaskQueue) {
170                 final TaskQueue queue = (TaskQueue)super.getQueue();
171                 try {
172                     if (!queue.force(command, timeout, unit)) {
173                         submittedCount.decrementAndGet();
174                         throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
175                     }
176                 } catch (InterruptedException x) {
177                     submittedCount.decrementAndGet();
178                     throw new RejectedExecutionException(x);
179                 }
180             } else {
181                 submittedCount.decrementAndGet();
182                 throw rx;
183             }
184
185         }
186     }
187
188     public void contextStopping() {
189         this.lastContextStoppedTime.set(System.currentTimeMillis());
190
191         // save the current pool parameters to restore them later
192         int savedCorePoolSize = this.getCorePoolSize();
193         TaskQueue taskQueue =
194                 getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
195         if (taskQueue != null) {
196             // note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
197             // checks that queue.remainingCapacity()==0. I did not understand
198             // why, but to get the intended effect of waking up idle threads, I
199             // temporarily fake this condition.
200             taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
201         }
202
203         // setCorePoolSize(0) wakes idle threads
204         this.setCorePoolSize(0);
205
206         // TaskQueue.take() takes care of timing out, so that we are sure that
207         // all threads of the pool are renewed in a limited time, something like
208         // (threadKeepAlive + longest request time)
209
210         if (taskQueue != null) {
211             // ok, restore the state of the queue and pool
212             taskQueue.setForcedRemainingCapacity(null);
213         }
214         this.setCorePoolSize(savedCorePoolSize);
215     }
216
217     private static class RejectHandler implements RejectedExecutionHandler {
218         @Override
219         public void rejectedExecution(Runnable r,
220                 java.util.concurrent.ThreadPoolExecutor executor) {
221             throw new RejectedExecutionException();
222         }
223
224     }
225
226
227 }
228