Overview of the RxPY library
Hello, Habre!
Today we will talk about a good library for managing data flows in Python – RxPY – Implementation of Reactive Extensions for our favorite language. In version 4.0.4 this library has received a number of improvements, and today we will analyze its main functionality.
Contents
Basics of RxPY
RxPY – This is a library that implements the principles of functional reactive programming in Python. It allows you to create and manage asynchronous streams of data, combining them, filtering and transforming them with the help of chains of statements. Main components of RxPY:
-
Observable: A data source that can emit events
-
Observer: A subscriber that responds to events from an Observable
-
Operators: functions that allow you to transform, filter, and combine Observables.
Installing RxPY is easy:
pip install reactivex
In version 4.0.4 RxPY has undergone significant changes:
-
Renaming the module: now we import
reactivex
and notrx
. -
Improved typing: Added type directives for better IDE support.
-
Updated work with operators: using the method
pipe
for a chain of operators.
If you’ve been working with RxPY v3, here’s how to integrate the whole thing:
-
Import change:
import rx
->import reactivex as rx
. -
Operators: instead of
observable.map()
now we useobservable.pipe(ops.map())
. -
Removed obsolete features: some old operators and methods have been removed or renamed.
Creating an Observable and working with operators
Creating an Observable
just()
Creates an Observable that emits a single value.
import reactivex as rx
observable = rx.just(42)
observable.subscribe(lambda x: print(f"Значение: {x}"))
from_()
Converts an iterable object to an Observable.
observable = rx.from_([1, 2, 3, 4, 5])
observable.subscribe(lambda x: print(f"Элемент: {x}"))
interval()
Returns a sequence of numbers with a given time interval.
import time
from reactivex import interval
observable = interval(1) # каждую секунду
subscription = observable.subscribe(lambda x: print(f"Тик: {x}"))
time.sleep(5)
subscription.dispose()
timer()
Outputs a value after a specified delay.
from reactivex import timer
observable = timer(3) # через 3 секунды
observable.subscribe(lambda x: print("Таймер сработал!"))
Observable transformation
map()
Applies the function of each element.
from reactivex import operators as ops
observable.pipe(
ops.map(lambda x: x * x)
).subscribe(lambda x: print(f"Квадрат: {x}"))
flat_map()
Expands nested Observables.
def duplicate(x):
return rx.from_([x, x*2, x*3])
observable.pipe(
ops.flat_map(duplicate)
).subscribe(lambda x: print(f"Значение: {x}"))
scan()
Analog reduce
but outputs the accumulated result on each iteration.
observable.pipe(
ops.scan(lambda acc, x: acc + x, seed=0)
).subscribe(lambda x: print(f"Сумма: {x}"))
Data filtering
filter()
Selects elements that satisfy the condition.
observable.pipe(
ops.filter(lambda x: x % 2 == 0)
).subscribe(lambda x: print(f"Четное число: {x}"))
debounce()
Ignores values if they arrive very quickly.
observable.pipe(
ops.debounce(0.5)
).subscribe(lambda x: print(f"Получено: {x}"))
distinct()
Skips only unique values.
observable.pipe(
ops.distinct()
).subscribe(lambda x: print(f"Уникальное значение: {x}"))
Combination of Observable
merge()
Combines multiple Observables into one stream.
obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_([4, 5, 6])
rx.merge(obs1, obs2).subscribe(lambda x: print(f"Элемент: {x}"))
zip()
Combines the elements of several Observables into a tuple.
obs1 = rx.from_([1, 2, 3])
obs2 = rx.from_(['a', 'b', 'c'])
rx.zip(obs1, obs2).subscribe(lambda x: print(f"Сочетание: {x}"))
combine_latest()
Returns the combination of the last elements of each Observable.
obs1 = rx.interval(1)
obs2 = rx.interval(1.5)
rx.combine_latest(obs1, obs2).subscribe(lambda x: print(f"Комбинация: {x}"))
Data flow testing
There are hot and cold Observables
-
Cold Observable start issuing subscription data.
-
Hot Observables already generate data independently of subscribers.
An example of a cold Observable:
def create_cold_observable(scheduler):
return rx.from_([1, 2, 3], scheduler=scheduler)
scheduler = reactivex.testing.TestScheduler()
observable = create_cold_observable(scheduler)
An example of a hot Observable:
def create_hot_observable(scheduler):
return scheduler.create_hot_observable(
reactivex.testing.ReactiveTest.on_next(150, 1),
reactivex.testing.ReactiveTest.on_next(210, 2),
)
scheduler = reactivex.testing.TestScheduler()
observable = create_hot_observable(scheduler)
Using TestScheduler:
from reactivex.testing import TestScheduler, ReactiveTest
def test_map_operator():
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
ReactiveTest.on_next(150, 1),
ReactiveTest.on_next(210, 2),
ReactiveTest.on_completed(300)
)
def create():
return xs.pipe(ops.map(lambda x: x * 10))
results = scheduler.start(create)
assert results.messages == [
ReactiveTest.on_next(210, 20),
ReactiveTest.on_completed(300)
]
Testing using Marbles
Marble charts allow you to visualize data flows.
from reactivex.testing import marbles_testing
def test_filter_operator():
with marbles_testing() as (start, cold, hot, exp):
source = cold('--1-2-3-4-5-|')
expected = exp('----2---4---|')
result = start(source.pipe(
ops.filter(lambda x: int(x) % 2 == 0)
))
assert result == expected
A couple of examples of using RxPY
Integration with asyncio
RxPY goes well with asyncio
:
import asyncio
async def main():
loop = asyncio.get_event_loop()
observable = rx.interval(1).pipe(
ops.take(5)
)
observable.subscribe(
on_next=lambda x: print(f"Tick: {x}"),
on_error=lambda e: print(f"Error: {e}"),
on_completed=lambda: print("Completed"),
scheduler=rx.scheduler.AsyncIOScheduler(loop)
)
await asyncio.sleep(6)
asyncio.run(main())
RxPY can also help when working with message queues and caches in Redis:
import redis
from reactivex import Subject
r = redis.Redis()
def listen_to_channel(channel):
pubsub = r.pubsub()
pubsub.subscribe(channel)
for message in pubsub.listen():
if message['type'] == 'message':
yield message['data']
channel_observable = rx.from_(listen_to_channel('my_channel'))
channel_observable.subscribe(lambda msg: print(f"Received: {msg}"))
Event processing in event-driven architecture:
event_subject = Subject()
def handle_event(event):
print(f"Handling event: {event}")
event_subject.pipe(
ops.filter(lambda e: e['type'] == 'click'),
ops.map(lambda e: e['payload'])
).subscribe(handle_event)
# Где-то в коде
event_subject.on_next({'type': 'click', 'payload': {'x': 100, 'y': 200}})
event_subject.on_next({'type': 'hover', 'payload': {'x': 150, 'y': 250}})
Conclusion
RxPY is a great find for those who want to manage asynchronous data streams in a new way. My recommendation: use RxPY if you work with large volumes of asynchronous data or build event-driven systems. In such projects, she will reveal her potential.
RxPY has its entry threshold. If you need to solve a simple task with a minimal level of asynchrony, maybe the default libraries asyncio
or flows will be easier and faster to master. But when it comes to complex and dynamic systems, RxPY may already be needed.
You can learn more about RxPY in their git.
In conclusion, we remind you about the open lessons that will be held in October as part of the “Microservice Architecture” course:
-
October 23. Metrics and Prometheus: Let’s discuss how to collect and use metrics with Prometheus in Kubernetes for application monitoring. Record by link
-
October 24. Message brokers: RabbitMQ and Kafka – Learn how to use RabbitMQ and Kafka to organize asynchronous communication between microservices. Record by link