go语言写法:
package main
import (
"context"
"fmt"
"log"
"os"
"sort"
"strings"
"time"
"github.com/go-redis/redis/v8"
"gopkg.in/yaml.v3"
)
// 流式获取Redis所有key并写入文件
func streamRedisKeysToFile(host string, port int, db int, password string, batchSize int, filename string) (int, error) {
addr := host
if strings.Contains(host, ":") && !strings.HasPrefix(host, "[") {
addr = fmt.Sprintf("[%s]:%d", host, port)
} else {
addr = fmt.Sprintf("%s:%d", host, port)
}
rdb := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
})
ctx := context.Background()
var cursor uint64
totalKeys := 0
uniqueKeys := make(map[string]struct{})
// 创建输出文件
file, err := os.Create(filename)
if err != nil {
return 0, fmt.Errorf("创建文件失败: %v", err)
}
defer file.Close()
// 临时存储一批key进行去重和排序
batch := make([]string, 0, batchSize)
maxRetries := 3
// 扫描所有key
for {
var keys []string
var scanErr error
// 带重试的SCAN
for retry := 0; retry < maxRetries; retry++ {
keys, cursor, scanErr = rdb.Scan(ctx, cursor, "*", int64(batchSize)).Result()
if scanErr == nil {
break
}
log.Printf("SCAN错误 (尝试 %d/%d): %v", retry+1, maxRetries, scanErr)
// 最后一次重试仍然失败
if retry == maxRetries-1 {
return totalKeys, fmt.Errorf("扫描key时出错: %w", scanErr)
}
// 等待后重试
time.Sleep(time.Second * time.Duration(retry+1))
// 检查连接状态
if _, err := rdb.Ping(ctx).Result(); err != nil {
// 重新连接
rdb = redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
})
}
}
// 处理当前批次的key
for _, key := range keys {
if _, exists := uniqueKeys[key]; !exists {
uniqueKeys[key] = struct{}{}
batch = append(batch, key)
totalKeys++
// 当批次达到指定大小时写入文件
if len(batch) >= batchSize {
if err := writeBatchToFile(batch, file); err != nil {
return totalKeys, err
}
batch = batch[:0] // 清空批次
}
}
}
// 检查是否完成
// 添加延迟减轻服务器压力
if cursor != 0 {
time.Sleep(50 * time.Millisecond)
} else {
break
}
}
// 写入最后剩余的key
if len(batch) > 0 {
if err := writeBatchToFile(batch, file); err != nil {
return totalKeys, err
}
}
return totalKeys, nil
}
// 写入一批key到文件(已排序)
func writeBatchToFile(keys []string, file *os.File) error {
// 对当前批次排序
sort.Strings(keys)
// 写入文件
for _, key := range keys {
if _, err := file.WriteString(key + "\n"); err != nil {
return fmt.Errorf("写入文件失败: %v", err)
}
}
return nil
}
type RedisConfig struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
DB int `yaml:"db"`
Password string `yaml:"password"`
BatchSize int `yaml:"batchSize"`
}
type Config struct {
Redis RedisConfig `yaml:"redis"`
}
func LoadConfig(path string) (*Config, error) {
config := &Config{
Redis: RedisConfig{
Host: "localhost",
Port: 6379,
DB: 0,
Password: "",
},
}
file, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("无法读取配置文件: %w", err)
}
if err := yaml.Unmarshal(file, config); err != nil {
return nil, fmt.Errorf("无法解析配置文件: %w", err)
}
return config, nil
}
func main() {
startTime := time.Now()
cfg, err := LoadConfig("config.yaml")
if err != nil {
log.Fatal("配置加载失败: %v", err)
}
// 输出文件
outputFile := "redis_keys_gd_0610.txt"
// 获取所有key并直接写入文件
fmt.Println("正在从Redis获取所有key并写入文件...")
totalKeys, err := streamRedisKeysToFile(cfg.Redis.Host, cfg.Redis.Port, cfg.Redis.DB, cfg.Redis.Password, cfg.Redis.BatchSize, outputFile)
if err != nil {
log.Println("处理过程中发生错误: %v", err)
}
fmt.Printf("成功获取并写入 %d 个唯一的key\n", totalKeys)
fmt.Printf("所有key已写入文件: %s\n", outputFile)
elapsedTime := time.Since(startTime)
fmt.Printf("总耗时: %.4f 秒\n", elapsedTime.Seconds())
var input string
fmt.Scanln(&input)
fmt.Println("程序继续执行")
}
config.yaml配置文件:
redis:
host: "localhost"
port: 6379
db: 0
password: "123456"
BatchSize: 1000
相关命令:
go build 编译
go run ./main.go 执行该文件
python脚本:
import time
import redis
def get_all_redis_keys(host='localhost', port=6379, db=0, password=None):
# 连接 Redis,不自动解码响应
r = redis.Redis(host=host, port=port, db=db, password=password, decode_responses=False)
keys = set()
cursor = b'0' # 注意这里使用 bytes 类型的游标
while cursor != 0:
cursor, partial_keys = r.scan(cursor=cursor)
keys.update(partial_keys)
# 尝试解码为字符串,无法解码的保持为 bytes
decoded_keys = []
for key in keys:
try:
decoded_keys.append(key.decode('utf-8'))
except UnicodeDecodeError:
decoded_keys.append(str(key)) # 或者使用 repr(key) 显示原始字节
return sorted(decoded_keys)
def write_keys_to_file(keys, filename='redis_keys.txt'):
"""
将 key 列表写入文件
:param keys: key 列表
:param filename: 输出文件名
"""
with open(filename, 'w', encoding='utf-8') as f:
for key in keys:
f.write(f"{key}\n")
if __name__ == '__main__':
start_time = time.time() # 记录开始时间
# 配置 Redis 连接参数
redis_config = {
'host': 'localhost', # Redis 服务器地址
'port': 6379, # Redis 端口
'db': 0, # 数据库编号
'password': '12346' # 如果有密码,填写在这里
}
# 输出文件
output_file = 'redis_keys_0610.txt'
# 获取所有 key
try:
print("正在从 Redis 获取所有 key...")
all_keys = get_all_redis_keys(**redis_config)
print(f"获取到 {len(all_keys)} 个唯一的 key")
# 写入文件
write_keys_to_file(all_keys, output_file)
print(f"所有 key 已写入文件: {output_file}")
except Exception as e:
print(f"处理过程中发生错误: {e}")
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time # 计算耗时(秒)
print(f"耗时: {elapsed_time:.4f} 秒") # 保留4位小数
python解析有“:”的字符串:
def process_and_deduplicate(input_file, output_file):
"""
处理输入文件,截断冒号后的内容,去重后保存到输出文件
改进版本:自动处理编码问题
:param input_file: 输入文件名
:param output_file: 输出文件名
"""
unique_parts = set() # 用集合自动去重
# 读取输入文件并处理(尝试多种编码)
with open(input_file, 'rb') as infile: # 以二进制模式读取
for line_bytes in infile:
try:
# 先尝试UTF-8解码
line = line_bytes.decode('utf-8').strip()
except UnicodeDecodeError:
try:
# 如果UTF-8失败,尝试GBK(常见中文编码)
line = line_bytes.decode('gbk').strip()
except UnicodeDecodeError:
try:
# 再尝试latin-1(不会失败,但可能显示不正确)
line = line_bytes.decode('latin-1').strip()
except:
# 如果所有解码都失败,跳过该行
continue
parts = line.split(':', 1) # 按第一个冒号分割
if parts:
first_part = parts[0].strip()
if first_part: # 忽略空字符串
unique_parts.add(first_part) # 添加到集合(自动去重)
# 写入输出文件(按字母排序)
with open(output_file, 'w', encoding='utf-8', errors='replace') as outfile:
for item in sorted(unique_parts): # 可选:排序后写入
outfile.write(item + '\n')
if __name__ == '__main__':
input_filename = 'redis_keys_0610.txt' # 替换为你的输入文件
output_filename = 'output_deduplicated_0610.txt' # 替换为输出文件
try:
print(f"正在处理并去重文件 {input_filename}...")
process_and_deduplicate(input_filename, output_filename)
print(f"去重完成,结果已保存到 {output_filename}")
# 统计行数更高效的方法
with open(output_filename, 'r', encoding='utf-8') as f:
line_count = sum(1 for _ in f)
print(f"总共有 {line_count} 个唯一项")
except Exception as e:
print(f"处理过程中发生错误: {e}")