TG-forward-videos

[up主专用,视频内嵌代码贴在这]

Telegram 视频转发脚本说明文档

1. 功能概述

本脚本用于 从指定源频道批量转发视频到目标频道,支持以下功能:

  1. 批量转发最新视频,顺序保持与源频道一致(从旧到新)。
  2. 实时监听新视频,自动追加到目标频道。
  3. 去重处理
    • 保留旧的视频,重复视频自动跳过。
  4. 转发方式优化
    • 不显示“转发自”信息。
    • 视频说明文字保留,并可裁剪到最大长度。
  5. 视频筛选条件
    • 最小文件大小 (MIN_FILE_SIZE)
    • 视频时长范围 (MIN_DURATION / MAX_DURATION)
  6. 发送稳定性处理
    • 随机延迟 1~5 秒,降低风控风险。
    • 自动处理 FloodWaitError,等待后重试。
  7. JSON 持久化
    • 已发送视频记录到 video_keys.json,避免重复发送。
  8. 自定义配置
    • 使用 .env 文件统一管理配置。

2. 环境要求

  • Python 3.10+
  • 安装依赖包:
1
pip install telethon python-dotenv

3. 配置文件 .env

在脚本同级目录下创建 .env 文件,示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Telegram API 配置
API_ID=123456
API_HASH=abcdef123456abcdef123456abcdef12
PHONE_NUMBER=+12345678901
TWO_STEP_PASSWORD=你的二步密码

# 频道配置
SOURCE_CHANNEL=@source_channel
TARGET_CHANNEL=@target_channel

# 视频筛选与转发限制
MIN_FILE_SIZE=209715200 # 200MB
MAX_FORWARD_COUNT=0 # 0=不限制
SCAN_LIMIT=50 # 扫描最新 N 条消息
MIN_DURATION=0 # 秒,0=不限制
MAX_DURATION=0 # 秒,0=不限制
MAX_CAPTION_LENGTH=1024 # 文字说明最大长度

4. 使用说明

4.1 运行脚本

1
python main.py

脚本启动后流程:

  1. 登录 Telegram(首次运行会提示扫码或输入两步密码)。
  2. 扫描源频道最新消息(数量由 SCAN_LIMIT 决定)。
  3. 按源频道顺序从旧到新批量发送视频到目标频道。
  4. 已发送视频记录到 video_keys.json,避免重复发送。
  5. 启动实时监听,自动转发新视频。

4.2 JSON 持久化

  • video_keys.json 文件用于记录已发送视频的唯一标识(文件名_大小_id)和原消息 ID。
  • 保证脚本重启后仍然不会重复发送相同视频。

5. 核心功能说明

5.1 批量转发

  • 扫描源频道最新 SCAN_LIMIT 条消息。
  • 仅发送符合条件的视频(文件大小 >= MIN_FILE_SIZE,视频时长在 [MIN_DURATION, MAX_DURATION] 之间)。
  • 保留文字说明,长度超出 MAX_CAPTION_LENGTH 自动裁剪。

5.2 实时监听

  • 监听源频道的新消息事件。
  • 满足条件的视频立即发送到目标频道。
  • 已存在的视频自动跳过。

5.3 去重逻辑

  • 视频唯一标识 video_key = 文件名_大小_id
  • 已发送的视频不会被再次发送,保留旧视频。

5.4 避免 Telegram 风控

  • 随机延迟 1~5 秒发送。
  • 遇到 FloodWaitError 自动等待后重试。

6. 注意事项

  1. 确保目标频道允许机器人/账号发送视频。
  2. 批量发送时,过大的视频数量可能导致 Telegram 限制,需要适当调整 SCAN_LIMIT
  3. MAX_CAPTION_LENGTH 可根据 Telegram 最大限制调整(Telegram 视频文字说明最大 1024 字符)。
  4. video_keys.json 不要手动修改,否则可能导致去重逻辑错误。
  5. 建议使用虚拟环境运行脚本,避免依赖冲突。

7. 文件结构建议

1
2
3
4
5
6
/telegram_video_forward/

├─ main.py # 脚本主文件
├─ .env # 配置文件
├─ video_keys.json # 已发送视频记录
└─ README.md # 本说明文档

