仓库地址:https://gitee.com/liu-haijin/data-deal
对比相关excel中的不同的数据并提取出来:
"""
按「主键列」对比两个表式文件(CSV / 伪 xls 文本),将只出现在一侧的行写入 txt。
默认参数集中在 ComparisonConfig / default_config(),改一处即可复用到其他任务。
"""
from __future__ import annotations
import argparse
import csv
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
# ---------------------------------------------------------------------------
# 默认任务参数:新任务请改 default_config() 内字段,或通过命令行覆盖
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class ComparisonConfig:
"""单次对比所需的全部可调参数。"""
left_path: Path
right_path: Path
left_key_column: str
right_key_column: str
output_path: Path
file_encoding: str = "utf-8-sig"
output_encoding: str = "utf-8"
# 报告文案(便于换成任意业务名)
left_label: str = "左侧表"
right_label: str = "右侧表"
report_intro: str = "下列为「只在一侧表中出现」的号码及其整行内容。"
section_only_left: str = "===== 仅在左侧表中存在(右侧无此主键)====="
section_only_right: str = "===== 仅在右侧表中存在(左侧无此主键)====="
def default_config() -> ComparisonConfig:
base = Path(__file__).resolve().parent
return ComparisonConfig(
left_path=base / "1.csv",
right_path=base / "1.xls",
left_key_column="mobile",
right_key_column="电话号码",
output_path=base / "mobile_compare_missing.txt",
left_label="query_results CSV(mobile)",
right_label="VPS 验证文件(电话号码)",
report_intro="对比说明:下列为「只在一侧表中出现」的号码及其整行内容。",
section_only_left="===== 仅在 query_results CSV 中存在(VPS 文件中无此号码)=====",
section_only_right="===== 仅在 VPS 验证文件中存在(query_results CSV 中无此号码)=====",
)
def _normalize_phone(raw: object) -> str:
if raw is None:
return ""
s = str(raw).strip().replace(" ", "").replace("\t", "")
if not s:
return ""
if s.endswith(".0") and s[:-2].isdigit():
s = s[:-2]
return s
def _clean_fieldnames(fieldnames: Iterable[str] | None) -> list[str]:
if not fieldnames:
return []
return [(name or "").replace("\ufeff", "").strip() for name in fieldnames]
def _iter_csv_dict_rows(path: Path, encoding: str) -> Iterable[dict[str, str]]:
with path.open(newline="", encoding=encoding, errors="replace") as f:
reader = csv.DictReader(f)
reader.fieldnames = _clean_fieldnames(reader.fieldnames)
for row in reader:
yield {k: (v if v is not None else "") for k, v in row.items()}
def _load_phone_rows(
path: Path, phone_column: str, encoding: str
) -> tuple[dict[str, dict[str, str]], list[str]]:
"""返回 {规范化主键: 整行字典};同一主键多行时保留首行并记录告警。"""
out: dict[str, dict[str, str]] = {}
warnings: list[str] = []
for row in _iter_csv_dict_rows(path, encoding):
if phone_column not in row:
raise KeyError(f'文件缺少列 "{phone_column}": {path}')
key = _normalize_phone(row[phone_column])
if not key:
continue
if key in out:
warnings.append(f"{path.name}: 重复主键 {key},输出时保留首次出现行")
continue
out[key] = row
return out, warnings
def _format_row(row: dict[str, str]) -> str:
parts = [f"{k}={v}" for k, v in row.items()]
return " | ".join(parts)
def compare(cfg: ComparisonConfig) -> tuple[list[dict[str, str]], list[dict[str, str]], list[str], int]:
left, w1 = _load_phone_rows(cfg.left_path, cfg.left_key_column, cfg.file_encoding)
right, w2 = _load_phone_rows(cfg.right_path, cfg.right_key_column, cfg.file_encoding)
warnings = w1 + w2
left_keys, right_keys = set(left), set(right)
only_left = [left[k] for k in sorted(left_keys - right_keys)]
only_right = [right[k] for k in sorted(right_keys - left_keys)]
both_count = len(left_keys & right_keys)
return only_left, only_right, warnings, both_count
def build_report_lines(
cfg: ComparisonConfig,
only_left: list[dict[str, str]],
only_right: list[dict[str, str]],
warnings: list[str],
both_count: int,
) -> list[str]:
lines: list[str] = []
lines.append(cfg.report_intro)
lines.append(f"{cfg.left_label}: {cfg.left_path}")
lines.append(f"{cfg.right_label}: {cfg.right_path}")
lines.append("")
if warnings:
lines.append("提示:")
lines.extend(f" - {w}" for w in warnings)
lines.append("")
lines.append(cfg.section_only_left)
if not only_left:
lines.append("(无)")
else:
lines.extend(_format_row(r) for r in only_left)
lines.append("")
lines.append(cfg.section_only_right)
if not only_right:
lines.append("(无)")
else:
lines.extend(_format_row(r) for r in only_right)
lines.append("")
lines.append(
f"统计:仅左侧 {len(only_left)} 条;仅右侧 {len(only_right)} 条;两侧均存在 {both_count} 条。"
)
return lines
def main() -> None:
d = default_config()
parser = argparse.ArgumentParser(description="对比两个表式文件的主键列(默认 UTF-8 CSV 文本)")
parser.add_argument("--left", type=Path, default=d.left_path, help="左侧文件路径")
parser.add_argument("--right", type=Path, default=d.right_path, help="右侧文件路径")
parser.add_argument(
"--left-key", default=d.left_key_column, help="左侧用于比对的主键列名"
)
parser.add_argument(
"--right-key", default=d.right_key_column, help="右侧用于比对的主键列名"
)
parser.add_argument("-o", "--output", type=Path, default=d.output_path, help="结果 txt")
parser.add_argument(
"--encoding",
default=d.file_encoding,
help="两侧输入文件的文本编码(默认 utf-8-sig)",
)
parser.add_argument(
"--output-encoding",
default=d.output_encoding,
help="输出 txt 编码(默认 utf-8)",
)
parser.add_argument("--left-label", default=d.left_label, help="报告中左侧说明文字")
parser.add_argument("--right-label", default=d.right_label, help="报告中右侧说明文字")
parser.add_argument("--report-intro", default=d.report_intro, help="报告首行说明")
parser.add_argument("--section-only-left", default=d.section_only_left, help="仅左侧区块标题")
parser.add_argument("--section-only-right", default=d.section_only_right, help="仅右侧区块标题")
args = parser.parse_args()
cfg = ComparisonConfig(
left_path=args.left,
right_path=args.right,
left_key_column=args.left_key,
right_key_column=args.right_key,
output_path=args.output,
file_encoding=args.encoding,
output_encoding=args.output_encoding,
left_label=args.left_label,
right_label=args.right_label,
report_intro=args.report_intro,
section_only_left=args.section_only_left,
section_only_right=args.section_only_right,
)
only_left, only_right, warnings, both_count = compare(cfg)
text = "\n".join(build_report_lines(cfg, only_left, only_right, warnings, both_count))
cfg.output_path.write_text(text, encoding=cfg.output_encoding)
print(f"已写入: {cfg.output_path}")
if __name__ == "__main__":
main()