py处理相关csv和excel

我爱海鲸 2026-04-12 17:43:20 暂无标签

简介数据对比

仓库地址:https://gitee.com/liu-haijin/data-deal

对比相关excel中的不同的数据并提取出来:

"""
按「主键列」对比两个表式文件(CSV / 伪 xls 文本),将只出现在一侧的行写入 txt。
默认参数集中在 ComparisonConfig / default_config(),改一处即可复用到其他任务。
"""

from __future__ import annotations

import argparse
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable


# ---------------------------------------------------------------------------
# 默认任务参数:新任务请改 default_config() 内字段,或通过命令行覆盖
# ---------------------------------------------------------------------------


@dataclass(frozen=True)
class ComparisonConfig:
    """单次对比所需的全部可调参数。"""

    left_path: Path
    right_path: Path
    left_key_column: str
    right_key_column: str
    output_path: Path
    file_encoding: str = "utf-8-sig"
    output_encoding: str = "utf-8"
    # 报告文案(便于换成任意业务名)
    left_label: str = "左侧表"
    right_label: str = "右侧表"
    report_intro: str = "下列为「只在一侧表中出现」的号码及其整行内容。"
    section_only_left: str = "===== 仅在左侧表中存在(右侧无此主键)====="
    section_only_right: str = "===== 仅在右侧表中存在(左侧无此主键)====="


def default_config() -> ComparisonConfig:
    base = Path(__file__).resolve().parent
    return ComparisonConfig(
        left_path=base / "1.csv",
        right_path=base / "1.xls",
        left_key_column="mobile",
        right_key_column="电话号码",
        output_path=base / "mobile_compare_missing.txt",
        left_label="query_results CSV(mobile)",
        right_label="VPS 验证文件(电话号码)",
        report_intro="对比说明:下列为「只在一侧表中出现」的号码及其整行内容。",
        section_only_left="===== 仅在 query_results CSV 中存在(VPS 文件中无此号码)=====",
        section_only_right="===== 仅在 VPS 验证文件中存在(query_results CSV 中无此号码)=====",
    )


def _normalize_phone(raw: object) -> str:
    if raw is None:
        return ""
    s = str(raw).strip().replace(" ", "").replace("\t", "")
    if not s:
        return ""
    if s.endswith(".0") and s[:-2].isdigit():
        s = s[:-2]
    return s


def _clean_fieldnames(fieldnames: Iterable[str] | None) -> list[str]:
    if not fieldnames:
        return []
    return [(name or "").replace("\ufeff", "").strip() for name in fieldnames]


def _iter_csv_dict_rows(path: Path, encoding: str) -> Iterable[dict[str, str]]:
    with path.open(newline="", encoding=encoding, errors="replace") as f:
        reader = csv.DictReader(f)
        reader.fieldnames = _clean_fieldnames(reader.fieldnames)
        for row in reader:
            yield {k: (v if v is not None else "") for k, v in row.items()}


def _load_phone_rows(
    path: Path, phone_column: str, encoding: str
) -> tuple[dict[str, dict[str, str]], list[str]]:
    """返回 {规范化主键: 整行字典};同一主键多行时保留首行并记录告警。"""
    out: dict[str, dict[str, str]] = {}
    warnings: list[str] = []
    for row in _iter_csv_dict_rows(path, encoding):
        if phone_column not in row:
            raise KeyError(f'文件缺少列 "{phone_column}": {path}')
        key = _normalize_phone(row[phone_column])
        if not key:
            continue
        if key in out:
            warnings.append(f"{path.name}: 重复主键 {key},输出时保留首次出现行")
            continue
        out[key] = row
    return out, warnings


def _format_row(row: dict[str, str]) -> str:
    parts = [f"{k}={v}" for k, v in row.items()]
    return " | ".join(parts)


