1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package fulmine.context;
17
18 import static fulmine.util.Utils.CLOSE_BRACE;
19 import static fulmine.util.Utils.COLON;
20 import static fulmine.util.Utils.OPEN_BRACE;
21 import static fulmine.util.Utils.logException;
22 import static fulmine.util.Utils.nullCheck;
23
24 import java.net.InetAddress;
25 import java.net.UnknownHostException;
26 import java.util.Collection;
27 import java.util.TimerTask;
28
29 import org.apache.commons.lang.builder.HashCodeBuilder;
30
31 import fulmine.AbstractLifeCycle;
32 import fulmine.Domain;
33 import fulmine.IDomain;
34 import fulmine.IType;
35 import fulmine.Type;
36 import fulmine.distribution.IDistributionManager;
37 import fulmine.distribution.channel.IChannel;
38 import fulmine.distribution.connection.IConnectionBroker;
39 import fulmine.distribution.connection.IConnectionDiscoverer;
40 import fulmine.event.EventFrameExecution;
41 import fulmine.event.EventQueueItem;
42 import fulmine.event.IEvent;
43 import fulmine.event.IEventManager;
44 import fulmine.event.listener.IEventListener;
45 import fulmine.event.subscription.ISubscriptionListener;
46 import fulmine.event.system.ISystemEvent;
47 import fulmine.event.system.ISystemEventSource;
48 import fulmine.model.IModelManager;
49 import fulmine.model.container.IContainer;
50 import fulmine.model.container.IContainerFactory;
51 import fulmine.model.field.IField;
52 import fulmine.model.field.LongField;
53 import fulmine.protocol.specification.IFrameReader;
54 import fulmine.protocol.specification.IFrameWriter;
55 import fulmine.rpc.IRpcDefinition;
56 import fulmine.rpc.IRpcHandler;
57 import fulmine.rpc.IRpcManager;
58 import fulmine.rpc.IRpcMarker;
59 import fulmine.rpc.IRpcPublicationListener;
60 import fulmine.rpc.IRpcResult;
61 import fulmine.rpc.IRpcResultHandler;
62 import fulmine.util.concurrent.ThreadUtils;
63 import fulmine.util.log.AsyncLog;
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 public final class FulmineContext extends AbstractLifeCycle implements
83 IFrameworkContext
84 {
85
86
87
88
89
90
91 public static interface SystemInfoFields
92 {
93
94 String TX_COUNT = "tx count";
95
96
97 String RX_COUNT = "rx count";
98
99
100 String FREE_MEMORY = "free mem (k)";
101
102
103 String TOTAL_MEMORY = "total mem (k)";
104 }
105
106 private final static AsyncLog LOG = new AsyncLog(FulmineContext.class);
107
108 static
109 {
110 try
111 {
112
113 Class.forName(Type.class.getName());
114 Class.forName(Domain.class.getName());
115 ThreadUtils.startThreadDumps();
116 }
117 catch (Exception e)
118 {
119 LOG.fatal("Could not load required class", e);
120 System.exit(2);
121 }
122 }
123
124
125
126
127
128
129
130 private final class ShutdownTask implements Runnable
131 {
132 public void run()
133 {
134 FulmineContext.this.destroy();
135 }
136 }
137
138
139 private DefaultPermissionProfile permissionProfile;
140
141
142 INetwork network;
143
144
145 IConnectionBroker connectionBroker;
146
147
148 IConnectionDiscoverer discoverer;
149
150
151 private final IModelManager modelManager;
152
153
154 private final IEventManager eventManager;
155
156
157 private IDistributionManager distributionManager;
158
159
160 private final IRpcManager rpcManager;
161
162
163 private String identity;
164
165
166
167
168
169
170 private final int hashCode;
171
172
173 private final Thread shutdownThread;
174
175
176 private IContextWatchdog watchdog;
177
178
179 private final String eventProcessorIdentityPrefix;
180
181
182
183
184
185 private DelegatingRemoteUpdateHandler handler;
186
187
188 private IContainer systemInfo;
189
190
191
192
193
194
195
196
197
198
199 public FulmineContext(String identity)
200 {
201 this(identity, identity);
202 }
203
204
205
206
207
208
209
210
211
212 public FulmineContext(String identity, int processors)
213 {
214 this(identity, identity, processors);
215 }
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232 public FulmineContext(String identity, String eventProcessorIdentityPrefix)
233 {
234 this(identity, eventProcessorIdentityPrefix,
235 java.lang.Runtime.getRuntime().availableProcessors() + 1);
236 }
237
238
239
240
241
242
243
244
245
246
247
248
249 public FulmineContext(String identity, String eventProcessorIdentityPrefix,
250 int processors)
251 {
252 super();
253 this.permissionProfile = new DefaultPermissionProfile();
254 this.shutdownThread =
255 new Thread(new ShutdownTask(), identity + " context shutdown task");
256 java.lang.Runtime.getRuntime().addShutdownHook(this.shutdownThread);
257 nullCheck(identity, "Null identity");
258 nullCheck(eventProcessorIdentityPrefix,
259 "Null eventProcessorIdentityPrefix");
260 this.identity = identity;
261
262 if (identity.trim().equals(eventProcessorIdentityPrefix.trim()))
263 {
264 this.eventProcessorIdentityPrefix = identity;
265 }
266 else
267 {
268 if (eventProcessorIdentityPrefix.trim().endsWith(COLON))
269 {
270 this.eventProcessorIdentityPrefix =
271 eventProcessorIdentityPrefix.trim() + identity;
272 }
273 else
274 {
275 this.eventProcessorIdentityPrefix =
276 eventProcessorIdentityPrefix.trim() + COLON + identity;
277 }
278 }
279 this.eventManager =
280 new EventManager(this.eventProcessorIdentityPrefix,
281 (byte) processors, this);
282 this.distributionManager = new DistributionManager(this);
283 this.modelManager = new ModelManager(this);
284
285 try
286 {
287 int netName =
288 (InetAddress.getLocalHost().getHostAddress()).hashCode();
289 this.hashCode =
290 new HashCodeBuilder().append(identity).append(
291 System.identityHashCode(this)).append(netName).append(
292 System.nanoTime()).toHashCode();
293 }
294 catch (UnknownHostException e)
295 {
296 throw new RuntimeException("Could not get localhost for " + this);
297 }
298 this.rpcManager = new RpcManager(this);
299 }
300
301 public IContextWatchdog getContextWatchdog()
302 {
303 return this.watchdog;
304 }
305
306 public void setContextWatchdog(IContextWatchdog watchdog)
307 {
308 this.watchdog = watchdog;
309 }
310
311 protected IModelManager getModelManager()
312 {
313 return this.modelManager;
314 }
315
316 protected IEventManager getEventManager()
317 {
318 return this.eventManager;
319 }
320
321
322
323
324
325
326
327 public void setDistributionManager(IDistributionManager distributionManager)
328 {
329
330 if (this.distributionManager != null)
331 {
332 getDistributionManager().destroy();
333 }
334 this.distributionManager = distributionManager;
335 }
336
337 public IDistributionManager getDistributionManager()
338 {
339 return this.distributionManager;
340 }
341
342 public IPermissionProfile getPermissionProfile()
343 {
344 return this.permissionProfile;
345 }
346
347 public void setPermissionProfile(IPermissionProfile permissionProfile)
348 {
349 this.permissionProfile.setApplicationPermissions(permissionProfile);
350 }
351
352 @Override
353 protected void doStart()
354 {
355 getEventManager().start();
356 getModelManager().start();
357 getRpcManager().start();
358 getDistributionManager().start();
359
360 final IContainer systemInfo =
361 getLocalContainer(getIdentity() + ":SystemInfo", Type.SYSTEM,
362 Domain.FRAMEWORK);
363 this.systemInfo = systemInfo;
364 this.systemInfo.add(new LongField(SystemInfoFields.FREE_MEMORY));
365 this.systemInfo.add(new LongField(SystemInfoFields.TOTAL_MEMORY));
366
367 schedule(new TimerTask()
368 {
369 @Override
370 public void run()
371 {
372 EventQueueItem.isStatisticsEvent.set(Boolean.TRUE);
373 try
374 {
375 FulmineContext.this.systemInfo.beginFrame(new EventFrameExecution());
376 try
377 {
378 FulmineContext.this.systemInfo.getLongField(
379 SystemInfoFields.FREE_MEMORY).set(
380 Runtime.getRuntime().freeMemory() / 1000);
381 FulmineContext.this.systemInfo.getLongField(
382 SystemInfoFields.TOTAL_MEMORY).set(
383 Runtime.getRuntime().totalMemory() / 1000);
384 }
385 finally
386 {
387 FulmineContext.this.systemInfo.flushFrame();
388 }
389 }
390 finally
391 {
392 EventQueueItem.isStatisticsEvent.remove();
393 }
394 }
395 }, 10000, 10000);
396 if (getNetwork() != null)
397 {
398 setConnectionBroker(getNetwork().createBroker());
399 setConnectionDiscoverer(getNetwork().createDiscoverer());
400 getConnectionBroker().start();
401 if (getNetwork().isListeningOnlyMode())
402 {
403 getConnectionDiscoverer().disablePulsing();
404 }
405 getConnectionDiscoverer().start();
406
407 if (getHandler().getDelegate() == null)
408 {
409 getHandler().setDelegate(new VetoingRemoteUpdateHandler());
410 }
411
412 getRpcManager().publishRpcs(IRemoteUpdateHandler.class,
413 this.getHandler());
414 }
415 if (getLog().isInfoEnabled())
416 {
417 getLog().info("Started context " + this);
418 }
419 }
420
421 @Override
422 protected void doDestroy()
423 {
424
425
426
427
428
429
430 if (getConnectionDiscoverer() != null)
431 {
432 try
433 {
434 getConnectionDiscoverer().destroy();
435 }
436 catch (Exception e)
437 {
438 logException(getLog(), "discoverer", e);
439 }
440 }
441 if (getConnectionBroker() != null)
442 {
443 try
444 {
445 getConnectionBroker().destroy();
446 }
447 catch (Exception e)
448 {
449 logException(getLog(), "connectionBroker", e);
450 }
451 }
452 if (getNetwork() != null)
453 {
454 getNetwork().setContext(null);
455 }
456 getDistributionManager().destroy();
457 getRpcManager().destroy();
458 getModelManager().destroy();
459 getEventManager().destroy();
460 if (Thread.currentThread() != this.shutdownThread)
461 {
462 Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
463 }
464 }
465
466 public void addContainer(IContainer container)
467 {
468 getModelManager().addContainer(container);
469 }
470
471 public String getIdentity()
472 {
473 return this.identity;
474 }
475
476 public void setNetwork(INetwork network)
477 {
478 if (isActive())
479 {
480 throw new IllegalStateException(
481 "Must be called before starting the context");
482 }
483 this.network = network;
484 this.network.setContext(this);
485 }
486
487 public INetwork getNetwork()
488 {
489 return this.network;
490 }
491
492 public IContainer getLocalContainer(String identity, IType type,
493 IDomain domain)
494 {
495 return getModelManager().getLocalContainer(identity, type, domain);
496 }
497
498 public IContainer getRemoteContainer(String remoteContextIdentity,
499 String containerIdentity, IType type, IDomain domain)
500 {
501 return getModelManager().getRemoteContainer(remoteContextIdentity,
502 containerIdentity, type, domain);
503 }
504
505 public boolean containsRemoteContainer(String remoteContextIdentity,
506 String containerIdentity, IType type, IDomain domain)
507 {
508 return getModelManager().containsRemoteContainer(remoteContextIdentity,
509 containerIdentity, type, domain);
510 }
511
512 public boolean containsLocalContainer(String containerIdentity, IType type,
513 IDomain domain)
514 {
515 return getModelManager().containsLocalContainer(containerIdentity,
516 type, domain);
517 }
518
519 public void queueEvent(IEvent event)
520 {
521 getEventManager().queueEvent(event);
522 }
523
524 public void queueEvents(Collection<IEvent> events)
525 {
526 getEventManager().queueEvents(events);
527 }
528
529 public boolean removeContainer(IContainer container)
530 {
531 return getModelManager().removeContainer(container);
532 }
533
534 public boolean subscribe(String contextIdentity, String identityPattern,
535 IType type, IDomain domain, IEventListener listener)
536 {
537 return getDistributionManager().subscribe(contextIdentity,
538 identityPattern, type, domain, listener);
539 }
540
541 public boolean unsubscribe(String contextIdentity, String identityPattern,
542 IType type, IDomain domain, IEventListener listener)
543 {
544 return getDistributionManager().unsubscribe(contextIdentity,
545 identityPattern, type, domain, listener);
546 }
547
548 public IChannel[] getConnectedChannels()
549 {
550 return getDistributionManager().getConnectedChannels();
551 }
552
553 public int getEventProcessorCount()
554 {
555 return getEventManager().getEventProcessorCount();
556 }
557
558 public Collection<IContainer> getLocalContainers()
559 {
560 return getModelManager().getLocalContainers();
561 }
562
563 public Collection<IContainer> getRemoteContainers(
564 String remoteContextIdentity)
565 {
566 return getModelManager().getRemoteContainers(remoteContextIdentity);
567 }
568
569 public ThreadGroup getEventProcessorThreadGroup()
570 {
571 return getEventManager().getEventProcessorThreadGroup();
572 }
573
574 public IConnectionBroker getConnectionBroker()
575 {
576 return this.connectionBroker;
577 }
578
579 public void setConnectionBroker(IConnectionBroker broker)
580 {
581 this.connectionBroker = broker;
582 }
583
584 public IConnectionDiscoverer getConnectionDiscoverer()
585 {
586 return this.discoverer;
587 }
588
589 public void setConnectionDiscoverer(IConnectionDiscoverer discoverer)
590 {
591 this.discoverer = discoverer;
592 }
593
594 public IContainerFactory getContainerFactory()
595 {
596 return getModelManager().getContainerFactory();
597 }
598
599 public ISystemEventSource getSystemEventSource(
600 Class<? extends ISystemEvent> type)
601 {
602 return getEventManager().getSystemEventSource(type);
603 }
604
605 public void execute(Runnable task)
606 {
607 getEventManager().execute(task);
608 }
609
610 public IFrameReader getFrameReader()
611 {
612 return getDistributionManager().getFrameReader();
613 }
614
615 public IFrameWriter getFrameWriter()
616 {
617 return getDistributionManager().getFrameWriter();
618 }
619
620 public void requestRetransmit(String contextIdentity,
621 String identityRegularExpression, IType type, IDomain domain)
622 {
623 getDistributionManager().requestRetransmit(contextIdentity,
624 identityRegularExpression, type, domain);
625 }
626
627 public void requestRetransmitAll(String contextIdentity)
628 {
629 getDistributionManager().requestRetransmitAll(contextIdentity);
630 }
631
632 public void retransmit(String contextIdentity,
633 String identityRegularExpression, IType type, IDomain domain)
634 {
635 getDistributionManager().retransmit(contextIdentity,
636 identityRegularExpression, type, domain);
637 }
638
639 public void retransmitAll(String contextIdentity)
640 {
641 getDistributionManager().retransmitAll(contextIdentity);
642 }
643
644 public void retransmitAllToAll()
645 {
646 getDistributionManager().retransmitAllToAll();
647 }
648
649 public void retransmitToAll(String identityRegularExpression, IType type,
650 IDomain domain)
651 {
652 getDistributionManager().retransmitToAll(identityRegularExpression,
653 type, domain);
654 }
655
656 public boolean addSubscriptionListener(ISubscriptionListener listener)
657 {
658 return getDistributionManager().addSubscriptionListener(listener);
659 }
660
661 public boolean removeSubscriptionListener(ISubscriptionListener listener)
662 {
663 return getDistributionManager().removeSubscriptionListener(listener);
664 }
665
666 public int getContextHashCode()
667 {
668 return this.hashCode;
669 }
670
671 public String getEventProcessorIdentityPrefix()
672 {
673 return this.eventProcessorIdentityPrefix;
674 }
675
676 public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
677 {
678 getDistributionManager().invokeRpc(remoteContextIdentity, rpcData);
679 }
680
681 public boolean addRpcPublicationListener(String remoteContextIdentity,
682 IRpcPublicationListener listener)
683 {
684 return getRpcManager().addRpcPublicationListener(remoteContextIdentity,
685 listener);
686 }
687
688 public IRpcMarker invoke(IRpcResultHandler resultHandler,
689 String remoteContextIdentity, String procedure, IField... args)
690 {
691 return getRpcManager().invoke(resultHandler, remoteContextIdentity,
692 procedure, args);
693 }
694
695 public IRpcResult invoke(String remoteContextIdentity, String procedure,
696 IField... args)
697 {
698 return getRpcManager().invoke(remoteContextIdentity, procedure, args);
699 }
700
701 public boolean publishProdedure(IRpcHandler handler,
702 IRpcDefinition rpcDefinition)
703 {
704 return getRpcManager().publishProdedure(handler, rpcDefinition);
705 }
706
707 public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
708 {
709 return getRpcManager().unpublishProdedure(rpcDefinition);
710 }
711
712 public boolean unpublishRpcs(Class<?> definition, Object handler)
713 {
714 return getRpcManager().unpublishRpcs(definition, handler);
715 }
716
717 public boolean publishRpcs(Class<?> definition, Object handler)
718 {
719 return getRpcManager().publishRpcs(definition, handler);
720 }
721
722 public boolean removeRpcPublicationListener(String remoteContextIdentity,
723 IRpcPublicationListener listener)
724 {
725 return getRpcManager().removeRpcPublicationListener(
726 remoteContextIdentity, listener);
727 }
728
729 public void setRemoteUpdateHandler(IRemoteUpdateHandler handler)
730 {
731 handler.setContext(this);
732 getHandler().setDelegate(handler);
733 }
734
735 @Override
736 protected AsyncLog getLog()
737 {
738 return LOG;
739 }
740
741 private IRpcManager getRpcManager()
742 {
743 return this.rpcManager;
744 }
745
746 @Override
747 public final String toString()
748 {
749 return super.toString() + COLON + getIdentity() + OPEN_BRACE
750 + hashCode() + CLOSE_BRACE;
751 }
752
753 @Override
754 public final boolean equals(Object obj)
755 {
756 return super.equals(obj);
757 }
758
759 @Override
760 public final int hashCode()
761 {
762 return this.hashCode;
763 }
764
765 public String updateRemoteContainer(String remoteContextIdentity,
766 String identity, IType type, IDomain domain, String fieldName,
767 String fieldValueAsString)
768 {
769 return getDistributionManager().updateRemoteContainer(
770 remoteContextIdentity, identity, type, domain, fieldName,
771 fieldValueAsString);
772 }
773
774
775
776
777
778
779 private DelegatingRemoteUpdateHandler getHandler()
780 {
781 if (this.handler == null)
782 {
783 this.handler = new DelegatingRemoteUpdateHandler();
784 }
785 return handler;
786 }
787
788 public IContainer getSystemInfo()
789 {
790 return this.systemInfo;
791 }
792
793 public void schedule(TimerTask task, long delay, long period)
794 {
795 getEventManager().schedule(task, delay, period);
796 }
797 }