1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.tomcat.util.threads;
18
19 import java.util.Collection;
20 import java.util.concurrent.LinkedBlockingQueue;
21 import java.util.concurrent.RejectedExecutionException;
22 import java.util.concurrent.TimeUnit;
23
24 import org.apache.tomcat.util.res.StringManager;
25
26 /**
27  * As task queue specifically designed to run with a thread pool executor. The
28  * task queue is optimised to properly utilize threads within a thread pool
29  * executor. If you use a normal queue, the executor will spawn threads when
30  * there are idle threads and you wont be able to force items onto the queue
31  * itself.
32  */

33 public class TaskQueue extends LinkedBlockingQueue<Runnable> {
34
35     private static final long serialVersionUID = 1L;
36     protected static final StringManager sm = StringManager
37             .getManager("org.apache.tomcat.util.threads.res");
38
39     private transient volatile ThreadPoolExecutor parent = null;
40
41     // No need to be volatile. This is written and read in a single thread
42     // (when stopping a context and firing the  listeners)
43     private Integer forcedRemainingCapacity = null;
44
45     public TaskQueue() {
46         super();
47     }
48
49     public TaskQueue(int capacity) {
50         super(capacity);
51     }
52
53     public TaskQueue(Collection<? extends Runnable> c) {
54         super(c);
55     }
56
57     public void setParent(ThreadPoolExecutor tp) {
58         parent = tp;
59     }
60
61     public boolean force(Runnable o) {
62         if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
63         return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
64     }
65
66     public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
67         if (parent == null || parent.isShutdown()) throw new RejectedExecutionException(sm.getString("taskQueue.notRunning"));
68         return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
69     }
70
71     @Override
72     public boolean offer(Runnable o) {
73       //we can't do any checks
74         if (parent==nullreturn super.offer(o);
75         //we are maxed out on threads, simply queue the object
76         if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
77         //we have idle threads, just add it to the queue
78         if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
79         //if we have less threads than maximum force creation of a new thread
80         if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
81         //if we reached here, we need to add it to the queue
82         return super.offer(o);
83     }
84
85
86     @Override
87     public Runnable poll(long timeout, TimeUnit unit)
88             throws InterruptedException {
89         Runnable runnable = super.poll(timeout, unit);
90         if (runnable == null && parent != null) {
91             // the poll timed out, it gives an opportunity to stop the current
92             // thread if needed to avoid memory leaks.
93             parent.stopCurrentThreadIfNeeded();
94         }
95         return runnable;
96     }
97
98     @Override
99     public Runnable take() throws InterruptedException {
100         if (parent != null && parent.currentThreadShouldBeStopped()) {
101             return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
102                     TimeUnit.MILLISECONDS);
103             // yes, this may return null (in case of timeout) which normally
104             // does not occur with take()
105             // but the ThreadPoolExecutor implementation allows this
106         }
107         return super.take();
108     }
109
110     @Override
111     public int remainingCapacity() {
112         if (forcedRemainingCapacity != null) {
113             // ThreadPoolExecutor.setCorePoolSize checks that
114             // remainingCapacity==0 to allow to interrupt idle threads
115             // I don't see why, but this hack allows to conform to this
116             // "requirement"
117             return forcedRemainingCapacity.intValue();
118         }
119         return super.remainingCapacity();
120     }
121
122     public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
123         this.forcedRemainingCapacity = forcedRemainingCapacity;
124     }
125
126 }
127