1. 什么是pipeline
管道一词,对于熟悉linux的人来说并不陌生,在shell编程时,把若干个命令连接起来,前一个命令的输出是后一个命令的输入,最终完成一个流失计算。这是一种优秀的设计思想,你可以在大数据流失计算上找到相同的操作,python也可以实现这样的计算方法。
/bin/top -b -d 1 -n 1 |awk '{ if (NR > 6) print }' |awk '{ if ($9 > 10) print $1, $2, $9, $10, $12}' | sort -rn -k +3
上面这条命令就是管道技术的实际应用,它有多个命令组成,每个命令之间由 | 分隔,前一个命令的输出作为后一个命令的输入,最后,整个命令输出CPU使用率超过10%的进程信息。如果不使用管道技术,你无法在命令行界面里使用这么多命令连续处理数据。
2. pipeline适用于哪些操作
这种编程模式特别适合对数据的处理,假设我们要对字符串数据做如下处理:
- 大写转小写
- 去除字符串头部和尾部的回车字符
- 将字符串里的 – 字符替换成空格
你可能会想到写一个函数,来完成这三个功能,从代码组织上考虑,这不是一个好主意。试想,如果某些字符串只需要做前两项处理,你难道要在写一个只包含1和2两个功能的函数么?
合理的设计是编写是三个函数,以最小颗粒度实现这些功能
def lower(string_input): """ 大写转小写 :param string_input: :return: """ return string_input.lower() def remove_(string_input): """ 去除空格 :param string_input: :return: """ return string_input.replace("-", " ") def strip(string_input): """ 去除回车字符 :param string_input: :return: """ return string_input.strip("/n")
现在,你有了三个专门用来处理字符串的函数,问题是你如何组织它们,调用它们呢?
你要做如下考虑
- 不同的数据,需要的处理方式不同
- 代码要容易编写,不能每多出一种组合就写一个函数
基于上面的两个要求,我设计下面的函数
from functools import reduce def pipeline(string_input, funcs): return reduce(lambda x, y: y(x), funcs, string_input) result = pipeline("IT-is-a-test/n", [lower, remove_, strip]) print(result) reduce的原型是 def reduce(function, sequence, initial=None): pass
从定义上来看:
- reduce的第一个参数function应当是一个函数,function必须有两个参数,参数1是上一次执行结果,参数2是sequence的遍历结果
- sequence必须是一个可迭代对象
- initial可以做为function的第一个参数
就本段代码而言,string_input作为lambda第一次调用时的x,y是对funcs的遍历,从第二次开始,x都是y(x)的执行结果,每一次y调用执行的结果都作为下一次的输入。
3. fastcore
我们不必自己费力去写pipeline函数,fastcore这个库提供了更好的封装和支持,它从Julia, Ruby,和 Haskell那里借鉴了很多优秀的思想,提供了功能式编程模式,使用fastcore,可以更方便的写出pipeline
pip install fastcore
用fastcore替换我前面编写的pipeline函数
from fastcore.transform import Pipeline input_string = "IT-is-a-test/n" pipe = Pipeline([lower, remove_, strip]) output = pipe(input_string) print(output)
从调用方式上来讲,比自己写pipeline函数要方便许多。
原创文章,作者:,如若转载,请注明出处:https://blog.ytso.com/272280.html