8. 扩展建议

  • 可以增加 时间区间筛选,只转发某段时间内的视频。
  • 可增加 多源频道 支持,按顺序批量转发到目标频道。
  • 增加 日志文件记录,方便统计每日转发量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import asyncio
import os
import random
import json
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument, DocumentAttributeFilename, DocumentAttributeVideo
from telethon.errors import FloodWaitError
from dotenv import load_dotenv

# === 读取配置 ===
load_dotenv()

api_id = int(os.getenv("API_ID"))
api_hash = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD")
source_channel = os.getenv("SOURCE_CHANNEL")
target_channel = os.getenv("TARGET_CHANNEL")

# 文件大小、视频数量、时长限制
MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", 200 * 1024 * 1024)) # 默认200MB
MAX_FORWARD_COUNT = int(os.getenv("MAX_FORWARD_COUNT", 0)) # 0=不限制
SCAN_LIMIT = int(os.getenv("SCAN_LIMIT", 50)) # 扫描最新 N 条
MIN_DURATION = int(os.getenv("MIN_DURATION", 0)) # 秒,0=不限制
MAX_DURATION = int(os.getenv("MAX_DURATION", 0)) # 秒,0=不限制
MAX_CAPTION_LENGTH = int(os.getenv("MAX_CAPTION_LENGTH", 1024)) # 视频说明最大长度

client = TelegramClient("user_session", api_id, api_hash)
VIDEO_KEYS_FILE = "video_keys.json"

# ==================== JSON 持久化 ====================