def compare(cfg: ComparisonConfig) -> tuple[list[dict[str, str]], list[dict[str, str]], list[str], int]:
    left, w1 = _load_phone_rows(cfg.left_path, cfg.left_key_column, cfg.file_encoding)
    right, w2 = _load_phone_rows(cfg.right_path, cfg.right_key_column, cfg.file_encoding)
    warnings = w1 + w2

    left_keys, right_keys = set(left), set(right)
    only_left = [left[k] for k in sorted(left_keys - right_keys)]
    only_right = [right[k] for k in sorted(right_keys - left_keys)]
    both_count = len(left_keys & right_keys)
    return only_left, only_right, warnings, both_count


def build_report_lines(
    cfg: ComparisonConfig,
    only_left: list[dict[str, str]],
    only_right: list[dict[str, str]],
    warnings: list[str],
    both_count: int,
) -> list[str]:
    lines: list[str] = []
    lines.append(cfg.report_intro)
    lines.append(f"{cfg.left_label}: {cfg.left_path}")
    lines.append(f"{cfg.right_label}: {cfg.right_path}")
    lines.append("")
    if warnings:
        lines.append("提示:")
        lines.extend(f"  - {w}" for w in warnings)
        lines.append("")
    lines.append(cfg.section_only_left)
    if not only_left:
        lines.append("(无)")
    else:
        lines.extend(_format_row(r) for r in only_left)
    lines.append("")
    lines.append(cfg.section_only_right)
    if not only_right:
        lines.append("(无)")
    else:
        lines.extend(_format_row(r) for r in only_right)
    lines.append("")
    lines.append(
        f"统计:仅左侧 {len(only_left)} 条;仅右侧 {len(only_right)} 条;两侧均存在 {both_count} 条。"
    )
    return lines


def main() -> None:
    d = default_config()
    parser = argparse.ArgumentParser(description="对比两个表式文件的主键列(默认 UTF-8 CSV 文本)")
    parser.add_argument("--left", type=Path, default=d.left_path, help="左侧文件路径")
    parser.add_argument("--right", type=Path, default=d.right_path, help="右侧文件路径")
    parser.add_argument(
        "--left-key", default=d.left_key_column, help="左侧用于比对的主键列名"
    )
    parser.add_argument(
        "--right-key", default=d.right_key_column, help="右侧用于比对的主键列名"
    )
    parser.add_argument("-o", "--output", type=Path, default=d.output_path, help="结果 txt")
    parser.add_argument(
        "--encoding",
        default=d.file_encoding,
        help="两侧输入文件的文本编码(默认 utf-8-sig)",
    )
    parser.add_argument(
        "--output-encoding",
        default=d.output_encoding,
        help="输出 txt 编码(默认 utf-8)",
    )
    parser.add_argument("--left-label", default=d.left_label, help="报告中左侧说明文字")
    parser.add_argument("--right-label", default=d.right_label, help="报告中右侧说明文字")
    parser.add_argument("--report-intro", default=d.report_intro, help="报告首行说明")
    parser.add_argument("--section-only-left", default=d.section_only_left, help="仅左侧区块标题")
    parser.add_argument("--section-only-right", default=d.section_only_right, help="仅右侧区块标题")
    args = parser.parse_args()

    cfg = ComparisonConfig(
        left_path=args.left,
        right_path=args.right,
        left_key_column=args.left_key,
        right_key_column=args.right_key,
        output_path=args.output,
        file_encoding=args.encoding,
        output_encoding=args.output_encoding,
        left_label=args.left_label,
        right_label=args.right_label,
        report_intro=args.report_intro,
        section_only_left=args.section_only_left,
        section_only_right=args.section_only_right,
    )

    only_left, only_right, warnings, both_count = compare(cfg)
    text = "\n".join(build_report_lines(cfg, only_left, only_right, warnings, both_count))
    cfg.output_path.write_text(text, encoding=cfg.output_encoding)
    print(f"已写入: {cfg.output_path}")


if __name__ == "__main__":
    main()

 

你好:我的2025