读取redis中的所有key并写入到文件中

我爱海鲸 2025-06-10 12:36:38 暂无标签

简介go语言、读取并解析配置文件、yaml

go语言写法:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"sort"
	"strings"
	"time"

	"github.com/go-redis/redis/v8"
	"gopkg.in/yaml.v3"
)

// 流式获取Redis所有key并写入文件
func streamRedisKeysToFile(host string, port int, db int, password string, batchSize int, filename string) (int, error) {
	addr := host
	if strings.Contains(host, ":") && !strings.HasPrefix(host, "[") {
		addr = fmt.Sprintf("[%s]:%d", host, port)
	} else {
		addr = fmt.Sprintf("%s:%d", host, port)
	}

	rdb := redis.NewClient(&redis.Options{
		Addr:         addr,
		Password:     password,
		DB:           db,
		ReadTimeout:  30 * time.Second,
		WriteTimeout: 30 * time.Second,
	})

	ctx := context.Background()
	var cursor uint64
	totalKeys := 0
	uniqueKeys := make(map[string]struct{})

	// 创建输出文件
	file, err := os.Create(filename)
	if err != nil {
		return 0, fmt.Errorf("创建文件失败: %v", err)
	}
	defer file.Close()

	// 临时存储一批key进行去重和排序
	batch := make([]string, 0, batchSize)

	maxRetries := 3

	// 扫描所有key
	for {
		var keys []string
		var scanErr error

		// 带重试的SCAN
		for retry := 0; retry < maxRetries; retry++ {
			keys, cursor, scanErr = rdb.Scan(ctx, cursor, "*", int64(batchSize)).Result()
			if scanErr == nil {
				break
			}

			log.Printf("SCAN错误 (尝试 %d/%d): %v", retry+1, maxRetries, scanErr)

			// 最后一次重试仍然失败
			if retry == maxRetries-1 {
				return totalKeys, fmt.Errorf("扫描key时出错: %w", scanErr)
			}

			// 等待后重试
			time.Sleep(time.Second * time.Duration(retry+1))

			// 检查连接状态
			if _, err := rdb.Ping(ctx).Result(); err != nil {
				// 重新连接
				rdb = redis.NewClient(&redis.Options{
					Addr:         addr,
					Password:     password,
					DB:           db,
					ReadTimeout:  30 * time.Second,
					WriteTimeout: 30 * time.Second,
				})
			}
		}

		// 处理当前批次的key
		for _, key := range keys {
			if _, exists := uniqueKeys[key]; !exists {
				uniqueKeys[key] = struct{}{}
				batch = append(batch, key)
				totalKeys++

				// 当批次达到指定大小时写入文件
				if len(batch) >= batchSize {
					if err := writeBatchToFile(batch, file); err != nil {
						return totalKeys, err
					}
					batch = batch[:0] // 清空批次
				}
			}
		}

		// 检查是否完成
		// 添加延迟减轻服务器压力
		if cursor != 0 {
			time.Sleep(50 * time.Millisecond)
		} else {
			break
		}
	}

	// 写入最后剩余的key
	if len(batch) > 0 {
		if err := writeBatchToFile(batch, file); err != nil {
			return totalKeys, err
		}
	}

	return totalKeys, nil
}

// 写入一批key到文件(已排序)
func writeBatchToFile(keys []string, file *os.File) error {
	// 对当前批次排序
	sort.Strings(keys)

	// 写入文件
	for _, key := range keys {
		if _, err := file.WriteString(key + "\n"); err != nil {
			return fmt.Errorf("写入文件失败: %v", err)
		}
	}

	return nil
}

type RedisConfig struct {
	Host      string `yaml:"host"`
	Port      int    `yaml:"port"`
	DB        int    `yaml:"db"`
	Password  string `yaml:"password"`
	BatchSize int    `yaml:"batchSize"`
}

type Config struct {
	Redis RedisConfig `yaml:"redis"`
}

func LoadConfig(path string) (*Config, error) {
	config := &Config{
		Redis: RedisConfig{
			Host:     "localhost",
			Port:     6379,
			DB:       0,
			Password: "",
		},
	}

	file, err := os.ReadFile(path)
	if err != nil {
		return nil, fmt.Errorf("无法读取配置文件: %w", err)
	}

	if err := yaml.Unmarshal(file, config); err != nil {
		return nil, fmt.Errorf("无法解析配置文件: %w", err)
	}

	return config, nil
}