def load_existing_keys():
if os.path.exists(VIDEO_KEYS_FILE):
try:
with open(VIDEO_KEYS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
return {}

def save_existing_keys():
try:
with open(VIDEO_KEYS_FILE, "w", encoding="utf-8") as f:
json.dump(existing_video_keys, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"❌ 保存 video_keys.json 失败: {e}")

existing_video_keys = load_existing_keys()

# ==================== 工具函数 ====================

def get_duration_from_doc(doc):
"""提取视频时长"""
for attr in doc.attributes:
if isinstance(attr, DocumentAttributeVideo):
return attr.duration
return 0

def get_video_key(message):
"""生成视频唯一标识(文件名+大小+id)"""
doc = message.media.document
filename = next(
(attr.file_name for attr in doc.attributes if isinstance(attr, DocumentAttributeFilename)),
"video.mp4",
)
return f"{filename}_{doc.size}_{doc.id}"

def is_video_eligible(message):
"""判断视频是否满足条件"""
if not message.media or not isinstance(message.media, MessageMediaDocument):
return False
doc = message.media.document
if not doc.mime_type or not doc.mime_type.startswith("video"):
return False
if doc.size < MIN_FILE_SIZE:
return False
duration = get_duration_from_doc(doc)
if MIN_DURATION > 0 and duration < MIN_DURATION:
return False
if MAX_DURATION > 0 and duration > MAX_DURATION:
return False
return True

# ==================== 发送函数 ====================

async def forward_message(message, video_key):
"""直接发送视频到目标频道,不显示转发信息,保留文字说明并裁剪"""
try:
caption = getattr(message, "message", "")
if caption and len(caption) > MAX_CAPTION_LENGTH:
caption = caption[:MAX_CAPTION_LENGTH] + "…" # 超长裁剪

await client.send_file(
target_channel,
message.media.document,
caption=caption
)
print(f"✅ 发送成功: {video_key}")
existing_video_keys[video_key] = message.id
save_existing_keys()
await asyncio.sleep(random.uniform(1, 5)) # 随机延迟
except FloodWaitError as e:
print(f"⏳ 遇到 FloodWait,等待 {e.seconds} 秒...")
await asyncio.sleep(e.seconds)
await forward_message(message, video_key)
except Exception as e:
print(f"❌ 发送失败: {e}")

# ==================== 去重函数 ====================

async def check_and_skip_duplicates(messages):
"""检查重复视频:保留旧的,跳过新的"""
for message in messages:
if not is_video_eligible(message):
continue
video_key = get_video_key(message)
if video_key in existing_video_keys:
print(f"⏭️ 已存在相同视频,跳过新的视频 message_id={message.id}")
continue
existing_video_keys[video_key] = message.id
save_existing_keys()

# ==================== 批量发送 ====================

async def batch_forward_latest_videos():
"""批量发送最新 N 个视频,保持源频道顺序"""
print(f"📦 扫描最新 {SCAN_LIMIT} 条消息...")
source = await client.get_entity(source_channel)

# 获取消息并翻转为旧 → 新
messages = [msg async for msg in client.iter_messages(source, limit=SCAN_LIMIT)]
messages = list(reversed(messages)) # 保持源频道顺序

# 检查重复
await check_and_skip_duplicates(messages)

count = 0
for message in messages:
if is_video_eligible(message):
video_key = get_video_key(message)
if video_key in existing_video_keys and existing_video_keys[video_key] != message.id:
print(f"⏭️ 已发送过,跳过: {video_key}")
continue
try:
await forward_message(message, video_key)
except Exception as e:
print(f"❌ 发送失败 message_id={message.id}: {e}")
count += 1
if MAX_FORWARD_COUNT > 0 and count >= MAX_FORWARD_COUNT:
print(f"⏹️ 已达到最大发送数量 {MAX_FORWARD_COUNT},停止。")
break
else:
if message.media and isinstance(message.media, MessageMediaDocument):
size = message.media.document.size
duration = get_duration_from_doc(message.media.document)
print(f"⚠️ 跳过不符合条件的视频:{size/1024/1024:.2f}MB, {duration}s, message_id={message.id}")

print(f"📁 批量发送完成,共发送 {count} 个视频。")

# ==================== 实时监听 ====================

@client.on(events.NewMessage(chats=source_channel))
async def handler(event):
"""实时监听新视频"""
if is_video_eligible(event.message):
video_key = get_video_key(event.message)
if video_key in existing_video_keys:
print(f"⏭️ 已存在相同视频,跳过新的视频 message_id={event.message.id}")
return
await forward_message(event.message, video_key)
else:
if event.message.media and isinstance(event.message.media, MessageMediaDocument):
size = event.message.media.document.size
duration = get_duration_from_doc(event.message.media.document)
print(f"⚠️ 跳过不符合条件的视频:{size/1024/1024:.2f}MB, {duration}s, message_id={event.message.id}")

# ==================== 主程序 ====================

async def main():
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
print("✅ 登录成功")
await batch_forward_latest_videos()
print("⏳ 开始实时监听新视频...")
await client.run_until_disconnected()

if __name__ == "__main__":
with client:
client.loop.run_until_complete(main())

Telegram 视频批量转发脚本使用说明(plus升级版)

1. 脚本功能

本脚本基于 Telethon,可实现以下功能:

  1. 多源频道转发

    • 支持同时从多个源频道抓取视频。
    • 保持源频道视频顺序,批量转发到目标频道。
  2. 视频筛选条件

    • 文件大小限制(最小 MIN_FILE_SIZE)。
    • 视频时长限制(MIN_DURATIONMAX_DURATION)。
    • 时间区间筛选(可选,START_DATEEND_DATE)。
  3. 去重策略

    • 保留旧的视频,跳过重复的新视频。
    • 通过 JSON 文件 video_keys.json 持久化已转发视频。
  4. 文字说明处理

    • 转发视频时保留原文说明。
    • 可设置最大长度 MAX_CAPTION_LENGTH,超长自动裁剪。
  5. 转发控制

    • 最大转发数量限制 MAX_FORWARD_COUNT(0 表示不限制)。
    • 随机延迟 1~5 秒,降低 Telegram 风控风险。
    • 自动处理 FloodWait 异常。
  6. 实时监听

    • 支持实时监听新消息。
    • 自动检查重复,保留旧视频,跳过重复的新视频。
  7. 日志输出

    • 控制台打印每条视频转发状态。
    • 可扩展记录到日志文件统计每日转发量。

2. 环境要求

  • Python 3.10+
  • 依赖库:
    1
    pip install telethon python-dotenv

3. .env 配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# ================= API 配置 =================
API_ID=12345678
API_HASH=your_api_hash_here
PHONE_NUMBER=+1234567890
TWO_STEP_PASSWORD=your_2fa_password

# ================== 频道配置 =================
# 多源频道,逗号分隔
SOURCE_CHANNELS=@source_channel1,@source_channel2
TARGET_CHANNEL=@target_channel

# ================== 视频筛选 =================
MIN_FILE_SIZE=209715200 # 200MB
MAX_FORWARD_COUNT=0 # 0 = 不限制
SCAN_LIMIT=50 # 扫描最新 N 条
MIN_DURATION=0 # 秒,0=不限制
MAX_DURATION=0 # 秒,0=不限制
MAX_CAPTION_LENGTH=1024 # 转发文字说明最大长度

# ================== 时间区间筛选 (可选) =================
# 格式:YYYY-MM-DD
START_DATE=2025-09-01
END_DATE=2025-09-30

# ================== 其他 =================
# 如果不需要时间筛选,可留空或删除 START_DATE 和 END_DATE

字段说明

字段 用途
API_ID / API_HASH Telegram API 密钥,必填
PHONE_NUMBER 登录 Telegram 的手机号
TWO_STEP_PASSWORD 二步验证密码,如果没有可留空
SOURCE_CHANNELS 源频道列表,多频道用逗号分隔
TARGET_CHANNEL 目标频道,只能一个
MIN_FILE_SIZE 视频最小文件大小(字节)
MAX_FORWARD_COUNT 批量转发最大数量,0=不限制
SCAN_LIMIT 扫描最新消息条数
MIN_DURATION / MAX_DURATION 视频时长秒数限制,0=不限制
MAX_CAPTION_LENGTH 转发文字说明最大长度
START_DATE / END_DATE 时间区间筛选,格式 YYYY-MM-DD,可选

4. 使用方法

  1. 克隆或下载脚本文件。
  2. 安装依赖:
    1
    pip install telethon python-dotenv
  3. 创建 .env 文件,按示例配置。
  4. 运行脚本:
    1
    python your_script.py
  5. 脚本会自动:
    • 扫描源频道最新消息并批量转发。
    • 保留源频道顺序。
    • 保留文字说明,裁剪超长文字。
    • 实时监听新视频。

5. 注意事项

  • 首次登录:需要 Telegram 手机验证。
  • JSON 文件video_keys.json 用于记录已转发视频,勿删除,否则可能重复转发。
  • 时间区间筛选:如果未配置 START_DATE / END_DATE,默认转发所有消息。
  • 重复判断:通过视频文件名+大小+ID 来判断重复。
  • 转发顺序:批量转发时保留源频道顺序,旧视频先发,新视频后发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import asyncio
import os
import random
import json
from datetime import datetime, timezone
from telethon import TelegramClient, events
from telethon.tl.types import MessageMediaDocument, DocumentAttributeFilename, DocumentAttributeVideo
from telethon.errors import FloodWaitError
from dotenv import load_dotenv

# === 读取配置 ===
load_dotenv()

api_id = int(os.getenv("API_ID"))
api_hash = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")
TWO_STEP_PASSWORD = os.getenv("TWO_STEP_PASSWORD")
SOURCE_CHANNELS = [ch.strip() for ch in os.getenv("SOURCE_CHANNELS", "").split(",")]
TARGET_CHANNEL = os.getenv("TARGET_CHANNEL")

MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", 200 * 1024 * 1024))
MAX_FORWARD_COUNT = int(os.getenv("MAX_FORWARD_COUNT", 0))
SCAN_LIMIT = int(os.getenv("SCAN_LIMIT", 50))
MIN_DURATION = int(os.getenv("MIN_DURATION", 0))
MAX_DURATION = int(os.getenv("MAX_DURATION", 0))
MAX_CAPTION_LENGTH = int(os.getenv("MAX_CAPTION_LENGTH", 1024))

# 时间区间(可选)
START_DATE = os.getenv("START_DATE")
END_DATE = os.getenv("END_DATE")
start_date = datetime.strptime(START_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc) if START_DATE else None
end_date = datetime.strptime(END_DATE, "%Y-%m-%d").replace(tzinfo=timezone.utc) if END_DATE else None

client = TelegramClient("user_session", api_id, api_hash)
VIDEO_KEYS_FILE = "video_keys.json"

# ==================== JSON 持久化 ====================
def load_existing_keys():
if os.path.exists(VIDEO_KEYS_FILE):
try:
with open(VIDEO_KEYS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except Exception:
return {}
return {}

def save_existing_keys():
try:
with open(VIDEO_KEYS_FILE, "w", encoding="utf-8") as f:
json.dump(existing_video_keys, f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"❌ 保存 video_keys.json 失败: {e}")

existing_video_keys = load_existing_keys()

# ==================== 工具函数 ====================
def get_duration_from_doc(doc):
duration = 0
for attr in doc.attributes:
if isinstance(attr, DocumentAttributeVideo):
duration = attr.duration
break
return duration

def get_video_key(message):
doc = message.media.document
filename = next((attr.file_name for attr in doc.attributes if isinstance(attr, DocumentAttributeFilename)), "video.mp4")
return f"{filename}_{doc.size}_{doc.id}"

def is_video_eligible(message):
if not message.media or not isinstance(message.media, MessageMediaDocument):
return False
doc = message.media.document
if not doc.mime_type or not doc.mime_type.startswith("video"):
return False
if doc.size < MIN_FILE_SIZE:
return False
duration = get_duration_from_doc(doc)
if MIN_DURATION > 0 and duration < MIN_DURATION:
return False
if MAX_DURATION > 0 and duration > MAX_DURATION:
return False
# 时间筛选
msg_date = message.date
if start_date and msg_date < start_date:
return False
if end_date and msg_date > end_date:
return False
return True

# ==================== 转发函数 ====================
async def forward_message(message, video_key):
try:
caption = message.message or ""
if len(caption) > MAX_CAPTION_LENGTH:
caption = caption[:MAX_CAPTION_LENGTH] + "…"
await client.send_file(TARGET_CHANNEL, file=message.media, caption=caption, silent=True)
print(f"✅ 转发成功: {video_key}")
existing_video_keys[video_key] = message.id
save_existing_keys()
await asyncio.sleep(random.uniform(1, 5))
except FloodWaitError as e:
print(f"⏳ 遇到 FloodWait,等待 {e.seconds} 秒...")
await asyncio.sleep(e.seconds)
await forward_message(message, video_key)
except Exception as e:
print(f"❌ 转发失败: {e}")

# ==================== 批量转发 ====================
async def batch_forward_latest_videos():
for source_channel in SOURCE_CHANNELS:
print(f"📦 扫描源频道 {source_channel} 最新 {SCAN_LIMIT} 条消息...")
source = await client.get_entity(source_channel)
messages = [msg async for msg in client.iter_messages(source, limit=SCAN_LIMIT, reverse=False)] # 最新 → 最旧

count = 0
for message in messages:
if is_video_eligible(message):
video_key = get_video_key(message)
if video_key in existing_video_keys:
print(f"⏭️ 已存在相同视频,跳过 message_id={message.id}")
continue
await forward_message(message, video_key)
count += 1
if MAX_FORWARD_COUNT > 0 and count >= MAX_FORWARD_COUNT:
print(f"⏹️ 达到最大转发数量 {MAX_FORWARD_COUNT},停止。")
break
else:
if message.media and isinstance(message.media, MessageMediaDocument):
size = message.media.document.size
duration = get_duration_from_doc(message.media.document)
print(f"⚠️ 跳过视频:{size/1024/1024:.2f}MB, {duration}s, message_id={message.id}")
print(f"📁 源频道 {source_channel} 转发完成,共转发 {count} 个视频。")

# ==================== 实时监听 ====================
@client.on(events.NewMessage(chats=SOURCE_CHANNELS))
async def handler(event):
if is_video_eligible(event.message):
video_key = get_video_key(event.message)
if video_key in existing_video_keys:
print(f"⏭️ 已存在相同视频,跳过 message_id={event.message.id}")
return
await forward_message(event.message, video_key)
else:
if event.message.media and isinstance(event.message.media, MessageMediaDocument):
size = event.message.media.document.size
duration = get_duration_from_doc(event.message.media.document)
print(f"⚠️ 跳过视频:{size/1024/1024:.2f}MB, {duration}s, message_id={event.message.id}")

# ==================== 主程序 ====================
async def main():
await client.start(PHONE_NUMBER, TWO_STEP_PASSWORD)
print("✅ 登录成功")
await batch_forward_latest_videos()
print("⏳ 开始实时监听新视频...")
await client.run_until_disconnected()

if __name__ == "__main__":
with client:
client.loop.run_until_complete(main())