1 /**
2  *  Copyright Terracotta, Inc.
3  *
4  *  Licensed under the Apache License, Version 2.0 (the "License");
5  *  you may not use this file except in compliance with the License.
6  *  You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  *  Unless required by applicable law or agreed to in writing, software
11  *  distributed under the License is distributed on an "AS IS" BASIS,
12  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  *  See the License for the specific language governing permissions and
14  *  limitations under the License.
15  */

16
17 package net.sf.ehcache.terracotta;
18
19 import java.lang.management.ManagementFactory;
20 import java.lang.management.MemoryPoolMXBean;
21 import java.lang.management.MemoryUsage;
22 import java.lang.reflect.Method;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import net.sf.ehcache.CacheException;
33 import net.sf.ehcache.CacheManager;
34 import net.sf.ehcache.cluster.CacheCluster;
35 import net.sf.ehcache.cluster.ClusterNode;
36 import net.sf.ehcache.cluster.ClusterTopologyListener;
37 import net.sf.ehcache.config.CacheConfiguration;
38 import net.sf.ehcache.config.InvalidConfigurationException;
39 import net.sf.ehcache.config.MemoryUnit;
40 import net.sf.ehcache.config.TerracottaClientConfiguration;
41 import net.sf.ehcache.terracotta.TerracottaClusteredInstanceHelper.TerracottaRuntimeType;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46 /**
47  * Class encapsulating the idea of a Terracotta client. Provides access to the {@link ClusteredInstanceFactory} for the cluster
48  *
49  * @author Abhishek Sanoujam
50  *
51  */

