Python反应式编程详细操作教程
on_next()事件-这意味着数据流中有一个元素。
on_completed()事件-这表示排放已结束,没有更多的物品要来。
on_error()事件-这也意味着发射已经结束,但是如果 observable 抛出了错误。
pip install RxPY
get_strings()-用于从观察者获取字符串。
PrintObserver()-用于从观察者打印字符串。它使用观察者类的所有三个事件。它还使用了subscription()类。
# Filename : example.py
# Copyright : 2020 By Bianchenghao6
# Author by : bianchenghao6.com
# Date : 2020-08-22
from rx import Observable, Observer
def get_strings(observer):
observer.on_next("Ram")
observer.on_next("Mohan")
observer.on_next("Shyam")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Finished")
def on_error(self, error):
print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())
Received Ram
Received Mohan
Received Shyam
Finished
pip install pyfunctional
# Filename : example.py
# Copyright : 2020 By Bianchenghao6
# Author by : bianchenghao6.com
# Date : 2020-08-22
from functional import seq
result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)
print ("Result: {}".format(result))
Result: 6