Working with Subscriptions
Overview
The Eventing capability of DDF allows endpoints (and thus external users) to create a "standing query" and be notified when a matching Metacard is created, updated, or deleted.
To better understand why this would be useful, suppose that there has been increased pirating activity off the coast of Somalia. Because of these events, a group of intelligence analysts is interested in determining the reason for the heightened hostility and discovering its cause. To do this, analysts need to monitor interesting events occurring in that area. Without DDF Eventing, the analysts would need to repeatedly query for any records of events or intelligence gathered in that area. Analysts would have to keep an eye out for changes or anything of interest. However, with DDF Eventing, the analysts can create a subscription indicating criteria for the types of intelligence of interest. In this scenario, analysts could specify interest in metacards added, updated, or deleted that describe data obtained around the coast of Somalia. Through this subscription, DDF will send event notifications back to the team of analysts containing metadata of interest. Furthermore, they could filter the records not only spatially, but by any other criteria that would zero in on the most interesting records. For example, a fishing company that has operated ships peacefully in the same region for a long time may not be interesting. To exclude metadata about that company, analysts may add contextual criteria indicating to only return records containing the keyword "pirate." How these event notifications are handled and processed is up to the client's event consumer which receives all callbacks from DDF. With the subscription in place, the analysts will be notified only of metadata related to the pirating activity, so analysts can obtain better situational awareness.
The key components of DDF Eventing include:
- Subscription - represent "standing queries" in the Catalog.
- Delivery Method - provides the operation (created, updated, deleted) for how an event's metacard can be delivered.
- Event Processor - provides an engine that creates, updates, and deletes subscriptions for event notification.
- Event Consumer - service that receives and processes event notifications on the client side.
- Callback URL - location of Event Consumer.
These components can be studied in more depth by referring to their corresponding sections in the Integrator's Guide.
This section discusses how a developer can create subscriptions and event consumers to work with DDF Eventing. More information about DDF Eventing in general can be found in the Eventing section of the Integrator's Guide.
Creating a Subscription
To create a subscription in DDF the developer needs to implement the ddf.catalog.event.Subscription
interface. This interface extends org.opengis.filter.Filter
in order to represent the subscription's filter criteria. Furthermore, the Subscription
interface contains a DeliveryMethod
implementation.
When implementing Subscription
, the developer will need to override the methods accept
and evaluate
from the Filter
. The accept
method allows the visitor pattern to be applied to the Subscription
. A FilterVisitor
can be passed into this method in order to process the Subscription's
Filter
. In DDF this method is used to convert the Subscription's
Filter
into a predicate format that is understood by the Event Processor. The second method inherited from Filter
is evaluate
. This method is used to evaluate an object against the Filter's
criteria in order to determine if it matches the criteria. See the Creating Filters section of the Developer's Guide for more information on OGC Filters.
The functionality of these overridden methods is typically delegated to whatever Filter
implementation the Subscription
is using.
The developer will also need to define getDeliveryMethod
. This method should return an instance of the appropriate DeliveryMethod
that will communicate to the event consumer designated to receive the notifications.
The other two methods required because Subscription
implements Federatable
are isEnterprise
and getSourceIds,
which indicate that the subscription should watch for events occurring on all sources in the enterprise or on specified sources.
The following is an implementation stub of Subscription
:
public class SubscriptionImpl implements Subscription{ private Filter filter; //Subscription generally contains a Filter to perform the Filter methods against private DeliveryMethod deliveryMethod; //the DeliveryMethod instance to handle events and call out to event consumer public SubscriptionImpl(Filter filter, DeliveryMethod deliveryMethod){ this.filter = filter; this.deliveryMethod = deliveryMethod; } @Override public boolean evaluate(Object object) { //evaluates incoming object against filter criteria } @Override public Object accept(FilterVisitor visitor, Object extraData) { //visits filter in order to read and parse it } @Override public Set<String> getSourceIds() { //returns source ids that subscription should watch for events } @Override public boolean isEnterprise() { //should this subscription watch for events from the entire enterprise? } @Override public DeliveryMethod getDeliveryMethod() { return deliveryMethod; } }
Once a Subscription
is created, it needs to be registered in the OSGi Service Registry as a ddf.catalog.event.Subscription
service. This is necessary for the Subscription
to be discovered by the Event Processor. Typically this is done in code after the Subscription
is instantiated. When the Subscription
is registered, a unique ID will need to be specified using the key subscription-id
. This will be used to delete the Subscription
from the OSGi Service Registry. Furthermore, the ServiceRegistration
, which is the return value from registering a Subscription
, should be kept track in order to remove the Subscription
later. The following code shows how to correctly register a Subscription
implementation in the registry using the above SubscriptionImpl
for clarity:
//Map to keep track of registered Subscriptions. Used for unregistering Subscriptions. Map<String, ServiceRegistration> subscriptions = new HashMap<String, ServiceRegistration>(); //Instance of DeliveryMethod with the callback URL of the event consumer DeliveryMethod deliveryMethod = new DeliveryMethodImpl(String callbackURL); //Creates a filter that matches on the Metacard ID FilterFactory filterFactory = new FilterFactoryImpl(); Filter filter = filterFactory.equals( filterFactory.property( Metacard.ID ), filterFactory.literal( id ) ); //New Subscription instance Subscription subscription = new SubscriptionImpl(filter, deliveryMethod); //Specify the subscription-id to uniquely identify the Subscription String subscriptionId = "0123456789abcdef0123456789abcdef"; Dictionary<String, String> properties = new Hashtable<String, String>(); properties.put("subscription-id", subscriptionId); //Service registration requires an instance of the OSGi bundle context //Register subscription and keep track of the service registration ServiceRegistration serviceRegistration = context.registerService( "ddf.catalog.event.Subscription", subscription , properties ); subscriptions.put(subscriptionId, serviceRegistration);
Creating a Delivery Method
The Event Processor obtains the subscription's DeliveryMethod
and invokes one of its four methods when an event occurs. The DeliveryMethod
then handles that invocation and communicates an event to a specified consumer service outside of DDF. The DeliveryMethod
implementation typically contains a callback URL. It uses this URL to know where the event consumer is located.
The Event Processor calls the DeliveryMethod's
created
method when a new metacard matching the filter criteria is added to the Catalog. It calls the deleted
method when a metacard that matched the filter criteria is removed from the Catalog. updatedHit
is called when a metacard is updated and the new metacard matches the subscription. updatedMiss
is a little different in that it is only called if the old metacard matched the filter but the new metacard no longer does. An example of this would be if the filter contains spatial criteria consisting of Arizona. If a plane is flying over Arizona, the Event Processor will repeatedly call updatedHit
as the plane flies from one side to the other while updating its position in the Catalog. This happens because the updated records continually match the specified criteria. If the plane crosses into New Mexico, the previous metacard will have matched the filter, but the new metacard will not. Thus, updatedMiss
gets called.
The DeliveryMethod
knows how to communicate with the event consumer. It also knows what it is expecting. Each of the aforementioned methods should call out to an event consumer. A DeliveryMethod
reports to the event consumer the metacards involved in the event, what type of event occurred, and possibly the subscription ID if of interest in whatever format the event consumer requires. The following is an implementation stub for DeliveryMethod
:
public class DeliveryMethodImpl implements DeliveryMethod { @Override public void created(Metacard newMetacard) { //Sends created notification to event consumer } @Override public void updatedHit(Metacard newMetacard, Metacard oldMetacard) { //Sends updatedHit notification to event consumer } @Override public void updatedMiss(Metacard newMetacard, Metacard oldMetacard) { //Sends updatedMiss notification to event consumer } @Override public void deleted(Metacard oldMetacard) { //Sends deleted notification to event consumer } }
Creating an Event Consumer
An event consumer is a service designed to receive and process event notifications. This event consumer can be any type of service running at a location accessible to DDF. If the service is a SOAP service, the DeliveryMethod
will need to communicate with it according to its WSDL. On the other hand, if it is a REST service, the DeliveryMethod
will invoke HTTP operations on the callback URL. Typically, whatever type of service the event consumer is, it will have operations corresponding to each of the created, updated, and deleted events. This way, the consumer will know what type of event occurred on DDF.
Deleting a Subscription
To remove a subscription from DDF, the subscription ID is required. Once this is provided, the ServiceRegistration
for the indicated Subscription
should be obtained from the Subscriptions
Map
. Then the Subscription
can be removed by unregistering the service. The following code demonstrates how this is done:
String subscriptionId = "0123456789abcdef0123456789abcdef"; //Obtain service registration from subscriptions Map based on subscription ID ServiceRegistration sr = (ServiceRegistration) subscriptions.get(subscriptionId); //Unregister Subscription from OSGi Service Registry sr.unregister(); //Remove Subscription from Map keeping track of registered Subscriptions. subscriptions.remove(subscriptionId);