52 public class TerracottaClient {
53
54     private static final Logger LOGGER = LoggerFactory.getLogger(TerracottaClient.class);
55     private static final int REJOIN_SLEEP_MILLIS_ON_EXCEPTION = Integer.getInteger("net.sf.ehcache.rejoin.sleepMillisOnException", 5000);
56
57     private final TerracottaClientConfiguration terracottaClientConfiguration;
58     private volatile ClusteredInstanceFactoryWrapper clusteredInstanceFactory;
59     private final TerracottaCacheCluster cacheCluster = new TerracottaCacheCluster();
60     private final RejoinWorker rejoinWorker = new RejoinWorker();
61     private final TerracottaClientRejoinListener rejoinListener;
62     private final CacheManager cacheManager;
63     private ExecutorService l1TerminatorThreadPool;
64
65     /**
66      * Constructor accepting the {@link TerracottaClientRejoinListener} and the {@link TerracottaClientConfiguration}
67      *
68      * @param cacheManager
69      * @param rejoinAction
70      * @param terracottaClientConfiguration
71      */

72     public TerracottaClient(CacheManager cacheManager, TerracottaClientRejoinListener rejoinAction,
73             TerracottaClientConfiguration terracottaClientConfiguration) {
74         this.cacheManager = cacheManager;
75         this.rejoinListener = rejoinAction;
76         this.terracottaClientConfiguration = terracottaClientConfiguration;
77         if (terracottaClientConfiguration != null) {
78             terracottaClientConfiguration.freezeConfig();
79         }
80         if (isRejoinEnabled()) {
81             TerracottaRuntimeType type = TerracottaClusteredInstanceHelper.getInstance().getTerracottaRuntimeTypeOrNull();
82             if (type == null) {
83                 throw new InvalidConfigurationException(
84                         "Terracotta Rejoin is enabled but can't determine Terracotta Runtime. You are probably missing Terracotta jar(s).");
85             }
86             if (type != TerracottaRuntimeType.EnterpriseExpress && type != TerracottaRuntimeType.Express) {
87                 throw new InvalidConfigurationException("Rejoin cannot be used in Terracotta DSO mode.");
88             }
89             Thread rejoinThread = new Thread(rejoinWorker, "Rejoin Worker Thread [cacheManager: " + cacheManager.getName() + "]");
90             rejoinThread.setDaemon(true);
91             rejoinThread.start();
92         }
93     }
94
95     /*
96      * --------- THIS METHOD IS NOT FOR PUBLIC USE ----------
97      * private method, used in unit-tests using reflection
98      *
99      * @param testHelper the mock TerracottaClusteredInstanceHelper for testing
100      */

101     private static void setTestMode(TerracottaClusteredInstanceHelper testHelper) {
102         try {
103             Method method = TerracottaClusteredInstanceHelper.class.getDeclaredMethod("setTestMode",
104                     TerracottaClusteredInstanceHelper.class);
105             method.setAccessible(true);
106             method.invoke(null, testHelper);
107         } catch (Exception e) {
108             // just print a stack trace and ignore
109             e.printStackTrace();
110         }
111     }
112
113     /**
114      * Returns the {@link ClusteredInstanceFactory} associated with this client
115      *
116      * @return The ClusteredInstanceFactory
117      */

118     public ClusteredInstanceFactory getClusteredInstanceFactory() {
119         rejoinWorker.waitUntilRejoinComplete();
120         return clusteredInstanceFactory;
121     }
122
123     /**
124      * Returns true if the clusteredInstanceFactory was created, otherwise returns false.
125      * Multiple threads calling this method block and only one of them creates the factory.
126      *
127      * @param cacheConfigs
128      * @return true if the clusteredInstanceFactory was created, otherwise returns false
129      */

130     public boolean createClusteredInstanceFactory(Map<String, CacheConfiguration> cacheConfigs) {
131         rejoinWorker.waitUntilRejoinComplete();
132         if (clusteredInstanceFactory != null) {
133             return false;
134         }
135         final boolean created;
136         synchronized (this) {
137             if (clusteredInstanceFactory == null) {
138                 clusteredInstanceFactory = createNewClusteredInstanceFactory(cacheConfigs);
139                 created = true;
140             } else {
141                 created = false;
142             }
143         }
144         return created;
145     }
146
147     /**
148      * Get the {@link CacheCluster} associated with this client
149      *
150      * @return the {@link CacheCluster} associated with this client
151      */

152     public TerracottaCacheCluster getCacheCluster() {
153         rejoinWorker.waitUntilRejoinComplete();
154         if (clusteredInstanceFactory == null) {
155             throw new CacheException("Cannot get CacheCluster as ClusteredInstanceFactory has not been initialized yet.");
156         }
157         return cacheCluster;
158     }
159
160     /**
161      * Shuts down the client
162      */

163     public synchronized void shutdown() {
164         rejoinWorker.waitUntilRejoinComplete();
165         rejoinWorker.shutdown();
166         if (clusteredInstanceFactory != null) {
167             shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
168         }
169     }
170
171     private void shutdownClusteredInstanceFactoryWrapper(ClusteredInstanceFactoryWrapper clusteredInstanceFactory) {
172         clusteredInstanceFactory.getActualFactory().getTopology().getTopologyListeners().clear();
173         clusteredInstanceFactory.shutdown();
174     }
175
176     private synchronized ClusteredInstanceFactoryWrapper createNewClusteredInstanceFactory(Map<String, CacheConfiguration> cacheConfigs) {
177         // shut down the old factory
178         if (clusteredInstanceFactory != null) {
179             info("Shutting down old ClusteredInstanceFactory...");
180             shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
181         }
182         info("Creating new ClusteredInstanceFactory");
183         ClusteredInstanceFactory factory;
184         CacheCluster underlyingCacheCluster = null;
185         try {
186             factory = TerracottaClusteredInstanceHelper.getInstance().newClusteredInstanceFactory(cacheConfigs,
187                     terracottaClientConfiguration);
188             underlyingCacheCluster = factory.getTopology();
189         } finally {
190             // always set up listener so that rejoin can happen upon nodeLeft
191             if (isRejoinEnabled()) {
192                 if (underlyingCacheCluster != null) {
193                     underlyingCacheCluster.addTopologyListener(new NodeLeftListener(this, underlyingCacheCluster
194                             .waitUntilNodeJoinsCluster()));
195                 } else {
196                     warn("Unable to register node left listener for rejoin");
197                 }
198             }
199         }
200
201         if (!rejoinWorker.isRejoinInProgress()) {
202             // set up the cacheCluster with the new underlying cache cluster if rejoin is not in progress
203             // else defer until rejoin is complete (to have node joined, online fired just before rejoin event)
204             cacheCluster.setUnderlyingCacheCluster(underlyingCacheCluster);
205         }
206
207         return new ClusteredInstanceFactoryWrapper(this, factory);
208     }
209
210     /**
211      * Block thread until rejoin is complete
212      */

213     protected void waitUntilRejoinComplete() {
214         rejoinWorker.waitUntilRejoinComplete();
215     }
216
217     private synchronized ExecutorService getL1TerminatorThreadPool() {
218         if (l1TerminatorThreadPool == null) {
219             l1TerminatorThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
220                 private final ThreadGroup threadGroup = new ThreadGroup("Rejoin Terminator Thread Group");
221
222                 public Thread newThread(Runnable runnable) {
223                     Thread t = new Thread(threadGroup, runnable, "L1 Terminator");
224                     t.setDaemon(true);
225                     return t;
226                 }
227             });
228         }
229         return l1TerminatorThreadPool;
230     }
231
232     /**
233      * Rejoins the cluster
234      */

