tv/sync_script.py
2025-04-07 17:17:29 -04:00

183 lines
6.2 KiB
Python

import os
import logging
import asyncio
import json
import requests
import filecmp
import shutil
import zipfile
from telegram import Bot
from git import Repo
from dotenv import load_dotenv
from telethon.sync import TelegramClient
from telegram.ext import Application
# 配置日志模块
logging.basicConfig(
filename='/var/log/telegram_gitea_sync.log', # 日志文件存储路径
level=logging.ERROR, # 只记录错误级别及以上日志
format='%(asctime)s - %(levelname)s - %(message)s' # 格式:时间戳、日志级别和消息
)
# 加载环境变量
load_dotenv()
TELEGRAM_API_ID = os.getenv("TELEGRAM_API_ID")
TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH")
BOT_TOKEN = os.getenv("BOT_TOKEN")
GITEA_TOKEN = os.getenv("GITEA_TOKEN")
GITEA_REPO = os.getenv("GITEA_REPO")
GITEA_BRANCH = os.getenv("GITEA_BRANCH")
TELEGRAM_CHAT_ID = "-1002513646618"
# 初始化
bot = Bot(token=BOT_TOKEN)
TEMP_DIR = "/opt/telegram_gitea_sync/tmp"
REPO_DIR = "/opt/telegram_gitea_sync/repo"
os.makedirs(TEMP_DIR, exist_ok=True)
async def send_telegram_message(chat_id, text):
"""
异步发送 Telegram 消息
"""
try:
await bot.send_message(chat_id=chat_id, text=text)
except Exception as e:
print(f"发送消息失败: {e}")
async def download_latest_file():
"""
使用个人账号从 Telegram 下载最新文件
"""
try:
# 登录 Telegram
client = TelegramClient('session_name', TELEGRAM_API_ID, TELEGRAM_API_HASH)
await client.start(phone="+17165880598") # 替换为您的 Telegram 注册手机号
print("成功登录 Telegram")
# 获取目标频道最新消息
messages = await client.get_messages("PandaGroovePG", limit=10)
print(f"获取到的消息数量: {len(messages)}")
# 遍历消息,查找文件
for message in messages:
if message.file:
file_name = message.file.name
file_path = f"{TEMP_DIR}/{file_name}"
await message.download_media(file_path)
print(f"文件下载完成: {file_path}")
return file_path
# 未找到文件
raise FileNotFoundError("未找到可下载的文件")
except Exception as e:
print(f"文件下载失败: {e}")
raise
async def process_files(file_path):
"""
解压、递归比对文件,并同步到仓库
"""
try:
print("开始解压文件...")
shutil.unpack_archive(file_path, TEMP_DIR) # 解压 ZIP 文件
print(f"文件已解压到临时目录: {TEMP_DIR}")
# 遍历所有文件和文件夹,逐一比对内容
for root, dirs, files in os.walk(TEMP_DIR): # 遍历解压后的临时目录
for file in files:
temp_file_path = os.path.join(root, file) # 解压后的文件路径
repo_file_path = temp_file_path.replace(TEMP_DIR, REPO_DIR) # 替换路径到仓库
# 如果仓库中不存在该文件,直接复制
if not os.path.exists(repo_file_path):
print(f"新文件发现,复制到仓库: {temp_file_path}")
os.makedirs(os.path.dirname(repo_file_path), exist_ok=True)
shutil.copy(temp_file_path, repo_file_path)
else:
# 文件存在,进行内容比对
if not filecmp.cmp(temp_file_path, repo_file_path, shallow=False):
print(f"文件已更新,覆盖旧版本: {temp_file_path}")
shutil.copy(temp_file_path, repo_file_path)
else:
print(f"文件未更改,跳过: {temp_file_path}")
print("所有文件比对完成,准备同步到 Gitea")
await sync_to_gitea()
except Exception as e:
print(f"处理文件时发生错误: {e}")
raise
async def process_jsm_json(json_path):
"""
处理 jsm.json 文件
"""
try:
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
data["logo"] = "./bj/fyj.gif"
for site in data.get("sites", []):
if site.get("key") == "lf_js_search":
site["proxy"] = "noproxy"
with open(json_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
except Exception as e:
await send_telegram_message(TELEGRAM_CHAT_ID, f"处理 jsm.json 文件失败: {e}")
raise
async def sync_to_gitea():
"""
推送更新到 Gitea 仓库
"""
try:
print(f"检查仓库路径是否存在: {REPO_DIR}")
if not os.path.exists(REPO_DIR):
raise FileNotFoundError(f"仓库路径不存在: {REPO_DIR}")
repo = Repo(REPO_DIR)
print("成功加载 Gitea 仓库")
repo.git.add("--all")
print("添加所有修改文件")
repo.index.commit("自动更新文件")
print("提交修改")
origin = repo.remote(name="origin")
origin.push()
print("推送更新到远程仓库成功")
except Exception as e:
print(f"同步到 Gitea 失败: {e}")
raise
async def main():
"""
主流程
"""
try:
print("开始执行主流程...") # 添加调试信息
await send_telegram_message(TELEGRAM_CHAT_ID, "开始检测最新文件...")
print("已发送电报通知") # 添加调试信息
file_path = await download_latest_file()
print(f"文件下载完成,路径为: {file_path}") # 输出下载文件路径
await process_files(file_path)
print("文件处理完成并已同步到 Gitea") # 确认文件处理完成
await send_telegram_message(TELEGRAM_CHAT_ID, "同步完成!")
print("任务完成,通知已发送到电报频道") # 确认任务完成
except Exception as e:
error_message = f"发生错误: {e}"
print(error_message) # 输出错误信息到终端
await send_telegram_message(TELEGRAM_CHAT_ID, error_message)
if __name__ == "__main__":
print("脚本已启动,准备执行主逻辑...") # 添加调试信息
import asyncio
asyncio.run(main())
print("脚本已完成运行!") # 添加结束时的调试信息