Pipe

Pipe

Python数据处理的中缀编程工具包

Pipe是一个Python模块,提供类似Shell的中缀语法编程能力。通过管道操作符'|'连接数据处理函数,实现高效的数据处理流程。该工具支持惰性求值、函数别名和部分初始化,内置多种数据操作函数如过滤、映射、分组等。Pipe允许创建自定义管道函数,适用于处理大型数据集和复杂数据流,有助于提高代码可读性和可维护性。

PipePython惰性求值函数式编程迭代器Github开源项目

Pipe — 中缀编程工具包

PyPI 每月下载量 支持的Python版本 GitHub工作流状态

启用类似sh的中缀语法(使用管道)的模块。

简介

例如,这是欧拉计划第2题的解决方案:

找出斐波那契数列中不超过四百万的所有偶数项之和。

假设fib是一个斐波那契数生成器:

sum(fib() | where(lambda x: x % 2 == 0) | take_while(lambda x: x < 4000000))

每个管道都是惰性求值的,可以被别名化,并且可以部分初始化,所以它可以重写为:

is_even = where(lambda x: x % 2 == 0) sum(fib() | is_even | take_while(lambda x: x < 4000000)

安装

要安装这个库,你只需运行以下命令:

# Linux/macOS python3 -m pip install pipe # Windows py -3 -m pip install pipe

使用

基本语法是像在shell中一样使用|

>>> from itertools import count >>> from pipe import select, take >>> sum(count() | select(lambda x: x ** 2) | take(10)) 285 >>>

一些管道需要一个参数:

>>> from pipe import where >>> sum([1, 2, 3, 4] | where(lambda x: x % 2 == 0)) 6 >>>

有些不需要参数:

>>> from pipe import traverse >>> for i in [1, [2, 3], 4] | traverse: ... print(i) 1 2 3 4 >>>

在这种情况下,允许使用调用括号:

>>> from pipe import traverse >>> for i in [1, [2, 3], 4] | traverse(): ... print(i) 1 2 3 4 >>>

本模块中现有的管道

按字母顺序列出可用的管道;当一个管道有多个名称时,这些是别名。

batched

类似Python 3.12的itertool.batched

>>> from pipe import batched >>> list("ABCDEFG" | batched(3)) [('A', 'B', 'C'), ('D', 'E', 'F'), ('G',)] >>>

chain

链接一系列可迭代对象:

>>> from pipe import chain >>> list([[1, 2], [3, 4], [5]] | chain) [1, 2, 3, 4, 5] >>>

警告:chain只展开仅包含可迭代对象的可迭代对象:

list([1, 2, [3]] | chain)

会引发TypeError: 'int' object is not iterable 考虑使用traverse。

chain_with(other)

类似itertools.chain,先产生给定可迭代对象的元素,然后产生其参数的元素

>>> from pipe import chain_with >>> list((1, 2, 3) | chain_with([4, 5], [6])) [1, 2, 3, 4, 5, 6] >>>

dedup(key=None)

去除重复值,如果提供了key函数则使用它。

>>> from pipe import dedup >>> list([-1, 0, 0, 0, 1, 2, 3] | dedup) [-1, 0, 1, 2, 3] >>> list([-1, 0, 0, 0, 1, 2, 3] | dedup(key=abs)) [-1, 0, 2, 3] >>>

enumerate(start=0)

内置的enumerate()作为一个Pipe:

>>> from pipe import enumerate >>> list(['apple', 'banana', 'citron'] | enumerate) [(0, 'apple'), (1, 'banana'), (2, 'citron')] >>> list(['car', 'truck', 'motorcycle', 'bus', 'train'] | enumerate(start=6)) [(6, 'car'), (7, 'truck'), (8, 'motorcycle'), (9, 'bus'), (10, 'train')] >>>

filter(predicate)

where(predicate)的别名,见where(predicate)

groupby(key=None)

类似itertools.groupby(sorted(iterable, key = keyfunc), keyfunc)

>>> from pipe import groupby, map >>> items = range(10) >>> ' / '.join(items | groupby(lambda x: "Odd" if x % 2 else "Even") ... | select(lambda x: "{}: {}".format(x[0], ', '.join(x[1] | map(str))))) 'Even: 0, 2, 4, 6, 8 / Odd: 1, 3, 5, 7, 9' >>>

islice()

就是itertools.islice函数作为一个Pipe:

>>> from pipe import islice >>> list((1, 2, 3, 4, 5, 6, 7, 8, 9) | islice(2, 8, 2)) [3, 5, 7] >>>

izip()

就是itertools.izip函数作为一个Pipe:

>>> from pipe import izip >>> list(range(0, 10) | izip(range(1, 11))) [(0, 1), (1, 2), (2, 3), (3, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10)] >>>

map(), select()

将作为参数给出的转换表达式应用于给定可迭代对象的每个元素

>>> list([1, 2, 3] | map(lambda x: x * x)) [1, 4, 9] >>> list([1, 2, 3] | select(lambda x: x * x)) [1, 4, 9] >>>

netcat

netcat Pipe通过TCP发送和接收字节:

data = [ b"HEAD / HTTP/1.0\r\n", b"Host: python.org\r\n", b"\r\n", ] for packet in data | netcat("python.org", 80): print(packet.decode("UTF-8"))

输出:

HTTP/1.1 301 Moved Permanently
Content-length: 0
Location: https://python.org/
Connection: close

permutations(r=None)

返回所有可能的排列:

>>> from pipe import permutations >>> for item in 'ABC' | permutations(2): ... print(item) ('A', 'B') ('A', 'C') ('B', 'A') ('B', 'C') ('C', 'A') ('C', 'B') >>>
>>> for item in range(3) | permutations: ... print(item) (0, 1, 2) (0, 2, 1) (1, 0, 2) (1, 2, 0) (2, 0, 1) (2, 1, 0) >>>

reverse

类似Python的内置reversed函数。

>>> from pipe import reverse >>> list([1, 2, 3] | reverse) [3, 2, 1] >>>

select(fct)

map(fct)的别名,见map(fct)

skip()

从给定的可迭代对象中跳过给定数量的元素,然后产生剩余元素

>>> from pipe import skip >>> list((1, 2, 3, 4, 5) | skip(2)) [3, 4, 5] >>>

skip_while(predicate)

类似itertools.dropwhile,当谓词为真时跳过给定可迭代对象的元素,然后产生其他元素:

>>> from pipe import skip_while >>> list([1, 2, 3, 4] | skip_while(lambda x: x < 3)) [3, 4] >>>

sort(key=None, reverse=False)

类似Python的内置"sorted"原语。

>>> from pipe import sort >>> ''.join("python" | sort) 'hnopty' >>> [5, -4, 3, -2, 1] | sort(key=abs) [1, -2, 3, -4, 5] >>>

t

类似Haskell的操作符":":

>>> from pipe import t >>> for i in 0 | t(1) | t(2): ... print(i) 0 1 2 >>>

tail(n)

产生给定可迭代对象的最后n个元素。

>>> from pipe import tail >>> for i in (1, 2, 3, 4, 5) | tail(3): ... print(i) 3 4 5 >>>

take(n)

从给定的可迭代对象中产生给定数量的元素,类似shell脚本中的head

>>> from pipe import take >>> for i in count() | take(5): ... print(i) 0 1 2 3 4 >>>

take_while(predicate)

类似itertools.takewhile,当谓词为真时产生给定可迭代对象的元素:

>>> from pipe import take_while >>> for i in count() | take_while(lambda x: x ** 2 < 100): ... print(i) 0 1 2 3 4 5 6 7 8 9 >>>

tee

tee 输出到标准输出并产生未更改的项,对于逐步调试管道很有用:

>>> from pipe import tee >>> sum(["1", "2", "3", "4", "5"] | tee | map(int) | tee) '1' 1 '2' 2 '3' 3 '4' 4 '5' 5 15 >>>

末尾的 15sum 的返回值。

transpose()

转置矩阵的行和列。

>>> from pipe import transpose >>> [[1, 2, 3], [4, 5, 6], [7, 8, 9]] | transpose [(1, 4, 7), (2, 5, 8), (3, 6, 9)] >>>

traverse

递归展开可迭代对象:

>>> list([[1, 2], [[[3], [[4]]], [5]]] | traverse) [1, 2, 3, 4, 5] >>> squares = (i * i for i in range(3)) >>> list([[0, 1, 2], squares] | traverse) [0, 1, 2, 0, 1, 4] >>>

uniq(key=None)

类似于 dedup(),但只对连续的值去重,如果提供了 key 函数则使用该函数(否则使用恒等函数)。

>>> from pipe import uniq >>> list([1, 1, 2, 2, 3, 3, 1, 2, 3] | uniq) [1, 2, 3, 1, 2, 3] >>> list([1, -1, 1, 2, -2, 2, 3, 3, 1, 2, 3] | uniq(key=abs)) [1, 2, 3, 1, 2, 3] >>>

where(predicate), filter(predicate)

只产生给定可迭代对象中匹配的项:

>>> list([1, 2, 3] | where(lambda x: x % 2 == 0)) [2] >>>

别忘了它们可以被别名:

>>> positive = where(lambda x: x > 0) >>> negative = where(lambda x: x < 0) >>> sum([-10, -5, 0, 5, 10] | positive) 15 >>> sum([-10, -5, 0, 5, 10] | negative) -15 >>>

构建自己的管道

你可以使用 Pipe 类构建自己的管道,像这样:

from pipe import Pipe square = Pipe(lambda iterable: (x ** 2 for x in iterable)) map = Pipe(lambda iterable, fct: builtins.map(fct, iterable) >>>

正如你所见,编写通常非常简短,运气好的话,你要包装的函数已经将可迭代对象作为第一个参数,这使得包装变得简单:

>>> from collections import deque >>> from pipe import Pipe >>> end = Pipe(deque) >>>

就这样,itrable | end(3) 就是 deque(iterable, 3):

>>> list(range(100) | end(3)) [97, 98, 99] >>>

如果情况变得更复杂,可以使用 Pipe 作为装饰器,装饰一个以可迭代对象作为第一个参数,其他可选参数在后面的函数:

>>> from statistics import mean >>> @Pipe ... def running_average(iterable, width): ... items = deque(maxlen=width) ... for item in iterable: ... items.append(item) ... yield mean(items) >>> list(range(20) | running_average(width=2)) [0, 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5, 18.5] >>> list(range(20) | running_average(width=10)) [0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5, 14.5] >>>

一次性管道

有时你只想要一行代码,在创建管道时可以直接指定函数的位置参数和命名参数

>>> from itertools import combinations >>> list(range(5) | Pipe(combinations, 2)) [(0, 1), (0, 2), (0, 3), (0, 4), (1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (3, 4)] >>>

带有初始起始值的简单累加和

>>> from itertools import accumulate >>> list(range(10) | Pipe(accumulate, initial=1)) [1, 1, 2, 4, 7, 11, 16, 22, 29, 37, 46] >>>

或者根据某些条件过滤数据

>>> from itertools import compress list(range(20) | Pipe(compress, selectors=[1, 0] * 10)) [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] >>> list(range(20) | Pipe(compress, selectors=[0, 1] * 10)) [1, 3, 5, 7, 9, 11, 13, 15, 17, 19] >>>

欧拉项目示例

求1000以下所有3或5的倍数之和。

>>> sum(count() | where(lambda x: x % 3 == 0 or x % 5 == 0) | take_while(lambda x: x < 1000)) 233168 >>>

求斐波那契数列中不超过四百万的偶数项之和。

sum(fib() | where(lambda x: x % 2 == 0) | take_while(lambda x: x < 4000000))

求前一百个自然数的平方和与和的平方之差。

>>> square = map(lambda x: x ** 2) >>> sum(range(101)) ** 2 - sum(range(101) | square) 25164150 >>>

深入探讨

部分管道

可以在不求值的情况下参数化 pipe:

>>> running_average_of_two = running_average(2) >>> list(range(20) | running_average_of_two) [0, 0.5, 1.5, 2.5, 3.5, 4.5, 5.5, 6.5, 7.5, 8.5, 9.5, 10.5, 11.5, 12.5, 13.5, 14.5, 15.5, 16.5, 17.5, 18.5] >>>

对于多参数管道,可以部分初始化,你可以将其视为柯里化:

some_iterable | some_pipe(1, 2, 3) some_iterable | Pipe(some_func, 1, 2, 3)

严格等同于:

some_iterable | some_pipe(1)(2)(3)

因此可以用来专门化管道,首先是一个简单的例子:

>>> @Pipe ... def addmul(iterable, to_add, to_mul): ... """对输入的每一项计算 (x + to_add) * to_mul。""" ... for i in iterable: ... yield (i + to_add) * to_mul >>> mul = addmul(0) # 这部分初始化 addmul,设置 to_add=0 >>> list(range(10) | mul(10)) [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]

这也适用于关键字参数:

>>> add = addmul(to_mul=1) # 这部分初始化 addmul,设置 `to_mul=1` >>> list(range(10) | add(10)) [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] >>>

现在来看一些有趣的东西:

>>> import re >>> @Pipe ... def grep(iterable, pattern, flags=0): ... for line in iterable: ... if re.match(pattern, line, flags=flags): ... yield line ... >>> lines = ["Hello", "hello", "World", "world"] >>> for line in lines | grep("H"): ... print(line) Hello >>>

现在让我们以两种方式重用它,首先是使用模式:

>>> lowercase_only = grep("[a-z]+$") >>> for line in lines | lowercase_only: ... print(line) hello world >>>

或者现在使用标志:

>>> igrep = grep(flags=re.IGNORECASE) >>> for line in lines | igrep("hello"): ... print(line) ... Hello hello >>>

惰性求值

Pipe 从头到尾都使用生成器,所以它本质上是惰性的。

在以下示例中,我们将使用 itertools.count:一个无限整数生成器。 我们还将使用tee管道,它会打印通过它的每个值。

以下示例什么也不做,tee不会打印任何内容,因此没有值通过它。这很好,因为生成无限平方序列是"缓慢"的。

>>> result = count() | tee | select(lambda x: x ** 2) >>>

链接更多管道仍然不会使前面的管道开始生成值,在下面的示例中,没有一个值从count中被提取出来:

>>> result = count() | tee | select(lambda x: x ** 2) >>> first_results = result | take(10) >>> only_odd_ones = first_results | where(lambda x: x % 2) >>>

不使用变量的相同示例:

>>> result = (count() | tee ... | select(lambda x: x ** 2) ... | take(10) ... | where(lambda x: x % 2)) >>>

只有当实际需要值时,生成器才开始工作。

在以下示例中,只会从count中提取两个值:

  • 0被平方(变成0),轻易通过take(10),但被where丢弃
  • 1被平方(变成1),也轻易通过take(10),通过where,并通过take(1)

此时take(1)已满足,所以不需要进行其他计算。注意tee打印通过它的01

>>> result = (count() | tee ... | select(lambda x: x ** 2) ... | take(10) ... | where(lambda x: x % 2)) >>> print(list(result | take(1))) 0 1 [1] >>>

废弃

在pipe 1.x中,许多函数返回可迭代对象,而许多其他函数返回非可迭代对象,这造成了混淆。那些返回非可迭代对象的函数只能用作管道表达式的最后一个函数,所以它们实际上是无用的:

range(100) | where(lambda x: x % 2 == 0) | add

可以改写为同样易读的形式:

sum(range(100) | where(lambda x: x % 2 == 0))

因此,所有返回非可迭代对象的管道都被废弃(发出警告),并最终在pipe 2.0中被移除。

我应该怎么做?

哦,你刚刚升级了pipe,遇到了异常,然后来到这里?你有三个解决方案:

  1. 停止使用闭合管道,将...|...|...|...|as_list替换为list(...|...|...|),就是这样,甚至更短。

  2. 如果"闭合管道"对你来说不是问题,而你真的喜欢它们,只需重新实现你真正需要的几个,通常只需要很少的代码行,或者从这里复制它们。

  3. 如果你仍然依赖很多闭合管道并且时间紧迫,只需pip install pipe<2

并开始使用Python开发模式测试你的项目,这样你就可以在警告变成问题之前捕获它们。

但我喜欢它们,请重新引入它们!

这个问题已经在#74中讨论过了。

一个@Pipe通常可以在1到3行代码的函数中轻松实现,而pipe模块的目标不是提供所有可能性,而是提供Pipe装饰器。

因此,如果你需要更多管道、闭合管道、奇怪的管道等等,可以随意在你的项目中实现它们,并将已实现的管道视为如何实现的示例。

请参阅下面的"构建你自己的管道"段落。

编辑推荐精选

TRAE编程

TRAE编程

AI辅助编程,代码自动修复

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

AI工具TraeAI IDE协作生产力转型热门
蛙蛙写作

蛙蛙写作

AI小说写作助手,一站式润色、改写、扩写

蛙蛙写作—国内先进的AI写作平台,涵盖小说、学术、社交媒体等多场景。提供续写、改写、润色等功能,助力创作者高效优化写作流程。界面简洁,功能全面,适合各类写作者提升内容品质和工作效率。

AI辅助写作AI工具蛙蛙写作AI写作工具学术助手办公助手营销助手AI助手
问小白

问小白

全能AI智能助手,随时解答生活与工作的多样问题

问小白,由元石科技研发的AI智能助手,快速准确地解答各种生活和工作问题,包括但不限于搜索、规划和社交互动,帮助用户在日常生活中提高效率,轻松管理个人事务。

热门AI助手AI对话AI工具聊天机器人
Transly

Transly

实时语音翻译/同声传译工具

Transly是一个多场景的AI大语言模型驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

AI办公办公工具AI工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图热门
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

热门AI开发模型训练AI工具讯飞星火大模型智能问答内容创作多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

咔片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

热门AI辅助写作AI工具讯飞绘文内容运营AI创作个性化文章多平台分发AI助手
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

下拉加载更多