datatrove

datatrove

用于大规模文本数据处理和去重的开源Python库

DataTrove是一个开源Python库,专门用于处理、过滤和去重大规模文本数据。它提供预构建的常用处理模块和自定义功能支持。该库的处理流程可在本地或Slurm集群上运行,具有低内存消耗和多步骤设计,适合处理大型语言模型训练数据等大规模工作负载。DataTrove支持多种文件系统,为数据处理提供灵活解决方案。

DataTrove数据处理大规模管道文本去重Github开源项目

DataTrove

DataTrove是一个用于大规模处理、过滤和去重文本数据的库。它提供了一套预建的常用处理模块,并提供了一个框架来轻松添加自定义功能。

DataTrove处理流水线是平台无关的,可以在本地或Slurm集群上直接运行。其(相对)低内存使用和多步骤设计使其非常适合大型工作负载,例如处理LLM的训练数据。

通过fsspec支持本地、远程和其他文件系统。

目录

<!-- toc --> <!-- tocstop -->

安装

pip install datatrove[FLAVOUR]

可用的版本(可以用,组合,例如[processing,s3]):

  • all 安装所有内容:pip install datatrove[all]
  • io 用于读取warc/arc/wet文件和arrow/parquet格式的依赖:pip install datatrove[io]
  • processing 用于文本提取、过滤和分词的依赖:pip install datatrove[processing]
  • s3 s3支持:pip install datatrove[s3]
  • cli 命令行工具:pip install datatrove[cli]

快速开始示例

你可以查看以下示例

流水线

DataTrove文档

每个流水线模块处理datatrove Document格式的数据:

  • text 每个样本的实际文本内容
  • id 此样本的唯一标识符(字符串)
  • metadata 可以存储任何额外信息的字典

流水线模块类型

每个流水线模块接受一个Document生成器作为输入,并返回另一个Document生成器。

  • readers 从不同格式读取数据并生成Document
  • writers 以不同格式将Document保存到磁盘/云端
  • extractors 从原始格式(如网页html)中提取文本内容
  • filters 根据特定规则/标准过滤掉(删除)一些Document
  • stats 收集数据集统计信息的模块
  • tokens 用于分词或计算标记数的模块
  • dedup 用于去重的模块

完整流水线

流水线被定义为一系列流水线模块。例如,以下流水线将从磁盘读取数据,随机过滤(删除)一些文档,然后将它们写回磁盘:

from datatrove.pipeline.readers import CSVReader from datatrove.pipeline.filters import SamplerFilter from datatrove.pipeline.writers import JsonlWriter pipeline = [ CSVReader( data_folder="/my/input/path" ), SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]

执行器

流水线是平台无关的,这意味着同一流水线可以在不同的执行环境中无缝运行,而无需更改其步骤。每个环境都有自己的PipelineExecutor。 所有执行器的一些通用选项:

  • pipeline 由应运行的流水线步骤组成的列表
  • logging_dir 用于保存日志文件、统计信息等的数据文件夹。不要为不同的流水线/作业重复使用文件夹,因为这会覆盖你的统计信息、日志和完成情况。
  • skip_completed布尔值,默认为True)datatrove会跟踪已完成的任务,以便在重新启动作业时可以跳过这些任务。设置为False可禁用此行为
  • randomize_start_duration整数,默认为0)延迟每个任务开始的最大秒数,以防止所有任务同时开始并可能使系统过载。

调用执行器的run方法来执行其流水线。

[!TIP] Datatrove通过在${logging_dir}/completions文件夹中创建标记(空文件)来跟踪哪些任务成功完成。一旦作业完成,如果某些任务失败,你可以简单地重新启动完全相同的执行器,datatrove将检查并只运行之前未完成的任务。

[!CAUTION] 如果你因为某些任务失败而重新启动流水线,不要改变总任务数,因为这会影响输入文件/分片的分布。

LocalPipelineExecutor

