TG-forward-videos

[up主专用,视频内嵌代码贴在这]

Telegram 视频转发

Telegram 视频转发去重脚本说明文档

一、项目简介

本脚本是一个基于 Telethon 的 Telegram 视频转发机器人,主要功能是:

  • 多个源频道 扫描并实时监听视频消息
  • 对视频进行 严格条件过滤(大小、时长、分辨率等)
  • 基于 视频源文件 ID 的强去重机制,避免重复转发
  • 将合规视频 自动转发到指定目标频道
  • 使用 SQLite + WAL 保存去重记录与频道扫描进度
  • 支持 断点续扫、并发转发、风控处理、运行状态查询

适用于:

  • 视频聚合频道
  • 长期无人值守自动同步
  • 对去重准确性要求较高的场景

二、整体工作流程

  1. 启动 Telegram 客户端并登录
  2. 根据目标频道 ID 初始化独立数据库
  3. 从源频道读取上次扫描进度并进行历史扫描
  4. 将符合条件且未去重的视频加入转发队列
  5. 多个 Worker 并发消费队列并转发视频
  6. 实时监听源频道新消息并即时处理
  7. 成功转发后更新去重表与进度表

三、依赖环境

Python 版本

  • Python ≥ 3.9(推荐 3.10 / 3.11)

依赖库

1
pip install telethon python-dotenv

四、环境变量配置(.env)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
API_ID=123456
API_HASH=xxxxxxxxxxxxxxxx
PHONE_NUMBER=+8613800000000
TWO_STEP_PASSWORD=二步验证密码

TARGET_CHANNEL=@target_channel
SOURCE_CHANNELS=@source1,@source2,-100xxxxxxxxx
ADMIN_ID=123456789

SCAN_LIMIT=50
MAX_CAPTION_LENGTH=1024

MIN_FILE_SIZE=0
MAX_FILE_SIZE=0
MIN_DURATION=0
MAX_DURATION=0
MIN_WIDTH=0
MIN_HEIGHT=0

关键参数说明

  • TARGET_CHANNEL:视频最终转发的目标频道
  • SOURCE_CHANNELS:视频来源频道(支持 @用户名 / 数字 ID)
  • SCAN_LIMIT:历史扫描条数,0 表示不限制
  • ADMIN_ID:允许使用 /status 的管理员账号 ID

五、数据库设计说明

1️⃣ 数据库命名

数据库文件按目标频道区分:

1
bot_data_target_<target_channel_id>.db

不同目标频道互不影响,可安全复用脚本。


2️⃣ video_keys 表(去重核心)

字段名 类型 说明
video_key TEXT (PK) 视频唯一标识(document.id)
target_msg_id INTEGER 目标频道中的消息 ID
timestamp DATETIME 转发时间

说明:

  • video_key = msg.media.document.id
  • 同一视频在任何频道、转发、改名中都保持一致
  • 去重准确率极高

3️⃣ channel_progress 表(断点续扫)

字段名 类型 说明
channel_id TEXT (PK) 源频道 ID
last_msg_id INTEGER 已成功转发的最后消息 ID

六、视频筛选规则

视频必须同时满足以下条件才会被处理:

