#csv #json-schema #schema-definition #json-file #memory-buffer #column #schema-file

bin+lib csv_log_cleaner

通过多线程流式传输并通过小内存缓冲区清洗CSV文件以符合类型模式,并记录数据丢失

4个版本

新版本 0.2.1 2024年7月29日
0.2.0 2024年3月20日
0.1.2 2023年4月4日
0.1.0 2023年2月6日
0.0.3 2023年1月15日

299编码

Download history 11/week @ 2024-07-07 311/week @ 2024-07-28

322每月下载量

MIT许可

53KB
963

CSV清洗器

这是什么?

csv_log_cleaner - 通过并行化流式传输并通过可配置大小的内存缓冲区清洗逗号分隔值(CSV)文件以符合给定的类型模式,并按列记录任何结果数据丢失。为数据库或数据管道准备CSV表中的未验证数据。在内存受限环境中处理任意大小的文件。

用Rust编写,csv_log_cleaner编译成命令行界面(CLI)工具,可作为Linux、Mac或Windows的单个小二进制文件(可在GitHub上下载)使用,或作为库在Rust项目中作为cargocrate使用。

快速入门

从源代码构建并清洗测试数据

cargo build --release
./target/release/csv_log_cleaner -i tests/e2e_data/test_input.csv -o cleaned.csv -j tests/e2e_data/test_schema.json -l log.json

使用cargo安装最新版本,并通过一系列Linux命令行界面(CLI)工具将数据从stdin管道到stdout,使用内联模式定义模式

cargo install csv_log_cleaner
cat tests/e2e_data/test_input.csv | csv_log_cleaner -j '{"columns": [{"name": "INT_COLUMN","column_type": "Int"},{"name": "STRING_COLUMN","column_type": "String","nullable": false},{"name": "DATE_COLUMN","column_type": "Date","format": "%Y-%m-%d"},{"name": "ENUM_COLUMN","column_type": "Enum","nullable": false,"legal_vals": ["V1", "V2", "V3"],"illegal_val_replacement": "V1"}]}' -l log.json | tail -n +2 | sort -t ',' -k4

在这里,cat将文件内容写入stdincsv_log_cleaner清洗它们,tail跳过第一行(标题行),sort按第四列的内容对剩余行进行排序。

描述

csv_log_cleaner工具接受一个JSON CSV模式定义(例如,定义一个类型为String的NAME列、一个类型为Int的AGE列和一个类型为Date的DATE_OF_BIRTH列)和一个必须符合该定义的CSV文件,并清洗输入数据以符合模式定义。

非法值要么被清理(例如,通过将 .0 添加到浮点列中的整数值末尾)要么被删除(例如,通过将浮点列中的字符串值 not_a_float 替换为空字符串),所有数据丢失都会记录在一个输出 JSON 文件中,同时记录遇到的非法值的最小值和最大值。

数据按行处理,使用可调整大小的缓冲区并行处理所有可用核心。这里的目的是使得能够快速高效地处理大于内存的数据集,作为可扩展的 Unix 管道的一部分。

参数

  • -j --schema - 必需 可以是具有 *.json 形式的 JSON 模式文件路径,或者内联 JSON 字符串定义,例如 '{"columns": [{"name": "INT_COLUMN","column_type": "Int","STRING_COLUMN","column_type": "String","nullable": false},{"name": "STRING_COLUMN","column_type": "String","nullable": false}]}'
  • -l --log - 必需 输出 JSON 日志文件的路径
  • -i --input - 可选 输入 CSV 文件的路径,如果不提供,将从 stdin 读取
  • -o --output - 可选 输出 CSV 文件的路径,如果不提供,将写入到 stout
  • -s --sep - 可选(默认 ',')分隔符字符,例如 TSV 文件:将在输入和输出中使用
  • -b --buffer_size - 可选的缓冲区大小整数(默认值 1000),表示为处理分配给新线程的每个缓冲区保留的行数。程序使用所有可用的线程,所以如果有10个核心可用,并且缓冲区值设置为1000,则在处理过程中任何给定时间内最多可以保留10,000行。

