1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *      http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */

17 package org.apache.tomcat.util.net;
18
19 import java.io.EOFException;
20 import java.io.IOException;
21 import java.net.SocketTimeoutException;
22 import java.nio.ByteBuffer;
23 import java.nio.channels.SelectionKey;
24 import java.nio.channels.Selector;
25 import java.util.NoSuchElementException;
26 import java.util.concurrent.ConcurrentLinkedQueue;
27 import java.util.concurrent.atomic.AtomicInteger;
28
29 /**
30  * Thread safe non blocking selector pool
31  */

32 public class NioSelectorPool {
33
34     protected NioBlockingSelector blockingSelector;
35
36     protected volatile Selector sharedSelector;
37
38     protected boolean shared = Boolean.parseBoolean(System.getProperty("org.apache.tomcat.util.net.NioSelectorShared""true"));
39     protected int maxSelectors = 200;
40     protected long sharedSelectorTimeout = 30000;
41     protected int maxSpareSelectors = -1;
42     protected boolean enabled = true;
43
44     protected AtomicInteger active = new AtomicInteger(0);
45     protected AtomicInteger spare = new AtomicInteger(0);
46     protected ConcurrentLinkedQueue<Selector> selectors = new ConcurrentLinkedQueue<>();
47
48     protected Selector getSharedSelector() throws IOException {
49         if (shared && sharedSelector == null) {
50             synchronized (NioSelectorPool.class) {
51                 if (sharedSelector == null) {
52                     sharedSelector = Selector.open();
53                 }
54             }
55         }
56         return  sharedSelector;
57     }
58
59     public Selector get() throws IOException{
60         if (shared) {
61             return getSharedSelector();
62         }
63         if ((!enabled) || active.incrementAndGet() >= maxSelectors) {
64             if (enabled) {
65                 active.decrementAndGet();
66             }
67             return null;
68         }
69         Selector s = null;
70         try {
71             s = selectors.size() > 0 ? selectors.poll() : null;
72             if (s == null) {
73                 s = Selector.open();
74             } else {
75                 spare.decrementAndGet();
76             }
77         } catch (NoSuchElementException x) {
78             try {
79                 s = Selector.open();
80             } catch (IOException iox) {
81             }
82         } finally {
83             if (s == null) {
84                 active.decrementAndGet();// we were unable to find a selector
85             }
86         }
87         return s;
88     }
89
90
91
92     public void put(Selector s) throws IOException {
93         if (shared) {
94             return;
95         }
96         if (enabled) {
97             active.decrementAndGet();
98         }
99         if (enabled && (maxSpareSelectors == -1
100                 || spare.get() < Math.min(maxSpareSelectors, maxSelectors))) {
101             spare.incrementAndGet();
102             selectors.offer(s);
103         } else {
104             s.close();
105         }
106     }
107
108     public void close() throws IOException {
109         enabled = false;
110         Selector s;
111         while ((s = selectors.poll()) != null) {
112             s.close();
113         }
114         spare.set(0);
115         active.set(0);
116         if (blockingSelector != null) {
117             blockingSelector.close();
118         }
119         if (shared && getSharedSelector() != null) {
120             getSharedSelector().close();
121             sharedSelector = null;
122         }
123     }
124
125     public void open(String name) throws IOException {
126         enabled = true;
127         getSharedSelector();
128         if (shared) {
129             blockingSelector = new NioBlockingSelector();
130             blockingSelector.open(name, getSharedSelector());
131         }
132
133     }
134
135     /**
136      * Performs a write using the bytebuffer for data to be written and a
137      * selector to block (if blocking is requested). If the
138      * <code>selector</code> parameter is null, and blocking is requested then
139      * it will perform a busy write that could take up a lot of CPU cycles.
140      * @param buf           The buffer containing the data, we will write as long as <code>(buf.hasRemaining()==true)</code>
141      * @param socket        The socket to write data to
142      * @param selector      The selector to use for blocking, if null then a busy write will be initiated
143      * @param writeTimeout  The timeout for this write operation in milliseconds, -1 means no timeout
144      * @return the number of bytes written
145      * @throws EOFException if write returns -1
146      * @throws SocketTimeoutException if the write times out
147      * @throws IOException if an IO Exception occurs in the underlying socket logic
148      */

149     public int write(ByteBuffer buf, NioChannel socket, Selector selector, long writeTimeout)
150             throws IOException {
151         if (shared) {
152             return blockingSelector.write(buf, socket, writeTimeout);
153         }
154         SelectionKey key = null;
155         int written = 0;
156         boolean timedout = false;
157         int keycount = 1; //assume we can write
158         long time = System.currentTimeMillis(); //start the timeout timer
159         try {
160             while ((!timedout) && buf.hasRemaining()) {
161                 int cnt = 0;
162                 if ( keycount > 0 ) { //only write if we were registered for a write
163                     cnt = socket.write(buf); //write the data
164                     if (cnt == -1) {
165                         throw new EOFException();
166                     }
167
168                     written += cnt;
169                     if (cnt > 0) {
170                         time = System.currentTimeMillis(); //reset our timeout timer
171                         continue//we successfully wrote, try again without a selector
172                     }
173                 }
174                 if (selector != null) {
175                     //register OP_WRITE to the selector
176                     if (key == null) {
177                         key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
178                     } else {
179                         key.interestOps(SelectionKey.OP_WRITE);
180                     }
181                     if (writeTimeout == 0) {
182                         timedout = buf.hasRemaining();
183                     } else if (writeTimeout < 0) {
184                         keycount = selector.select();
185                     } else {
186                         keycount = selector.select(writeTimeout);
187                     }
188                 }
189                 if (writeTimeout > 0 && (selector == null || keycount == 0)) {
190                     timedout = (System.currentTimeMillis() - time) >= writeTimeout;
191                 }
192             }
193             if (timedout) {
194                 throw new SocketTimeoutException();
195             }
196         } finally {
197             if (key != null) {
198                 key.cancel();
199                 if (selector != null) selector.selectNow();//removes the key from this selector
200             }
201         }
202         return written;
203     }
204
205     /**
206      * Performs a blocking read using the bytebuffer for data to be read and a selector to block.
207      * If the <code>selector</code> parameter is null, then it will perform a busy read that could
208      * take up a lot of CPU cycles.
209      * @param buf ByteBuffer - the buffer containing the data, we will read as until we have read at least one byte or we timed out
210      * @param socket SocketChannel - the socket to write data to
211      * @param selector Selector - the selector to use for blocking, if null then a busy read will be initiated
212      * @param readTimeout long - the timeout for this read operation in milliseconds, -1 means no timeout
213      * @return the number of bytes read
214      * @throws EOFException if read returns -1
215      * @throws SocketTimeoutException if the read times out
216      * @throws IOException if an IO Exception occurs in the underlying socket logic
217      */

218     public int read(ByteBuffer buf, NioChannel socket, Selector selector, long readTimeout)
219             throws IOException {
220         if (shared) {
221             return blockingSelector.read(buf, socket, readTimeout);
222         }
223         SelectionKey key = null;
224         int read = 0;
225         boolean timedout = false;
226         int keycount = 1; //assume we can write
227         long time = System.currentTimeMillis(); //start the timeout timer
228         try {
229             while (!timedout) {
230                 int cnt = 0;
231                 if (keycount > 0) { //only read if we were registered for a read
232                     cnt = socket.read(buf);
233                     if (cnt == -1) {
234                         if (read == 0) {
235                             read = -1;
236                         }
237                         break;
238                     }
239                     read += cnt;
240                     if (cnt > 0) continue//read some more
241                     if (cnt == 0 && read > 0) {
242                         break//we are done reading
243                     }
244                 }
245                 if (selector != null) {//perform a blocking read
246                     //register OP_WRITE to the selector
247                     if (key == null) {
248                         key = socket.getIOChannel().register(selector, SelectionKey.OP_READ);
249                     }
250                     else key.interestOps(SelectionKey.OP_READ);
251                     if (readTimeout == 0) {
252                         timedout = (read == 0);
253                     } else if (readTimeout < 0) {
254                         keycount = selector.select();
255                     } else {
256                         keycount = selector.select(readTimeout);
257                     }
258                 }
259                 if (readTimeout > 0 && (selector == null || keycount == 0) ) {
260                     timedout = (System.currentTimeMillis() - time) >= readTimeout;
261                 }
262             }
263             if (timedout) {
264                 throw new SocketTimeoutException();
265             }
266         } finally {
267             if (key != null) {
268                 key.cancel();
269                 if (selector != null) {
270                     selector.selectNow();//removes the key from this selector
271                 }
272             }
273         }
274         return read;
275     }
276
277     public void setMaxSelectors(int maxSelectors) {
278         this.maxSelectors = maxSelectors;
279     }
280
281     public void setMaxSpareSelectors(int maxSpareSelectors) {
282         this.maxSpareSelectors = maxSpareSelectors;
283     }
284
285     public void setEnabled(boolean enabled) {
286         this.enabled = enabled;
287     }
288
289     public void setSharedSelectorTimeout(long sharedSelectorTimeout) {
290         this.sharedSelectorTimeout = sharedSelectorTimeout;
291     }
292
293     public int getMaxSelectors() {
294         return maxSelectors;
295     }
296
297     public int getMaxSpareSelectors() {
298         return maxSpareSelectors;
299     }
300
301     public boolean isEnabled() {
302         return enabled;
303     }
304
305     public long getSharedSelectorTimeout() {
306         return sharedSelectorTimeout;
307     }
308
309     public ConcurrentLinkedQueue<Selector> getSelectors() {
310         return selectors;
311     }
312
313     public AtomicInteger getSpare() {
314         return spare;
315     }
316
317     public boolean isShared() {
318         return shared;
319     }
320
321     public void setShared(boolean shared) {
322         this.shared = shared;
323     }
324 }