1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.tomcat.util.threads;
18
19 import java.util.Collection;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
22
23 import org.apache.juli.logging.Log;
24 import org.apache.juli.logging.LogFactory;
25
26 /**
27 * Shared latch that allows the latch to be acquired a limited number of times
28 * after which all subsequent requests to acquire the latch will be placed in a
29 * FIFO queue until one of the shares is returned.
30 */
31 public class LimitLatch {
32
33 private static final Log log = LogFactory.getLog(LimitLatch.class);
34
35 private class Sync extends AbstractQueuedSynchronizer {
36 private static final long serialVersionUID = 1L;
37
38 public Sync() {
39 }
40
41 @Override
42 protected int tryAcquireShared(int ignored) {
43 long newCount = count.incrementAndGet();
44 if (!released && newCount > limit) {
45 // Limit exceeded
46 count.decrementAndGet();
47 return -1;
48 } else {
49 return 1;
50 }
51 }
52
53 @Override
54 protected boolean tryReleaseShared(int arg) {
55 count.decrementAndGet();
56 return true;
57 }
58 }
59
60 private final Sync sync;
61 private final AtomicLong count;
62 private volatile long limit;
63 private volatile boolean released = false;
64
65 /**
66 * Instantiates a LimitLatch object with an initial limit.
67 * @param limit - maximum number of concurrent acquisitions of this latch
68 */
69 public LimitLatch(long limit) {
70 this.limit = limit;
71 this.count = new AtomicLong(0);
72 this.sync = new Sync();
73 }
74
75 /**
76 * Returns the current count for the latch
77 * @return the current count for latch
78 */
79 public long getCount() {
80 return count.get();
81 }
82
83 /**
84 * Obtain the current limit.
85 * @return the limit
86 */
87 public long getLimit() {
88 return limit;
89 }
90
91
92 /**
93 * Sets a new limit. If the limit is decreased there may be a period where
94 * more shares of the latch are acquired than the limit. In this case no
95 * more shares of the latch will be issued until sufficient shares have been
96 * returned to reduce the number of acquired shares of the latch to below
97 * the new limit. If the limit is increased, threads currently in the queue
98 * may not be issued one of the newly available shares until the next
99 * request is made for a latch.
100 *
101 * @param limit The new limit
102 */
103 public void setLimit(long limit) {
104 this.limit = limit;
105 }
106
107
108 /**
109 * Acquires a shared latch if one is available or waits for one if no shared
110 * latch is current available.
111 * @throws InterruptedException If the current thread is interrupted
112 */
113 public void countUpOrAwait() throws InterruptedException {
114 if (log.isDebugEnabled()) {
115 log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
116 }
117 sync.acquireSharedInterruptibly(1);
118 }
119
120 /**
121 * Releases a shared latch, making it available for another thread to use.
122 * @return the previous counter value
123 */
124 public long countDown() {
125 sync.releaseShared(0);
126 long result = getCount();
127 if (log.isDebugEnabled()) {
128 log.debug("Counting down["+Thread.currentThread().getName()+"] latch="+result);
129 }
130 return result;
131 }
132
133 /**
134 * Releases all waiting threads and causes the {@link #limit} to be ignored
135 * until {@link #reset()} is called.
136 * @return <code>true</code> if release was done
137 */
138 public boolean releaseAll() {
139 released = true;
140 return sync.releaseShared(0);
141 }
142
143 /**
144 * Resets the latch and initializes the shared acquisition counter to zero.
145 * @see #releaseAll()
146 */
147 public void reset() {
148 this.count.set(0);
149 released = false;
150 }
151
152 /**
153 * Returns <code>true</code> if there is at least one thread waiting to
154 * acquire the shared lock, otherwise returns <code>false</code>.
155 * @return <code>true</code> if threads are waiting
156 */
157 public boolean hasQueuedThreads() {
158 return sync.hasQueuedThreads();
159 }
160
161 /**
162 * Provide access to the list of threads waiting to acquire this limited
163 * shared latch.
164 * @return a collection of threads
165 */
166 public Collection<Thread> getQueuedThreads() {
167 return sync.getQueuedThreads();
168 }
169 }
170