来源

库代码作为 cargo crate 提供。源代码可以在 GitHub 上找到,这里。欢迎提出改进请求、添加缺失的功能或修复错误;同样欢迎开启 GitHub 问题请求修复或新功能。

为什么?

CSV格式的简单性意味着此类文件包含的数据在本质上是无类型的。有效的CSV文件不能保证其包含的字段内容不是字符串。这一点加上CSV文件通常由易出错的、手动过程生成,当需要处理CSV数据且列必须具有特定类型时,就出现了问题。例如,如果没有保证它不会包含像 "not_a_float" 这样的字符串值,就不能安全地设置一个数据管道来计算 AMOUNT_PAID 列的内容。

针对这个问题存在多种现有解决方案,每种都有其自身的局限性。

一种解决方案可以被称为“完全宽容”。这是 Python 的 pandas 数据框处理库使用的默认策略。这种方法意味着如果在列中找到一个非法的 12.34\n 浮点值,则整个列将被读取为字符串列。这种方法简化了读取逻辑,但意味着处理逻辑需要更复杂:因为你必须编写代码,没有任何保证 AMOUNT_PAID 列将实际包含数值。

第二种可以被称为“完全严格”。这是 PostgreSQL 关系数据库使用的默认策略。这种方法意味着使用 CSV 数据的类型模式,如果在一个列中找到一个非法的 12.34\n 浮点值,整个 CSV 读取操作将失败。这种方法意味着可以保持写入和处理代码的简单性,但意味着在大数据集(即使非法值比例极小)面前,读取操作很可能会失败。

为了克服完全开放性和完全严格性的限制,有时会采用第三种方法,可以将其称为“隐形强制”。这种方法意味着使用CSV数据类型模式,如果在列中找到一个非法的浮点值,例如12.34,那么该值将被静默地替换为NULL值。这种方法意味着可以保持编写和处理代码简单,但意味着潜在的数据丢失可能无法被发现。例如,这种方法可能导致整个DATE_OF_BIRTH字段被静默地替换为NULL值,因为日期格式被错误地设置为YYYY-MM-DD而不是预期的YYYY-MM-DD格式。

csv_log_cleaner的目标是使数据处理者能够使用一种可以称为“带数据丢失日志的强制”的方法。在这种方法中,用户为他们的CSV数据提供一个类型模式定义,该工具将保证通过该工具的数据符合该模式(例如,通过从浮点列中的12.34值的末尾删除非法的\n,或者从同一列中完全删除not_a_valid_float值)并记录每个列中删除的值的数量以及遇到的非法值的最大和最小值。这意味着用户可以确信他们的管道不会因为读取或处理时的输入类型错误而失败。此外,任何在清理操作期间发生的数据丢失都将记录在人类和计算机可读的JSON日志文件中,包括找到的样本最大和最小非法值以及非法值计数和比例,可用于监控数据丢失。例如,可以使用监控脚本来忽略特定列中的数据丢失,如果它低于给定的阈值(例如1%),否则发出警报。

为什么是命令行工具?

Unix命令行界面(CLI)工具自20世纪70年代以来已经开发和完善,作为一种简单、快速、内存高效、可组合的数据处理方式。

示例

假设从stdin获取以下CSV数据

INT_COLUMN,STRING_COLUMN,DATE_COLUMN,ENUM_COLUMN
4,dog,2020-12-31,V1
not_an_int,cat,not_a_date,V2
an_int,weasel,a_date,V5

以及以下形式的JSON CSV模式定义作为输入

{
    "columns": [
        {
            "name": "INT_COLUMN",
            "column_type": "Int"
        },
        {
            "name": "STRING_COLUMN",
            "column_type": "String",
            "nullable": false
        },
        {
            "name": "DATE_COLUMN",
            "column_type": "Date",
            "format": "%Y-%m-%d"
        },
        {
            "name": "ENUM_COLUMN",
            "column_type": "Enum",
            "nullable": false,
            "legal_vals": ["V1", "V2", "V3"],
            "illegal_val_replacement": "V1"
        }
    ]
}

