View Javadoc

1   /*
2      Copyright 2007 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.context;
17  
18  import static fulmine.util.Utils.CLOSE_BRACE;
19  import static fulmine.util.Utils.COLON;
20  import static fulmine.util.Utils.OPEN_BRACE;
21  import static fulmine.util.Utils.logException;
22  import static fulmine.util.Utils.nullCheck;
23  
24  import java.net.InetAddress;
25  import java.net.UnknownHostException;
26  import java.util.Collection;
27  import java.util.TimerTask;
28  
29  import org.apache.commons.lang.builder.HashCodeBuilder;
30  
31  import fulmine.AbstractLifeCycle;
32  import fulmine.Domain;
33  import fulmine.IDomain;
34  import fulmine.IType;
35  import fulmine.Type;
36  import fulmine.distribution.IDistributionManager;
37  import fulmine.distribution.channel.IChannel;
38  import fulmine.distribution.connection.IConnectionBroker;
39  import fulmine.distribution.connection.IConnectionDiscoverer;
40  import fulmine.event.EventFrameExecution;
41  import fulmine.event.EventQueueItem;
42  import fulmine.event.IEvent;
43  import fulmine.event.IEventManager;
44  import fulmine.event.listener.IEventListener;
45  import fulmine.event.subscription.ISubscriptionListener;
46  import fulmine.event.system.ISystemEvent;
47  import fulmine.event.system.ISystemEventSource;
48  import fulmine.model.IModelManager;
49  import fulmine.model.container.IContainer;
50  import fulmine.model.container.IContainerFactory;
51  import fulmine.model.field.IField;
52  import fulmine.model.field.LongField;
53  import fulmine.protocol.specification.IFrameReader;
54  import fulmine.protocol.specification.IFrameWriter;
55  import fulmine.rpc.IRpcDefinition;
56  import fulmine.rpc.IRpcHandler;
57  import fulmine.rpc.IRpcManager;
58  import fulmine.rpc.IRpcMarker;
59  import fulmine.rpc.IRpcPublicationListener;
60  import fulmine.rpc.IRpcResult;
61  import fulmine.rpc.IRpcResultHandler;
62  import fulmine.util.concurrent.ThreadUtils;
63  import fulmine.util.log.AsyncLog;
64  
65  /**
66   * The default {@link IFrameworkContext} implementation composed of
67   * collaborating implementations:
68   * <ul>
69   * <li>{@link IModelManager}
70   * <li>{@link IEventManager}
71   * <li>{@link IDistributionManager}
72   * <li>{@link IRpcManager}
73   * </ul>
74   * <p>
75   * This is created with a {@link DefaultPermissionProfile}. By default, all
76   * remote container updates are vetoed using an internal
77   * {@link VetoingRemoteUpdateHandler}, this can be replaced using an appropriate
78   * handler via {@link #setRemoteUpdateHandler(IRemoteUpdateHandler)}.
79   * 
80   * @author Ramon Servadei
81   */
82  public final class FulmineContext extends AbstractLifeCycle implements
83      IFrameworkContext
84  {
85  
86      /**
87       * Encapsulates the field names for the system info record
88       * 
89       * @author Ramon Servadei
90       */
91      public static interface SystemInfoFields
92      {
93          /** Key name for the tx counts in the system info record */
94          String TX_COUNT = "tx count";
95  
96          /** Key name for the rx counts in the system info record */
97          String RX_COUNT = "rx count";
98  
99          /** Key name for the free memory field in the system info record */
100         String FREE_MEMORY = "free mem (k)";
101 
102         /** Key name for the total memory field in the system info record */
103         String TOTAL_MEMORY = "total mem (k)";
104     }
105 
106     private final static AsyncLog LOG = new AsyncLog(FulmineContext.class);
107 
108     static
109     {
110         try
111         {
112             // force a class load to set framework types
113             Class.forName(Type.class.getName());
114             Class.forName(Domain.class.getName());
115             ThreadUtils.startThreadDumps();
116         }
117         catch (Exception e)
118         {
119             LOG.fatal("Could not load required class", e);
120             System.exit(2);
121         }
122     }
123 
124     /**
125      * The shutdown task for the fulmine runtime.
126      * 
127      * @author Ramon Servadei
128      * 
129      */
130     private final class ShutdownTask implements Runnable
131     {
132         public void run()
133         {
134             FulmineContext.this.destroy();
135         }
136     }
137 
138     /** The permission profile for the context. */
139     private DefaultPermissionProfile permissionProfile;
140 
141     /** The network */
142     INetwork network;
143 
144     /** The connection broker */
145     IConnectionBroker connectionBroker;
146 
147     /** The discoverer for connections */
148     IConnectionDiscoverer discoverer;
149 
150     /** Collaborating component for model management */
151     private final IModelManager modelManager;
152 
153     /** Collaborating component for event management */
154     private final IEventManager eventManager;
155 
156     /** Collaborating component for distribution management */
157     private IDistributionManager distributionManager;
158 
159     /** The internal RPC manager */
160     private final IRpcManager rpcManager;
161 
162     /** The identity */
163     private String identity;
164 
165     /**
166      * The hashcode for this context.
167      * 
168      * @see FulmineContext#getContextHashCode()
169      */
170     private final int hashCode;
171 
172     /** The shutdown thread */
173     private final Thread shutdownThread;
174 
175     /** The component that can report on the state of this context */
176     private IContextWatchdog watchdog;
177 
178     /** The event processor identity prefix */
179     private final String eventProcessorIdentityPrefix;
180 
181     /**
182      * The remote update handler (not great that it is a concrete reference
183      * rather than an interface)
184      */
185     private DelegatingRemoteUpdateHandler handler;
186 
187     /** The system info record for this context */
188     private IContainer systemInfo;
189 
190     /**
191      * Construct a context with an identity.
192      * <p>
193      * This version checks the system runtime for the number of processors and
194      * will use that number + 1 when constructing the {@link EventManager}.
195      * 
196      * @param identity
197      *            the context's identity
198      */
199     public FulmineContext(String identity)
200     {
201         this(identity, identity);
202     }
203 
204     /**
205      * Construct a context with an identity and the number of processors.
206      * 
207      * @param identity
208      *            the context's identity
209      * @param processors
210      *            the number of processors available for events
211      */
212     public FulmineContext(String identity, int processors)
213     {
214         this(identity, identity, processors);
215     }
216 
217     /**
218      * Construct a context with an identity and the prefix for the name for the
219      * event processors. If the prefix is EMPTY_STRING or the same as the
220      * identity of the context, the event processor identity will simply be the
221      * context identity, otherwise the event processor identity is the prefix
222      * plus the context identity.
223      * <p>
224      * This version checks the system runtime for the number of processors and
225      * will use that number + 1 when constructing the {@link EventManager}.
226      * 
227      * @param identity
228      *            the context's identity
229      * @param eventProcessorIdentityPrefix
230      *            the prefix for the name for the event processors
231      */
232     public FulmineContext(String identity, String eventProcessorIdentityPrefix)
233     {
234         this(identity, eventProcessorIdentityPrefix,
235             java.lang.Runtime.getRuntime().availableProcessors() + 1);
236     }
237 
238     /**
239      * Construct a context with an identity, the prefix for the name for the
240      * event processors and the number of processors.
241      * 
242      * @param identity
243      *            the context's identity
244      * @param eventProcessorIdentityPrefix
245      *            the prefix for the name for the event processors
246      * @param processors
247      *            the number of processors available for events
248      */
249     public FulmineContext(String identity, String eventProcessorIdentityPrefix,
250         int processors)
251     {
252         super();
253         this.permissionProfile = new DefaultPermissionProfile();
254         this.shutdownThread =
255             new Thread(new ShutdownTask(), identity + " context shutdown task");
256         java.lang.Runtime.getRuntime().addShutdownHook(this.shutdownThread);
257         nullCheck(identity, "Null identity");
258         nullCheck(eventProcessorIdentityPrefix,
259             "Null eventProcessorIdentityPrefix");
260         this.identity = identity;
261         // determine the event processor identity
262         if (identity.trim().equals(eventProcessorIdentityPrefix.trim()))
263         {
264             this.eventProcessorIdentityPrefix = identity;
265         }
266         else
267         {
268             if (eventProcessorIdentityPrefix.trim().endsWith(COLON))
269             {
270                 this.eventProcessorIdentityPrefix =
271                     eventProcessorIdentityPrefix.trim() + identity;
272             }
273             else
274             {
275                 this.eventProcessorIdentityPrefix =
276                     eventProcessorIdentityPrefix.trim() + COLON + identity;
277             }
278         }
279         this.eventManager =
280             new EventManager(this.eventProcessorIdentityPrefix,
281                 (byte) processors, this);
282         this.distributionManager = new DistributionManager(this);
283         this.modelManager = new ModelManager(this);
284 
285         try
286         {
287             int netName =
288                 (InetAddress.getLocalHost().getHostAddress()).hashCode();
289             this.hashCode =
290                 new HashCodeBuilder().append(identity).append(
291                     System.identityHashCode(this)).append(netName).append(
292                     System.nanoTime()).toHashCode();
293         }
294         catch (UnknownHostException e)
295         {
296             throw new RuntimeException("Could not get localhost for " + this);
297         }
298         this.rpcManager = new RpcManager(this);
299     }
300 
301     public IContextWatchdog getContextWatchdog()
302     {
303         return this.watchdog;
304     }
305 
306     public void setContextWatchdog(IContextWatchdog watchdog)
307     {
308         this.watchdog = watchdog;
309     }
310 
311     protected IModelManager getModelManager()
312     {
313         return this.modelManager;
314     }
315 
316     protected IEventManager getEventManager()
317     {
318         return this.eventManager;
319     }
320 
321     /**
322      * Provide a different implementation of the distribution manager.
323      * 
324      * @param distributionManager
325      *            the distribution manager to use
326      */
327     public void setDistributionManager(IDistributionManager distributionManager)
328     {
329         // destroy the previous manager
330         if (this.distributionManager != null)
331         {
332             getDistributionManager().destroy();
333         }
334         this.distributionManager = distributionManager;
335     }
336 
337     public IDistributionManager getDistributionManager()
338     {
339         return this.distributionManager;
340     }
341 
342     public IPermissionProfile getPermissionProfile()
343     {
344         return this.permissionProfile;
345     }
346 
347     public void setPermissionProfile(IPermissionProfile permissionProfile)
348     {
349         this.permissionProfile.setApplicationPermissions(permissionProfile);
350     }
351 
352     @Override
353     protected void doStart()
354     {
355         getEventManager().start();
356         getModelManager().start();
357         getRpcManager().start();
358         getDistributionManager().start();
359         // create the system info record
360         final IContainer systemInfo =
361             getLocalContainer(getIdentity() + ":SystemInfo", Type.SYSTEM,
362                 Domain.FRAMEWORK);
363         this.systemInfo = systemInfo;
364         this.systemInfo.add(new LongField(SystemInfoFields.FREE_MEMORY));
365         this.systemInfo.add(new LongField(SystemInfoFields.TOTAL_MEMORY));
366         // schedule a task to flush the system info record periodically
367         schedule(new TimerTask()
368         {
369             @Override
370             public void run()
371             {
372                 EventQueueItem.isStatisticsEvent.set(Boolean.TRUE);
373                 try
374                 {
375                     FulmineContext.this.systemInfo.beginFrame(new EventFrameExecution());
376                     try
377                     {
378                         FulmineContext.this.systemInfo.getLongField(
379                             SystemInfoFields.FREE_MEMORY).set(
380                             Runtime.getRuntime().freeMemory() / 1000);
381                         FulmineContext.this.systemInfo.getLongField(
382                             SystemInfoFields.TOTAL_MEMORY).set(
383                             Runtime.getRuntime().totalMemory() / 1000);
384                     }
385                     finally
386                     {
387                         FulmineContext.this.systemInfo.flushFrame();
388                     }
389                 }
390                 finally
391                 {
392                     EventQueueItem.isStatisticsEvent.remove();
393                 }
394             }
395         }, 10000, 10000);
396         if (getNetwork() != null)
397         {
398             setConnectionBroker(getNetwork().createBroker());
399             setConnectionDiscoverer(getNetwork().createDiscoverer());
400             getConnectionBroker().start();
401             if (getNetwork().isListeningOnlyMode())
402             {
403                 getConnectionDiscoverer().disablePulsing();
404             }
405             getConnectionDiscoverer().start();
406 
407             if (getHandler().getDelegate() == null)
408             {
409                 getHandler().setDelegate(new VetoingRemoteUpdateHandler());
410             }
411             // publish the remote update handler RPC method
412             getRpcManager().publishRpcs(IRemoteUpdateHandler.class,
413                 this.getHandler());
414         }
415         if (getLog().isInfoEnabled())
416         {
417             getLog().info("Started context " + this);
418         }
419     }
420 
421     @Override
422     protected void doDestroy()
423     {
424         /*
425          * The discoverer and broker may have implementations that do not extend
426          * the AbstractLifeCycle (which guarantees no exception from destroy())
427          * so we call the destroy methods of these in individual try-catch
428          * blocks.
429          */
430         if (getConnectionDiscoverer() != null)
431         {
432             try
433             {
434                 getConnectionDiscoverer().destroy();
435             }
436             catch (Exception e)
437             {
438                 logException(getLog(), "discoverer", e);
439             }
440         }
441         if (getConnectionBroker() != null)
442         {
443             try
444             {
445                 getConnectionBroker().destroy();
446             }
447             catch (Exception e)
448             {
449                 logException(getLog(), "connectionBroker", e);
450             }
451         }
452         if (getNetwork() != null)
453         {
454             getNetwork().setContext(null);
455         }
456         getDistributionManager().destroy();
457         getRpcManager().destroy();
458         getModelManager().destroy();
459         getEventManager().destroy();
460         if (Thread.currentThread() != this.shutdownThread)
461         {
462             Runtime.getRuntime().removeShutdownHook(this.shutdownThread);
463         }
464     }
465 
466     public void addContainer(IContainer container)
467     {
468         getModelManager().addContainer(container);
469     }
470 
471     public String getIdentity()
472     {
473         return this.identity;
474     }
475 
476     public void setNetwork(INetwork network)
477     {
478         if (isActive())
479         {
480             throw new IllegalStateException(
481                 "Must be called before starting the context");
482         }
483         this.network = network;
484         this.network.setContext(this);
485     }
486 
487     public INetwork getNetwork()
488     {
489         return this.network;
490     }
491 
492     public IContainer getLocalContainer(String identity, IType type,
493         IDomain domain)
494     {
495         return getModelManager().getLocalContainer(identity, type, domain);
496     }
497 
498     public IContainer getRemoteContainer(String remoteContextIdentity,
499         String containerIdentity, IType type, IDomain domain)
500     {
501         return getModelManager().getRemoteContainer(remoteContextIdentity,
502             containerIdentity, type, domain);
503     }
504 
505     public boolean containsRemoteContainer(String remoteContextIdentity,
506         String containerIdentity, IType type, IDomain domain)
507     {
508         return getModelManager().containsRemoteContainer(remoteContextIdentity,
509             containerIdentity, type, domain);
510     }
511 
512     public boolean containsLocalContainer(String containerIdentity, IType type,
513         IDomain domain)
514     {
515         return getModelManager().containsLocalContainer(containerIdentity,
516             type, domain);
517     }
518 
519     public void queueEvent(IEvent event)
520     {
521         getEventManager().queueEvent(event);
522     }
523 
524     public void queueEvents(Collection<IEvent> events)
525     {
526         getEventManager().queueEvents(events);
527     }
528 
529     public boolean removeContainer(IContainer container)
530     {
531         return getModelManager().removeContainer(container);
532     }
533 
534     public boolean subscribe(String contextIdentity, String identityPattern,
535         IType type, IDomain domain, IEventListener listener)
536     {
537         return getDistributionManager().subscribe(contextIdentity,
538             identityPattern, type, domain, listener);
539     }
540 
541     public boolean unsubscribe(String contextIdentity, String identityPattern,
542         IType type, IDomain domain, IEventListener listener)
543     {
544         return getDistributionManager().unsubscribe(contextIdentity,
545             identityPattern, type, domain, listener);
546     }
547 
548     public IChannel[] getConnectedChannels()
549     {
550         return getDistributionManager().getConnectedChannels();
551     }
552 
553     public int getEventProcessorCount()
554     {
555         return getEventManager().getEventProcessorCount();
556     }
557 
558     public Collection<IContainer> getLocalContainers()
559     {
560         return getModelManager().getLocalContainers();
561     }
562 
563     public Collection<IContainer> getRemoteContainers(
564         String remoteContextIdentity)
565     {
566         return getModelManager().getRemoteContainers(remoteContextIdentity);
567     }
568 
569     public ThreadGroup getEventProcessorThreadGroup()
570     {
571         return getEventManager().getEventProcessorThreadGroup();
572     }
573 
574     public IConnectionBroker getConnectionBroker()
575     {
576         return this.connectionBroker;
577     }
578 
579     public void setConnectionBroker(IConnectionBroker broker)
580     {
581         this.connectionBroker = broker;
582     }
583 
584     public IConnectionDiscoverer getConnectionDiscoverer()
585     {
586         return this.discoverer;
587     }
588 
589     public void setConnectionDiscoverer(IConnectionDiscoverer discoverer)
590     {
591         this.discoverer = discoverer;
592     }
593 
594     public IContainerFactory getContainerFactory()
595     {
596         return getModelManager().getContainerFactory();
597     }
598 
599     public ISystemEventSource getSystemEventSource(
600         Class<? extends ISystemEvent> type)
601     {
602         return getEventManager().getSystemEventSource(type);
603     }
604 
605     public void execute(Runnable task)
606     {
607         getEventManager().execute(task);
608     }
609 
610     public IFrameReader getFrameReader()
611     {
612         return getDistributionManager().getFrameReader();
613     }
614 
615     public IFrameWriter getFrameWriter()
616     {
617         return getDistributionManager().getFrameWriter();
618     }
619 
620     public void requestRetransmit(String contextIdentity,
621         String identityRegularExpression, IType type, IDomain domain)
622     {
623         getDistributionManager().requestRetransmit(contextIdentity,
624             identityRegularExpression, type, domain);
625     }
626 
627     public void requestRetransmitAll(String contextIdentity)
628     {
629         getDistributionManager().requestRetransmitAll(contextIdentity);
630     }
631 
632     public void retransmit(String contextIdentity,
633         String identityRegularExpression, IType type, IDomain domain)
634     {
635         getDistributionManager().retransmit(contextIdentity,
636             identityRegularExpression, type, domain);
637     }
638 
639     public void retransmitAll(String contextIdentity)
640     {
641         getDistributionManager().retransmitAll(contextIdentity);
642     }
643 
644     public void retransmitAllToAll()
645     {
646         getDistributionManager().retransmitAllToAll();
647     }
648 
649     public void retransmitToAll(String identityRegularExpression, IType type,
650         IDomain domain)
651     {
652         getDistributionManager().retransmitToAll(identityRegularExpression,
653             type, domain);
654     }
655 
656     public boolean addSubscriptionListener(ISubscriptionListener listener)
657     {
658         return getDistributionManager().addSubscriptionListener(listener);
659     }
660 
661     public boolean removeSubscriptionListener(ISubscriptionListener listener)
662     {
663         return getDistributionManager().removeSubscriptionListener(listener);
664     }
665 
666     public int getContextHashCode()
667     {
668         return this.hashCode;
669     }
670 
671     public String getEventProcessorIdentityPrefix()
672     {
673         return this.eventProcessorIdentityPrefix;
674     }
675 
676     public void invokeRpc(String remoteContextIdentity, byte[] rpcData)
677     {
678         getDistributionManager().invokeRpc(remoteContextIdentity, rpcData);
679     }
680 
681     public boolean addRpcPublicationListener(String remoteContextIdentity,
682         IRpcPublicationListener listener)
683     {
684         return getRpcManager().addRpcPublicationListener(remoteContextIdentity,
685             listener);
686     }
687 
688     public IRpcMarker invoke(IRpcResultHandler resultHandler,
689         String remoteContextIdentity, String procedure, IField... args)
690     {
691         return getRpcManager().invoke(resultHandler, remoteContextIdentity,
692             procedure, args);
693     }
694 
695     public IRpcResult invoke(String remoteContextIdentity, String procedure,
696         IField... args)
697     {
698         return getRpcManager().invoke(remoteContextIdentity, procedure, args);
699     }
700 
701     public boolean publishProdedure(IRpcHandler handler,
702         IRpcDefinition rpcDefinition)
703     {
704         return getRpcManager().publishProdedure(handler, rpcDefinition);
705     }
706 
707     public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
708     {
709         return getRpcManager().unpublishProdedure(rpcDefinition);
710     }
711 
712     public boolean unpublishRpcs(Class<?> definition, Object handler)
713     {
714         return getRpcManager().unpublishRpcs(definition, handler);
715     }
716 
717     public boolean publishRpcs(Class<?> definition, Object handler)
718     {
719         return getRpcManager().publishRpcs(definition, handler);
720     }
721 
722     public boolean removeRpcPublicationListener(String remoteContextIdentity,
723         IRpcPublicationListener listener)
724     {
725         return getRpcManager().removeRpcPublicationListener(
726             remoteContextIdentity, listener);
727     }
728 
729     public void setRemoteUpdateHandler(IRemoteUpdateHandler handler)
730     {
731         handler.setContext(this);
732         getHandler().setDelegate(handler);
733     }
734 
735     @Override
736     protected AsyncLog getLog()
737     {
738         return LOG;
739     }
740 
741     private IRpcManager getRpcManager()
742     {
743         return this.rpcManager;
744     }
745 
746     @Override
747     public final String toString()
748     {
749         return super.toString() + COLON + getIdentity() + OPEN_BRACE
750             + hashCode() + CLOSE_BRACE;
751     }
752 
753     @Override
754     public final boolean equals(Object obj)
755     {
756         return super.equals(obj);
757     }
758 
759     @Override
760     public final int hashCode()
761     {
762         return this.hashCode;
763     }
764 
765     public String updateRemoteContainer(String remoteContextIdentity,
766         String identity, IType type, IDomain domain, String fieldName,
767         String fieldValueAsString)
768     {
769         return getDistributionManager().updateRemoteContainer(
770             remoteContextIdentity, identity, type, domain, fieldName,
771             fieldValueAsString);
772     }
773 
774     /**
775      * Lazily constructs the {@link DelegatingRemoteUpdateHandler}
776      * 
777      * @return the {@link DelegatingRemoteUpdateHandler}
778      */
779     private DelegatingRemoteUpdateHandler getHandler()
780     {
781         if (this.handler == null)
782         {
783             this.handler = new DelegatingRemoteUpdateHandler();
784         }
785         return handler;
786     }
787 
788     public IContainer getSystemInfo()
789     {
790         return this.systemInfo;
791     }
792 
793     public void schedule(TimerTask task, long delay, long period)
794     {
795         getEventManager().schedule(task, delay, period);
796     }
797 }