Professional Communication
Software Development Tools

OPC Foundation member and certified logos

OPC Reactive Extensions (Rx/OPC)

The reactive programming model in QuickOPC merges the world of Microsoft Reactive Extensions (Rx) for .NET with OPC. Microsoft Reactive Extensions is a library to compose asynchronous and event-based programs using observable collections (data streams) and LINQ-style query operators.

QuickOPC (a toolkit for OPC client development) ships with OPC Reactive Extensions (Rx/OPC), which provide the reactive programming model for OPC data.

The reactive model for OPC client development, along with the original and traditional imperative programming model, can be freely mixed in the same project. For even higher level and demands, the reactive programming model in QuickOPC can be used in complex event processing (CEP) applications, e.g. with the use of the Trill, or StreamInsight Extensions.

 

Introduction to Reactive Programming for OPC

OPC Reactive Extensions (Rx/OPC) are OPC-specific classes that implement the Rx interfaces. With Rx/OPC, you can e.g.:

  • Create observable collections (data streams) that provide OPC data.
  • Consume (observe) data streams, and process them using OPC.
  • Combine data flows to create more complex operations.

For OPC Unified Architecture, Rx/OPC gives you the ability to work with flows representing OPC UA node value changes, or flows that represents values to be written into an OPC UA node. Rx/OPC also works with OPC "Classic" Data Access and OPC Alarms&Events data sources.

For example, your application may be required to continuously monitor the value of an OPC "Classic" item, and when it is available (there is no failure obtaining the value), make some computations with it (we will multiply the value by 1000), and write the results into a different OPC item. This logic can be expressed by following code:

    DAItemChangedObservable.Create<int>(                                      
             """OPCLabs.KitServer.2""Simulation.Incrementing (1 s)"100) 
        .Where(e => e.Exception == null)                                      
        .Select(e => e.Vtq.Value * 1000)                                      
        .Subscribe(DAWriteItemValueObserver.Create<int>(                      
            """OPCLabs.KitServer.2""Simulation.Register_I4"));            

Let's dissect what this example does:

  1. It creates an observable sequence for significant changes in OPC DA item "Simulation.Incrementing (1 s)".
  2. The "Where" clause filters (from the observable sequence) only the changes that have a null Exception, i.e. those that carry a valid Vtq object (value/timestamp/quality).
  3. The "Select" clause takes the actual value from the Vtq property (it is of type DAVtq<int>), and returns the value multiplied by 1000.
  4. An observer that writes incoming vales into the "Simulation.Register_I4" OPC DA item is created.
  5. The observer is subscribed to the transformed (processed) observable sequence.

As you can see, the code that does this kind of OPC data processing is very concise - all the extra "plumbing" needed in imperative programming model is gone, and only the meaningful pieces remain. Programs written in this way clearly express their intent, and the logic that handles certain functionality is concentrated in one place and not spread around various classes and methods.

 

Useful links: Documentation / Knowledge Base