基本条件

  • media 类型为 MessageMediaDocument
  • MIME 类型以 video/* 开头
  • 非圆形视频(排除视频语音)

可配置限制

  • 文件大小(MIN / MAX_FILE_SIZE)
  • 视频时长(MIN / MAX_DURATION)
  • 分辨率(MIN_WIDTH / MIN_HEIGHT)

任何一项不满足都会被直接跳过。


七、文案清洗机制

转发前会对视频 caption 进行清洗,自动移除:

  • @用户名
  • 各类链接(http / t.me)
  • 拉群、推广、关注提示
  • VX / 微信 / 群号等广告信息

并限制最大长度,避免目标频道文案污染。


八、转发与并发机制

队列设计

  • 使用 asyncio.Queue(maxsize=30) 控制堆积
  • 扫描 & 实时消息只负责入队

Worker 模型

  • 默认启动 2 个 Worker
  • 每个 Worker 独立拉取、校验、转发
  • 成功转发后才更新数据库进度

风控处理

  • FloodWait:自动 sleep 指定秒数 + 随机延迟
  • 内容保护频道:记录日志并跳过

九、实时监听模式

1
@client.on(events.NewMessage(chats=SOURCE_CHANNELS))

特点:

  • 与历史扫描共存
  • 新视频秒级入队
  • 自动去重,不会与历史任务冲突

十、运行状态查询

管理员可向机器人发送:

1
/status

返回信息包括:

  • 运行时长
  • 目标频道
  • 今日已转发数量
  • 当前队列积压数

十一、日志系统

  • 控制台实时输出
  • bot.log 文件持久化
  • 单文件 5MB,最多保留 5 个历史日志

日志级别覆盖:

  • INFO:正常流程
  • WARNING:风控等待
  • ERROR:单条失败
  • CRITICAL:Worker 崩溃重启

十二、适用 & 注意事项

✅ 适合:

  • 视频类频道自动化运营
  • 高去重、高稳定性需求

⚠ 注意:

  • 请使用 用户账号,不要使用 Bot Token
  • 长期运行建议部署在 VPS / Docker
  • 源频道开启内容保护的视频无法转发

十三、总结

该脚本是一个 工程化程度较高 的 Telegram 视频转发系统,核心优势在于:

  • 源头级视频去重
  • 断点续扫 + 实时监听
  • 强抗 FloodWait 能力
  • 适合长期稳定运行

如需扩展:

  • 多目标频道
  • 视频分类
  • 自动封面 / 水印

可在此架构上继续演进。

TG-forward-videos-plus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
import asyncio
import os
import random
import sqlite3
import re
import logging
from logging.handlers import RotatingFileHandler
from datetime import datetime, timezone

from telethon import TelegramClient, events
from telethon.tl.types import (
MessageMediaDocument,
DocumentAttributeVideo
)
from telethon.errors import (
FloodWaitError,
ChatForwardsRestrictedError,
SecurityError
)
from dotenv import load_dotenv

# ==================== 日志 ====================
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
RotatingFileHandler(
"bot.log",
maxBytes=5 * 1024 * 1024,
backupCount=5,
encoding="utf-8"
),
logging.StreamHandler()
]
)
logger = logging.getLogger("TGForwardBot")

START_TIME = datetime.now(timezone.utc)

# ==================== 数据库 ====================
class DBManager:
def __init__(self, path):
self.conn = sqlite3.connect(
path,
check_same_thread=False,
isolation_level=None,
timeout=30
)
self.cursor = self.conn.cursor()
self.lock = asyncio.Lock()
self._init()

def _init(self):
self.cursor.execute("PRAGMA journal_mode=WAL;")
self.cursor.execute("PRAGMA synchronous=NORMAL;")

self.cursor.execute("""
CREATE TABLE IF NOT EXISTS video_keys (
video_key TEXT PRIMARY KEY,
target_msg_id INTEGER,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")

self.cursor.execute("""
CREATE TABLE IF NOT EXISTS channel_progress (
channel_id TEXT PRIMARY KEY,
last_msg_id INTEGER
)
""")

self.cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_video_time
ON video_keys(timestamp)
""")

async def video_exists(self, key):
async with self.lock:
self.cursor.execute(
"SELECT 1 FROM video_keys WHERE video_key=?",
(key,)
)
return self.cursor.fetchone() is not None

async def add_video(self, key, msg_id):
async with self.lock:
self.cursor.execute(
"INSERT OR REPLACE INTO video_keys VALUES (?, ?, CURRENT_TIMESTAMP)",
(key, msg_id)
)

async def get_progress(self, cid):
async with self.lock:
self.cursor.execute(
"SELECT last_msg_id FROM channel_progress WHERE channel_id=?",
(cid,)
)
r = self.cursor.fetchone()
return r[0] if r else 0

async def update_progress(self, cid, msg_id):
async with self.lock:
self.cursor.execute(
"INSERT OR REPLACE INTO channel_progress VALUES (?, ?)",
(cid, msg_id)
)

async def today_count(self):
async with self.lock:
self.cursor.execute("""
SELECT COUNT(*) FROM video_keys
WHERE timestamp >= datetime('now','localtime','start of day')
""")
return self.cursor.fetchone()[0]

# ==================== 配置 ====================
load_dotenv()

api_id = int(os.getenv("API_ID"))
api_hash = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD")

TARGET_CHANNEL = os.getenv("TARGET_CHANNEL")
ADMIN_ID = int(os.getenv("ADMIN_ID", 0))

def parse_channel(v):
v = v.strip()
return int(v) if v.lstrip("-").isdigit() else v

SOURCE_CHANNELS = [
parse_channel(x)
for x in os.getenv("SOURCE_CHANNELS", "").split(",")
if x.strip()
]

SCAN_LIMIT = int(os.getenv("SCAN_LIMIT", 50))
MAX_CAPTION_LENGTH = int(os.getenv("MAX_CAPTION_LENGTH", 1024))

MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", 0))
MAX_FILE_SIZE = int(os.getenv("MAX_FILE_SIZE", 0))
MIN_DURATION = int(os.getenv("MIN_DURATION", 0))
MAX_DURATION = int(os.getenv("MAX_DURATION", 0))
MIN_WIDTH = int(os.getenv("MIN_WIDTH", 0))
MIN_HEIGHT = int(os.getenv("MIN_HEIGHT", 0))

# ==================== 客户端 & 队列 ====================
client = TelegramClient("user_session", api_id, api_hash)
forward_queue = asyncio.Queue(maxsize=30)

# ==================== 文本清洗 ====================
AD_PATTERNS = [
r"Поддержать\s+проект[::]?\s*\n?\s*(?:https?://)?t\.me/boost/\S+",
r"@\w+",
r"https?://\S+",
r"t\.me/\S+",
r"(?:VX|wx|微信)[::]?\s*\w+",
r"(?:群|频道)[::]?\s*@\w+",
r"加入频道|点击关注|更多资源",
]

def clean_caption(text: str) -> str:
if not text:
return ""
for p in AD_PATTERNS:
text = re.sub(p, "", text, flags=re.I)
text = re.sub(r"\n\s*\n+", "\n", text).strip()
return text[:MAX_CAPTION_LENGTH] if len(text) >= 5 else ""

# ==================== 视频判断 ====================
def is_video_ok(msg):
if not msg.media or not isinstance(msg.media, MessageMediaDocument):
return False

doc = msg.media.document
if not doc.mime_type or not doc.mime_type.startswith("video"):
return False

if MIN_FILE_SIZE and doc.size < MIN_FILE_SIZE:
return False
if MAX_FILE_SIZE and doc.size > MAX_FILE_SIZE:
return False

v = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
if not v or getattr(v, "round_message", False):
return False

if MIN_DURATION and v.duration < MIN_DURATION:
return False
if MAX_DURATION and v.duration > MAX_DURATION:
return False
if MIN_WIDTH and v.w < MIN_WIDTH:
return False
if MIN_HEIGHT and v.h < MIN_HEIGHT:
return False

return True

# ==================== 核心:源头级去重 Key ====================
def video_key(msg):
"""
Telethon 下最稳定的视频唯一标识
同一视频在任何频道 / 转发 / 重命名中都一致
"""
return str(msg.media.document.id)

# ==================== 数据库初始化 ====================
db = None

async def init_db_by_target():
global db
entity = await client.get_entity(TARGET_CHANNEL)
DB_FILE = f"bot_data_target_{entity.id}.db"
logger.info(f"🗂 使用数据库 {DB_FILE}")
db = DBManager(DB_FILE)

# ==================== Worker ====================
async def worker(wid: int):
logger.info(f"👷 Worker-{wid} 启动")
while True:
msg, key, cid = await forward_queue.get()
success = False
try:
refreshed = await client.get_messages(int(cid), ids=msg.id)
if not refreshed or not is_video_ok(refreshed):
continue

caption = clean_caption(refreshed.message or "")

sent = await client.send_file(
TARGET_CHANNEL,
refreshed.media,
caption=caption,
silent=True
)

await db.add_video(key, sent.id)
success = True

logger.info(f"✅ Worker-{wid} 转发 {msg.id}")
await asyncio.sleep(random.uniform(2, 5))

except FloodWaitError as e:
wait = e.seconds + random.uniform(3, 8)
logger.warning(f"⏳ FloodWait {wait:.1f}s")
await asyncio.sleep(wait)

except (ChatForwardsRestrictedError, SecurityError):
logger.error(f"⛔ 内容保护 {msg.id}")

except Exception as e:
logger.error(f"❌ Worker-{wid} 错误 {e}", exc_info=True)

finally:
if success:
await db.update_progress(cid, msg.id)
forward_queue.task_done()

async def safe_worker(i):
while True:
try:
await worker(i)
except Exception:
logger.critical(f"🔥 Worker-{i} 崩溃,重启", exc_info=True)
await asyncio.sleep(5)

# ==================== 扫描 ====================
async def scan_channel(ch, sem):
async with sem:
try:
entity = await client.get_entity(ch)
cid = str(entity.id)
last = max(0, (await db.get_progress(cid)) - 1)

logger.info(f"🔍 扫描 {ch} from {last}")

async for msg in client.iter_messages(
entity,
min_id=last,
limit=None if SCAN_LIMIT == 0 else SCAN_LIMIT,
reverse=True
):
if not is_video_ok(msg):
continue

key = video_key(msg)
if await db.video_exists(key):
continue

await forward_queue.put((msg, key, cid))
logger.info(f"📥 入队 {msg.id}")

except Exception as e:
logger.error(f"❌ 扫描失败 {ch}: {e}")

# ==================== 实时监听 ====================
@client.on(events.NewMessage(chats=SOURCE_CHANNELS))
async def realtime(event):
msg = event.message
if not is_video_ok(msg):
return

key = video_key(msg)
if await db.video_exists(key):
return

await forward_queue.put((msg, key, str(event.chat_id)))
logger.info(f"⚡ 实时入队 {msg.id}")

# ==================== 状态 ====================
@client.on(events.NewMessage(pattern="/status"))
async def status(event):
if ADMIN_ID and event.sender_id != ADMIN_ID:
return

up = datetime.now(timezone.utc) - START_TIME
await event.reply(
f"🤖 运行中\n"
f"🎯 Target: {TARGET_CHANNEL}\n"
f"⏱ {str(up).split('.')[0]}\n"
f"📊 今日 {await db.today_count()}\n"
f"📥 队列 {forward_queue.qsize()}"
)

# ==================== 主入口 ====================
async def main():
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
logger.info("🚀 启动成功")

await init_db_by_target()

for i in range(2):
asyncio.create_task(safe_worker(i + 1))

sem = asyncio.Semaphore(1)
await asyncio.gather(*(scan_channel(c, sem) for c in SOURCE_CHANNELS))

await client.run_until_disconnected()

if __name__ == "__main__":
with client:
client.loop.run_until_complete(main())

getlist

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
import os
import csv
import datetime
from telethon import TelegramClient
from dotenv import load_dotenv

# === 1. 读取配置 ===
load_dotenv()

API_ID = os.getenv("API_ID")
API_HASH = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD")

if not API_ID or not API_HASH:
print("❌ 错误: 请确保 .env 文件中配置了 API_ID 和 API_HASH")
exit(1)

client = TelegramClient('user_session', int(API_ID), API_HASH)

def get_bot_api_id(entity, entity_type):
"""
根据实体类型将 Telethon ID 转换为 Bot API ID
Bot API 规则:
- 频道/超级群: -100 + ID
- 普通小群: - + ID
- 用户: ID (不变)
"""
raw_id = entity.id
if entity_type in ["频道", "超级群", "频道(未知)"]:
return int(f"-100{raw_id}")
elif entity_type == "普通群":
return int(f"-{raw_id}")
else:
return raw_id

async def list_and_export_chats():
"""列出并导出账号加入的所有频道和群组"""
print("📃 正在获取对话列表,请稍候...")

chat_data_list = []

async for dialog in client.iter_dialogs():
entity = dialog.entity
entity_type = "未知"

# --- 分类逻辑 ---
if dialog.is_user:
entity_type = "私聊"
elif dialog.is_channel:
if getattr(entity, 'broadcast', False):
entity_type = "频道"
elif getattr(entity, 'megagroup', False):
entity_type = "超级群"
else:
entity_type = "频道(未知)"
elif dialog.is_group:
entity_type = "普通群"

# 过滤掉私聊 (如果需要私聊ID,注释掉下面这行)
if entity_type != "私聊":
# 获取两种格式的 ID
raw_id = entity.id
bot_api_id = get_bot_api_id(entity, entity_type)

chat_info = {
"类型": entity_type,
"名称": dialog.name,
"Bot_API_ID": bot_api_id, # ✅ 新增:可以直接给 Bot 用的 ID
"原始_ID": raw_id, # Telethon 用的原始 ID
"用户名": getattr(entity, 'username', '无') or '无',
"成员数": getattr(entity, 'participants_count', '未知')
}
chat_data_list.append(chat_info)

print(f"[{entity_type}] {dialog.name} | Bot_ID: {bot_api_id}")

# === 导出到 CSV 文件 ===
if chat_data_list:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"telegram_chats_{timestamp}.csv"

# 更新表头
headers = ["类型", "名称", "Bot_API_ID", "原始_ID", "用户名", "成员数"]

try:
with open(filename, mode='w', encoding='utf-8-sig', newline='') as f:
writer = csv.DictWriter(f, fieldnames=headers)
writer.writeheader()
writer.writerows(chat_data_list)

print("-" * 50)
print(f"✅ 成功!共导出 {len(chat_data_list)} 个群组/频道。")
print(f"📁 文件已保存为: {os.path.abspath(filename)}")

except Exception as e:
print(f"❌ 导出文件失败: {e}")
else:
print("⚠️ 未找到任何群组或频道。")

async def main():
await client.start(phone=PHONE_NUMBER, password=TWO_STEP_PASSWORD)
print("✅ 登录成功")
await list_and_export_chats()

if __name__ == "__main__":
with client:
client.loop.run_until_complete(main())

remove_duplicates

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import asyncio
import os
import sys
from telethon import TelegramClient, errors
from telethon.tl.types import MessageMediaDocument
from dotenv import load_dotenv

# ================== 读取配置 ==================
load_dotenv()

try:
api_id = int(os.getenv("API_ID"))
api_hash = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD", None)
TARGET_CHANNEL = os.getenv("TARGET_CHANNEL")

scan_env = os.getenv("SCAN_LIMIT", "50")
N = int(scan_env) if scan_env else 50
LIMIT = None if N == 0 else N
except Exception as e:
print(f"❌ 配置错误: {e}")
sys.exit(1)

client = TelegramClient("user_session", api_id, api_hash)

# ================== 工具函数 ==================
def is_video(message):
"""
判断是否为视频文件(mp4 / mkv / mov 等)
"""
if not message.media:
return False

if not isinstance(message.media, MessageMediaDocument):
return False

doc = message.media.document
return bool(doc.mime_type and doc.mime_type.startswith("video/"))

def get_video_key(message):
"""
Telethon 下最稳定的视频唯一标识
同一文件在任何频道 / 转发中都一致
"""
return message.media.document.id

# ================== 主逻辑 ==================
async def main():
print("🔐 正在登录 Telegram...")
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
print("✅ 登录成功")

try:
target = await client.get_entity(TARGET_CHANNEL)
target_name = getattr(target, "title", TARGET_CHANNEL)
except Exception:
print(f"❌ 无法获取频道: {TARGET_CHANNEL}")
return

mode = "全部历史" if LIMIT is None else f"最近 {LIMIT} 条"
print(f"\n📺 目标频道:{target_name}")
print(f"🔍 扫描范围:{mode}")
print("-" * 40)

seen_keys = set()
duplicates = []

scanned = 0
videos = 0

print("⏳ 正在扫描消息...")

# reverse=False:从【最新】→【最旧】
async for msg in client.iter_messages(target, limit=LIMIT, reverse=False):
scanned += 1

if not is_video(msg):
continue

videos += 1
key = get_video_key(msg)

# 已出现过 → 这是旧视频 → 删除
if key in seen_keys:
duplicates.append(msg.id)
else:
seen_keys.add(key)

if scanned % 200 == 0:
print(
f" 已扫描 {scanned} 条 | 视频 {videos} | 重复 {len(duplicates)}"
)

print("-" * 40)
print("📊 扫描完成")
print(f" 扫描消息数:{scanned}")
print(f" 视频总数:{videos}")
print(f" 重复视频:{len(duplicates)}")

if not duplicates:
print("✅ 没有发现需要删除的重复视频")
return

print(f"\n⚠️ 即将删除 {len(duplicates)} 条【旧重复】视频")
confirm = input("❓ 确认删除?(y/n): ").strip().lower()

if confirm != "y":
print("🚫 已取消删除")
return

print("🗑️ 开始删除...")
batch_size = 50

for i in range(0, len(duplicates), batch_size):
batch = duplicates[i:i + batch_size]
try:
await client.delete_messages(target, batch)
print(
f" 已删除 {min(i + batch_size, len(duplicates))}/{len(duplicates)}"
)
await asyncio.sleep(1)
except errors.FloodWaitError as e:
print(f"⏳ FloodWait {e.seconds}s,等待中...")
await asyncio.sleep(e.seconds + 1)
except errors.RPCError as e:
print(f"❌ 删除失败: {e}")

print(f"\n✅ 清理完成,共删除 {len(duplicates)} 条重复视频")

await client.disconnect()

# ================== 程序入口 ==================
if __name__ == "__main__":
with client:
client.loop.run_until_complete(main())