Introduction

ReactiveX is an API for asynchronous programming with a bounded or unbounded stream of data. In this programming model, the stream of data is called an Observable . We also define an Observer that subscribes to the observable and reacts to the emitted events. Reactive programming also defines a set of Operators to create and compose Observables. Using this programming model helps us to create complex data pipelines. Observables can be composed to create different workflows that are non-blocking and reactive.

ReactiveX is an umbrella project that provides a reactive extension to major programming languages, platforms and frameworks. It provides a consistent API for working with data streams irrespective of the nature of the programming language.

From ReactiveX website:

ReactiveX is not biased toward some particular source of concurrency or asynchronicity. Observables can be implemented using thread-pools, event loops, non-blocking I/O, actors (such as from Akka), or whatever implementation suits your needs, your style, or your expertise. Client code treats all of its interactions with Observables as asynchronous, whether your underlying implementation is blocking or non-blocking and however you choose to implement it.

In this blog post, we will explore some concepts used in Reactive programming and see few examples in Go. All the code referenced in this blog post can be found here. Interested users can clone the repository and follow the instructions in readme.md. If you find any mistakes or have any questions or queries, please create an issue here and I will be more than happy to clarify them.

RxGo

From RxGo website:

The RxGo implementation is based on the concept of pipelines. In a nutshell, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.

In the above figure:

  • We create a static Observable based on a fixed list of items using Just operator.
  • We define a transformation function (convert a circle into a square) using Map operator.
  • We filter each yellow square using a Filter operator.
  • Observer can now watch these events and take action on the data stream.

A simple example

In this example, we create a static observable using rxgo.Just that emits values 1, 2, 3 ..6. Then, we use Map operator to multiply the values by 10 followed by a Filter operator to filter the values greater than 30. In the end, we observe the observable by calling the Observe() method on it.

func main() {
    // create
    observable := rxgo.Just(1, 2, 3, 4, 5, 6)().
        Map(times10). // times10 func multiplies each entry by 10
        Filter(greaterThan30) // greaterThan30 func filters values > 30
    
    // observe
    for item := range observable.Observe(){
        if item.Error(){
            fmt.Println("do something. error :: ", item.E)
        }
        fmt.Println("items are :: ", item.V)
    }
}

func times10(ctx context.Context, i interface{}) (interface{}, error) {
    return i.(int) * 10, nil
}

func greaterThan30(i interface{}) bool {
    return i.(int) > 30
}

Note :

  • All the operators to transform the data stream have been applied as arguments. This makes the individual functions more testable.
  • The Observable is lazy by default. This means that it emits events only once a subscription is made.
  • The Observe() method returns a <-chan rxgo.Item , which can be used like any other channel.
  • The error or the value is accessed using item.V or item.E .
  • By default, the item.Error() stops the observable on first error. To overcome this behavior, we can pass rxgo.WithErrorStrategy(rxgo.ContinueOnError) as option to Observe() method.

    for item := range ob.Observe(rxgo.WithErrorStrategy(rxgo.ContinueOnError)) {
      if item.Error() {
        fmt.Println("do something. error :: ", item.E)
    	}
      fmt.Println("items are ::", item.V)
    }
    

Users can find the complete working implementation of this example here.

Some concepts

Hot vs Cold Observables

When the data is produced by the Observable itself, it is called a cold Observable. When the data is produced outside the Observable, we call it a hot Observable. A hot observable stream is reproducible and can be used multiple times.

Backpressure

There are situations in reactive programming where the Observer can not cope up with Observable due to the high event production rate. In these situations, the system requires to behave sensibly by using one of the many backpressure handling techniques. For example,

  • Dropping the messages.
  • Sensible buffering strategies (time vs count).
  • Blocking the execution and processing the current set of events.
  • Throttling and debouncing strategies.
Sequential vs Parallel processing

This concept deals with the operators processing the data pipeline. By default, all the operations like map and filter are sequential. It is recommended to run the operations in parallel to using the maximum CPU. The parallel operation maintains the order in which the data stream was received.

Connectable Observable

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

A more realistic example

Let us say we want to perform a data migration that reads customer details from a data source (for example database), enriches the customer data with the customer’s address and saves the data to disk as a CSV file.

For this use case, we can design our data pipeline as follows

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    defer cancel()

    // initialize source and sink
    src := createSource(ctx)
    dst := createSink(ctx)

    // create an observable
    ob := rxgo.
        FromChannel(src.Ch, rxgo.WithPublishStrategy()).
        Filter(filterNamesWithA). // names that start with A
        Map(mapToCustomerWithAddress, rxgo.WithPool(32)). // enrich 
        Map(mapToUpperCase). // uppercase the customer name
        BufferWithTimeOrCount(rxgo.WithDuration(time.Millisecond*500), 5)

    // connect to observer
    ob.Connect()
    src.Read()
  
  // listen to observable
  for items := range ob.Observe() {
        for _, item := range items.V.([]interface{}) {
            dst.Write(item.(sink.CustomerWithAddress))
      }
    }
}

In the code snippet above, we start by creating a connection to our source and sink. We then create a Connectable Observable that performs the data transformation using the map and filter operators with different functions as arguments. At this point, the observable is lazy and we trigger it by connecting to it using ob.Connect() . Once the Observable starts producing events, it can be consumed and written to the destination using dest.Write() method.

A working implementation for the above example is available here. I recommend the interested users to clone the repository and try it themselves.

Conclusion

Reactive Extensions are a programming paradigm to work with asynchronous data-streams using the Observer pattern. It provides us with API to build a data pipeline and to write code that is simpler to understand and expresses the intent clearly. One thing I would like to clarify is that reactive programming does not build a reactive system. Reactive systems as described in the reactive manifesto are an architectural style to build responsive distributed systems. Reactive programming should be one of the tools in your toolbox to build reactive systems.

From a language perspective, Go already provides a lot of the tools out of the box to write highly concurrent systems, but it can be too verbose at times. Using RxGo can help us to avoid a lot of boilerplate and write more expressive code.

References

Images in the blogpost are taken from Rxgo and ReactiveX websites.