func main() {
	startTime := time.Now()
	cfg, err := LoadConfig("config.yaml")
	if err != nil {
		log.Fatal("配置加载失败: %v", err)
	}

	// 输出文件
	outputFile := "redis_keys_gd_0610.txt"

	// 获取所有key并直接写入文件
	fmt.Println("正在从Redis获取所有key并写入文件...")
	totalKeys, err := streamRedisKeysToFile(cfg.Redis.Host, cfg.Redis.Port, cfg.Redis.DB, cfg.Redis.Password, cfg.Redis.BatchSize, outputFile)
	if err != nil {
		log.Println("处理过程中发生错误: %v", err)
	}

	fmt.Printf("成功获取并写入 %d 个唯一的key\n", totalKeys)
	fmt.Printf("所有key已写入文件: %s\n", outputFile)

	elapsedTime := time.Since(startTime)
	fmt.Printf("总耗时: %.4f 秒\n", elapsedTime.Seconds())
	var input string
	fmt.Scanln(&input)

	fmt.Println("程序继续执行")
}

config.yaml配置文件:

redis:
  host: "localhost"
  port: 6379
  db: 0
  password: "123456"
  BatchSize: 1000

相关命令:

go build 编译

go run ./main.go  执行该文件

 

python脚本:

import time

import redis

def get_all_redis_keys(host='localhost', port=6379, db=0, password=None):
    # 连接 Redis,不自动解码响应
    r = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=False)

    keys = set()
    cursor = b'0'  # 注意这里使用 bytes 类型的游标
    while cursor != 0:
        cursor, partial_keys = r.scan(cursor=cursor)
        keys.update(partial_keys)

    # 尝试解码为字符串,无法解码的保持为 bytes
    decoded_keys = []
    for key in keys:
        try:
            decoded_keys.append(key.decode('utf-8'))
        except UnicodeDecodeError:
            decoded_keys.append(str(key))  # 或者使用 repr(key) 显示原始字节

    return sorted(decoded_keys)

def write_keys_to_file(keys, filename='redis_keys.txt'):
    """
    将 key 列表写入文件
    :param keys: key 列表
    :param filename: 输出文件名
    """
    with open(filename, 'w', encoding='utf-8') as f:
        for key in keys:
            f.write(f"{key}\n")

if __name__ == '__main__':
    start_time = time.time()  # 记录开始时间
    # 配置 Redis 连接参数
    redis_config = {
        'host': 'localhost',  # Redis 服务器地址
        'port': 6379,         # Redis 端口
        'db': 0,              # 数据库编号
        'password': '12346'      # 如果有密码,填写在这里
    }

    # 输出文件
    output_file = 'redis_keys_0610.txt'

    # 获取所有 key
    try:
        print("正在从 Redis 获取所有 key...")
        all_keys = get_all_redis_keys(**redis_config)
        print(f"获取到 {len(all_keys)} 个唯一的 key")

        # 写入文件
        write_keys_to_file(all_keys, output_file)
        print(f"所有 key 已写入文件: {output_file}")
    except Exception as e:
        print(f"处理过程中发生错误: {e}")

    end_time = time.time()  # 记录结束时间
    elapsed_time = end_time - start_time  # 计算耗时(秒)
    print(f"耗时: {elapsed_time:.4f} 秒")  # 保留4位小数

python解析有“:”的字符串:

def process_and_deduplicate(input_file, output_file):
    """
    处理输入文件,截断冒号后的内容,去重后保存到输出文件
    改进版本:自动处理编码问题
    :param input_file: 输入文件名
    :param output_file: 输出文件名
    """
    unique_parts = set()  # 用集合自动去重

    # 读取输入文件并处理(尝试多种编码)
    with open(input_file, 'rb') as infile:  # 以二进制模式读取
        for line_bytes in infile:
            try:
                # 先尝试UTF-8解码
                line = line_bytes.decode('utf-8').strip()
            except UnicodeDecodeError:
                try:
                    # 如果UTF-8失败,尝试GBK(常见中文编码)
                    line = line_bytes.decode('gbk').strip()
                except UnicodeDecodeError:
                    try:
                        # 再尝试latin-1(不会失败,但可能显示不正确)
                        line = line_bytes.decode('latin-1').strip()
                    except:
                        # 如果所有解码都失败,跳过该行
                        continue

            parts = line.split(':', 1)  # 按第一个冒号分割
            if parts:
                first_part = parts[0].strip()
                if first_part:  # 忽略空字符串
                    unique_parts.add(first_part)  # 添加到集合(自动去重)

    # 写入输出文件(按字母排序)
    with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
        for item in sorted(unique_parts):  # 可选:排序后写入
            outfile.write(item + '\n')

if __name__ == '__main__':
    input_filename = 'redis_keys_0610.txt'  # 替换为你的输入文件
    output_filename = 'output_deduplicated_0610.txt'  # 替换为输出文件

    try:
        print(f"正在处理并去重文件 {input_filename}...")
        process_and_deduplicate(input_filename, output_filename)
        print(f"去重完成,结果已保存到 {output_filename}")

        # 统计行数更高效的方法
        with open(output_filename, 'r', encoding='utf-8') as f:
            line_count = sum(1 for _ in f)
        print(f"总共有 {line_count} 个唯一项")
    except Exception as e:
        print(f"处理过程中发生错误: {e}")

你好:我的2025