ReactiveX is an API for asynchronous programming with a bounded or unbounded stream of data. In this programming model, the stream of data is called an
Observable . We also define an
Observer that subscribes to the observable and reacts to the emitted events. Reactive programming also defines a set of
Operators to create and compose Observables. Using this programming model helps us to create complex data pipelines. Observables can be composed to create different workflows that are non-blocking and reactive.
ReactiveX is an umbrella project that provides a reactive extension to major programming languages, platforms and frameworks. It provides a consistent API for working with data streams irrespective of the nature of the programming language.
From ReactiveX website:
ReactiveX is not biased toward some particular source of concurrency or asynchronicity. Observables can be implemented using thread-pools, event loops, non-blocking I/O, actors (such as from Akka), or whatever implementation suits your needs, your style, or your expertise. Client code treats all of its interactions with Observables as asynchronous, whether your underlying implementation is blocking or non-blocking and however you choose to implement it.
In this blog post, we will explore some concepts used in Reactive programming and see few examples in Go. All the code referenced in this blog post can be found here. Interested users can clone the repository and follow the instructions in readme.md. If you find any mistakes or have any questions or queries, please create an issue here and I will be more than happy to clarify them.
From RxGo website:
The RxGo implementation is based on the concept of pipelines. In a nutshell, a pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
In the above figure:
- We create a static Observable based on a fixed list of items using Just operator.
- We define a transformation function (convert a circle into a square) using Map operator.
- We filter each yellow square using a Filter operator.
- Observer can now watch these events and take action on the data stream.
A simple example⌗
In this example, we create a static observable using
rxgo.Just that emits values 1, 2, 3 ..6. Then, we use Map operator to multiply the values by 10 followed by a Filter operator to filter the values greater than 30. In the end, we observe the observable by calling the
Observe() method on it.
- All the operators to transform the data stream have been applied as arguments. This makes the individual functions more testable.
- The Observable is lazy by default. This means that it emits events only once a subscription is made.
Observe()method returns a
<-chan rxgo.Item, which can be used like any other channel.
- The error or the value is accessed using
- By default, the
item.Error()stops the observable on first error. To overcome this behavior, we can pass
rxgo.WithErrorStrategy(rxgo.ContinueOnError)as option to
Users can find the complete working implementation of this example [here] (https://github.com/PrakharSrivastav/reactivex-go-programming/tree/master/a_simple).
Hot vs Cold Observables⌗
When the data is produced by the Observable itself, it is called a cold Observable. When the data is produced outside the Observable, we call it a hot Observable. A hot observable stream is reproducible and can be used multiple times.
There are situations in reactive programming where the Observer can not cope up with Observable due to the high event production rate. In these situations, the system requires to behave sensibly by using one of the many backpressure handling techniques. For example,
- Dropping the messages.
- Sensible buffering strategies (time vs count).
- Blocking the execution and processing the current set of events.
- Throttling and debouncing strategies.
Sequential vs Parallel processing⌗
This concept deals with the operators processing the data pipeline. By default, all the operations like map and filter are sequential. It is recommended to run the operations in parallel to using the maximum CPU. The parallel operation maintains the order in which the data stream was received.
A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way, you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.
A more realistic example⌗
Let us say we want to perform a data migration that reads customer details from a data source (for example database), enriches the customer data with the customer’s address and saves the data to disk as a CSV file.
For this use case, we can design our data pipeline as follows
In the code snippet above, we start by creating a connection to our source and sink. We then create a Connectable Observable that performs the data transformation using the map and filter operators with different functions as arguments. At this point, the observable is lazy and we trigger it by connecting to it using
ob.Connect() . Once the Observable starts producing events, it can be consumed and written to the destination using
A working implementation for the above example is available here. I recommend the interested users to clone the repository and try it themselves.
Reactive Extensions are a programming paradigm to work with asynchronous data-streams using the Observer pattern. It provides us with API to build a data pipeline and to write code that is simpler to understand and expresses the intent clearly. One thing I would like to clarify is that reactive programming does not build a reactive system. Reactive systems as described in the [reactive manifesto] (https://www.reactivemanifesto.org) are an architectural style to build responsive distributed systems. Reactive programming should be one of the tools in your toolbox to build reactive systems.
From a language perspective, Go already provides a lot of the tools out of the box to write highly concurrent systems, but it can be too verbose at times. Using RxGo can help us to avoid a lot of boilerplate and write more expressive code.
- RxGo : Reactive Extension for Go
- ReactiveX: Toolbox for reactive programming
- The Reactive Manifesto : Principles to create robust reactive systems
- RxGo : Creating observables
- RxGo : Transforming observables
- RxGo : Filtering observables
- RxGo : Combining observables
- RxGo : Error handling operators
- RxGo : Utility operators
- RxGo : Conditional operators
- RxGo : Methamatical operators
Images in the blogpost are taken from Rxgo and ReactiveX websites.