View Javadoc

1   /*
2      Copyright 2008 Ramon Servadei
3   
4      Licensed under the Apache License, Version 2.0 (the "License");
5      you may not use this file except in compliance with the License.
6      You may obtain a copy of the License at
7   
8          http://www.apache.org/licenses/LICENSE-2.0
9   
10     Unless required by applicable law or agreed to in writing, software
11     distributed under the License is distributed on an "AS IS" BASIS,
12     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13     See the License for the specific language governing permissions and
14     limitations under the License.
15   */
16  package fulmine.rpc;
17  
18  import static fulmine.util.Utils.COMMA;
19  import static fulmine.util.Utils.EMPTY_STRING;
20  import static fulmine.util.Utils.UNDERSCORE;
21  import static fulmine.util.Utils.safeToString;
22  import static fulmine.util.Utils.string;
23  
24  import java.lang.reflect.Method;
25  import java.util.Arrays;
26  import java.util.Iterator;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.Map.Entry;
30  
31  import org.apache.commons.logging.Log;
32  
33  import fulmine.AbstractLifeCycle;
34  import fulmine.Domain;
35  import fulmine.Type;
36  import fulmine.context.IFrameworkContext;
37  import fulmine.event.EventFrameExecution;
38  import fulmine.model.container.IContainer;
39  import fulmine.model.field.FieldUtils;
40  import fulmine.model.field.IField;
41  import fulmine.model.field.IntegerField;
42  import fulmine.model.field.StringField;
43  import fulmine.util.Utils;
44  import fulmine.util.collection.CollectionFactory;
45  import fulmine.util.log.AsyncLog;
46  import fulmine.util.reference.is;
47  
48  /**
49   * Standard {@link IRpcRegistry} implementation. This has a single
50   * {@link IRecord} that represents the set of RPC definitions that are published
51   * by local application code. Remote contexts will subscribe for this to receive
52   * the current and future RPC definitions. The record is addressed using the
53   * following attributes:
54   * <ol>
55   * <li>identity={@link IRpcRegistry#RPC_REGISTRY}
56   * <li>type={@link Type#RECORD}
57   * <li>domain={@link Domain#FRAMEWORK}
58   * </ol>
59   * When a new RPC definition is published by application code, a new field is
60   * added to the registry record. The field is called the RPC registry key. The
61   * RPC registry key format is described in the following ABNF:
62   * 
63   * <pre>
64   * RPC key              =  &quot;rpc&quot; counter &quot;_&quot; compactForm
65   * counter              =  1*DIGIT ; this increments each time a new definition is registered
66   * compactForm          =  ALPHA 0*(ALPHA/DIGIT) argTypeCompactForm; this is the RPC name
67   * argTypeCompactForm   =  0*ALPHA ; this is a string formed from the first letter of each argument type
68   * </pre>
69   * 
70   * This ensures that the key is deterministic. Application code can cycle
71   * through the fields of the RPC registry record and intercept any beginning
72   * with this pattern in order to find RPC definitions. The value of the key is
73   * the string representation of the RPC definition. E.g. rpc12lasersIDB is the
74   * key for the 12th registered RPC, the RPC definition is lasers(IntegerField,
75   * DoubleField, BooleanField) - compact form is lasersIDB
76   * <p>
77   * An {@link IRpcDefinition} can re-construct itself from this representation so
78   * this is what is sent on-the-wire to remote contexts. After the key is added,
79   * an update event is then triggered for the registry which will be received by
80   * all subscribers for the registry (including channels connected to remote
81   * contexts).
82   * <p>
83   * When an RPC definition is unpublished, the key and value are removed.
84   * <p>
85   * This class is thread safe.
86   * 
87   * @author Ramon Servadei
88   */
89  public final class RpcRegistry extends AbstractLifeCycle implements
90      IRpcRegistry
91  {
92      /** The local context */
93      private final IFrameworkContext context;
94  
95      /**
96       * The registry for all RPC definitions published by the local context.
97       */
98      private IContainer registry;
99  
100     /**
101      * The registry key count. Uses the <a
102      * href="http://www.ibm.com/developerworks/java/library/j-jtp06197.html"
103      * >'cheap read-write lock'</a>
104      */
105     volatile int keyCount;
106 
107     /** Tracks the published RPC definitions against their RPC registry key */
108     private final Map<String, IRpcDefinition> published;
109 
110     /** Maps the RPC registry key to the handler for the RPC */
111     private final Map<String, IRpcHandler> handlers;
112 
113     /**
114      * Construct the RPC repository
115      * 
116      * @param context
117      *            the local context
118      */
119     public RpcRegistry(IFrameworkContext context)
120     {
121         super();
122         this.context = context;
123         this.published = CollectionFactory.newMap();
124         this.handlers = CollectionFactory.newMap();
125     }
126 
127     public boolean publishProdedure(IRpcHandler handler,
128         IRpcDefinition rpcDefinition)
129     {
130         final String rpcDefAsString = rpcDefinition.toString();
131         try
132         {
133             synchronized (getPublished())
134             {
135                 if (getPublished().values().contains(rpcDefinition))
136                 {
137                     return false;
138                 }
139                 // temporary addition
140                 getPublished().put(rpcDefAsString, rpcDefinition);
141             }
142 
143             // lock the record
144             getRegistry().beginFrame(new EventFrameExecution());
145             try
146             {
147                 final String[] fields = getRegistry().getComponentIdentities();
148 
149                 /*
150                  * Find the key for the new RPC definition. We need to go
151                  * through all current keys to see if any can be re-used because
152                  * an unpublish will just set the string field value to null.
153                  */
154                 String keyName = null;
155                 for (String fieldName : fields)
156                 {
157                     final IField field = getRegistry().get(fieldName);
158                     if (field instanceof StringField)
159                     {
160                         final StringField key = (StringField) field;
161                         if (key.get() == null)
162                         {
163                             keyName = fieldName;
164                             key.set(rpcDefAsString);
165                             break;
166                         }
167                     }
168                 }
169                 if (keyName == null)
170                 {
171                     final IntegerField rpcCount =
172                         getRegistry().<IntegerField> get(
173                             IRpcRegistry.RPC_KEY_COUNT);
174                     keyName =
175                         IRpcRegistry.RPC_KEY + this.keyCount + UNDERSCORE
176                             + getCompactForm(rpcDefinition);
177                     synchronized (this)
178                     {
179                         this.keyCount++;
180                     }
181                     rpcCount.set(this.keyCount);
182                     getRegistry().add(new StringField(keyName, rpcDefAsString));
183                 }
184                 synchronized (getHandlers())
185                 {
186                     getHandlers().put(keyName, handler);
187                 }
188                 synchronized (getPublished())
189                 {
190                     getPublished().put(keyName, rpcDefinition);
191                 }
192             }
193             finally
194             {
195                 getRegistry().endFrame();
196             }
197             return true;
198         }
199         finally
200         {
201             // clear out the temporary key
202             synchronized (getPublished())
203             {
204                 getPublished().remove(rpcDefAsString);
205             }
206         }
207     }
208 
209     public boolean unpublishProdedure(IRpcDefinition rpcDefinition)
210     {
211         // remove the published RPC first
212         synchronized (getPublished())
213         {
214             if (!getPublished().values().contains(rpcDefinition))
215             {
216                 return false;
217             }
218             final Set<Entry<String, IRpcDefinition>> entrySet =
219                 getPublished().entrySet();
220             for (Iterator<Entry<String, IRpcDefinition>> iterator =
221                 entrySet.iterator(); iterator.hasNext();)
222             {
223                 Entry<String, IRpcDefinition> entry = iterator.next();
224                 if (entry.getValue().equals(rpcDefinition))
225                 {
226                     iterator.remove();
227                     break;
228                 }
229             }
230         }
231 
232         getRegistry().beginFrame(new EventFrameExecution());
233         try
234         {
235             final String[] fields = getRegistry().getComponentIdentities();
236             // find the field that the RPC definition was declared at
237             StringField rpcToRemove = null;
238             for (String fieldName : fields)
239             {
240                 final IField field = getRegistry().get(fieldName);
241                 if (field instanceof StringField)
242                 {
243                     final StringField key = (StringField) field;
244                     if (key.get().equals(rpcDefinition.toString()))
245                     {
246                         // we've found the definition so this is the key to
247                         // remove
248                         rpcToRemove = key;
249                         synchronized (getHandlers())
250                         {
251                             getHandlers().remove(key.getIdentity());
252                         }
253                         break;
254                     }
255                 }
256             }
257             getRegistry().remove(rpcToRemove);
258         }
259         finally
260         {
261             getRegistry().endFrame();
262         }
263         return true;
264     }
265 
266     public boolean unpublishRpcs(Class<?> definition, Object handler)
267     {
268         return new RpcPublisher().disableRemoteMethods(definition, handler,
269             this);
270     }
271 
272     public boolean publishRpcs(Class<?> definition, Object handler)
273     {
274         return new RpcPublisher().enableRemoteMethods(definition, handler, this);
275     }
276 
277     public IRpcHandler getHandler(String registryKey)
278     {
279         synchronized (getHandlers())
280         {
281             return getHandlers().get(registryKey);
282         }
283     }
284 
285     public IRpcDefinition getDefinition(String registryKey)
286     {
287         synchronized (getPublished())
288         {
289             return getPublished().get(registryKey);
290         }
291     }
292 
293     public String getRegistryKey(String rpcName, IField[] args)
294     {
295         synchronized (getPublished())
296         {
297             // go through all published RPCs to find a matching signature
298             final Iterator<Entry<String, IRpcDefinition>> iterator =
299                 getPublished().entrySet().iterator();
300             while (iterator.hasNext())
301             {
302                 final Entry<String, IRpcDefinition> next = iterator.next();
303                 final IRpcDefinition definition = next.getValue();
304                 if (definition.getName().equals(rpcName))
305                 {
306                     // name matches, now check the arguments
307                     if (args != null
308                         && definition.getArgumentTypes().length == args.length)
309                     {
310                         // assume matching until a mismatching arg is found
311                         boolean found = true;
312                         for (int i = 0; i < args.length; i++)
313                         {
314                             IField field = args[i];
315                             if (!field.getClass().equals(
316                                 definition.getArgumentTypes()[i]))
317                             {
318                                 // no match on argument index
319                                 found = false;
320                                 break;
321                             }
322                         }
323                         if (found)
324                         {
325                             // return the RPC registry key
326                             return next.getKey();
327                         }
328                     }
329                 }
330             }
331             return null;
332         }
333     }
334 
335     @Override
336     protected void doDestroy()
337     {
338         getRegistry().destroy();
339     }
340 
341     @Override
342     protected void doStart()
343     {
344         this.registry =
345             getContext().getLocalContainer(IRpcRegistry.RPC_REGISTRY,
346                 Type.SYSTEM, Domain.FRAMEWORK);
347         getRegistry().add(new IntegerField(IRpcRegistry.RPC_KEY_COUNT, 0));
348     }
349 
350     /**
351      * Get the compact form of the RPC definition. This is the RPC name followed
352      * by the first letter of each argument type.
353      * 
354      * @param rpcDefinition
355      *            the definition
356      * @return its compact form
357      */
358     String getCompactForm(IRpcDefinition rpcDefinition)
359     {
360         StringBuilder sb = new StringBuilder();
361         sb.append(rpcDefinition.getName());
362         final Class<? extends IField>[] argumentTypes =
363             rpcDefinition.getArgumentTypes();
364         for (Class<? extends IField> argType : argumentTypes)
365         {
366             sb.append(argType.getSimpleName().substring(0, 1));
367         }
368         return sb.toString();
369     }
370 
371     IContainer getRegistry()
372     {
373         return this.registry;
374     }
375 
376     private IFrameworkContext getContext()
377     {
378         return this.context;
379     }
380 
381     private Map<String, IRpcDefinition> getPublished()
382     {
383         return this.published;
384     }
385 
386     private Map<String, IRpcHandler> getHandlers()
387     {
388         return this.handlers;
389     }
390 
391     @Override
392     public String toString()
393     {
394         return string(this, (getRegistry() == null ? EMPTY_STRING
395             : this.getRegistry().toDetailedString()));
396     }
397 }
398 
399 /**
400  * A stateless helper class that publishes and unpublishes RPCs represented by
401  * Java classes.
402  * 
403  * @author Ramon Servadei
404  */
405 final class RpcPublisher
406 {
407     private final static Log LOG = new AsyncLog(RpcPublisher.class);
408 
409     /** Construct the {@link RpcPublisher} */
410     RpcPublisher()
411     {
412         super();
413     }
414 
415     /**
416      * @see IRpcManager#unpublishRpcs(Class, Object)
417      */
418     boolean disableRemoteMethods(Class<?> definition, Object handler,
419         IRpcRegistry registry)
420     {
421         return doOperation(definition, handler, registry, false);
422     }
423 
424     /**
425      * @see IRpcManager#publishRpcs(Class, Object)
426      */
427     boolean enableRemoteMethods(Class<?> definition, Object handler,
428         IRpcRegistry registry)
429     {
430         return doOperation(definition, handler, registry, true);
431     }
432 
433     /**
434      * Convenience method that contains all the shared logic and uses a boolean
435      * to flag whether to issue a publish or unpublish method call.
436      * 
437      * @param definition
438      *            the definition
439      * @param handler
440      *            the handler that will receive invocations
441      * @param state
442      *            the state object to use
443      * @param publish
444      *            whether to publish or not
445      * @return <code>true</code> if the operation succeeds
446      */
447     private boolean doOperation(Class<?> definition, Object handler,
448         IRpcRegistry registry, boolean publish)
449     {
450         if (!definition.isInterface())
451         {
452             throw new IllegalArgumentException(
453                 "The definition must be an interface, got " + definition);
454         }
455         final Method[] methods = definition.getMethods();
456         final Method[] handlerMethods = new Method[methods.length];
457         // check the handler has a method for each method in the interface
458         for (int i = 0; i < methods.length; i++)
459         {
460             try
461             {
462                 Method method = methods[i];
463                 handlerMethods[i] =
464                     handler.getClass().getMethod(method.getName(),
465                         method.getParameterTypes());
466             }
467             catch (SecurityException e)
468             {
469                 throw new IllegalStateException(e);
470             }
471             catch (NoSuchMethodException e)
472             {
473                 throw new IllegalArgumentException(safeToString(handler)
474                     + " does not implement all methods of " + definition, e);
475             }
476         }
477         boolean result = true;
478         // now go through each method in the handler and process it
479         for (Method handlerMethod : handlerMethods)
480         {
481             try
482             {
483                 IRpcDefinition rpcDefinition =
484                     new RpcDefinition(
485                         FieldUtils.getFrameworkClass(handlerMethod.getReturnType()),
486                         handlerMethod.getName(),
487                         RpcUtils.convertToFramework(handlerMethod.getParameterTypes()));
488                 RpcHandler rpcHandler =
489                     new RpcHandler(handler, rpcDefinition, handlerMethod);
490                 if (publish)
491                 {
492                     result &=
493                         registry.publishProdedure(rpcHandler, rpcDefinition);
494                 }
495                 else
496                 {
497                     result &= registry.unpublishProdedure(rpcDefinition);
498                 }
499             }
500             catch (Exception e)
501             {
502                 Utils.logException(LOG, "Could not publish " + handlerMethod, e);
503             }
504         }
505         return result;
506     }
507 }
508 
509 /**
510  * A class that wraps the handler object supplied to the
511  * {@link IRpcManager#publishRpcs(Class, Object)} and invokes only a single
512  * remotely enabled method on the delegate handler.
513  * <p>
514  * An {@link RpcHandler} is compared for equality using its internal members.
515  * 
516  * @author Ramon Servadei
517  */
518 final class RpcHandler implements IRpcHandler
519 {
520     private final static AsyncLog LOG = new AsyncLog(RpcHandler.class);
521 
522     /** The handler that contains the remotely enable method */
523     final Object delegate;
524 
525     /** The RPC definition this handler supports */
526     final IRpcDefinition rpcDefinition;
527 
528     /** The method to invoke on the delegate */
529     private final Method method;
530 
531     /**
532      * Construct the handler with the delegate object that will handle a single,
533      * specific RPC.
534      * 
535      * @param delegate
536      *            the actual object that will handle an RPC
537      * @param rpcDefinition
538      *            the definition for the RPs this handler will support
539      * @method the method to invoke on the delegate. This method will
540      *         semantically match the rpcDefinition argument.
541      */
542     RpcHandler(Object delegate, IRpcDefinition rpcDefinition, Method method)
543     {
544         super();
545         Utils.nullCheck(delegate, "delegate was null");
546         Utils.nullCheck(rpcDefinition, "rpcDefinition was null");
547         Utils.nullCheck(method, "method was null");
548         this.delegate = delegate;
549         this.rpcDefinition = rpcDefinition;
550         this.method = method;
551     }
552 
553     public IRpcResult handle(IRpcDefinition rpcDefinition, IField... arguments)
554     {
555         if (!getRpcDefinition().equals(rpcDefinition))
556         {
557             throw new IllegalArgumentException(safeToString(getDelegate())
558                 + " cannot handle " + safeToString(rpcDefinition));
559         }
560         Object invokeResult = null;
561         String exception = null;
562         try
563         {
564             // ensure we can access even private methods
565             getMethod().setAccessible(true);
566             invokeResult =
567                 getMethod().invoke(getDelegate(),
568                     RpcUtils.convertToNative(arguments));
569         }
570         catch (Exception e)
571         {
572             if (LOG.isWarnEnabled())
573             {
574                 LOG.warn("Could not invoke method " + getMethod()
575                     + " with arguments " + Arrays.deepToString(arguments), e);
576             }
577             exception = e.toString();
578         }
579         return new RpcResult(exception == null,
580             FieldUtils.createFromNative(invokeResult), exception);
581     }
582 
583     @Override
584     public String toString()
585     {
586         return string(this, getDelegate() + COMMA + getMethod() + COMMA
587             + getRpcDefinition());
588     }
589 
590     @Override
591     public int hashCode()
592     {
593         final int prime = 31;
594         int result = 1;
595         result =
596             prime * result + ((delegate == null) ? 0 : delegate.hashCode());
597         result =
598             prime * result
599                 + ((getMethod() == null) ? 0 : getMethod().hashCode());
600         result =
601             prime * result
602                 + ((rpcDefinition == null) ? 0 : rpcDefinition.hashCode());
603         return result;
604     }
605 
606     @Override
607     public boolean equals(Object obj)
608     {
609         if (is.same(this, obj))
610         {
611             return true;
612         }
613         if (is.differentClass(this, obj))
614         {
615             return false;
616         }
617         RpcHandler other = (RpcHandler) obj;
618         return is.eq(getDelegate(), other.getDelegate())
619             && is.eq(getMethod(), other.getMethod())
620             && is.eq(getRpcDefinition(), other.getRpcDefinition());
621     }
622 
623     private Method getMethod()
624     {
625         return this.method;
626     }
627 
628     private Object getDelegate()
629     {
630         return this.delegate;
631     }
632 
633     private IRpcDefinition getRpcDefinition()
634     {
635         return this.rpcDefinition;
636     }
637 
638 }