1 /*
2  *  Licensed to the Apache Software Foundation (ASF) under one or more
3  *  contributor license agreements.  See the NOTICE file distributed with
4  *  this work for additional information regarding copyright ownership.
5  *  The ASF licenses this file to You under the Apache License, Version 2.0
6  *  (the "License"); you may not use this file except in compliance with
7  *  the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  */

17 package org.apache.tomcat.util.threads;
18
19 import java.util.Collection;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
22
23 import org.apache.juli.logging.Log;
24 import org.apache.juli.logging.LogFactory;
25
26 /**
27  * Shared latch that allows the latch to be acquired a limited number of times
28  * after which all subsequent requests to acquire the latch will be placed in a
29  * FIFO queue until one of the shares is returned.
30  */

31 public class LimitLatch {
32
33     private static final Log log = LogFactory.getLog(LimitLatch.class);
34
35     private class Sync extends AbstractQueuedSynchronizer {
36         private static final long serialVersionUID = 1L;
37
38         public Sync() {
39         }
40
41         @Override
42         protected int tryAcquireShared(int ignored) {
43             long newCount = count.incrementAndGet();
44             if (!released && newCount > limit) {
45                 // Limit exceeded
46                 count.decrementAndGet();
47                 return -1;
48             } else {
49                 return 1;
50             }
51         }
52
53         @Override
54         protected boolean tryReleaseShared(int arg) {
55             count.decrementAndGet();
56             return true;
57         }
58     }
59
60     private final Sync sync;
61     private final AtomicLong count;
62     private volatile long limit;
63     private volatile boolean released = false;
64
65     /**
66      * Instantiates a LimitLatch object with an initial limit.
67      * @param limit - maximum number of concurrent acquisitions of this latch
68      */

69     public LimitLatch(long limit) {
70         this.limit = limit;
71         this.count = new AtomicLong(0);
72         this.sync = new Sync();
73     }
74
75     /**
76      * Returns the current count for the latch
77      * @return the current count for latch
78      */

79     public long getCount() {
80         return count.get();
81     }
82
83     /**
84      * Obtain the current limit.
85      * @return the limit
86      */

87     public long getLimit() {
88         return limit;
89     }
90
91
92     /**
93      * Sets a new limit. If the limit is decreased there may be a period where
94      * more shares of the latch are acquired than the limit. In this case no
95      * more shares of the latch will be issued until sufficient shares have been
96      * returned to reduce the number of acquired shares of the latch to below
97      * the new limit. If the limit is increased, threads currently in the queue
98      * may not be issued one of the newly available shares until the next
99      * request is made for a latch.
100      *
101      * @param limit The new limit
102      */

103     public void setLimit(long limit) {
104         this.limit = limit;
105     }
106
107
108     /**
109      * Acquires a shared latch if one is available or waits for one if no shared
110      * latch is current available.
111      * @throws InterruptedException If the current thread is interrupted
112      */

113     public void countUpOrAwait() throws InterruptedException {
114         if (log.isDebugEnabled()) {
115             log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
116         }
117         sync.acquireSharedInterruptibly(1);
118     }
119
120     /**
121      * Releases a shared latch, making it available for another thread to use.
122      * @return the previous counter value
123      */

124     public long countDown() {
125         sync.releaseShared(0);
126         long result = getCount();
127         if (log.isDebugEnabled()) {
128             log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
129         }
130         return result;
131     }
132
133     /**
134      * Releases all waiting threads and causes the {@link #limit} to be ignored
135      * until {@link #reset()} is called.
136      * @return <code>true</code> if release was done
137      */

138     public boolean releaseAll() {
139         released = true;
140         return sync.releaseShared(0);
141     }
142
143     /**
144      * Resets the latch and initializes the shared acquisition counter to zero.
145      * @see #releaseAll()
146      */

147     public void reset() {
148         this.count.set(0);
149         released = false;
150     }
151
152     /**
153      * Returns <code>true</code> if there is at least one thread waiting to
154      * acquire the shared lock, otherwise returns <code>false</code>.
155      * @return <code>true</code> if threads are waiting
156      */

157     public boolean hasQueuedThreads() {
158         return sync.hasQueuedThreads();
159     }
160
161     /**
162      * Provide access to the list of threads waiting to acquire this limited
163      * shared latch.
164      * @return a collection of threads
165      */

166     public Collection<Thread> getQueuedThreads() {
167         return sync.getQueuedThreads();
168     }
169 }
170