1、在数据的每一行前面添加一个前缀:
prefix = "prefix."
filename = "Text.txt"
new_filename = prefix + filename
# 读取txt文件
with open(filename, 'r', encoding='utf-8') as txt_file:
data = txt_file.readlines()
# 处理每行数据
new_data = []
for line in data:
# 分隔每个字段
fields = line.strip().split('\t')
if fields == ['']:
# 在每个字段前不添加前缀
new_fields = [field for field in fields]
# 组合新的一行数据
new_line = '\t'.join(new_fields) + '\n'
new_data.append(new_line)
elif any(item.startswith('/') or item.startswith('*') for item in fields):
continue
else:
# 在每个字段前添加前缀
new_fields = [prefix + field for field in fields]
# 组合新的一行数据
new_line = '\t'.join(new_fields) + '\n'
new_data.append(new_line)
# 将更改后的数据写入新的txt文件
with open(new_filename, 'w', encoding='utf-8') as new_txt_file:
new_txt_file.writelines(new_data)
2、去除文本中的所有数字:
import re
with open('Test.txt', 'r',encoding='UTF-8') as f:
content = f.read()
new_content = re.sub(r'\d+', '', content)
with open('prefix.Test.txt', 'w') as f:
f.write(new_content)
3、扫描一个目录,将文件(自动匹配编码格式)中存在关键字的所有行提取出来并保存的一个文件中:
import os
import re
import chardet
# 要扫描的目录
dir_path = r'/project'
# 要查找的字符串
search_str = 'keyWorld'
# 要保存结果的文件名
output_file = r'output.txt'
# 遍历目录中的所有文件
for root, dirs, files in os.walk(dir_path):
for file in files:
# 对 .java 后缀的文件进行处理
if file.endswith('.java'):
# 使用 chardet 检测文件编码
with open(os.path.join(root, file), 'rb') as f:
data = f.read()
result = chardet.detect(data)
if not result['encoding']:
print(f'Skipping {file} due to unrecognized encoding')
continue
encoding = result['encoding']
try:
# 打开文件并按行读取
with open(os.path.join(root, file), 'r', encoding=encoding) as f:
for line in f.readlines():
# 查找包含指定字符串的行
if search_str in line:
# 将符合条件的行写入输出文件
with open(output_file, 'a', encoding=encoding) as of:
of.write(f'{os.path.join(file)}:{line}')
except UnicodeDecodeError:
print(f'Skipping {file} due to decoding error')
print('Done.')
4、解析cvs文件:
import csv
new_filename = "Test.txt"
# 打开CSV文件并读取数据
with open('gift.csv', newline='') as csvfile:
reader = csv.DictReader(csvfile)
# 仅读取message列
messages = [row['rowName'] for row in reader]
# 打印message数据
new_data = []
for message in messages:
print(message)
new_data.append('\t'+ message + '\n')
# 将更改后的数据写入新的txt文件
with open(new_filename, 'w', encoding='utf-8') as new_txt_file:
new_txt_file.writelines(new_data)
5、解析每一行的数据:
import re
from collections import defaultdict
# 用于匹配行数据中的不同字段
PATTERN = r"id:(\d+),.*twoId:(\d+),num:(\d+),"
# 用于存储分组后的结果
result = defaultdict(lambda: defaultdict(int))
# 打开数据文件
with open("Test.txt") as f:
for line in f:
# 使用正则表达式解析当前行的数据
print(line.strip())
match = re.search(PATTERN, line.strip())
print(match)
if match:
id = match.group(1)
twoId = match.group(2)
num = int(match.group(3))
# 更新分组后的结果
result[id][twoId] += num
# 将结果写入文件
with open("output.txt", "w") as f:
for id, Items in result.items():
for twoId, num in Items.items():
f.write(f"{id},{twoId},{num}\n")
6、解析txt文件并保存到excel文件中:
import re
from openpyxl import Workbook
# 读取 txt 文件
with open('Text.txt', 'r') as f:
content = f.readlines()
# 创建 Excel 文件
wb = Workbook()
ws = wb.active
# 将数据写入 Excel 文件
for line in content:
match = re.match(r'^([^=]+)=(.+)$', line)
print(match)
if match:
key = match.group(1).strip()
value = match.group(2).strip('"').lstrip('"').lstrip('-"').lstrip(' "')
ws.append([key, value])
# 保存 Excel 文件
wb.save('new.xlsx')
2024-02-23 start:
from PIL import Image
def run():
# 打开PNG图像
image = Image.open('【图片路径】')
# 修改像素大小
new_width = 108
new_height = 108
resized_image = image.resize((new_width, new_height), Image.Resampling.BICUBIC)
# 保存修改后的图像
resized_image.save('108.png')
if __name__ == '__main__':
run()
end
2024-12-06 start:
输入mysql的表名,输出该表所有字段的注释(没有注释的显示无注释)并输出字段的个数
import mysql.connector
from mysql.connector import Error
def get_table_column_comments(host, port, user, password, database, table_name):
connection = None # 提前声明并初始化connection变量
try:
# 创建数据库连接,并指定端口号
connection = mysql.connector.connect(
host=host,
port=port, # 指定端口号
user=user,
password=password,
database=database,
charset='utf8mb4', # 指定字符集
collation='utf8mb4_general_ci' # 指定排序规则
)
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
# 查询指定表的所有字段及其注释
query_columns = """
SELECT COLUMN_NAME, COLUMN_COMMENT
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s;
"""
cursor.execute(query_columns, (database, table_name))
columns = cursor.fetchall()
# 构建字段注释列表
column_comments = []
for column in columns:
column_comment = column['COLUMN_COMMENT'] or '无注释' # 如果字段没有注释,则默认为'无注释'
column_comments.append(f"{column_comment}")
# 使用中文逗号分隔并打印
print("、".join(column_comments))
print("\n")
print(len(column_comments))
except Error as e:
print("Error while connecting to MySQL", e)
finally:
# 关闭数据库连接
if connection.is_connected():
cursor.close()
connection.close()
# 使用你的实际数据库信息替换以下参数
host = '123'
port = '3306'
user = '123'
password = '123'
database = '123'
table_name = '123'
get_table_column_comments(host, port, user, password, database, table_name)
输入mysql表名的注释,打印对应的表名:
import mysql.connector
from mysql.connector import Error
def get_tables_with_filtered_comments(host, port, user, password, database, filter_texts):
connection = None # 提前声明并初始化connection变量
try:
# 创建数据库连接,并指定端口号
connection = mysql.connector.connect(
host=host,
port=port, # 指定端口号
user=user,
password=password,
database=database,
charset='utf8mb4', # 指定字符集
collation='utf8mb4_general_ci' # 指定排序规则
)
if connection.is_connected():
cursor = connection.cursor(dictionary=True)
# 查询所有表名及其注释
query_tables = """
SELECT TABLE_NAME, TABLE_COMMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = %s;
"""
cursor.execute(query_tables, (database,))
tables = cursor.fetchall()
# 存储结果
filtered_tables = []
for table in tables:
table_name = table['TABLE_NAME']
table_comment = table['TABLE_COMMENT'] or '无注释' # 如果表没有注释,则默认为'无注释'
# 检查表注释是否包含任意一个过滤关键字
if not filter_texts or any(filter_text in table_comment for filter_text in filter_texts):
filtered_tables.append(f"{table_name}({table_comment})")
# 打印结果
if filtered_tables:
print("\n".join(filtered_tables))
else:
print("No matching tables found with the given filter texts.")
except Error as e:
print("Error while connecting to MySQL", e)
finally:
# 关闭数据库连接
if connection.is_connected():
cursor.close()
connection.close()
# 使用你的实际数据库信息替换以下参数
host = '123'
port = '3306'
user = '123'
password = '123'
database = '123'
filter_texts = ["123"]
get_tables_with_filtered_comments(host, port, user, password, database, filter_texts)
end