1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.util.NoSuchElementException;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 /**
30 * Thread safe non blocking selector pool
31 */
32 public class NioSelectorPool {
33
34 protected NioBlockingSelector blockingSelector;
35
36 protected volatile Selector sharedSelector;
37
38 protected boolean shared = Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared", "true"));
39 protected int maxSelectors = 200;
40 protected long sharedSelectorTimeout = 30000;
41 protected int maxSpareSelectors = -1;
42 protected boolean enabled = true;
43
44 protected AtomicInteger active = new AtomicInteger(0);
45 protected AtomicInteger spare = new AtomicInteger(0);
46 protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<>();
47
48 protected Selector getSharedSelector() throws IOException {
49 if (shared && sharedSelector == null) {
50 synchronized (NioSelectorPool.class) {
51 if (sharedSelector == null) {
52 sharedSelector = Selector.open();
53 }
54 }
55 }
56 return sharedSelector;
57 }
58
59 public Selector get() throws IOException{
60 if (shared) {
61 return getSharedSelector();
62 }
63 if ((!enabled) || active.incrementAndGet() >= maxSelectors) {
64 if (enabled) {
65 active.decrementAndGet();
66 }
67 return null;
68 }
69 Selector s = null;
70 try {
71 s = selectors.size() > 0 ? selectors.poll() : null;
72 if (s == null) {
73 s = Selector.open();
74 } else {
75 spare.decrementAndGet();
76 }
77 } catch (NoSuchElementException x) {
78 try {
79 s = Selector.open();
80 } catch (IOException iox) {
81 }
82 } finally {
83 if (s == null) {
84 active.decrementAndGet();// we were unable to find a selector
85 }
86 }
87 return s;
88 }
89
90
91
92 public void put(Selector s) throws IOException {
93 if (shared) {
94 return;
95 }
96 if (enabled) {
97 active.decrementAndGet();
98 }
99 if (enabled && (maxSpareSelectors == -1
100 || spare.get() < Math.min(maxSpareSelectors, maxSelectors))) {
101 spare.incrementAndGet();
102 selectors.offer(s);
103 } else {
104 s.close();
105 }
106 }
107
108 public void close() throws IOException {
109 enabled = false;
110 Selector s;
111 while ((s = selectors.poll()) != null) {
112 s.close();
113 }
114 spare.set(0);
115 active.set(0);
116 if (blockingSelector != null) {
117 blockingSelector.close();
118 }
119 if (shared && getSharedSelector() != null) {
120 getSharedSelector().close();
121 sharedSelector = null;
122 }
123 }
124
125 public void open(String name) throws IOException {
126 enabled = true;
127 getSharedSelector();
128 if (shared) {
129 blockingSelector = new NioBlockingSelector();
130 blockingSelector.open(name, getSharedSelector());
131 }
132
133 }
134
135 /**
136 * Performs a write using the bytebuffer for data to be written and a
137 * selector to block (if blocking is requested). If the
138 * <code>selector</code> parameter is null, and blocking is requested then
139 * it will perform a busy write that could take up a lot of CPU cycles.
140 * @param buf The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
141 * @param socket The socket to write data to
142 * @param selector The selector to use for blocking, if null then a busy write will be initiated
143 * @param writeTimeout The timeout for this write operation in milliseconds, -1 means no timeout
144 * @return the number of bytes written
145 * @throws EOFException if write returns -1
146 * @throws SocketTimeoutException if the write times out
147 * @throws IOException if an IO Exception occurs in the underlying socket logic
148 */
149 public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout)
150 throws IOException {
151 if (shared) {
152 return blockingSelector.write(buf, socket, writeTimeout);
153 }
154 SelectionKey key = null;
155 int written = 0;
156 boolean timedout = false;
157 int keycount = 1; //assume we can write
158 long time = System.currentTimeMillis(); //start the timeout timer
159 try {
160 while ((!timedout) && buf.hasRemaining()) {
161 int cnt = 0;
162 if ( keycount > 0 ) { //only write if we were registered for a write
163 cnt = socket.write(buf); //write the data
164 if (cnt == -1) {
165 throw new EOFException();
166 }
167
168 written += cnt;
169 if (cnt > 0) {
170 time = System.currentTimeMillis(); //reset our timeout timer
171 continue; //we successfully wrote, try again without a selector
172 }
173 }
174 if (selector != null) {
175 //register OP_WRITE to the selector
176 if (key == null) {
177 key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
178 } else {
179 key.interestOps(SelectionKey.OP_WRITE);
180 }
181 if (writeTimeout == 0) {
182 timedout = buf.hasRemaining();
183 } else if (writeTimeout < 0) {
184 keycount = selector.select();
185 } else {
186 keycount = selector.select(writeTimeout);
187 }
188 }
189 if (writeTimeout > 0 && (selector == null || keycount == 0)) {
190 timedout = (System.currentTimeMillis() - time) >= writeTimeout;
191 }
192 }
193 if (timedout) {
194 throw new SocketTimeoutException();
195 }
196 } finally {
197 if (key != null) {
198 key.cancel();
199 if (selector != null) selector.selectNow();//removes the key from this selector
200 }
201 }
202 return written;
203 }
204
205 /**
206 * Performs a blocking read using the bytebuffer for data to be read and a selector to block.
207 * If the <code>selector</code> parameter is null, then it will perform a busy read that could
208 * take up a lot of CPU cycles.
209 * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
210 * @param socket SocketChannel - the socket to write data to
211 * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
212 * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
213 * @return the number of bytes read
214 * @throws EOFException if read returns -1
215 * @throws SocketTimeoutException if the read times out
216 * @throws IOException if an IO Exception occurs in the underlying socket logic
217 */
218 public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout)
219 throws IOException {
220 if (shared) {
221 return blockingSelector.read(buf, socket, readTimeout);
222 }
223 SelectionKey key = null;
224 int read = 0;
225 boolean timedout = false;
226 int keycount = 1; //assume we can write
227 long time = System.currentTimeMillis(); //start the timeout timer
228 try {
229 while (!timedout) {
230 int cnt = 0;
231 if (keycount > 0) { //only read if we were registered for a read
232 cnt = socket.read(buf);
233 if (cnt == -1) {
234 if (read == 0) {
235 read = -1;
236 }
237 break;
238 }
239 read += cnt;
240 if (cnt > 0) continue; //read some more
241 if (cnt == 0 && read > 0) {
242 break; //we are done reading
243 }
244 }
245 if (selector != null) {//perform a blocking read
246 //register OP_WRITE to the selector
247 if (key == null) {
248 key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
249 }
250 else key.interestOps(SelectionKey.OP_READ);
251 if (readTimeout == 0) {
252 timedout = (read == 0);
253 } else if (readTimeout < 0) {
254 keycount = selector.select();
255 } else {
256 keycount = selector.select(readTimeout);
257 }
258 }
259 if (readTimeout > 0 && (selector == null || keycount == 0) ) {
260 timedout = (System.currentTimeMillis() - time) >= readTimeout;
261 }
262 }
263 if (timedout) {
264 throw new SocketTimeoutException();
265 }
266 } finally {
267 if (key != null) {
268 key.cancel();
269 if (selector != null) {
270 selector.selectNow();//removes the key from this selector
271 }
272 }
273 }
274 return read;
275 }
276
277 public void setMaxSelectors(int maxSelectors) {
278 this.maxSelectors = maxSelectors;
279 }
280
281 public void setMaxSpareSelectors(int maxSpareSelectors) {
282 this.maxSpareSelectors = maxSpareSelectors;
283 }
284
285 public void setEnabled(boolean enabled) {
286 this.enabled = enabled;
287 }
288
289 public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
290 this.sharedSelectorTimeout = sharedSelectorTimeout;
291 }
292
293 public int getMaxSelectors() {
294 return maxSelectors;
295 }
296
297 public int getMaxSpareSelectors() {
298 return maxSpareSelectors;
299 }
300
301 public boolean isEnabled() {
302 return enabled;
303 }
304
305 public long getSharedSelectorTimeout() {
306 return sharedSelectorTimeout;
307 }
308
309 public ConcurrentLinkedQueue<Selector> getSelectors() {
310 return selectors;
311 }
312
313 public AtomicInteger getSpare() {
314 return spare;
315 }
316
317 public boolean isShared() {
318 return shared;
319 }
320
321 public void setShared(boolean shared) {
322 this.shared = shared;
323 }
324 }