import os import logging import asyncio import json import requests import filecmp import shutil import zipfile from telegram import Bot from git import Repo from dotenv import load_dotenv from telethon.sync import TelegramClient from telegram.ext import Application # 配置日志模块 logging.basicConfig( filename='/var/log/telegram_gitea_sync.log', # 日志文件存储路径 level=logging.ERROR, # 只记录错误级别及以上日志 format='%(asctime)s - %(levelname)s - %(message)s' # 格式:时间戳、日志级别和消息 ) # 加载环境变量 load_dotenv() TELEGRAM_API_ID = os.getenv("TELEGRAM_API_ID") TELEGRAM_API_HASH = os.getenv("TELEGRAM_API_HASH") BOT_TOKEN = os.getenv("BOT_TOKEN") GITEA_TOKEN = os.getenv("GITEA_TOKEN") GITEA_REPO = os.getenv("GITEA_REPO") GITEA_BRANCH = os.getenv("GITEA_BRANCH") TELEGRAM_CHAT_ID = "-1002513646618" # 初始化 bot = Bot(token=BOT_TOKEN) TEMP_DIR = "/opt/telegram_gitea_sync/tmp" REPO_DIR = "/opt/telegram_gitea_sync/repo" os.makedirs(TEMP_DIR, exist_ok=True) async def send_telegram_message(chat_id, text): """ 异步发送 Telegram 消息 """ try: await bot.send_message(chat_id=chat_id, text=text) except Exception as e: print(f"发送消息失败: {e}") async def download_latest_file(): """ 使用个人账号从 Telegram 下载最新文件 """ try: # 登录 Telegram client = TelegramClient('session_name', TELEGRAM_API_ID, TELEGRAM_API_HASH) await client.start(phone="+17165880598") # 替换为您的 Telegram 注册手机号 print("成功登录 Telegram") # 获取目标频道最新消息 messages = await client.get_messages("PandaGroovePG", limit=10) print(f"获取到的消息数量: {len(messages)}") # 遍历消息,查找文件 for message in messages: if message.file: file_name = message.file.name file_path = f"{TEMP_DIR}/{file_name}" await message.download_media(file_path) print(f"文件下载完成: {file_path}") return file_path # 未找到文件 raise FileNotFoundError("未找到可下载的文件") except Exception as e: print(f"文件下载失败: {e}") raise async def process_files(file_path): """ 解压、递归比对文件,并同步到仓库 """ try: print("开始解压文件...") shutil.unpack_archive(file_path, TEMP_DIR) # 解压 ZIP 文件 print(f"文件已解压到临时目录: {TEMP_DIR}") # 遍历所有文件和文件夹,逐一比对内容 for root, dirs, files in os.walk(TEMP_DIR): # 遍历解压后的临时目录 for file in files: temp_file_path = os.path.join(root, file) # 解压后的文件路径 repo_file_path = temp_file_path.replace(TEMP_DIR, REPO_DIR) # 替换路径到仓库 # 如果仓库中不存在该文件,直接复制 if not os.path.exists(repo_file_path): print(f"新文件发现,复制到仓库: {temp_file_path}") os.makedirs(os.path.dirname(repo_file_path), exist_ok=True) shutil.copy(temp_file_path, repo_file_path) else: # 文件存在,进行内容比对 if not filecmp.cmp(temp_file_path, repo_file_path, shallow=False): print(f"文件已更新,覆盖旧版本: {temp_file_path}") shutil.copy(temp_file_path, repo_file_path) else: print(f"文件未更改,跳过: {temp_file_path}") print("所有文件比对完成,准备同步到 Gitea") await sync_to_gitea() except Exception as e: print(f"处理文件时发生错误: {e}") raise async def process_jsm_json(json_path): """ 处理 jsm.json 文件 """ try: with open(json_path, "r", encoding="utf-8") as f: data = json.load(f) data["logo"] = "./bj/fyj.gif" for site in data.get("sites", []): if site.get("key") == "lf_js_search": site["proxy"] = "noproxy" with open(json_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=4) except Exception as e: await send_telegram_message(TELEGRAM_CHAT_ID, f"处理 jsm.json 文件失败: {e}") raise async def sync_to_gitea(): """ 推送更新到 Gitea 仓库 """ try: print(f"检查仓库路径是否存在: {REPO_DIR}") if not os.path.exists(REPO_DIR): raise FileNotFoundError(f"仓库路径不存在: {REPO_DIR}") repo = Repo(REPO_DIR) print("成功加载 Gitea 仓库") repo.git.add("--all") print("添加所有修改文件") repo.index.commit("自动更新文件") print("提交修改") origin = repo.remote(name="origin") origin.push() print("推送更新到远程仓库成功") except Exception as e: print(f"同步到 Gitea 失败: {e}") raise async def main(): """ 主流程 """ try: print("开始执行主流程...") # 添加调试信息 await send_telegram_message(TELEGRAM_CHAT_ID, "开始检测最新文件...") print("已发送电报通知") # 添加调试信息 file_path = await download_latest_file() print(f"文件下载完成,路径为: {file_path}") # 输出下载文件路径 await process_files(file_path) print("文件处理完成并已同步到 Gitea") # 确认文件处理完成 await send_telegram_message(TELEGRAM_CHAT_ID, "同步完成!") print("任务完成,通知已发送到电报频道") # 确认任务完成 except Exception as e: error_message = f"发生错误: {e}" print(error_message) # 输出错误信息到终端 await send_telegram_message(TELEGRAM_CHAT_ID, error_message) if __name__ == "__main__": print("脚本已启动,准备执行主逻辑...") # 添加调试信息 import asyncio asyncio.run(main()) print("脚本已完成运行!") # 添加结束时的调试信息