235     private void rejoinCluster(final ClusterNode oldNode) {
236         if (!isRejoinEnabled()) {
237             return;
238         }
239         final Runnable rejoinRunnable = new Runnable() {
240             public void run() {
241                 if (rejoinWorker.isRejoinInProgress()) {
242                     debug("Current node (" + oldNode.getId() + ") left before rejoin could complete, force terminating current client");
243                     if (clusteredInstanceFactory != null) {
244                         // if the rejoin thread is stuck in terracotta stack, this will make the rejoin thread come out with
245                         // TCNotRunningException
246                         info("Shutting down old client");
247                         shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
248                         clusteredInstanceFactory = null;
249                     } else {
250                         warn("Current node (" + oldNode.getId() + ") left before rejoin could complete, but previous client is null");
251                     }
252                     // now interrupt the thread
253                     // this will interrupt the rejoin thread if its still stuck after L1 has been shutdown
254                     debug("Interrupting rejoin thread");
255                     rejoinWorker.rejoinThread.interrupt();
256                 }
257                 debug("Going to initiate rejoin");
258                 // initiate the rejoin
259                 rejoinWorker.startRejoin(oldNode);
260             }
261
262         };
263         if (rejoinWorker.isRejoinInProgress()) {
264             // if another rejoin was already in progress
265             // run in another thread, so that this thread (a thread from the L1) can just go back
266             // also mark that its forced shutdown first
267             rejoinWorker.setForcedShutdown();
268             getL1TerminatorThreadPool().execute(rejoinRunnable);
269         } else {
270             // no need to run in separate thread as this is just initiating the rejoin
271             rejoinRunnable.run();
272         }
273     }
274
275     private boolean isRejoinEnabled() {
276         return terracottaClientConfiguration != null && terracottaClientConfiguration.isRejoin();
277     }
278
279     private void info(String msg) {
280         info(msg, null);
281     }
282
283     private void info(String msg, Throwable t) {
284         if (t == null) {
285             LOGGER.info(getLogPrefix() + msg);
286         } else {
287             LOGGER.info(getLogPrefix() + msg, t);
288         }
289     }
290
291     private String getLogPrefix() {
292         return "Thread [" + Thread.currentThread().getName() + "] [cacheManager: " + getCacheManagerName() + "]: ";
293     }
294
295     private void debug(String msg) {
296         LOGGER.debug(getLogPrefix() + msg);
297     }
298
299     private void warn(String msg) {
300         LOGGER.warn(getLogPrefix() + msg);
301     }
302
303     private String getCacheManagerName() {
304         if (cacheManager.isNamed()) {
305             return "'" + cacheManager.getName() + "'";
306         } else {
307             return "no name";
308         }
309     }
310
311     /**
312      * Private class responsible for carrying out rejoin
313      *
314      * @author Abhishek Sanoujam
315      *
316      */

