1
16
17 package net.sf.ehcache.terracotta;
18
19 import java.lang.management.ManagementFactory;
20 import java.lang.management.MemoryPoolMXBean;
21 import java.lang.management.MemoryUsage;
22 import java.lang.reflect.Method;
23 import java.util.Collections;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.CountDownLatch;
27 import java.util.concurrent.ExecutorService;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadFactory;
30 import java.util.concurrent.atomic.AtomicInteger;
31
32 import net.sf.ehcache.CacheException;
33 import net.sf.ehcache.CacheManager;
34 import net.sf.ehcache.cluster.CacheCluster;
35 import net.sf.ehcache.cluster.ClusterNode;
36 import net.sf.ehcache.cluster.ClusterTopologyListener;
37 import net.sf.ehcache.config.CacheConfiguration;
38 import net.sf.ehcache.config.InvalidConfigurationException;
39 import net.sf.ehcache.config.MemoryUnit;
40 import net.sf.ehcache.config.TerracottaClientConfiguration;
41 import net.sf.ehcache.terracotta.TerracottaClusteredInstanceHelper.TerracottaRuntimeType;
42
43 import org.slf4j.Logger;
44 import org.slf4j.LoggerFactory;
45
46
52 public class TerracottaClient {
53
54 private static final Logger LOGGER = LoggerFactory.getLogger(TerracottaClient.class);
55 private static final int REJOIN_SLEEP_MILLIS_ON_EXCEPTION = Integer.getInteger("net.sf.ehcache.rejoin.sleepMillisOnException", 5000);
56
57 private final TerracottaClientConfiguration terracottaClientConfiguration;
58 private volatile ClusteredInstanceFactoryWrapper clusteredInstanceFactory;
59 private final TerracottaCacheCluster cacheCluster = new TerracottaCacheCluster();
60 private final RejoinWorker rejoinWorker = new RejoinWorker();
61 private final TerracottaClientRejoinListener rejoinListener;
62 private final CacheManager cacheManager;
63 private ExecutorService l1TerminatorThreadPool;
64
65
72 public TerracottaClient(CacheManager cacheManager, TerracottaClientRejoinListener rejoinAction,
73 TerracottaClientConfiguration terracottaClientConfiguration) {
74 this.cacheManager = cacheManager;
75 this.rejoinListener = rejoinAction;
76 this.terracottaClientConfiguration = terracottaClientConfiguration;
77 if (terracottaClientConfiguration != null) {
78 terracottaClientConfiguration.freezeConfig();
79 }
80 if (isRejoinEnabled()) {
81 TerracottaRuntimeType type = TerracottaClusteredInstanceHelper.getInstance().getTerracottaRuntimeTypeOrNull();
82 if (type == null) {
83 throw new InvalidConfigurationException(
84 "Terracotta Rejoin is enabled but can't determine Terracotta Runtime. You are probably missing Terracotta jar(s).");
85 }
86 if (type != TerracottaRuntimeType.EnterpriseExpress && type != TerracottaRuntimeType.Express) {
87 throw new InvalidConfigurationException("Rejoin cannot be used in Terracotta DSO mode.");
88 }
89 Thread rejoinThread = new Thread(rejoinWorker, "Rejoin Worker Thread [cacheManager: " + cacheManager.getName() + "]");
90 rejoinThread.setDaemon(true);
91 rejoinThread.start();
92 }
93 }
94
95
101 private static void setTestMode(TerracottaClusteredInstanceHelper testHelper) {
102 try {
103 Method method = TerracottaClusteredInstanceHelper.class.getDeclaredMethod("setTestMode",
104 TerracottaClusteredInstanceHelper.class);
105 method.setAccessible(true);
106 method.invoke(null, testHelper);
107 } catch (Exception e) {
108
109 e.printStackTrace();
110 }
111 }
112
113
118 public ClusteredInstanceFactory getClusteredInstanceFactory() {
119 rejoinWorker.waitUntilRejoinComplete();
120 return clusteredInstanceFactory;
121 }
122
123
130 public boolean createClusteredInstanceFactory(Map<String, CacheConfiguration> cacheConfigs) {
131 rejoinWorker.waitUntilRejoinComplete();
132 if (clusteredInstanceFactory != null) {
133 return false;
134 }
135 final boolean created;
136 synchronized (this) {
137 if (clusteredInstanceFactory == null) {
138 clusteredInstanceFactory = createNewClusteredInstanceFactory(cacheConfigs);
139 created = true;
140 } else {
141 created = false;
142 }
143 }
144 return created;
145 }
146
147
152 public TerracottaCacheCluster getCacheCluster() {
153 rejoinWorker.waitUntilRejoinComplete();
154 if (clusteredInstanceFactory == null) {
155 throw new CacheException("Cannot get CacheCluster as ClusteredInstanceFactory has not been initialized yet.");
156 }
157 return cacheCluster;
158 }
159
160
163 public synchronized void shutdown() {
164 rejoinWorker.waitUntilRejoinComplete();
165 rejoinWorker.shutdown();
166 if (clusteredInstanceFactory != null) {
167 shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
168 }
169 }
170
171 private void shutdownClusteredInstanceFactoryWrapper(ClusteredInstanceFactoryWrapper clusteredInstanceFactory) {
172 clusteredInstanceFactory.getActualFactory().getTopology().getTopologyListeners().clear();
173 clusteredInstanceFactory.shutdown();
174 }
175
176 private synchronized ClusteredInstanceFactoryWrapper createNewClusteredInstanceFactory(Map<String, CacheConfiguration> cacheConfigs) {
177
178 if (clusteredInstanceFactory != null) {
179 info("Shutting down old ClusteredInstanceFactory...");
180 shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
181 }
182 info("Creating new ClusteredInstanceFactory");
183 ClusteredInstanceFactory factory;
184 CacheCluster underlyingCacheCluster = null;
185 try {
186 factory = TerracottaClusteredInstanceHelper.getInstance().newClusteredInstanceFactory(cacheConfigs,
187 terracottaClientConfiguration);
188 underlyingCacheCluster = factory.getTopology();
189 } finally {
190
191 if (isRejoinEnabled()) {
192 if (underlyingCacheCluster != null) {
193 underlyingCacheCluster.addTopologyListener(new NodeLeftListener(this, underlyingCacheCluster
194 .waitUntilNodeJoinsCluster()));
195 } else {
196 warn("Unable to register node left listener for rejoin");
197 }
198 }
199 }
200
201 if (!rejoinWorker.isRejoinInProgress()) {
202
203
204 cacheCluster.setUnderlyingCacheCluster(underlyingCacheCluster);
205 }
206
207 return new ClusteredInstanceFactoryWrapper(this, factory);
208 }
209
210
213 protected void waitUntilRejoinComplete() {
214 rejoinWorker.waitUntilRejoinComplete();
215 }
216
217 private synchronized ExecutorService getL1TerminatorThreadPool() {
218 if (l1TerminatorThreadPool == null) {
219 l1TerminatorThreadPool = Executors.newCachedThreadPool(new ThreadFactory() {
220 private final ThreadGroup threadGroup = new ThreadGroup("Rejoin Terminator Thread Group");
221
222 public Thread newThread(Runnable runnable) {
223 Thread t = new Thread(threadGroup, runnable, "L1 Terminator");
224 t.setDaemon(true);
225 return t;
226 }
227 });
228 }
229 return l1TerminatorThreadPool;
230 }
231
232
235 private void rejoinCluster(final ClusterNode oldNode) {
236 if (!isRejoinEnabled()) {
237 return;
238 }
239 final Runnable rejoinRunnable = new Runnable() {
240 public void run() {
241 if (rejoinWorker.isRejoinInProgress()) {
242 debug("Current node (" + oldNode.getId() + ") left before rejoin could complete, force terminating current client");
243 if (clusteredInstanceFactory != null) {
244
245
246 info("Shutting down old client");
247 shutdownClusteredInstanceFactoryWrapper(clusteredInstanceFactory);
248 clusteredInstanceFactory = null;
249 } else {
250 warn("Current node (" + oldNode.getId() + ") left before rejoin could complete, but previous client is null");
251 }
252
253
254 debug("Interrupting rejoin thread");
255 rejoinWorker.rejoinThread.interrupt();
256 }
257 debug("Going to initiate rejoin");
258
259 rejoinWorker.startRejoin(oldNode);
260 }
261
262 };
263 if (rejoinWorker.isRejoinInProgress()) {
264
265
266
267 rejoinWorker.setForcedShutdown();
268 getL1TerminatorThreadPool().execute(rejoinRunnable);
269 } else {
270
271 rejoinRunnable.run();
272 }
273 }
274
275 private boolean isRejoinEnabled() {
276 return terracottaClientConfiguration != null && terracottaClientConfiguration.isRejoin();
277 }
278
279 private void info(String msg) {
280 info(msg, null);
281 }
282
283 private void info(String msg, Throwable t) {
284 if (t == null) {
285 LOGGER.info(getLogPrefix() + msg);
286 } else {
287 LOGGER.info(getLogPrefix() + msg, t);
288 }
289 }
290
291 private String getLogPrefix() {
292 return "Thread [" + Thread.currentThread().getName() + "] [cacheManager: " + getCacheManagerName() + "]: ";
293 }
294
295 private void debug(String msg) {
296 LOGGER.debug(getLogPrefix() + msg);
297 }
298
299 private void warn(String msg) {
300 LOGGER.warn(getLogPrefix() + msg);
301 }
302
303 private String getCacheManagerName() {
304 if (cacheManager.isNamed()) {
305 return "'" + cacheManager.getName() + "'";
306 } else {
307 return "no name";
308 }
309 }
310
311
317 private class RejoinWorker implements Runnable {
318 private final Object rejoinSync = new Object();
319 private final RejoinStatus rejoinStatus = new RejoinStatus();
320 private final AtomicInteger rejoinCount = new AtomicInteger();
321 private final RejoinRequestHolder rejoinRequestHolder = new RejoinRequestHolder();
322 private volatile boolean shutdown;
323 private volatile Thread rejoinThread;
324 private volatile boolean forcedShutdown;
325
326 public void run() {
327 rejoinThread = Thread.currentThread();
328 while (!shutdown) {
329 waitUntilRejoinRequested();
330 if (shutdown || isJVMShuttingDown()) {
331 break;
332 }
333 boolean rejoined = false;
334 final RejoinRequest rejoinRequest = rejoinRequestHolder.consume();
335 debug("Going to start rejoin for request: " + rejoinRequest);
336 while (!rejoined) {
337 try {
338 doRejoin(rejoinRequest);
339 rejoined = true;
340 } catch (Exception e) {
341 boolean forced = getAndClearForcedShutdown();
342 if (forced) {
343 info("Client was shutdown forcefully before rejoin completed", e);
344 break;
345 }
346 LOGGER.warn("Caught exception while trying to rejoin cluster", e);
347 if (isError(e)) {
348 info("Rejoin worker thread exiting - unrecoverable error condition", e);
349 shutdown = true;
350 break;
351 } else {
352 info("Trying to rejoin again in " + REJOIN_SLEEP_MILLIS_ON_EXCEPTION + " msecs...");
353 sleep(REJOIN_SLEEP_MILLIS_ON_EXCEPTION);
354 }
355 }
356 }
357 }
358 }
359
360 private boolean isError(Throwable t) {
361 while (t != null) {
362 if (t instanceof Error) {
363 return true;
364 } else {
365 t = t.getCause();
366 }
367 }
368 return false;
369 }
370
371 public synchronized boolean getAndClearForcedShutdown() {
372 boolean rv = forcedShutdown;
373 forcedShutdown = false;
374 return rv;
375 }
376
377 public synchronized void setForcedShutdown() {
378 forcedShutdown = true;
379 }
380
381 public boolean isRejoinInProgress() {
382 return rejoinStatus.isRejoinInProgress();
383 }
384
385 public synchronized boolean isJVMShuttingDown() {
386 try {
387
388
389 Thread jvmShutdownCheckThread = new Thread();
390 Runtime.getRuntime().addShutdownHook(jvmShutdownCheckThread);
391 Runtime.getRuntime().removeShutdownHook(jvmShutdownCheckThread);
392 return false;
393 } catch (IllegalStateException e) {
394 return true;
395 }
396 }
397
398 private void sleep(long sleepMillis) {
399 try {
400 Thread.sleep(sleepMillis);
401 } catch (InterruptedException e1) {
402
403 }
404 }
405
406 public void shutdown() {
407 synchronized (rejoinSync) {
408 shutdown = true;
409 rejoinSync.notifyAll();
410 }
411 }
412
413 private void doRejoin(RejoinRequest rejoinRequest) {
414 if (rejoinRequest == null) {
415 return;
416 }
417
418
419 final ClusterNode oldNodeReference = new DisconnectedClusterNode(rejoinRequest.getRejoinOldNode());
420 rejoinStatus.rejoinStarted();
421 if (Thread.currentThread().isInterrupted()) {
422
423 info("Clearing interrupt state of rejoin thread");
424 Thread.currentThread().interrupted();
425 }
426 int rejoinNumber = rejoinCount.incrementAndGet();
427 info("Starting Terracotta Rejoin (as client id: " + (oldNodeReference == null ? "null" : oldNodeReference.getId())
428 + " left the cluster) [rejoin count = " + rejoinNumber + "] ... ");
429 rejoinListener.clusterRejoinStarted();
430 clusteredInstanceFactory = createNewClusteredInstanceFactory(Collections.<String, CacheConfiguration>emptyMap());
431
432 rejoinListener.clusterRejoinComplete();
433
434 fireClusterRejoinedEvent(oldNodeReference);
435 info("Rejoin Complete [rejoin count = " + rejoinNumber + "]");
436 rejoinStatus.rejoinComplete();
437 }
438
439 private void fireClusterRejoinedEvent(final ClusterNode oldNodeReference) {
440
441 cacheCluster.setUnderlyingCacheCluster(clusteredInstanceFactory.getActualFactory().getTopology());
442
443 final CountDownLatch latch = new CountDownLatch(2);
444 FireRejoinEventListener fireRejoinEventListener = new FireRejoinEventListener(clusteredInstanceFactory.getActualFactory()
445 .getTopology().waitUntilNodeJoinsCluster(), latch);
446 clusteredInstanceFactory.getActualFactory().getTopology().addTopologyListener(fireRejoinEventListener);
447
448 waitUntilLatchOpen(latch);
449 try {
450 cacheCluster.fireNodeRejoinedEvent(oldNodeReference, cacheCluster.getCurrentNode());
451 } catch (Throwable e) {
452 LOGGER.error("Caught exception while firing rejoin event", e);
453 }
454 clusteredInstanceFactory.getActualFactory().getTopology().removeTopologyListener(fireRejoinEventListener);
455 }
456
457 private void waitUntilLatchOpen(CountDownLatch latch) {
458 boolean done = false;
459 do {
460 try {
461 latch.await();
462 done = true;
463 } catch (InterruptedException e) {
464 if (forcedShutdown) {
465 throw new CacheException(e);
466 } else {
467 LOGGER.info("Ignoring interrupted exception while waiting for latch");
468 }
469 }
470 } while (!done);
471 }
472
473 private void waitUntilRejoinRequested() {
474 String message = "Rejoin worker waiting until rejoin requested";
475 List<MemoryPoolMXBean> memoryPoolMXBeans = ManagementFactory.getMemoryPoolMXBeans();
476 for (MemoryPoolMXBean memoryPoolMXBean : memoryPoolMXBeans) {
477 String name = memoryPoolMXBean.getName();
478 if (!name.contains("Perm Gen")) {
479 continue;
480 }
481 MemoryUsage usage = memoryPoolMXBean.getUsage();
482 message += " (" + name + " : " + MemoryUnit.BYTES.toMegaBytes(usage.getUsed()) + "M / "
483 + MemoryUnit.BYTES.toMegaBytes(usage.getMax()) + "M)";
484 }
485 info(message + "...");
486
487 synchronized (rejoinSync) {
488 while (!rejoinRequestHolder.isRejoinRequested()) {
489 if (shutdown) {
490 break;
491 }
492 try {
493 rejoinSync.wait();
494 } catch (InterruptedException e) {
495
496 }
497 }
498 }
499 }
500
501 public void startRejoin(ClusterNode oldNode) {
502 synchronized (rejoinSync) {
503 rejoinRequestHolder.addRejoinRequest(oldNode);
504 rejoinSync.notifyAll();
505 }
506 }
507
508 private void waitUntilRejoinComplete() {
509 if (rejoinThread == Thread.currentThread()) {
510 return;
511 }
512 if (isRejoinEnabled()) {
513 rejoinStatus.waitUntilRejoinComplete();
514 }
515 }
516 }
517
518
524 private static class RejoinRequestHolder {
525 private RejoinRequest outstandingRequest;
526
527 public synchronized void addRejoinRequest(ClusterNode oldNode) {
528
529 outstandingRequest = new RejoinRequest(oldNode);
530 }
531
532 public synchronized RejoinRequest consume() {
533 if (outstandingRequest == null) {
534 return null;
535 }
536 RejoinRequest rv = outstandingRequest;
537 outstandingRequest = null;
538 return rv;
539 }
540
541 public synchronized boolean isRejoinRequested() {
542 return outstandingRequest != null;
543 }
544 }
545
546
552 private static class RejoinRequest {
553 private final ClusterNode oldNode;
554
555 public RejoinRequest(ClusterNode oldNode) {
556 this.oldNode = oldNode;
557 }
558
559 public ClusterNode getRejoinOldNode() {
560 return oldNode;
561 }
562
563 @Override
564 public String toString() {
565 return "RejoinRequest [oldNode=" + oldNode.getId() + "]";
566 }
567
568 }
569
570
577 private static class NodeLeftListener implements ClusterTopologyListener {
578
579 private final ClusterNode currentNode;
580 private final TerracottaClient client;
581
582
585 public NodeLeftListener(TerracottaClient client, ClusterNode currentNode) {
586 this.client = client;
587 this.currentNode = currentNode;
588 client.info("Registered interest for rejoin, current node: " + currentNode.getId());
589 }
590
591
594 public void nodeLeft(ClusterNode node) {
595 client.info("ClusterNode [id=" + node.getId() + "] left the cluster (currentNode=" + currentNode.getId() + ")");
596 if (node.equals(currentNode)) {
597 client.rejoinCluster(node);
598 }
599 }
600
601
604 public void clusterOffline(ClusterNode node) {
605 client.info("ClusterNode [id=" + node.getId() + "] went offline (currentNode=" + currentNode.getId() + ")");
606 }
607
608
611 public void clusterOnline(ClusterNode node) {
612 client.info("ClusterNode [id=" + node.getId() + "] became online (currentNode=" + currentNode.getId() + ")");
613 }
614
615
618 public void nodeJoined(ClusterNode node) {
619 client.info("ClusterNode [id=" + node.getId() + "] joined the cluster (currentNode=" + currentNode.getId() + ")");
620 }
621
622
625 public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
626 client.info("ClusterNode [id=" + oldNode.getId() + "] rejoined cluster as ClusterNode [id=" + newNode.getId()
627 + "] (currentNode=" + currentNode.getId() + ")");
628 }
629
630 }
631
632
638 private static class RejoinStatus {
639
640
646 enum RejoinState {
647 IN_PROGRESS, NOT_IN_PROGRESS;
648 }
649
650 private volatile RejoinState state = RejoinState.NOT_IN_PROGRESS;
651
652
657 public boolean isRejoinInProgress() {
658 return state == RejoinState.IN_PROGRESS;
659 }
660
661
664 public synchronized void waitUntilRejoinComplete() {
665 boolean interrupted = false;
666 while (state == RejoinState.IN_PROGRESS) {
667 try {
668 wait();
669 } catch (InterruptedException e) {
670 interrupted = true;
671 }
672 }
673 if (interrupted) {
674 Thread.currentThread().interrupt();
675 }
676 }
677
678
681 public synchronized void rejoinStarted() {
682 state = RejoinState.IN_PROGRESS;
683 notifyAll();
684 }
685
686
689 public synchronized void rejoinComplete() {
690 state = RejoinState.NOT_IN_PROGRESS;
691 notifyAll();
692 }
693
694 }
695
696
702 private static class FireRejoinEventListener implements ClusterTopologyListener {
703
704 private final CountDownLatch latch;
705 private final ClusterNode currentNode;
706
707
713 public FireRejoinEventListener(ClusterNode currentNode, CountDownLatch latch) {
714 this.currentNode = currentNode;
715 this.latch = latch;
716 }
717
718
721 public void nodeJoined(ClusterNode node) {
722 if (node.equals(currentNode)) {
723 latch.countDown();
724 }
725 }
726
727
730 public void clusterOnline(ClusterNode node) {
731 if (node.equals(currentNode)) {
732 latch.countDown();
733 }
734 }
735
736
739 public void nodeLeft(ClusterNode node) {
740
741 }
742
743
746 public void clusterOffline(ClusterNode node) {
747
748 }
749
750
753 public void clusterRejoined(ClusterNode oldNode, ClusterNode newNode) {
754
755 }
756
757 }
758
759 }
760