此执行器将在本地机器上启动流水线。 选项:

  • tasks 要运行的总任务数
  • workers 同时运行多少个任务。如果为-1,则不限制。任何> 1的值都将使用多进程来执行任务。
  • start_method 用于生成多进程Pool的方法。如果workers为1则忽略
<details> <summary>执行器示例</summary>
from datatrove.executor import LocalPipelineExecutor executor = LocalPipelineExecutor( pipeline=[ ... ], logging_dir="logs/", tasks=10, workers=5 ) executor.run()
</details> <details> <summary>多节点并行</summary>

你可以让不同的节点/机器处理总任务的不同部分,方法是使用local_taskslocal_rank_offset。对于每个节点/实例/机器,使用以下选项启动:

  • tasks 要执行的总任务数(跨所有机器)。这个值在每台机器上必须相同,否则输入文件分布可能会重叠! 示例:500
  • local_tasks 在这台特定机器上将执行多少个总任务。注意,你可以为每台机器使用不同的值。示例:100
  • local_rank_offset 在这台机器上执行的第一个任务的序号。如果这是你启动作业的第3台机器,而前两台机器分别运行了250和150个作业,那么当前机器的这个值应该是400

要获得最终合并的统计信息,你需要在包含所有机器统计信息的路径上手动调用merge_stats脚本。

</details>

SlurmPipelineExecutor