317     private class RejoinWorker implements Runnable {
318         private final Object rejoinSync = new Object();
319         private final RejoinStatus rejoinStatus = new RejoinStatus();
320         private final AtomicInteger rejoinCount = new AtomicInteger();
321         private final RejoinRequestHolder rejoinRequestHolder = new RejoinRequestHolder();
322         private volatile boolean shutdown;
323         private volatile Thread rejoinThread;
324         private volatile boolean forcedShutdown;
325
326         public void run() {
327             rejoinThread = Thread.currentThread();
328             while (!shutdown) {
329                 waitUntilRejoinRequested();
330                 if (shutdown || isJVMShuttingDown()) {
331                     break;
332                 }
333                 boolean rejoined = false;
334                 final RejoinRequest rejoinRequest = rejoinRequestHolder.consume();
335                 debug("Going to start rejoin for request: " + rejoinRequest);
336                 while (!rejoined) {
337                     try {
338                         doRejoin(rejoinRequest);
339                         rejoined = true;
340                     } catch (Exception e) {
341                         boolean forced = getAndClearForcedShutdown();
342                         if (forced) {
343                             info("Client was shutdown forcefully before rejoin completed", e);
344                             break;
345                         }
346                         LOGGER.warn("Caught exception while trying to rejoin cluster", e);
347                         if (isError(e)) {
348                             info("Rejoin worker thread exiting - unrecoverable error condition", e);
349                             shutdown = true;
350                             break;
351                         } else {
352                             info("Trying to rejoin again in " + REJOIN_SLEEP_MILLIS_ON_EXCEPTION + " msecs...");
353                             sleep(REJOIN_SLEEP_MILLIS_ON_EXCEPTION);
354                         }
355                     }
356                 }
357             }
358         }
359
360         private boolean isError(Throwable t) {
361             while (t != null) {
362                 if (t instanceof Error) {
363                     return true;
364                 } else {
365                     t = t.getCause();
366                 }
367             }
368             return false;
369         }
370
371         public synchronized boolean getAndClearForcedShutdown() {
372             boolean rv = forcedShutdown;
373             forcedShutdown = false;
374             return rv;
375         }
376
377         public synchronized void setForcedShutdown() {
378             forcedShutdown = true;
379         }
380
381         public boolean isRejoinInProgress() {
382             return rejoinStatus.isRejoinInProgress();
383         }
384
385         public synchronized boolean isJVMShuttingDown() {
386             try {
387                 // Detect whether the JVM is going down by adding a shutdown hook, if it's shutting down
388                 // we should get an IllegalStateException.
389                 Thread jvmShutdownCheckThread = new Thread();
390                 Runtime.getRuntime().addShutdownHook(jvmShutdownCheckThread);
391                 Runtime.getRuntime().removeShutdownHook(jvmShutdownCheckThread);
392                 return false;
393             } catch (IllegalStateException e) {
394                 return true;
395             }
396         }
397
398         private void sleep(long sleepMillis) {
399             try {
400                 Thread.sleep(sleepMillis);
401             } catch (InterruptedException e1) {
402                 // ignore
403             }
404         }
405
406         public void shutdown() {
407             synchronized (rejoinSync) {
408                 shutdown = true;
409                 rejoinSync.notifyAll();
410             }
411         }
412
413         private void doRejoin(RejoinRequest rejoinRequest) {
414             if (rejoinRequest == null) {
415                 return;
416             }
417             // copy the disconnected ClusterNode to prevent keeping a ref to it for the whole duration of
418             // the rejoin and allow the GC to immediately clean up the disconnected L1.
419             final ClusterNode oldNodeReference = new DisconnectedClusterNode(rejoinRequest.getRejoinOldNode());
420             rejoinStatus.rejoinStarted();
421             if (Thread.currentThread().isInterrupted()) {
422                 // clear interrupt status if set
423                 info("Clearing interrupt state of rejoin thread");
424                 Thread.currentThread().interrupted();
425             }
426             int rejoinNumber = rejoinCount.incrementAndGet();
427             info("Starting Terracotta Rejoin (as client id: " + (oldNodeReference == null ? "null" : oldNodeReference.getId())
428                     + " left the cluster) [rejoin count = " + rejoinNumber + "] ... ");
429             rejoinListener.clusterRejoinStarted();
430             clusteredInstanceFactory = createNewClusteredInstanceFactory(Collections.<String, CacheConfiguration>emptyMap());
431             // now reinitialize all existing caches with the new instance factory, outside lock
432             rejoinListener.clusterRejoinComplete();
433             // now fire the clusterRejoined event
434             fireClusterRejoinedEvent(oldNodeReference);
435             info("Rejoin Complete [rejoin count = " + rejoinNumber + "]");
436             rejoinStatus.rejoinComplete();
437         }
438
439         private void fireClusterRejoinedEvent(final ClusterNode oldNodeReference) {
440             // set up the cacheCluster with the new underlying cache cluster (to fire node joined and online events)
441             cacheCluster.setUnderlyingCacheCluster(clusteredInstanceFactory.getActualFactory().getTopology());
442             // add another listener here to fire the rejoin event only after receiving node joined and online
443             final CountDownLatch latch = new CountDownLatch(2);
444             FireRejoinEventListener fireRejoinEventListener = new FireRejoinEventListener(clusteredInstanceFactory.getActualFactory()
445                     .getTopology().waitUntilNodeJoinsCluster(), latch);
446             clusteredInstanceFactory.getActualFactory().getTopology().addTopologyListener(fireRejoinEventListener);
447
448             waitUntilLatchOpen(latch);
449             try {
450                 cacheCluster.fireNodeRejoinedEvent(oldNodeReference, cacheCluster.getCurrentNode());
451             } catch (Throwable e) {
452                 LOGGER.error("Caught exception while firing rejoin event", e);
453             }
454             clusteredInstanceFactory.getActualFactory().getTopology().removeTopologyListener(fireRejoinEventListener);
455         }
456
457         private void waitUntilLatchOpen(CountDownLatch latch) {
458             boolean done = false;
459             do {
460                 try {
461                     latch.await();
462                     done = true;
463                 } catch (InterruptedException e) {
464                     if (forcedShutdown) {
465                         throw new CacheException(e);
466                     } else {
467                         LOGGER.info("Ignoring interrupted exception while waiting for latch");
468                     }
469                 }
470             } while (!done);
471         }
472
473         private void waitUntilRejoinRequested() {
474             String message = "Rejoin worker waiting until rejoin requested";
475             List<MemoryPoolMXBean> memoryPoolMXBeans = ManagementFactory.getMemoryPoolMXBeans();
476             for (MemoryPoolMXBean memoryPoolMXBean : memoryPoolMXBeans) {
477                 String name = memoryPoolMXBean.getName();
478                 if (!name.contains("Perm Gen")) {
479                     continue;
480                 }
481                 MemoryUsage usage = memoryPoolMXBean.getUsage();
482                 message += " (" + name + " : " + MemoryUnit.BYTES.toMegaBytes(usage.getUsed()) + "M / "
483                            + MemoryUnit.BYTES.toMegaBytes(usage.getMax()) + "M)";
484             }
485             info(message + "...");
486
487             synchronized (rejoinSync) {
488                 while (!rejoinRequestHolder.isRejoinRequested()) {
489                     if (shutdown) {
490                         break;
491                     }
492                     try {
493                         rejoinSync.wait();
494                     } catch (InterruptedException e) {
495                         // ignore
496                     }
497                 }
498             }
499         }
500
501         public void startRejoin(ClusterNode oldNode) {
502             synchronized (rejoinSync) {
503                 rejoinRequestHolder.addRejoinRequest(oldNode);
504                 rejoinSync.notifyAll();
505             }
506         }
507
508         private void waitUntilRejoinComplete() {
509             if (rejoinThread == Thread.currentThread()) {
510                 return;
511             }
512             if (isRejoinEnabled()) {
513                 rejoinStatus.waitUntilRejoinComplete();
514             }
515         }
516     }
517
518     /**
519      * Private class maintaining rejoin requests
520      *
521      * @author Abhishek Sanoujam
522      *
523      */

