自动追更脚本

1.初始化 Termux 环境

打开你的 Termux,把这三行命令依次复制进去回车。这步是为了装好 Python 和需要的底层运行库(如果提示 [Y/n],直接敲 y 回车):

pkg update -y && pkg upgrade -y
pkg install python clang make libffi -y
pip install requests pycryptodome python-dotenv schedule

2.建立专属工作台与配置文件
在 Termux 里依次执行下面这几行,创建咱们的文件夹和环境变量文件:

# 1. 创建专属文件夹并进去
mkdir -p ~/cloud189_bot/db
cd ~/cloud189_bot

# 2. 创建环境变量文件并编辑
cd
nano sys.env

执行完 nano sys.env 后,屏幕会变成黑底白字的编辑器。把下面这段内容修改成你自己的真实信息后,粘贴进去(注意等号两边不要有空格):

# 你的天翼云盘账号和密码
ENV_189_CLIENT_ID=13800138000
ENV_189_CLIENT_SECRET=你的天翼密码

# 你的 TG 机器人配置
ENV_TG_BOT_TOKEN=123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ
ENV_TG_ADMIN_USER_ID=你的TG数字ID

填完后,按 Ctrl + O(字母O),回车保存;然后按 Ctrl + X 退出。

3.写入终极神级脚本
现在,我们要把核心代码放进去。
在 Termux 里敲:

nano auto_189.py

进入编辑器后,把下面这套我已经帮你把所有冗余代码剔除、加入了“深度穿透”和“TG精准订阅”的完整纯净版代码,一次性全部粘贴进去:

import os
import json
import time
import requests
from urllib import parse
from Crypto.Cipher import PKCS1_v1_5 as Cipher_pksc1_v1_5
from Crypto.PublicKey import RSA
import logging
import schedule
from dotenv import load_dotenv
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
load_dotenv(dotenv_path="sys.env", override=True)

ENV_189_CLIENT_ID = os.getenv("ENV_189_CLIENT_ID", "")
ENV_189_CLIENT_SECRET = os.getenv("ENV_189_CLIENT_SECRET", "")
TG_BOT_TOKEN = os.getenv("ENV_TG_BOT_TOKEN", "")
TG_ADMIN_USER_ID = os.getenv("ENV_TG_ADMIN_USER_ID", "")

SUBS_FILE = "db/subscriptions.json"
HISTORY_FILE = "db/history.json"

def load_json(filepath):
    if os.path.exists(filepath):
        with open(filepath, 'r', encoding='utf-8') as f:
            return json.load(f)
    return {}