作为命令的输入

input.csv > csv_log_cleaner -j schema.json -l log_file.json > output.csv 

然后通过stdout将以下内容输出到output.csv文件

INT_COLUMN,STRING_COLUMN,DATE_COLUMN,ENUM_COLUMN
4,dog,2020-12-31,V1
,cat,,V2
,weasel,,V1

并写入JSON日志文件,指示在清理过程中丢失了哪些数据

{
	"total_rows": 3,
	"columns_with_errors": [
		{
			"column_name": "DATE_COLUMN",
			"invalid_row_count": 2,
			"max_illegal_val": "not_a_date",
			"min_illegal_val": "a_date"
		},{
			"column_name": "INT_COLUMN",
			"invalid_row_count": 2,
			"max_illegal_val": "not_an_int",
			"min_illegal_val": "an_int"
		},{
			"column_name": "STRING_COLUMN",
			"invalid_row_count": 0,
			"max_illegal_val": "",
			"min_illegal_val": ""
		},{
			"column_name": "ENUM_COLUMN",
			"invalid_row_count": 1,
			"max_illegal_val": "V5",
			"min_illegal_val": "V5"
		}
	]
}

局限性

  • 仅支持UTF-8编码的输入数据(考虑使用iconv或类似工具将非UTF-8数据转换为所需的编码作为管道的一部分)
  • 由于单个行在多个线程中单独处理,因此行输出顺序通常不会与行输出顺序匹配。
  • 在所有列类型中使用空字符串值作为NULL(这意味着其他可识别的NULL值,如NAnull,将在输出中转换为空字符串,并且所有空字符串值都将被视为NULL)

模式定义

类型模式必须是一个JSON文件,格式如下

{
    "columns": [
        {
            "name": "INT_COLUMN",
            "column_type": "Int", 
        },
        {
            "name": "STRING_COLUMN",
            "column_type": "String",
            "nullable": false
        },
        {
            "name": "DATE_COLUMN",
            "column_type": "Date",
            "format": "%Y-%m-%d"
        },
        {
            "name": "ENUM_COLUMN",
            "column_type": "Enum",
            "nullable": false,
            "legal_vals": ["V1", "V2", "V3"],
            "illegal_val_replacement": "V1"
        }
    ]
}

列属性说明

  • "name": 必需 - 必须与CSV文件中的列名相对应
  • "column_type": 必需 - 必须是以下之一:"String", "Int", "Date", "Float", "Enum" 或 "Bool"
  • "nullable": 可选 - 默认值是 true
  • "legal_vals": 仅对Enum类型列必需 - 接受的字符串值列表
  • "illegal_val_replacement": 仅对Enum类型列可选 - 默认值是空字符串
  • "format": 仅对Date类型列必需 - 应与以下语法匹配此处

空值处理

  • 以下输入CSV文件中的值会被工具解释为 NULL["", "#N/A","#N/A N/A","#NA","-1.#IND","-1.#QNAN","-NaN","-nan","1.#IND","1.#QNAN",<NA>,"N/A","NA","NULL","NaN","n/a","nan","null"]

从源码构建

Cargo

构建和运行测试

cargo test

构建(编译速度快,运行慢)的调试版本并运行它

cat test_input.csv | cargo run -- -j test_schema.json -l test_log.json > test_output.csv

构建(编译慢,运行快)的发布版本

cargo build --release

运行发布版本

cat test_input.csv | ./target/release/csv_log_cleaner -j test_schema.json -l test_log.json > test_output.csv

使用 cargo 安装并运行最新版本

cargo install csv_log_cleaner
cat test_input.csv | csv_log_cleaner -j test_schema.json -l test_log.json > test_output.csv

将功能作为库包含到您的Rust项目中

cargo add csv_log_cleaner

依赖项

~6–16MB
~207K SLoC