删除 sync_script.py
This commit is contained in:
parent
71fdb4c264
commit
0e9d40eb45
182
sync_script.py
182
sync_script.py
@ -1,182 +0,0 @@
|
||||
import os
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import requests
|
||||
import filecmp
|
||||
import shutil
|
||||
import zipfile
|
||||
from telegram import Bot
|
||||
from git import Repo
|
||||
from dotenv import load_dotenv
|
||||
from telethon.sync import TelegramClient
|
||||
from telegram.ext import Application
|
||||
|
||||
# 配置日志模块
|
||||
logging.basicConfig(
|
||||
filename='/var/log/telegram_gitea_sync.log', # 日志文件存储路径
|
||||
level=logging.ERROR, # 只记录错误级别及以上日志
|
||||
format='%(asctime)s - %(levelname)s - %(message)s' # 格式:时间戳、日志级别和消息
|
||||
)
|
||||
|
||||
# 加载环境变量
|
||||
load_dotenv()
|
||||
|
||||
TELEGRAM_API_ID = os.getenv("TELEGRAM_API_ID")
|
||||
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
|
||||
BOT_TOKEN = os.getenv("BOT_TOKEN")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN")
|
||||
GITEA_REPO = os.getenv("GITEA_REPO")
|
||||
GITEA_BRANCH = os.getenv("GITEA_BRANCH")
|
||||
TELEGRAM_CHAT_ID = "-1002513646618"
|
||||
|
||||
# 初始化
|
||||
bot = Bot(token=BOT_TOKEN)
|
||||
TEMP_DIR = "/opt/telegram_gitea_sync/tmp"
|
||||
REPO_DIR = "/opt/telegram_gitea_sync/repo"
|
||||
os.makedirs(TEMP_DIR, exist_ok=True)
|
||||
|
||||
async def send_telegram_message(chat_id, text):
|
||||
"""
|
||||
异步发送 Telegram 消息
|
||||
"""
|
||||
try:
|
||||
await bot.send_message(chat_id=chat_id, text=text)
|
||||
except Exception as e:
|
||||
print(f"发送消息失败: {e}")
|
||||
|
||||
async def download_latest_file():
|
||||
"""
|
||||
使用个人账号从 Telegram 下载最新文件
|
||||
"""
|
||||
try:
|
||||
# 登录 Telegram
|
||||
client = TelegramClient('session_name', TELEGRAM_API_ID, TELEGRAM_API_HASH)
|
||||
await client.start(phone="+17165880598") # 替换为您的 Telegram 注册手机号
|
||||
|
||||
print("成功登录 Telegram")
|
||||
|
||||
# 获取目标频道最新消息
|
||||
messages = await client.get_messages("PandaGroovePG", limit=10)
|
||||
print(f"获取到的消息数量: {len(messages)}")
|
||||
|
||||
# 遍历消息,查找文件
|
||||
for message in messages:
|
||||
if message.file:
|
||||
file_name = message.file.name
|
||||
file_path = f"{TEMP_DIR}/{file_name}"
|
||||
await message.download_media(file_path)
|
||||
print(f"文件下载完成: {file_path}")
|
||||
return file_path
|
||||
|
||||
# 未找到文件
|
||||
raise FileNotFoundError("未找到可下载的文件")
|
||||
except Exception as e:
|
||||
print(f"文件下载失败: {e}")
|
||||
raise
|
||||
|
||||
async def process_files(file_path):
|
||||
"""
|
||||
解压、递归比对文件,并同步到仓库
|
||||
"""
|
||||
try:
|
||||
print("开始解压文件...")
|
||||
shutil.unpack_archive(file_path, TEMP_DIR) # 解压 ZIP 文件
|
||||
print(f"文件已解压到临时目录: {TEMP_DIR}")
|
||||
|
||||
# 遍历所有文件和文件夹,逐一比对内容
|
||||
for root, dirs, files in os.walk(TEMP_DIR): # 遍历解压后的临时目录
|
||||
for file in files:
|
||||
temp_file_path = os.path.join(root, file) # 解压后的文件路径
|
||||
repo_file_path = temp_file_path.replace(TEMP_DIR, REPO_DIR) # 替换路径到仓库
|
||||
|
||||
# 如果仓库中不存在该文件,直接复制
|
||||
if not os.path.exists(repo_file_path):
|
||||
print(f"新文件发现,复制到仓库: {temp_file_path}")
|
||||
os.makedirs(os.path.dirname(repo_file_path), exist_ok=True)
|
||||
shutil.copy(temp_file_path, repo_file_path)
|
||||
else:
|
||||
# 文件存在,进行内容比对
|
||||
if not filecmp.cmp(temp_file_path, repo_file_path, shallow=False):
|
||||
print(f"文件已更新,覆盖旧版本: {temp_file_path}")
|
||||
shutil.copy(temp_file_path, repo_file_path)
|
||||
else:
|
||||
print(f"文件未更改,跳过: {temp_file_path}")
|
||||
|
||||
print("所有文件比对完成,准备同步到 Gitea")
|
||||
await sync_to_gitea()
|
||||
except Exception as e:
|
||||
print(f"处理文件时发生错误: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def process_jsm_json(json_path):
|
||||
"""
|
||||
处理 jsm.json 文件
|
||||
"""
|
||||
try:
|
||||
with open(json_path, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
data["logo"] = "./bj/fyj.gif"
|
||||
for site in data.get("sites", []):
|
||||
if site.get("key") == "lf_js_search":
|
||||
site["proxy"] = "noproxy"
|
||||
with open(json_path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||||
except Exception as e:
|
||||
await send_telegram_message(TELEGRAM_CHAT_ID, f"处理 jsm.json 文件失败: {e}")
|
||||
raise
|
||||
|
||||
async def sync_to_gitea():
|
||||
"""
|
||||
推送更新到 Gitea 仓库
|
||||
"""
|
||||
try:
|
||||
print(f"检查仓库路径是否存在: {REPO_DIR}")
|
||||
if not os.path.exists(REPO_DIR):
|
||||
raise FileNotFoundError(f"仓库路径不存在: {REPO_DIR}")
|
||||
|
||||
repo = Repo(REPO_DIR)
|
||||
print("成功加载 Gitea 仓库")
|
||||
|
||||
repo.git.add("--all")
|
||||
print("添加所有修改文件")
|
||||
|
||||
repo.index.commit("自动更新文件")
|
||||
print("提交修改")
|
||||
|
||||
origin = repo.remote(name="origin")
|
||||
origin.push()
|
||||
print("推送更新到远程仓库成功")
|
||||
except Exception as e:
|
||||
print(f"同步到 Gitea 失败: {e}")
|
||||
raise
|
||||
|
||||
|
||||
async def main():
|
||||
"""
|
||||
主流程
|
||||
"""
|
||||
try:
|
||||
print("开始执行主流程...") # 添加调试信息
|
||||
await send_telegram_message(TELEGRAM_CHAT_ID, "开始检测最新文件...")
|
||||
print("已发送电报通知") # 添加调试信息
|
||||
|
||||
file_path = await download_latest_file()
|
||||
print(f"文件下载完成,路径为: {file_path}") # 输出下载文件路径
|
||||
|
||||
await process_files(file_path)
|
||||
print("文件处理完成并已同步到 Gitea") # 确认文件处理完成
|
||||
|
||||
await send_telegram_message(TELEGRAM_CHAT_ID, "同步完成!")
|
||||
print("任务完成,通知已发送到电报频道") # 确认任务完成
|
||||
except Exception as e:
|
||||
error_message = f"发生错误: {e}"
|
||||
print(error_message) # 输出错误信息到终端
|
||||
await send_telegram_message(TELEGRAM_CHAT_ID, error_message)
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("脚本已启动,准备执行主逻辑...") # 添加调试信息
|
||||
import asyncio
|
||||
asyncio.run(main())
|
||||
print("脚本已完成运行!") # 添加结束时的调试信息
|
Loading…
x
Reference in New Issue
Block a user