524     private static class RejoinRequestHolder {
525         private RejoinRequest outstandingRequest;
526
527         public synchronized void addRejoinRequest(ClusterNode oldNode) {
528             // will hold only one pending rejoin
529             outstandingRequest = new RejoinRequest(oldNode);
530         }
531
532         public synchronized RejoinRequest consume() {
533             if (outstandingRequest == null) {
534                 return null;
535             }
536             RejoinRequest rv = outstandingRequest;
537             outstandingRequest = null;
538             return rv;
539         }
540
541         public synchronized boolean isRejoinRequested() {
542             return outstandingRequest != null;
543         }
544     }
545
546     /**
547      * Private class - Rejoin request bean
548      *
549      * @author Abhishek Sanoujam
550      *
551      */

552     private static class RejoinRequest {
553         private final ClusterNode oldNode;
554
555         public RejoinRequest(ClusterNode oldNode) {
556             this.oldNode = oldNode;
557         }
558
559         public ClusterNode getRejoinOldNode() {
560             return oldNode;
561         }
562
563         @Override
564         public String toString() {
565             return "RejoinRequest [oldNode=" + oldNode.getId() + "]";
566         }
567
568     }
569
570     /**
571      *
572      * A {@link ClusterTopologyListener} that listens for node left event for a node
573      *
574      * @author Abhishek Sanoujam
575      *
576      */