def save_json(filepath, data):
    with open(filepath, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def clean_filename(name):
    illegal_chars = '"\\/:*?|<>'
    for char in illegal_chars:
        name = name.replace(char, '')
    return name[:255]

def rsaEncrpt(password, public_key):
    rsakey = RSA.importKey(public_key)
    cipher = Cipher_pksc1_v1_5.new(rsakey)
    return cipher.encrypt(password.encode()).hex()

class TelegramNotifier:
    def __init__(self, bot_token, user_id):
        self.bot_token = bot_token
        self.user_id = user_id
        self.base_url = f"https://api.telegram.org/bot{self.bot_token}/" if self.bot_token else None

    def send_message(self, message):
        if not self.bot_token: return False
        try:
            requests.get(f"{self.base_url}sendMessage", params={"chat_id": self.user_id, "text": message}, timeout=10)
            return True
        except:
            return False

class Cloud189ShareInfo:
    def __init__(self, fileId, shareId, shareMode, cloud189Client, accessCode=""):
        self.shareDirFileId = fileId
        self.shareId = shareId
        self.session = cloud189Client.session
        self.client = cloud189Client
        self.shareMode = shareMode
        self.accessCode = accessCode

    def getAllShareFiles(self, folder_id=None):
        if folder_id is None:
            folder_id = self.shareDirFileId
        fileList, folders = [], []
        pageNumber = 1
        while True:
            result = self.session.get("https://cloud.189.cn/api/open/share/listShareDir.action", params={
                "pageNum": pageNumber, "pageSize": "10000", "fileId": folder_id,
                "shareDirFileId": self.shareDirFileId, "isFolder": "true",
                "shareId": self.shareId, "shareMode": self.shareMode,
                "orderBy": "lastOpTime", "descending": "true", "accessCode": self.accessCode,
            }).json()
            if result['res_code'] != 0: break
            fileListAO = result.get("fileListAO", {})
            fileList += fileListAO.get("fileList", [])
            folders += fileListAO.get("folderList", [])
            if fileListAO.get("fileListSize", 0) == 0 and len(fileListAO.get("folderList", [])) == 0: break
            pageNumber += 1
        return {"files": fileList, "folders": folders}

    def saveShareFiles(self, tasksInfos, targetFolderId):
        try:
            response = self.session.post("https://cloud.189.cn/api/open/batch/createBatchTask.action", data={
                "type": "SHARE_SAVE", "taskInfos": str(tasksInfos),
                "targetFolderId": targetFolderId, "shareId": self.shareId,
            }).json()
            if response.get("res_code") != 0: return response.get('res_message', 'UNKNOWN_ERROR')
            
            taskId = response["taskId"]
            while True:
                res = self.session.post("https://cloud.189.cn/api/open/batch/checkBatchTask.action", data={
                    "taskId": taskId, "type": "SHARE_SAVE"
                }).json()
                if res["taskStatus"] != 3 or res.get("errorCode"): break
                time.sleep(1)
            return res.get("errorCode")
        except Exception as e:
            return str(e)

class Cloud189:
    def __init__(self):
        self.session = requests.session()
        self.session.headers = {
            'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
            "Accept": "application/json;charset=UTF-8",
        }

    def getEncrypt(self):
        return self.session.post("https://open.e.189.cn/api/logbox/config/encryptConf.do", data={'appId': 'cloud'}, timeout=15).json()['data']['pubKey']

    def getRedirectURL(self):
        rsp = self.session.get('https://cloud.189.cn/api/portal/loginUrl.action?redirectURL=https://cloud.189.cn/web/redirect.html?returnURL=/main.action', timeout=15)
        return parse.parse_qs(parse.urlparse(rsp.url).query)

    def login(self, username, password):
        encryptKey = self.getEncrypt()
        query = self.getRedirectURL()
        resData = self.session.post('https://open.e.189.cn/api/logbox/oauth2/appConf.do', data={"version": '2.0', "appKey": 'cloud'}, headers={"Referer": 'https://open.e.189.cn/', "lt": query["lt"][0], "REQID": query["reqId"][0]}, timeout=15).json()
        
        keyData = f"-----BEGIN PUBLIC KEY-----\n{encryptKey}\n-----END PUBLIC KEY-----"
        data = {
            "appKey": 'cloud', "version": '2.0', "accountType": '01', "mailSuffix": '@189.cn',
            "returnUrl": resData['data']['returnUrl'], "paramId": resData['data']['paramId'],
            "clientType": '1', "isOauth2": "false",
            "userName": f"{{NRP}}{rsaEncrpt(username, keyData)}",
            "password": f"{{NRP}}{rsaEncrpt(password, keyData)}",
        }
        result = self.session.post('https://open.e.189.cn/api/logbox/oauth2/loginSubmit.do', data=data, headers={'Referer': 'https://open.e.189.cn/', 'lt': query["lt"][0], 'REQID': query["reqId"][0]}, timeout=15).json()
        if result['result'] == 0:
            self.session.get(result['toUrl'], headers={"Host": 'cloud.189.cn'}, timeout=15)
        else:
            raise Exception(result['msg'])

    def getShareInfo(self, link):
        url = parse.urlparse(link)
        try:
            code = parse.parse_qs(url.query)["code"][0]
        except:
            code = url.path.split('/')[-1]
            
        pwd = parse.parse_qs(url.query).get('pwd', [''])[0]
        
        result = self.session.get("https://cloud.189.cn/api/open/share/getShareInfoByCodeV2.action", params={"shareCode": code}).json()
        if result.get('res_code') != 0: 
            raise Exception(result.get('res_message', '获取分享失败'))
            
        file_id = result.get("fileId")
        share_mode = result.get("shareMode", 1)
        share_id = result.get("shareId")
        
        if pwd:
            verify_res = self.session.get("https://cloud.189.cn/api/open/share/checkAccessCode.action", params={"shareCode": code, "accessCode": pwd}).json()
            if verify_res.get('res_code') != 0:
                raise Exception(f"提取码错误或失效: {verify_res}")
            share_id = verify_res.get("shareId")
            
        if not share_id:
            raise Exception("未能获取到 shareId")
            
        return Cloud189ShareInfo(file_id, share_id, share_mode, self, pwd)

    def createFolder(self, name, parentFolderId=-11):
        result = self.session.post("https://cloud.189.cn/api/open/file/createFolder.action", data={"parentFolderId": parentFolderId, "folderName": name}).json()
        return result["id"]

    def getObjectFolderNodes(self, folderId=-11):
        return self.session.post("https://cloud.189.cn/api/portal/getObjectFolderNodes.action", data={"id": folderId, "orderBy": 1, "order": "ASC"}).json()

    def mkdirAll(self, path, parentFolderId=-11):
        path = path.strip("/")
        if not path: return parentFolderId
        for name in path.split("/"):
            found = False
            for node in self.getObjectFolderNodes(parentFolderId):
                if node["name"] == name:
                    parentFolderId = node["id"]
                    found = True
                    break
            if not found:
                parentFolderId = self.createFolder(name, parentFolderId)
        return parentFolderId

def get_all_share_files_recursive(info, folder_id=None, current_path=""):
    all_files = []
    result = info.getAllShareFiles(folder_id)
    for f in result.get("files", []):
        f["full_path"] = current_path + "/" + f["name"]
        all_files.append(f)
    for folder in result.get("folders", []):
        new_path = current_path + "/" + folder["name"]
        all_files.extend(get_all_share_files_recursive(info, folder["id"], new_path))
    return all_files

# ==========================================
# 🌟 终极引擎:精准细分频率 + 时间窗 + CD
# ==========================================
def check_subscriptions(client):
    subs = load_json(SUBS_FILE)
    history = load_json(HISTORY_FILE)
    notifier = TelegramNotifier(TG_BOT_TOKEN, TG_ADMIN_USER_ID)

    if not subs: return
    
    current_hour = datetime.now().hour
    current_time = time.time()
    logger.info(f"🔍 巡逻开始 (时段: {current_hour}点),总任务数: {len(subs)}")
    
    for target_id, sub_info in subs.items():
        try:
            share_url = sub_info if isinstance(sub_info, str) else sub_info.get("url", "")
            keyword = "" if isinstance(sub_info, str) else sub_info.get("keyword", "")
            path = "" if isinstance(sub_info, str) else sub_info.get("path", "")
            last_update = 0 if isinstance(sub_info, str) else sub_info.get("last_update", 0)
            freq = "" if isinstance(sub_info, str) else sub_info.get("freq", "") # 获取频率标签

            if path:
                hours_since_update = (current_time - last_update) / 3600
                
                if "动漫" in path:
                    # 🌟 核心升级:精细化计算动漫的休眠时间
                    if freq == "周更" or "周更" in path:
                        cd_hours = 144  # 周更:睡6天
                    elif freq == "双更" or "双更" in path:
                        cd_hours = 72   # 双更:睡3天
                    else:
                        cd_hours = 48   # 没打标签的兜底:睡2天
                        
                    # 第一关:休眠期拦截
                    if hours_since_update < cd_hours:
                        continue 
                    # 第二关:生物钟拦截(动漫 10-12点 或 18-23点 巡逻)
                    if not ((10 <= current_hour <= 12) or (18 <= current_hour <= 23)):
                        continue
                        
                elif "电视剧" in path or "剧" in path:
                    # 第一关:休眠期拦截(电视剧睡12小时)
                    if hours_since_update < 12:
                        continue 
                    # 第二关:生物钟拦截(电视剧 18-23点 巡逻)
                    if not (18 <= current_hour <= 23):
                        continue
            
            # 只有熬过了上面的双重拦截,才会向天翼发请求
            info = client.getShareInfo(share_url)
            all_files = get_all_share_files_recursive(info)
            
            if keyword:
                all_files = [f for f in all_files if keyword.lower() in f["full_path"].lower()]

            new_files = [f for f in all_files if str(f["id"]) not in history]

            if new_files:
                logger.info(f"🎉 发现 {len(new_files)} 个新文件,开始转存...")
                taskInfos = [{"fileId": f["id"], "fileName": clean_filename(f["name"]), "isFolder": 0} for f in new_files]
                
                batch_size = 50
                for i in range(0, len(taskInfos), batch_size):
                    batch_tasks = taskInfos[i:i+batch_size]
                    code = info.saveShareFiles(batch_tasks, target_id)
                    
                    if not code:
                        file_names = []
                        for task in batch_tasks:
                            history[str(task["fileId"])] = task["fileName"]
                            file_names.append(task["fileName"])
                        save_json(HISTORY_FILE, history)
                        notifier.send_message(f"✅【追剧更新】\n🔗 来源: {share_url}\n📂 新增文件:\n" + "\n".join(file_names))
                        
                        # 转存成功后,刷新这部分剧的最后更新时间
                        subs_for_update = load_json(SUBS_FILE)
                        if str(target_id) in subs_for_update:
                            if isinstance(subs_for_update[str(target_id)], str):
                                subs_for_update[str(target_id)] = {"url": share_url, "keyword": keyword, "path": path}
                            subs_for_update[str(target_id)]["last_update"] = time.time()
                            save_json(SUBS_FILE, subs_for_update)
                    else:
                        logger.error(f"转存失败,错误码: {code}")
        except Exception as e:
            pass 

def main_control_loop(client):
    offset = 0
    notifier = TelegramNotifier(TG_BOT_TOKEN, TG_ADMIN_USER_ID)
    notifier.send_message("🤖 私人追剧管家(支持 #周更 标签)已上线!")

    schedule.every(30).minutes.do(check_subscriptions, client)
    logger.info("🚀 启动强制秒查机制...")
    check_subscriptions(client)

    while True:
        schedule.run_pending()
        try:
            url = f"https://api.telegram.org/bot{TG_BOT_TOKEN}/getUpdates?offset={offset}&timeout=10"
            res = requests.get(url, timeout=15).json()
            if res.get('ok'):
                for item in res['result']:
                    offset = item['update_id'] + 1
                    msg = item.get('message', {})
                    text = msg.get('text', '')
                    chat_id = msg.get('chat', {}).get('id')

                    if str(chat_id) == str(TG_ADMIN_USER_ID):
                        if text.startswith("订阅") or text.startswith("绑定"):
                            
                            # 🌟 核心升级:提前“嗅探”并剥离时间标签,绝不影响原有识别逻辑!
                            freq_tag = ""
                            if "#周更" in text:
                                freq_tag = "周更"
                                text = text.replace("#周更", "").strip()
                            elif "#双更" in text:
                                freq_tag = "双更"
                                text = text.replace("#双更", "").strip()
                                
                            is_bind = text.startswith("绑定")
                            parts = text.split()
                            if len(parts) >= 3:
                                if parts[-2].startswith("http"):
                                    share_url = parts[-2]
                                    keyword = parts[-1]          
                                    target_path = " ".join(parts[1:-2])
                                elif parts[-1].startswith("http"):
                                    share_url = parts[-1]
                                    keyword = ""                 
                                    target_path = " ".join(parts[1:-1])
                                else:
                                    continue
                                
                                mode_name = "绑定(静默)" if is_bind else "订阅(下载)"
                                tag_msg = f" ⏱️ 频率: {freq_tag}" if freq_tag else ""
                                notifier.send_message(f"⏳ 正在处理{mode_name}目录:{target_path}{tag_msg} ...")
                                
                                try:
                                    target_id = client.mkdirAll(target_path)
                                    subs = load_json(SUBS_FILE)
                                    
                                    # 初始化时存入频率标签,设last_update为0确保能马上被查
                                    subs[str(target_id)] = {"url": share_url, "keyword": keyword, "path": target_path, "last_update": 0, "freq": freq_tag}
                                    save_json(SUBS_FILE, subs)
                                    
                                    if is_bind:
                                        info = client.getShareInfo(share_url)
                                        all_files = get_all_share_files_recursive(info)
                                        if keyword:
                                            all_files = [f for f in all_files if keyword.lower() in f["full_path"].lower()]
                                        history = load_json(HISTORY_FILE)
                                        for f in all_files:
                                            history[str(f["id"])] = f["name"]
                                        save_json(HISTORY_FILE, history)
                                        notifier.send_message(f"✅ 成功绑定!{tag_msg}\n⚠️ 已将 {len(all_files)} 个旧文件标记为已存。")
                                    else:
                                        notifier.send_message(f"✅ 成功添加订阅!{tag_msg}")
                                        check_subscriptions(client) 
                                except Exception as e:
                                    notifier.send_message(f"❌ 处理失败: {e}")
                            else:
                                notifier.send_message("❌ 格式错误...")
                                
                        elif text.startswith("取消订阅"):
                            parts = text.split()
                            if len(parts) >= 2:
                                path_parts = [p for p in parts[1:] if not p.startswith("http")]
                                target_path = " ".join(path_parts)
                                try:
                                    target_id = client.mkdirAll(target_path)
                                    subs = load_json(SUBS_FILE)
                                    if str(target_id) in subs:
                                        del subs[str(target_id)]
                                        save_json(SUBS_FILE, subs)
                                        notifier.send_message(f"✅ 已成功取消订阅:\n📁 {target_path}")
                                    else:
                                        notifier.send_message("⚠️ 未找到该目录。")
                                except Exception as e:
                                    pass
        except:
            pass 
        time.sleep(2)

if __name__ == '__main__':
    os.makedirs("db", exist_ok=True)
    client = Cloud189()
    try:
        logger.info("189正在登录 ...")
        client.login(ENV_189_CLIENT_ID, ENV_189_CLIENT_SECRET)
    except Exception as e:
        logger.error(f"登录失败: {e}")
        exit(-1)
    main_control_loop(client)

全部粘贴完后,按 Ctrl + O 回车保存,Ctrl + X 退出。

由于脚本过长,手机一次性复制不这么多内容,需用另一种方法直接传递auto_189.py
3.1电脑新建auto_189.py文件,内容如上
3.2电脑文件传到手机storage/downloads目录下
3.3复制脚本

cp ~/storage/downloads/auto_sync.py ~/auto_sync.py

4.点火升空 & 日常操作
准备工作全部完成!现在我们要让它在 Termux 后台静默长驻。
敲下这行命令启动:

nohup python auto_189.py > run.log 2>&1 &

就这么简单!现在你可以把 Termux 切到后台了。

去打开你的 Telegram,找到你的机器人,直接发给它你的第一条追剧指令:

订阅 /177-动漫/繁花/Season 1 [大佬的189分享链接]
只要机器人回复你“✅ 成功添加!”,这就意味着整个系统已经完美跑通了!它会自动建好 Season 1,把里面的视频抽出来存进去,而且每 60 分钟帮你盯一次更新。

5.其它有可以出现的问题
5.1杀掉后台的“哑巴”进程

pkill -f auto_189.py

5.2在前台“裸奔”启动

python auto_189.py

5.3库出错
这个 ModuleNotFoundError: No module named 'Crypto' 是 Python 界一个极其经典且烦人的“坑”,几乎所有第一次折腾加密库的人都会踩中
原因很简单:代码里调用的名字叫 Crypto,但它对应的现代库其实叫 pycryptodome。有时候 Python 环境会犯傻,或者之前残留了一些废弃的老库(比如老古董 pycrypto),导致它“认错人”了
直接复制下面这两行命令,依次在 Termux 里回车(第一行可能会提示未找到某些库,不用管,直接让它执行完):

pip uninstall crypto pycrypto pycryptodome -y
pip install pycryptodome

现在冒出来的这个 No module named 'schedule' 报错,是因为咱们最开始那步批量装库的时候,可能因为网络波动中断了,导致 schedule(用来做定时任务的库)没装上。

咱们现在就玩“打地鼠”,它缺啥咱们补啥!为了防止等会儿它再报别的库没装,咱们干脆把脚本需要的剩下几个第三方库一次性全补齐。

直接复制这行回车:

pip install schedule python-dotenv requests

6.清理脏数据,正式接客
在 Termux 里直接执行这两行命令(先清空数据库,再重新启动):

# 1. 强制清空订阅数据库和历史记录(把脏数据扬了)
pkill -f auto_189.py
rm -f db/history.json db/subscriptions.json

# 2. 重新启动脚本!
python auto_189.py

7.订阅格式
7.1核心万能公式:
[动作] [你的网盘保存路径] [大佬的分享链接(?pwd=密码)] [狙击过滤词] [#休眠标签]
7.2无密码订阅:
订阅 /177-电视剧/狂飙 [https://cloud.189.cn/t/xxxxxx]
7.3带密码订阅(链接屁股后加 ?pwd=密码):
订阅 /177-电视剧/狂飙 [https://cloud.189.cn/t/xxxxxx?pwd=abcd]
7.4静默绑定
指令(把“订阅”换成“绑定”):
绑定 /177-电视剧/狂飙 [https://cloud.189.cn/t/xxxxxx?pwd=abcd]
(机器人会提示:已将 N 个旧文件标记为已存。)
7.5精准狙击
指令(在最后空一格,写上过滤词):
绑定 /177-动漫/神墓/Season 03 [https://cloud.189.cn/t/xxxxxx] S03
(如果资源命名是中文,就把 S03 换成 第三季)
7.6智能休眠(告别无效巡逻,防封神技)
适用场景:动漫更新极其规律,你想让它抓完一集后立刻吃安眠药,节省 API 资源。

  • 标签说明: #周更 (抓完睡6天) / #双更 (抓完睡3天)。没打标签的动漫默认睡 2 天,电视剧默认睡 12 小时。
  • 指令(随便加在结尾即可):
    7.7究极缝合怪(全功能火力全开)
    适用场景:带密码 + 只要第三季 + 周更动漫 + 不下旧文件。

终极指令:
绑定 /177-动漫/神墓/Season 3 https://cloud.189.cn/t/xxxxxx?pwd=abcd S03 #周更
绑定 /177-动漫/2602/天庭 https://cloud.189.cn/t/YRnIjmAZZZre?pwd=ea12 #周更

7.8拔管取消
适用场景:剧追完了,或者烂尾不想看了。

无脑取消法:
直接把你当时发送的那条超长的指令复制下来,把开头的 绑定 或 订阅 改成 取消订阅 发过去。
取消订阅 /177-动漫/2602/天庭 https://cloud... (后面的它会自动忽略)

精准取消法:
取消订阅 /177-动漫/2602/天庭
7.9老鸟进阶 Tips:
如何修改任务?
如果你发现忘了加 #周更 标签,或者密码换了,不需要先取消!直接用正确的格式再发一遍 绑定 指令(路径保持不变),它就会极其聪明地自动覆盖旧配置。

它的作息规律是什么?

  • 动漫: 每天 10点-12点 & 18点-23点 活跃。
  • 电视剧: 每天 18点-23点 活跃。

(非活跃期和 CD 休眠期,它会彻底装死,绝对安全!)

8.所有脚本启动

cat > ~/start.sh << 'EOF'
#!/bin/bash
echo "🚀 正在唤醒所有追更矩阵服务..."

cd ~/openlist && nohup ./openlist server >/dev/null 2>&1 &
echo "✅ OpenList 已启动"

nohup cloudflared tunnel --protocol http2 --config ~/.cloudflared/config.yml run emby_tunnel > ~/tunnel.log 2>&1 &
echo "✅ Cloudflare 隧道已启动"

aria2c --conf-path=$HOME/.config/aria2/aria2.conf -D
echo "✅ Aria2 已启动"

crond
echo "✅ 定时任务 (crond) 已启动"

nohup python ~/tg_bridge.py > /dev/null 2>&1 &
echo "✅ TG 监控通知已启动"

nohup python auto_189.py > run.log 2>&1 &
echo "✅ TG 追剧转存已启动"

echo "🎉 矩阵全面复活!可以退出了。"
EOF
chmod +x ~/start.sh
~/start.sh

标签: none

添加新评论