此执行器将在Slurm集群上启动流水线,使用Slurm作业数组来分组和管理任务。 选项:

  • tasks 要运行的总任务数。必需
  • time Slurm时间限制字符串。必需
  • partition Slurm分区。必需
  • workers 同时运行多少个任务。如果为-1,则不限制。Slurm将一次运行workers个任务。(默认:-1
  • job_name Slurm作业名称(默认:"data_processing")
  • depends 另一个SlurmPipelineExecutor实例,它将作为此流水线的依赖项(当前流水线只有在依赖的流水线成功完成后才会开始执行)
  • sbatch_args 包含你想传递给sbatch的任何其他参数的字典
  • slurm_logs_folder 保存Slurm日志文件的位置。如果为logging_dir使用本地路径,它们将保存在logging_dir/slurm_logs中。如果不是,它们将保存在当前目录的子目录中。
<details> <summary>其他选项</summary>
  • cpus_per_task 给每个任务分配多少CPU(默认:1
  • qos Slurm qos(默认:"normal")
  • mem_per_cpu_gb 每个CPU的内存,单位为GB(默认:2)
  • env_command 激活Python环境的自定义命令(如果需要)
  • condaenv 要激活的conda环境
  • venv_path 要激活的Python环境路径
  • max_array_size $ scontrol show config中的_MaxArraySize_值。如果任务数超过此数字,将分成多个数组作业(默认:1001)
  • max_array_launch_parallel 如果由于max_array_size需要多个作业,是否一次性全部启动(并行)或顺序启动(默认:False
  • stagger_max_array_jobs 当max_array_launch_parallel为True时,这决定了在启动每个并行作业之间等待多少秒(默认:0
  • run_on_dependency_fail 当我们依赖的作业完成时开始执行,即使它失败了(默认:False
  • randomize_start 在约3分钟的窗口内随机化每个任务的开始时间。在大量访问S3存储桶等情况下很有用。(默认:False
</details> <details> <summary>执行器示例</summary>
from datatrove.executor import SlurmPipelineExecutor executor1 = SlurmPipelineExecutor( pipeline=[ ... ], job_name="my_cool_job1", logging_dir="logs/job1", tasks=500, workers=100, # 省略以一次运行所有任务 time="10:00:00", # 10小时 partition="hopper-cpu" ) executor2 = SlurmPipelineExecutor( pipeline=[ ... ], job_name="my_cool_job2", logging_dir="logs/job2", tasks=1, time="5:00:00", # 5小时 partition="hopper-cpu", depends=executor1 # 此流水线只有在executor1成功完成后才会启动 ) # executor1.run() executor2.run() # 这实际上会启动executor1,因为它是一个依赖项,所以不需要显式启动它
</details> ## 日志记录

对于 logging_dirmylogspath/exp1 的管道,将创建以下文件夹结构:

<details> <summary>查看文件夹结构</summary>
└── mylogspath/exp1
    │── executor.json ⟵ 执行器选项和管道步骤的 json 转储
    │── launch_script.slurm ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
    │── executor.pik ⟵ 创建并用于启动此作业的 slurm 配置(如果在 slurm 上运行)
    │── ranks_to_run.json ⟵ 正在运行的任务列表
    │── logs/
    │   └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ 每个任务的单独日志文件
    │── completions/
    │   └──[00004, 00007, 00204, ...] ⟵ 标记任务已完成的空文件。用于重新启动/恢复作业时(只会运行未完成的任务)
    │── stats/
    │   └──[00000.json, 00001.json, 00002.json, ...] ⟵ 每个任务的单独统计信息(处理的样本数量、过滤的、移除的等)
    └── stats.json ⟵ 所有任务的全局统计信息
</details>

颜色化

日志消息支持颜色化。默认情况下,控制台消息会自动检测颜色化,而日志文件(logs/task_XXXXX.log)则禁用颜色化。 要明确启用或禁用颜色化,可以设置以下环境变量:

  • DATATROVE_COLORIZE_LOGS 设为 "1" 为控制台日志消息添加 ANSI 颜色,设为 "0" 禁用颜色化。
  • DATATROVE_COLORIZE_LOG_FILES 设为 "1" 为保存到 logs/task_XXXXX.log 的日志消息添加 ANSI 颜色。

DataFolder / 路径

Datatrove 通过 fsspec 支持多种输入/输出源。

有几种方法可以为 datatrove 块提供路径(用于 input_folderlogging_dirdata_folder 等参数):

  • str:最简单的方法是传递单个字符串。例如:/home/user/mydirs3://mybucket/myinputdatahf://datasets/allenai/c4/en/

  • (str, fsspec filesystem instance):一个字符串路径和一个完全初始化的文件系统对象。例如:("s3://mybucket/myinputdata", S3FileSystem(client_kwargs={"endpoint_url": endpoint_uri}))

  • (str, dict):一个字符串路径和一个用于初始化文件系统的选项字典。例如(等同于上一行):("s3://mybucket/myinputdata", {"client_kwargs": {"endpoint_url": endpoint_uri}})

  • DataFolder:你可以直接初始化一个 DataFolder 对象并将其作为参数传递

在底层,这些参数组合由 get_datafolder 解析。

实用指南

读取数据

通常,管道会以 Reader 块开始。 大多数读取器都有一个 data_folder 参数 — 指向包含要读取数据的文件夹的路径。

这些文件将分布在每个任务中。如果你有 N 个任务,排名为 i(从 0 开始)的任务将处理文件 i, i+N, i+2N, i+3N,...

在内部,每个读取器读取数据并将其转换为字典,然后创建一个 Document 对象。

大多数读取器的一些常见选项:

  • text_key 包含每个样本文本内容的字典键。默认:text
  • id_key 包含每个样本 id 的字典键。默认:id
  • default_metadata 一个字典,用于添加任何你想要的默认元数据值(例如它们的来源)
  • recursive 是否递归查找 data_folder 子目录中的文件
  • glob_pattern 用此字段匹配特定文件。例如,glob_pattern="*/warc/*.warc.gz" 将匹配 data_folder 的每个子目录的 warc/ 文件夹中扩展名为 .warc.gz 的文件
  • adapter 此函数接收从读取器获得的原始字典,并返回带有 Document 字段名的字典。如果需要,你可以覆盖此函数(_default_adapter)。
  • limit 只读取一定数量的样本。对测试/调试有用

提取文本

你可以使用 extractors 从原始 html 中提取文本内容。datatrove 中最常用的提取器是 Trafilatura,它使用 trafilatura 库。

过滤数据

Filters 是任何数据处理管道中最重要的块之一。Datatrove 的过滤器块接收一个 Document 并返回一个布尔值(True 保留文档,False 移除它)。被移除的样本不会继续到下一个管道阶段。你还可以通过将 Writer 传递给 excluded_writer 参数来将被移除的样本保存到磁盘。

保存数据

一旦你完成了数据处理,你可能想要将其保存到某个地方。为此,你可以使用 writer。 写入器需要一个 output_folder(数据应该保存的路径)。你可以选择要使用的 compression(默认:gzip)和每个文件保存的文件名。 对于 output_filename,使用以下参数应用模板:

  • ${rank} 替换为当前任务的排名。注意,如果没有这个标签,不同的任务可能会尝试写入相同的位置
  • ${id} 替换为样本 id
  • 元数据:任何其他 ${tag} 将被相应的 document.metadata['tag'] 值替换

一个根据 lang 元数据字段分离样本的示例:

JsonlWriter(
    f"{MAIN_OUTPUT_PATH}/non_english/",
    output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz",  # 文件夹结构:language/dump/file
)

去重数据

对于去重,请查看示例 minhash_deduplication.pysentence_deduplication.pyexact_substrings.py

摘要统计

对于数据的摘要统计,你可以使用 Stats 块。这些块提供了一种简单的方法来以分布式方式收集数据集的数据概况。这是一个两步过程:

  1. 对于每个分片,遍历文档并将统计信息收集到以下分组之一:summary(所有文档计入 "summary" 键)、fqdn(完全限定域名分组)、suffix(URL 路径最后部分分组)或 histogram(基于值的分组)。
  2. 将不同分片的统计信息合并到一个文件中。 有关更多详细信息,请参阅 summary_stats.py

每个结果统计信息保存在具有以下结构的单独文件中:output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json

每个这样的文件都是一个 MetricStatsDict 对象,你可以使用以下方法轻松加载:

from datatrove.pipeline.stats.summary_stats import MetricStatsDict import json stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json"))) # 例如,nytimes.com 文档的总长度 stats["nytimes.com"].total # 或 cnn.com 文档的平均值 stats["cnn.com"].mean

可用的统计信息如下:

  • contamination_stats.pyword_contamination_{words[0]}:文档中单词污染的频率。
  • doc_stats.pylength:文档长度,white_space_ratio:空白字符比例,non_alpha_digit_ratio:非字母和非数字字符比例,digit_ratio:数字比例,uppercase_ratio:大写字母比例,elipsis_ratio:省略号字符比例,punctuation_ratio:标点符号比例
  • lang_stats.pyfasttext_{language}:使用 fastText 的文档语言
  • line_stats.pyn_lines:每个文档的行数,avg_line_length:每个文档的平均行长度,long_line_ratio_words:超过 k 个字符的行比例,short_line_ratio_chars:少于 k 个字符的行比例,bullet_point_lines_ratio:项目符号行比例,line_duplicates:重复行比例,line_char_duplicates:重复行中字符的比例
  • paragraph_stats.pyn_paragraphs:段落数,avg_paragraph_length:平均段落长度,short_paragraph_ratio_{chars}:短段落比例(<{chars} 字符),long_paragraph_ratio_{chars}:长段落比例(>{chars} 字符)
  • perplexity_stats.pyccnet_perplexity_{model_dataset}_{language}:使用 CCNet 模型在 {dataset} 上的 {language} 中的文档困惑度
  • sentence_stats.pyn_sentences:句子数,avg_sentence_length:平均句子长度,short_sentence_ratio_{chars}:短句比例(<{chars} 字符),long_sentence_ratio_{chars}:长句比例(>{chars} 字符)
  • token_stats.pytoken_count:文档中的令牌数
  • word_stats.pyn_words:文档中的单词数,avg_word_length:文档中单词的平均长度,avg_words_per_line:文档中每行的平均单词数,short_word_ratio_{chars}:短于 {chars} 字符的单词比例,stop_word_ratio:停用词比例,long_word_ratio_{chars}:长于 {chars} 字符的单词比例,type_token_ratio:唯一单词数 / 令牌数,capitalized_word_ratio:首字母大写单词比例,uppercase_word_ratio:全大写单词比例

自定义块

简单数据

你可以直接将 Document 的可迭代对象作为管道块传递,如下所示:

from datatrove.data import Document from datatrove.pipeline.filters import SamplerFilter from datatrove.pipeline.writers import JsonlWriter pipeline = [ [ Document(text="一些数据", id="0"), Document(text="更多数据", id="1"), Document(text="更多数据", id="2"), ], SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]

但请注意,这个可迭代对象不会被分片(如果你启动多个任务,它们都会获得完整的可迭代对象)。 这通常用于小型工作负载/测试。

自定义函数

对于简单的处理,你可以直接传入具有以下签名的自定义函数:

from datatrove.data import DocumentsPipeline def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: """ `data`是Document生成器。你必须返回一个Document生成器(yield) 你可以选择使用`rank`和`world_size`进行分片 """ for document in data: document.text = document.text.upper() yield document pipeline = [ ..., uppercase_everything, ... ]

[!提示] 由于导入问题,你可能会遇到一些pickle相关的问题。如果发生这种情况,只需将你需要的导入移到函数体内即可。

自定义块

你也可以定义一个完整的块,继承自PipelineStep或其子类之一:

from datatrove.pipeline.base import PipelineStep from datatrove.data import DocumentsPipeline from datatrove.io import DataFolderLike, get_datafolder class UppercaserBlock(PipelineStep): def __init__(self, some_folder: DataFolderLike, some_param: int = 5): super().__init__() # 你可以接受任何需要的参数并将它们保存在这里 self.some_param = some_param # 使用get_datafolder()加载数据文件夹 self.some_folder = get_datafolder(some_folder) def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: # 你也可以从`some_folder`加载数据: for filepath in self.some_folder.get_shard(rank, world_size): # 它也接受glob模式,以及其他选项 with self.some_folder.open(filepath, "rt") as f: # 做一些处理 ... yield doc # # 或处理来自前一个块的数据(`data`) # for doc in data: with self.track_time(): # 你可以将主要处理代码包装在`track_time`中,以了解每个文档处理所需的时间 nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text)) # 你也可以使用stat_update跟踪每个文档的统计信息 self.stat_update("og_upper_letters", value=nr_uppercase_letters) doc.text = doc.text.upper() # 确保将yield保持在track_time块之外,否则会影响时间计算 yield doc # # 或将数据保存到磁盘 # with self.some_folder.open("myoutput", "wt") as f: for doc in data: f.write(doc...)
pipeline = [ ..., UppercaserBlock("somepath"), ... ]

你也可以继承自BaseExtractorBaseFilterBaseReader/BaseDiskReaderDiskWriter

贡献

git clone git@github.com:huggingface/datatrove.git && cd datatrove pip install -e ".[dev]"

安装pre-commit代码风格钩子:

pre-commit install

运行测试:

pytest -sv ./tests/

引用

@misc{penedo2024datatrove, author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas}, title = {DataTrove: large scale data processing}, year = {2024}, publisher = {GitHub}, journal = {GitHub repository}, url = {https://github.com/huggingface/datatrove} }

编辑推荐精选

问小白

问小白

全能AI智能助手,随时解答生活与工作的多样问题

问小白,由元石科技研发的AI智能助手,快速准确地解答各种生活和工作问题,包括但不限于搜索、规划和社交互动,帮助用户在日常生活中提高效率,轻松管理个人事务。

热门AI助手AI对话AI工具聊天机器人
Transly

Transly

实时语音翻译/同声传译工具

Transly是一个多场景的AI大语言模型驱动的同声传译、专业翻译助手,它拥有超精准的音频识别翻译能力,几乎零延迟的使用体验和支持多国语言可以让你带它走遍全球,无论你是留学生、商务人士、韩剧美剧爱好者,还是出国游玩、多国会议、跨国追星等等,都可以满足你所有需要同传的场景需求,线上线下通用,扫除语言障碍,让全世界的语言交流不再有国界。

讯飞智文

讯飞智文

一键生成PPT和Word,让学习生活更轻松

讯飞智文是一个利用 AI 技术的项目,能够帮助用户生成 PPT 以及各类文档。无论是商业领域的市场分析报告、年度目标制定,还是学生群体的职业生涯规划、实习避坑指南,亦或是活动策划、旅游攻略等内容,它都能提供支持,帮助用户精准表达,轻松呈现各种信息。

AI办公办公工具AI工具讯飞智文AI在线生成PPTAI撰写助手多语种文档生成AI自动配图热门
讯飞星火

讯飞星火

深度推理能力全新升级,全面对标OpenAI o1

科大讯飞的星火大模型,支持语言理解、知识问答和文本创作等多功能,适用于多种文件和业务场景,提升办公和日常生活的效率。讯飞星火是一个提供丰富智能服务的平台,涵盖科技资讯、图像创作、写作辅助、编程解答、科研文献解读等功能,能为不同需求的用户提供便捷高效的帮助,助力用户轻松获取信息、解决问题,满足多样化使用场景。

热门AI开发模型训练AI工具讯飞星火大模型智能问答内容创作多语种支持智慧生活
Spark-TTS

Spark-TTS

一种基于大语言模型的高效单流解耦语音令牌文本到语音合成模型

Spark-TTS 是一个基于 PyTorch 的开源文本到语音合成项目,由多个知名机构联合参与。该项目提供了高效的 LLM(大语言模型)驱动的语音合成方案,支持语音克隆和语音创建功能,可通过命令行界面(CLI)和 Web UI 两种方式使用。用户可以根据需求调整语音的性别、音高、速度等参数,生成高质量的语音。该项目适用于多种场景,如有声读物制作、智能语音助手开发等。

Trae

Trae

字节跳动发布的AI编程神器IDE

Trae是一种自适应的集成开发环境(IDE),通过自动化和多元协作改变开发流程。利用Trae,团队能够更快速、精确地编写和部署代码,从而提高编程效率和项目交付速度。Trae具备上下文感知和代码自动完成功能,是提升开发效率的理想工具。

AI工具TraeAI IDE协作生产力转型热门
咔片PPT

咔片PPT

AI助力,做PPT更简单!

咔片是一款轻量化在线演示设计工具,借助 AI 技术,实现从内容生成到智能设计的一站式 PPT 制作服务。支持多种文档格式导入生成 PPT,提供海量模板、智能美化、素材替换等功能,适用于销售、教师、学生等各类人群,能高效制作出高品质 PPT,满足不同场景演示需求。

讯飞绘文

讯飞绘文

选题、配图、成文,一站式创作,让内容运营更高效

讯飞绘文,一个AI集成平台,支持写作、选题、配图、排版和发布。高效生成适用于各类媒体的定制内容,加速品牌传播,提升内容营销效果。

热门AI辅助写作AI工具讯飞绘文内容运营AI创作个性化文章多平台分发AI助手
材料星

材料星

专业的AI公文写作平台,公文写作神器

AI 材料星,专业的 AI 公文写作辅助平台,为体制内工作人员提供高效的公文写作解决方案。拥有海量公文文库、9 大核心 AI 功能,支持 30 + 文稿类型生成,助力快速完成领导讲话、工作总结、述职报告等材料,提升办公效率,是体制打工人的得力写作神器。

openai-agents-python

openai-agents-python

OpenAI Agents SDK,助力开发者便捷使用 OpenAI 相关功能。

openai-agents-python 是 OpenAI 推出的一款强大 Python SDK,它为开发者提供了与 OpenAI 模型交互的高效工具,支持工具调用、结果处理、追踪等功能,涵盖多种应用场景,如研究助手、财务研究等,能显著提升开发效率,让开发者更轻松地利用 OpenAI 的技术优势。

下拉加载更多