Python反应式编程



Python反应式编程

Python反应式编程详细操作教程

反应式编程是一种处理数据流和变化传播的编程范例。这意味着当一个组件发出数据流时,更改将通过反应式编程库传播到其他组件。变化的传播将持续到到达最终接收者为止。事件驱动编程和反应式编程之间的区别在于,事件驱动编程围绕事件,而反应编程围绕数据。

用于反应式编程的ReactiveX或RX

ReactiveX或Raective Extension是反应式编程的最著名实现。 ReactiveX的工作取决于以下两个类-

可观察的班级

此类是数据流或事件的源,它包装传入的数据,以便可以将数据从一个线程传递到另一个线程。除非有观察者订阅,否则它不会提供数据。

观察者类

此类使用了
observable 发出的数据流。可以有多个可观察的观察者,每个观察者将接收发射的每个数据项。观察者可以通过订阅来接收三种类型的事件

on_next()事件-这意味着数据流中有一个元素。
on_completed()事件-这表示排放已结束,没有更多的物品要来。
on_error()事件-这也意味着发射已经结束,但是如果 observable 抛出了错误。

RxPY –用于反应式编程的Python模块

RxPY是一个Python模块,可用于反应式编程。我们需要确保已安装该模块。以下命令可用于安装RxPY模块-
 pip install RxPY

示例

以下是一个Python脚本,它使用
RxPY 模块及其类
Observable
Observe用于反应式编程。基本上有两个类-

get_strings()-用于从观察者获取字符串。
PrintObserver()-用于从观察者打印字符串。它使用观察者类的所有三个事件。它还使用了subscription()类。

 # Filename : example.py
# Copyright : 2020 By Bianchenghao6
# Author by : bianchenghao6.com
# Date : 2020-08-22
from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

输出

 Received Ram
Received Mohan
Received Shyam
Finished

用于反应式编程的PyFunctional库

PyFunctional 是另一个可用于反应式编程的Python库。它使我们能够使用Python编程语言创建功能程序。之所以有用,是因为它允许我们使用链接的函数运算符来创建数据管道。

RxPY与PyFunctional之间的区别

这两个库都用于反应式编程,并以相似的方式处理流,但是两者之间的主要区别取决于数据的处理。
RxPY 处理系统中的数据和事件,而
PyFunctional 专注于使用功能编程范例进行数据转换。

安装PyFunctional模块

在使用该模块之前,我们需要先安装该模块。可以使用pip命令如下安装-
 pip install pyfunctional

示例

以下示例使用
PyFunctional 模块及其
seq 类,这些类充当我们可以迭代和操作的流对象。在该程序中,它使用lamda函数映射序列,该函数将每个值加倍,然后过滤x大于4的值,最后将序列缩减为所有剩余值的总和。
 # Filename : example.py
# Copyright : 2020 By Bianchenghao6
# Author by : bianchenghao6.com
# Date : 2020-08-22
from functional import seq
result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)
print ("Result: {}".format(result))

输出

 Result: 6