577     private static class NodeLeftListener implements ClusterTopologyListener {
578
579         private final ClusterNode currentNode;
580         private final TerracottaClient client;
581
582         /**
583          * Constructor accepting the client and the node to listen for
584          */

585         public NodeLeftListener(TerracottaClient client, ClusterNode currentNode) {
586             this.client = client;
587             this.currentNode = currentNode;
588             client.info("Registered interest for rejoin, current node: " + currentNode.getId());
589         }
590
591         /**
592          * {@inheritDoc}
593          */

594         public void nodeLeft(ClusterNode node) {
595             client.info("ClusterNode [id=" + node.getId() + "] left the cluster (currentNode=" + currentNode.getId() + ")");
596             if (node.equals(currentNode)) {
597                 client.rejoinCluster(node);
598             }
599         }
600
601         /**
602          * {@inheritDoc}
603          */

604         public void clusterOffline(ClusterNode node) {
605             client.info("ClusterNode [id=" + node.getId() + "] went offline (currentNode=" + currentNode.getId() + ")");
606         }
607
608         /**
609          * {@inheritDoc}
610          */

611         public void clusterOnline(ClusterNode node) {
612             client.info("ClusterNode [id=" + node.getId() + "] became online (currentNode=" + currentNode.getId() + ")");
613         }
614
615         /**
616          * {@inheritDoc}
617          */

618         public void nodeJoined(ClusterNode node) {
619             client.info("ClusterNode [id=" + node.getId() + "] joined the cluster (currentNode=" + currentNode.getId() + ")");
620         }
621
622         /**
623          * {@inheritDoc}
624          */

625         public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
626             client.info("ClusterNode [id=" + oldNode.getId() + "] rejoined cluster as ClusterNode [id=" + newNode.getId()
627                     + "] (currentNode=" + currentNode.getId() + ")");
628         }
629
630     }
631
632     /**
633      * Private class maintaining the rejoin state of the client
634      *
635      * @author Abhishek Sanoujam
636      *
637      */

638     private static class RejoinStatus {
639
640         /**
641          * Rejoin state enum
642          *
643          * @author Abhishek Sanoujam
644          *
645          */

646         enum RejoinState {
647             IN_PROGRESS, NOT_IN_PROGRESS;
648         }
649
650         private volatile RejoinState state = RejoinState.NOT_IN_PROGRESS;
651
652         /**
653          * Returns true if rejoin is in progress
654          *
655          * @return true if rejoin is in progress
656          */

657         public boolean isRejoinInProgress() {
658             return state == RejoinState.IN_PROGRESS;
659         }
660
661         /**
662          * Waits until rejoin is complete if in progress
663          */

664         public synchronized void waitUntilRejoinComplete() {
665             boolean interrupted = false;
666             while (state == RejoinState.IN_PROGRESS) {
667                 try {
668                     wait();
669                 } catch (InterruptedException e) {
670                     interrupted = true;
671                 }
672             }
673             if (interrupted) {
674                 Thread.currentThread().interrupt();
675             }
676         }
677
678         /**
679          * Set the status to rejoin in progress
680          */

681         public synchronized void rejoinStarted() {
682             state = RejoinState.IN_PROGRESS;
683             notifyAll();
684         }
685
686         /**
687          * Set the rejoin status to not in progress
688          */

689         public synchronized void rejoinComplete() {
690             state = RejoinState.NOT_IN_PROGRESS;
691             notifyAll();
692         }
693
694     }
695
696     /**
697      * Event listener that counts down on receiving node join and online event
698      *
699      * @author Abhishek Sanoujam
700      *
701      */

702     private static class FireRejoinEventListener implements ClusterTopologyListener {
703
704         private final CountDownLatch latch;
705         private final ClusterNode currentNode;
706
707         /**
708          * Constructor
709          *
710          * @param clusterNode
711          * @param latch
712          */

713         public FireRejoinEventListener(ClusterNode currentNode, CountDownLatch latch) {
714             this.currentNode = currentNode;
715             this.latch = latch;
716         }
717
718         /**
719          * {@inheritDoc}
720          */

721         public void nodeJoined(ClusterNode node) {
722             if (node.equals(currentNode)) {
723                 latch.countDown();
724             }
725         }
726
727         /**
728          * {@inheritDoc}
729          */

730         public void clusterOnline(ClusterNode node) {
731             if (node.equals(currentNode)) {
732                 latch.countDown();
733             }
734         }
735
736         /**
737          * {@inheritDoc}
738          */

739         public void nodeLeft(ClusterNode node) {
740             // no-op
741         }
742
743         /**
744          * {@inheritDoc}
745          */

746         public void clusterOffline(ClusterNode node) {
747             // no-op
748         }
749
750         /**
751          * {@inheritDoc}
752          */

753         public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
754             // no-op
755         }
756
757     }
758
759 }
760