
用于大规模文本数据处理和去重的开源Python库
DataTrove是一个开源Python库,专门用于处理、过滤和去重大规模文本数据。它提供预构建的常用处理模块和自定义功能支持。该库的处理流程可在本地或Slurm集群上运行,具有低内存消耗和多步骤设计,适合处理大型语言模型训练数据等大规模工作负载。DataTrove支持多种文件系统,为数据处理提供灵活解决方案。
DataTrove是一个用于大规模处理、过滤和去重文本数据的库。它提供了一套预建的常用处理模块,并提供了一个框架来轻松添加自定义功能。
DataTrove处理流水线是平台无关的,可以在本地或Slurm集群上直接运行。其(相对)低内存使用和多步骤设计使其非常适合大型工作负载,例如处理LLM的训练数据。
通过fsspec支持本地、远程和其他文件系统。
pip install datatrove[FLAVOUR]
可用的版本(可以用,组合,例如[processing,s3]):
all 安装所有内容:pip install datatrove[all]io 用于读取warc/arc/wet文件和arrow/parquet格式的依赖:pip install datatrove[io]processing 用于文本提取 、过滤和分词的依赖:pip install datatrove[processing]s3 s3支持:pip install datatrove[s3]cli 命令行工具:pip install datatrove[cli]你可以查看以下示例:
gpt2分词器对C4数据集的英语部分进行分词每个流水线模块处理datatrove Document格式的数据:
text 每个样本的实际文本内容id 此样本的唯一标识符(字符串)metadata 可以存储任何额外信息的字典每个流水线模块接受一个Document生成器作为输入,并返回另一个Document生成器。
DocumentDocument保存到磁盘/云端Document流水线被定义为一系列流水线模块。例如,以下流水线将从磁盘读取数据,随机过滤(删除)一些文档,然后将它们写回磁盘:
from datatrove.pipeline.readers import CSVReader from datatrove.pipeline.filters import SamplerFilter from datatrove.pipeline.writers import JsonlWriter pipeline = [ CSVReader( data_folder="/my/input/path" ), SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]
流水线是平台无关的,这意味着同一流水线可以在不同的执行环境中无缝运行,而无需更改其步骤。每个环境都有自己的PipelineExecutor。 所有执行器的一些通用选项:
pipeline 由应运行的流水线步骤组成的列表logging_dir 用于保存日志文件、统计信息等的数据文件夹。不要为不同的流水线/作业重复使用文件夹,因为这会覆盖你的统计信息、日志和完成情况。skip_completed (布尔值,默认为True)datatrove会跟踪已完成的任务,以便在重新启动作业时可以跳过这些任务。设置为False可禁用此行为randomize_start_duration (整数,默认为0)延迟每个任务开始的最大秒数,以防止所有任务同时开始并可能使系统过载。调用执行器的run方法来执行其流水线。
[!TIP] Datatrove通过在
${logging_dir}/completions文件夹中创建标记(空文件)来跟踪哪些任务成功完成。一旦作业完成,如果某些任务失败,你可以简单地重新启动完全相同的执行器,datatrove将检查并只运行之前未完成的任务。
[!CAUTION] 如果你因为某些任务失败而重新启动流水线,不要改变总任务数,因为这会影响输入文件/分片的分布。
此执行器将在本地机器上启动流水线。 选项:
tasks 要运行的总任务数workers 同时运行多少个任务。如果为-1,则不限制。任何> 1的值都将使用多进程来执行任务。start_method 用于生成多进程Pool的方法。如果workers为1则忽略</details> <details> <summary>多节点并行</summary>from datatrove.executor import LocalPipelineExecutor executor = LocalPipelineExecutor( pipeline=[ ... ], logging_dir="logs/", tasks=10, workers=5 ) executor.run()
你可以让不同的节点/机器处理总任务的不同部分,方法是使用local_tasks和local_rank_offset。对于每个节点/实例/机器,使用以下选项启动:
tasks 要执行的总任务数(跨所有机器)。这个值在每台机器上必须相同,否则输入文件分布可能会重叠! 示例:500local_tasks 在这台特定机器上将执行多少个总任务。注意,你可以为每台机器使用不同的值。示例:100local_rank_offset 在这台机器上执行的第一个任务的序号。如果这是你启动作业的第3台机器,而前两台机器分别运行了250和150个作业,那么当前机器的这个值应该是400。要获得最终合并的统计信息,你需要在包含所有机器统计信息的路径上手动调用merge_stats脚本。
此执行器将在Slurm集群上启动流水线,使用Slurm作业数组来分组和管理任务。 选项:
tasks 要运行的总任务数。必需time Slurm时间限制字符串。必需partition Slurm分区。必需workers 同时运行多少个任务。如果为-1,则不限制。Slurm将一次运行workers个任务。(默认:-1)job_name Slurm作业名称(默认:"data_processing")depends 另一个SlurmPipelineExecutor实例,它将作为此流水线的依赖项(当前流水线只有在依赖的流水线成功完成后才会开始执行)sbatch_args 包含你想传递给sbatch的任何其他参数的字典slurm_logs_folder 保存Slurm日志文件的位置。如果为logging_dir使用本地路径,它们将保存在logging_dir/slurm_logs中。如果不是,它们将保存在当前目录的子目录中。cpus_per_task 给每个任务分配多少CPU(默认:1)qos Slurm qos(默认:"normal")mem_per_cpu_gb 每个CPU的内存,单位为GB(默认:2)env_command 激活Python环境的自定义命令(如果需要)condaenv 要激活的conda环境venv_path 要激活的Python环境路径max_array_size $ scontrol show config中的_MaxArraySize_值。如果任务数超过此数字,将分成多个数组作业(默认:1001)max_array_launch_parallel 如果由于max_array_size需要多个作业,是否一次性全部启动(并行)或顺序启动(默认:False)stagger_max_array_jobs 当max_array_launch_parallel为True时,这决定了在启动每个并行作业之间等待多少秒(默认:0)run_on_dependency_fail 当我们依赖的作业完成时开始执行,即使它失败了(默认:False)randomize_start 在约3分钟的窗口内随机化每个任务的开始时间。在大量访问S3存储桶等情况下很有用。(默认:False)</details> ## 日志记录from datatrove.executor import SlurmPipelineExecutor executor1 = SlurmPipelineExecutor( pipeline=[ ... ], job_name="my_cool_job1", logging_dir="logs/job1", tasks=500, workers=100, # 省略以一次运行所有任务 time="10:00:00", # 10小时 partition="hopper-cpu" ) executor2 = SlurmPipelineExecutor( pipeline=[ ... ], job_name="my_cool_job2", logging_dir="logs/job2", tasks=1, time="5:00:00", # 5小时 partition="hopper-cpu", depends=executor1 # 此流水线只有在executor1成功完成后才会启动 ) # executor1.run() executor2.run() # 这实际上会启动executor1,因为它是一个依赖项,所以不需要显式启动它
对于 logging_dir 为 mylogspath/exp1 的管道,将创建以下文件夹结构:
└── mylogspath/exp1
│── executor.json ⟵ 执行器选项和管道步骤的 json 转储
│── launch_script.slurm ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
│── executor.pik ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
│── ranks_to_run.json ⟵ 正在运行的任务列表
│── logs/
│ └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ 每个任务的单独日志文件
│── completions/
│ └──[00004, 00007, 00204, ...] ⟵ 标记任务已完成的空文件。用于重新启动/恢复作业时(只会运行未完成的任务)
│── stats/
│ └──[00000.json, 00001.json, 00002.json, ...] ⟵ 每个任务的单独统计信息(处理的样本数量、过滤的、移除的等)
└── stats.json ⟵ 所有任务的全局统计信息
</details>
日志消息支持颜色化。默认情况下,控制台消息会自动检测颜色化,而日志文件(logs/task_XXXXX.log)则禁用颜色化。 要明确启用或禁用颜色化,可以设置以下环境变量:
DATATROVE_COLORIZE_LOGS 设为 "1" 为控制台日志消息添加 ANSI 颜色,设为 "0" 禁用颜色化。DATATROVE_COLORIZE_LOG_FILES 设为 "1" 为保存到 logs/task_XXXXX.log 的日志消息添加 ANSI 颜色。Datatrove 通过 fsspec 支持多种输入/输出源。
有几种方法可以为 datatrove 块提供路径(用于 input_folder、logging_dir、data_folder 等参数):
str:最简单的方法是传递单个字符串。例如:/home/user/mydir、s3://mybucket/myinputdata、hf://datasets/allenai/c4/en/
(str, fsspec filesystem instance):一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))
(str, dict):一个字符串路径和一个用于初始化文件系统的选项字典。例如(等同于上一行):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})
DataFolder:你可以直接初始化一个 DataFolder 对象并将其作为参数传递
在底层,这些参数组合由 get_datafolder 解析。
通常,管道会以 Reader 块开始。
大多数读取器都有一个 data_folder 参数 — 指向包含要读取数据的文件夹的路径。
这些文件将分布在每个任务中。如果你有 N 个任务,排名为 i(从 0 开始)的任务将处理文件 i, i+N, i+2N, i+3N,...。
在内部,每个读取器读取数据并将其转换为字典,然后创建一个 Document 对象。
大多数读取器的一些常见选项:
text_key 包含每个样本文本内容的字典键。默认:textid_key 包含每个样本 id 的字典键。默认:iddefault_metadata 一个字典,用于添加任何你想要的默认元数据值(例如它们的来源)recursive 是否递归查找 data_folder 子目录中的文件glob_pattern 用此字段匹配特定文件。例如,glob_pattern="*/warc/*.warc.gz" 将匹配 data_folder 的每个子目录的 warc/ 文件夹中扩展名为 .warc.gz 的文件adapter 此函数接收从读取器获得的原始字典,并返回带有 Document 字段名的字典。如果需要,你可以覆盖此函数(_default_adapter)。limit 只读取一定数量的样本。对测试/调试有用你可以使用 extractors 从原始 html 中提取文本内容。datatrove 中最常用的提取器是 Trafilatura,它使用 trafilatura 库。
Filters 是任何数据处理管道中最重要的块之一。Datatrove 的过滤器块接收一个 Document 并返回一个布尔值(True 保留文档,False 移除它)。被移除的样本不会继续到下一个管道阶段。你还可以通过将 Writer 传递给 excluded_writer 参数来将被移除的样本保存到磁盘。
一旦你完成了数据处理,你可能想要将其保存到某个地方。为此,你可以使用 writer。
写入器需要一个 output_folder(数据应该保存的路径)。你可以选择要使用的 compression(默认:gzip)和每个文件保存的文件名。
对于 output_filename,使用以下参数应用模板:
${rank} 替换为当前任务的排名。注意,如果没有这个标签,不同的任务可能会尝试写入相同的位置${id} 替换为样本 id${tag} 将被相应的 document.metadata['tag'] 值替换一个根据 lang 元数据字段分离样本的示例:
JsonlWriter(
f"{MAIN_OUTPUT_PATH}/non_english/",
output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz", # 文件夹结构:language/dump/file
)
对于去重,请查看示例 minhash_deduplication.py、sentence_deduplication.py 和 exact_substrings.py。
对于数据的摘要统计,你可以使用 Stats 块。这些块提供了一种简单的方法来以分布式方式收集数据集的数据概况。这是一个两步过程:
summary(所有文档计入 "summary" 键)、fqdn(完全限定域名分组)、suffix(URL 路径最后部分分组)或 histogram(基于值的分组)。每个结果统计信息保存在具有以下结构的单独文件中:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json
每个这样的文件都是一个 MetricStatsDict 对象,你可以使用以下方法轻松加载:
from datatrove.pipeline.stats.summary_stats import MetricStatsDict import json stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json"))) # 例如,nytimes.com 文档的总长度 stats["nytimes.com"].total # 或 cnn.com 文档的平均值 stats["cnn.com"].mean
可用的统计信息如下:
contamination_stats.py:word_contamination_{words[0]}:文档中单词污染的频率。doc_stats.py:length:文档长度,white_space_ratio:空白字符比例,non_alpha_digit_ratio:非字母和非数字字符比例,digit_ratio:数字比例,uppercase_ratio:大写字母比例,elipsis_ratio:省略号字符比例,punctuation_ratio:标点符号比例lang_stats.py:fasttext_{language}:使用 fastText 的文档语言line_stats.py:n_lines:每个文档的行数,avg_line_length:每个文档的平均行长度,long_line_ratio_words:超过 k 个字符的行比例,short_line_ratio_chars:少于 k 个字符的行比例,bullet_point_lines_ratio:项目符号行比例,line_duplicates:重复行比例,line_char_duplicates:重复行中字符的比例paragraph_stats.py:n_paragraphs:段落数,avg_paragraph_length:平均段落长度,short_paragraph_ratio_{chars}:短段落比例(<{chars} 字符),long_paragraph_ratio_{chars}:长段落比例(>{chars} 字符)perplexity_stats.py:ccnet_perplexity_{model_dataset}_{language}:使用 CCNet 模型在 {dataset} 上的 {language} 中的文档困惑度sentence_stats.py:n_sentences:句子数,avg_sentence_length:平 均句子长度,short_sentence_ratio_{chars}:短句比例(<{chars} 字符),long_sentence_ratio_{chars}:长句比例(>{chars} 字符)token_stats.py:token_count:文档中的令牌数word_stats.py:n_words:文档中的单词数,avg_word_length:文档中单词的平均长度,avg_words_per_line:文档中每行的平均单词数,short_word_ratio_{chars}:短于 {chars} 字符的单词比例,stop_word_ratio:停用词比例,long_word_ratio_{chars}:长于 {chars} 字符的单词比例,type_token_ratio:唯一单词数 / 令牌数,capitalized_word_ratio:首字母大写单词比例,uppercase_word_ratio:全大写单词比例你可以直接将 Document 的可迭代对象作为管道块传递,如下所示:
from datatrove.data import Document from datatrove.pipeline.filters import SamplerFilter from datatrove.pipeline.writers import JsonlWriter pipeline = [ [ Document(text="一些数据", id="0"), Document(text="更多数据", id="1"), Document(text="更多数据", id="2"), ], SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]
但请注意,这个可迭代对象不会被分片(如果你启动多个任务,它们都会获得完整的可迭代对象)。 这通常用于小型工作负载/测试。
对于简单的处理,你可以直接传入具有以下签名的自定义函数:
from datatrove.data import DocumentsPipeline def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: """ `data`是Document生成器。你必须返回一个Document生成器(yield) 你可以选择使用`rank`和`world_size`进行分片 """ for document in data: document.text = document.text.upper() yield document pipeline = [ ..., uppercase_everything, ... ]
[!提示] 由于导入问题,你可能会遇到一些pickle相关的问题。如果发生这种情况,只需将你需要的导入移到函数体内即可。
你也可以定义一个完整的块,继承自PipelineStep或其子类之一:
from datatrove.pipeline.base import PipelineStep from datatrove.data import DocumentsPipeline from datatrove.io import DataFolderLike, get_datafolder class UppercaserBlock(PipelineStep): def __init__(self, some_folder: DataFolderLike, some_param: int = 5): super().__init__() # 你可以接受任何需要的参数并将它们保存在这里 self.some_param = some_param # 使用get_datafolder()加载数据文件夹 self.some_folder = get_datafolder(some_folder) def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: # 你也可以从`some_folder`加载数据: for filepath in self.some_folder.get_shard(rank, world_size): # 它也接受glob模式,以及其他选项 with self.some_folder.open(filepath, "rt") as f: # 做一些处理 ... yield doc # # 或处理来自前一个块的数据(`data`) # for doc in data: with self.track_time(): # 你可以将主要处理代码包装在`track_time`中,以了解每个文档处理所需的时间 nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text)) # 你也可以使用stat_update跟踪每个文档的统计信息 self.stat_update("og_upper_letters", value=nr_uppercase_letters) doc.text = doc.text.upper() # 确保将yield保持在track_time块之外,否则会影响时间计算 yield doc # # 或将数据保存到磁盘 # with self.some_folder.open("myoutput", "wt") as f: for doc in data: f.write(doc...)
pipeline = [ ..., UppercaserBlock("somepath"), ... ]
你也可以继承自BaseExtractor、BaseFilter、BaseReader/BaseDiskReader或DiskWriter。
git clone git@github.com:huggingface/datatrove.git && cd datatrove pip install -e ".[dev]"
安装pre-commit代码风格钩子:
pre-commit install
运行测试:
pytest -sv ./tests/
@misc{penedo2024datatrove, author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas}, title = {DataTrove: large scale data processing}, year = {2024}, publisher = {GitHub}, journal = {GitHub repository}, url = {https://github.com/huggingface/datatrove} }


AI一键生成PPT,就用博思AIPPT!
博思AIPPT,新一代的AI生成PPT平台,支持智能生成PPT、AI美化PPT、文本&链接生成PPT、导入Word/PDF/Markdown文档生成PPT等,内置海量精美PPT模板,涵盖商务、教育、科技等不同风格,同时针对每个页面提供多种版式,一键自适应切换,完美适配各种办公场景。


AI赋能电商视觉革命,一站式智能商拍平台
潮际好麦深耕服装行业,是国内AI试衣效果最好的软件。使用先进AIGC能力为电商卖家批量提供优质的、低成本的商拍图。合作品牌有Shein、Lazada、安踏、百丽等65个国内外头部品牌,以及国内10万+淘宝、天猫、京东等主流平台的品牌商家,为卖家节省将近85%的出图成本,提升约3倍出图效率,让品牌能够快速上架。


企业专属的AI法律顾问
iTerms是法大大集团旗下法律子品牌,基于最先进的大语言模型(LLM)、专业的法律知识库和强大的智能体架构,帮助企业扫清合规障碍,筑牢风控防线,成为您企业专属的AI法律顾问。


稳定高效的流量提升解决方案,助力品牌曝光
稳定高效的流量提升解决方案,助力品牌曝光


最新版Sora2模型免费使用,一键生成无水印视频
最新版Sora2模型免费使用,一键生成无水印视频


实时语音翻译/同声传译工具
Transly是一个多场景的AI大语言模型驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧 爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。


选题、配图、成文,一站式创作,让内容运营更高效
讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。


AI辅助编程,代码自动修复
Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。


最强AI数据分析助手
小浣熊家族Raccoon,您的AI智能助手,致力于通过先进的人工智能技术,为用户提供高效、便捷的智能服务。无论是日常咨询还是专业问题解答,小 浣熊都能以快速、准确的响应满足您的需求,让您的生活更加智能便捷。


像人一样思考的AI智能体
imini 是一款超级AI智能体,能根据人类指令,自主思考、自主完成、并且交付结果的AI智能体。
最新AI工具、AI资讯
独家AI资源、AI项目落地

微信扫一扫关注公众号