Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
mdserver-web/plugins/qbittorrent/workers/qbittorrent_worker.py

734 lines
24 KiB

#!/usr/bin/env python
# encoding: utf-8
"""
下载检测
"""
'''
pl_hash_list 表字段 status说明
0 视频资源 - 正常
1 不含有视频资源
2 3小时下载未反应
4 3个月未下载完成
'''
import hashlib
import os
import time
import datetime
import traceback
import sys
import json
import socket
import threading
from hashlib import sha1
from random import randint
from struct import unpack
from socket import inet_ntoa
from threading import Timer, Thread
from time import sleep
reload(sys)
sys.setdefaultencoding("utf8")
sys.path.append('/usr/local/lib/python2.7/site-packages')
def formatTime():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def getRunDir():
return os.getcwd()
def getRootDir():
return os.path.dirname(os.path.dirname(getRunDir()))
def toSize(size):
# 字节单位转换
d = ('b', 'KB', 'MB', 'GB', 'TB')
s = d[0]
for b in d:
if size < 1024:
return str(round(size, 2)) + ' ' + b
size = float(size) / 1024.0
s = b
return str(round(size, 2)) + ' ' + b
# import pygeoip
import MySQLdb as mdb
from configparser import ConfigParser
cp = ConfigParser()
cp.read("../qb.conf")
section_db = cp.sections()[0]
DB_HOST = cp.get(section_db, "DB_HOST")
DB_USER = cp.get(section_db, "DB_USER")
DB_PORT = cp.getint(section_db, "DB_PORT")
DB_PASS = cp.get(section_db, "DB_PASS")
DB_NAME = cp.get(section_db, "DB_NAME")
section_qb = cp.sections()[1]
QB_HOST = cp.get(section_qb, "QB_HOST")
QB_PORT = cp.get(section_qb, "QB_PORT")
QB_USER = cp.get(section_qb, "QB_USER")
QB_PWD = cp.get(section_qb, "QB_PWD")
section_file = cp.sections()[2]
FILE_TO = cp.get(section_file, "FILE_TO")
FILE_TRANSFER_TO = cp.get(section_file, "FILE_TRANSFER_TO")
FILE_OWN = cp.get(section_file, "FILE_OWN")
FILE_GROUP = cp.get(section_file, "FILE_GROUP")
FILE_ENC_SWITCH = cp.get(section_file, "FILE_ENC_SWITCH")
FILE_API_URL = cp.get(section_file, "FILE_API_URL")
FILE_ASYNC_SWITCH = cp.get(section_file, "FILE_ASYNC_SWITCH")
section_task = cp.sections()[3]
TASK_SIZE_LIMIT = cp.get(section_task, "TASK_SIZE_LIMIT")
TASK_RATE = cp.getint(section_task, "TASK_RATE")
TASK_COMPLETED_RATE = cp.getint(section_task, "TASK_COMPLETED_RATE")
TASK_DEBUG = cp.getint(section_task, "TASK_DEBUG")
section_setting = cp.sections()[4]
QUEUE_SWITCH = cp.get(section_setting, "QUEUE_SWITCH")
MAX_ACTIVE_UPLOADS = cp.getint(section_setting, "MAX_ACTIVE_UPLOADS")
MAX_ACTIVE_TORRENTS = cp.getint(section_setting, "MAX_ACTIVE_TORRENTS")
MAX_ACTIVE_DOWNLOADS = cp.getint(section_setting, "MAX_ACTIVE_DOWNLOADS")
rooDir = getRootDir()
tmp_cmd = rooDir + "/lib/ffmpeg/ffmpeg"
if os.path.exists(tmp_cmd):
ffmpeg_cmd = tmp_cmd
else:
ffmpeg_cmd = "/usr/local/bin/ffmpeg"
class downloadBT(Thread):
__db_err = None
def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
self.qb = self.qb()
self.qb.set_preferences(max_active_uploads=MAX_ACTIVE_UPLOADS)
self.qb.set_preferences(max_active_torrents=MAX_ACTIVE_TORRENTS)
self.qb.set_preferences(max_active_downloads=MAX_ACTIVE_DOWNLOADS)
_has_suffix = ['mp4', 'rmvb', 'flv', 'avi',
'mpg', 'mkv', 'wmv', 'avi', 'rm']
has_suffix = []
for x in range(len(_has_suffix)):
has_suffix.append('.' + _has_suffix[x])
has_suffix.append('.' + _has_suffix[x].upper())
self.has_suffix = has_suffix
self.__conn()
def __conn(self):
try:
self.dbconn = mdb.connect(
DB_HOST, DB_USER, DB_PASS, DB_NAME, port=DB_PORT, charset='utf8')
self.dbconn.autocommit(False)
self.dbcurr = self.dbconn.cursor()
self.dbcurr.execute('SET NAMES utf8')
return True
except Exception as e:
self.__db_err = e
return False
def __check(self):
if self.__db_err:
sys.exit('未连接数据库!')
def __close(self):
self.dbcurr.close()
self.dbconn.close()
def query(self, sql):
# 执行SQL语句返回数据集
if not self.__conn():
return self.__db_err
try:
self.dbcurr.execute(sql)
result = self.dbcurr.fetchall()
# print result
# 将元组转换成列表
data = map(list, result)
self.__close()
return data
except Exception, ex:
return ex
def execute(self, sql):
if not self.__conn():
return self.__db_err
try:
self.dbcurr.execute(sql)
self.dbcurr.execute('SELECT LAST_INSERT_ID();')
result = self.dbcurr.fetchone()
self.dbconn.commit()
self.__close()
return result[0]
except Exception, ex:
return ex
def qb(self):
from qbittorrent import Client
url = 'http://' + QB_HOST + ':' + QB_PORT + '/'
qb = Client(url)
qb.login(QB_USER, QB_PWD)
return qb
def execShell(self, cmdstring, cwd=None, timeout=None, shell=True):
import subprocess
if shell:
cmdstring_list = cmdstring
else:
cmdstring_list = shlex.split(cmdstring)
if timeout:
end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
sub = subprocess.Popen(cmdstring_list, cwd=cwd, stdin=subprocess.PIPE,
shell=shell, bufsize=4096, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while sub.poll() is None:
time.sleep(0.1)
if timeout:
if end_time <= datetime.datetime.now():
raise Exception("Timeout:%s" % cmdstring)
return sub.communicate()
def md5(self, str):
# 生成MD5
try:
m = hashlib.md5()
m.update(str)
return m.hexdigest()
except:
return False
def readFile(self, filename):
# 读文件内容
try:
fp = open(filename, 'r')
fBody = fp.read()
fp.close()
return fBody
except:
return False
def get_transfer_ts_file(self, to):
return FILE_TRANSFER_TO + '/' + to + '.ts'
def get_transfer_mp4_file(self, to):
return FILE_TRANSFER_TO + '/' + to + '.mp4'
def get_transfer_m3u5_dir(self, dirname, fname):
return FILE_TO + '/m3u8/' + dirname + '/' + fname
def get_transfer_hash_dir(self, dirname):
return FILE_TO + '/m3u8/' + dirname
def fg_transfer_mp4_cmd(self, sfile, dfile):
cmd = ffmpeg_cmd + ' -y -i "' + sfile + \
'" -threads 1 -preset veryslow -crf 28 -c:v libx264 -strict -2 ' + dfile
return cmd
def fg_transfer_ts_cmd(self, file, to_file):
cmd = ffmpeg_cmd + ' -y -i ' + file + \
' -s 480x360 -vcodec copy -acodec copy -vbsf h264_mp4toannexb ' + to_file
return cmd
def fg_m3u8_cmd(self, ts_file, m3u8_file, to_file):
cmd = ffmpeg_cmd + ' -y -i ' + ts_file + ' -c copy -map 0 -f segment -segment_list ' + \
m3u8_file + ' -segment_time 3 ' + to_file
return cmd
def fg_m3u8enc_cmd(self, ts_file, m3u8_file, to_file, enc_dir):
cmd = ffmpeg_cmd + ' -y -i ' + ts_file + ' -threads 1 -strict -2 -hls_time 3 -hls_key_info_file ' + \
enc_dir + '/enc.keyinfo.txt -hls_playlist_type vod -hls_segment_filename ' + \
to_file + ' ' + m3u8_file
return cmd
def debug(self, msg):
return formatTime() + ":" + msg
def get_lock_file(self, to):
return '/tmp/mdw_qb_' + to + '.lock'
def lock(self, sign):
l = self.get_lock_file(sign)
self.execShell('touch ' + l)
def unlock(self, sign):
l = self.get_lock_file(sign)
self.execShell('rm -rf ' + l)
def islock(self, sign):
l = self.get_lock_file(sign)
if os.path.exists(l):
return True
return False
def ffmpeg_file_sync(self):
if FILE_ASYNC_SWITCH == '1':
runDir = getRunDir()
sign = 'sync'
print 'file_sync... start'
if self.islock(sign):
print self.debug('sync doing,already lock it!!!')
else:
self.lock(sign)
r = self.execShell('sh -x ' + runDir + '/rsync.sh')
print self.debug('file_sync:' + r[0])
print self.debug('file_sync_error:' + r[1])
self.unlock(sign)
print 'file_sync... end'
def ffmpeg_del_file(self, mp4, ts, m3u8_dir):
print self.debug('delete middle file ... start ' + mp4)
self.execShell('rm -rf ' + mp4)
self.execShell('rm -rf ' + ts)
print self.debug('delete middle file ... end ' + ts)
# if os.path.exists(m3u8_dir):
# self.execShell('rm -rf ' + m3u8_dir)
def ffmpeg_del_hfile(self, shash_dir):
pass
# print self.debug('delete middle hash dir ... start ' + shash_dir)
# if os.path.exists(shash_dir):
# self.execShell('rm -rf ' + shash_dir)
# print self.debug('delete middle hash dir ... end ' + shash_dir)
def ffmpeg(self, file=''):
if not os.path.exists(FILE_TRANSFER_TO):
self.execShell('mkdir -p ' + FILE_TRANSFER_TO)
fname = os.path.basename(file)
shash = self.sign_torrent['hash']
md5file = self.md5(file)
if not os.path.exists(file):
print formatTime(), 'file not exists:', file
return
print self.debug('source file ' + file)
mp4file = self.get_transfer_mp4_file(md5file)
cmd_mp4 = self.fg_transfer_mp4_cmd(file, mp4file)
if not os.path.exists(mp4file):
print self.debug('cmd_mp4:' + cmd_mp4)
os.system(cmd_mp4)
else:
print self.debug('mp4 exists:' + mp4file)
if not os.path.exists(mp4file):
print self.debug('mp4 not exists')
return
tsfile = self.get_transfer_ts_file(md5file)
cmd_ts = self.fg_transfer_ts_cmd(mp4file, tsfile)
if not os.path.exists(tsfile):
print self.debug('cmd_ts:' + cmd_ts)
os.system(cmd_ts)
else:
print self.debug('data_ts exists:' + mp4file)
if not os.path.exists(tsfile):
print self.debug('ts not exists')
return
md5Fname = self.md5(fname)
m3u8_dir = self.get_transfer_m3u5_dir(shash, md5Fname)
if not os.path.exists(m3u8_dir):
self.execShell('mkdir -p ' + m3u8_dir)
m3u8_file = m3u8_dir + '/index.m3u8'
tofile = m3u8_dir + '/%010d.ts'
print self.debug('tofile:' + tofile)
# 加密m3u8
if FILE_ENC_SWITCH != '0':
enc_dir = '/tmp/qb_m3u8'
cmd = self.fg_m3u8enc_cmd(tsfile, m3u8_file, tofile, enc_dir)
if os.path.exists(m3u8_file):
print self.debug('cmd_m3u8_enc exists:' + m3u8_file)
print self.debug('cmd_m3u8_enc:' + cmd)
self.ffmpeg_file_sync()
self.ffmpeg_del_file(mp4file, tsfile, m3u8_dir)
return
self.execShell('mkdir -p ' + enc_dir)
self.execShell('openssl rand -base64 16 > ' +
enc_dir + '/enc.key')
self.execShell('rm -rf ' + enc_dir + '/enc.keyinfo.txt')
try:
fid = self.add_hash(fname, md5Fname)
except Exception as e:
print 'add_hash_enc:' + str(e)
return
fid = self.add_hash(fname, md5Fname)
key = self.readFile(enc_dir + '/enc.key').strip()
self.set_hashfile_key(fid, key)
# FILE_API_URL
url = FILE_API_URL.replace('{$KEY}', fid)
enc_url = 'echo ' + url + ' >> ' + enc_dir + '/enc.keyinfo.txt'
self.execShell(enc_url)
enc_path = 'echo ' + enc_dir + '/enc.key >> ' + enc_dir + '/enc.keyinfo.txt'
self.execShell(enc_path)
enc_iv = 'openssl rand -hex 16 >> ' + enc_dir + '/enc.keyinfo.txt'
self.execShell(enc_iv)
os.system(cmd)
else:
if os.path.exists(m3u8_file):
print self.debug('m3u8 exists:' + tofile)
if TASK_DEBUG == 0:
self.ffmpeg_file_sync()
self.ffmpeg_del_file(mp4file, tsfile, m3u8_dir)
else:
cmd_m3u8 = self.fg_m3u8_cmd(tsfile, m3u8_file, tofile)
print self.debug('cmd_m3u8:' + cmd_m3u8)
os.system(cmd_m3u8)
try:
self.add_hash(fname, md5Fname)
except Exception as e:
print 'add_hash', str(e)
self.execShell('chown -R ' + FILE_OWN + ':' +
FILE_GROUP + ' ' + m3u8_dir)
self.execShell('chmod -R 755 ' + m3u8_dir)
if TASK_DEBUG == 0:
self.ffmpeg_file_sync()
self.ffmpeg_del_file(mp4file, tsfile, m3u8_dir)
def get_bt_size(self, torrent):
total_size = '0'
if 'size' in torrent:
total_size = str(torrent['size'])
if 'total_size' in torrent:
total_size = str(torrent['total_size'])
return total_size
def get_hashlist_id(self):
ct = formatTime()
total_size = self.get_bt_size(self.sign_torrent)
shash = self.sign_torrent['hash']
sname = self.sign_torrent['name']
sname = mdb.escape_string(sname)
info = self.query(
"select id from pl_hash_list where info_hash='" + shash + "'")
if len(info) > 0:
pid = info[0][0]
else:
print 'insert into pl_hash_list data'
pid = self.execute("insert into pl_hash_list (`name`,`info_hash`,`length`,`create_time`) values('" +
sname + "','" + shash + "','" + total_size + "','" + str(ct) + "')")
return pid
def set_hashlist_status(self, torrent, status):
ct = formatTime()
shash = torrent['hash']
info = self.query(
"select id from pl_hash_list where info_hash='" + shash + "'")
if len(info) > 0:
print 'set_hashlist_status update'
usql = "update pl_hash_list set `status`='" + \
str(status) + "' where info_hash='" + shash + "'"
self.execute(usql)
else:
print 'set_hashlist_status insert'
total_size = self.get_bt_size(torrent)
sname = torrent['name']
sname = mdb.escape_string(sname)
return self.execute("insert into pl_hash_list (`name`,`info_hash`,`length`,`status`,`create_time`) values('" +
sname + "','" + shash + "','" + total_size + "','" + str(status) + "','" + ct + "')")
def get_hashfile_id(self, fname, m3u8_name, pid):
ct = formatTime()
info = self.query(
"select id from pl_hash_file where name='" + fname + "' and pid='" + str(pid) + "'")
if len(info) == 0:
print 'insert into pl_hash_file data !'
fid = self.execute("insert into pl_hash_file (`pid`,`name`,`m3u8`,`create_time`) values('" +
str(pid) + "','" + fname + "','" + m3u8_name + "','" + ct + "')")
else:
print fname, ':', m3u8_name, 'already is exists!'
fid = str(info[0][0])
return fid
def set_hashfile_key(self, fid, key):
self.execute("update pl_hash_file set `key`='" +
mdb.escape_string(key) + "' where id=" + fid)
def add_queue(self, shash, size):
ct = formatTime()
info = self.query(
"select id from pl_hash_queue where info_hash='" + shash + "'")
if len(info) == 0:
sql = "insert into pl_hash_queue (`info_hash`,`length`,`created_at`,`updated_at`) values('" + \
shash + "','" + str(size) + "','" + ct + "','" + ct + "')"
return self.execute(sql)
else:
print 'queue:', shash, 'already is exists!'
def add_hash(self, fname, m3u8_name):
print '-------------------------add_hash---start-----------------------'
pid = self.get_hashlist_id()
fid = 0
if pid:
fid = self.get_hashfile_id(fname, m3u8_name, pid)
print '-------------------------add_hash---end--------------------------'
return fid
def file_arr(self, path, filters=['.DS_Store']):
file_list = []
flist = os.listdir(path)
for i in range(len(flist)):
# 下载缓存文件过滤
if flist[i] == '.unwanted':
continue
file_path = os.path.join(path, flist[i])
if flist[i] in filters:
continue
if os.path.isdir(file_path):
tmp = self.file_arr(file_path, filters)
file_list.extend(tmp)
else:
file_list.append(file_path)
return file_list
def find_dir_video(self, path):
flist = self.file_arr(path)
video = []
for i in range(len(flist)):
if self.is_video(flist[i]):
video.append(flist[i])
return video
def video_do(self, path):
if os.path.isfile(path):
if self.is_video(path):
self.ffmpeg(path)
else:
vlist = self.find_dir_video(path)
for v in vlist:
self.ffmpeg(v)
return ''
def is_video(self, path):
t = os.path.splitext(path)
if t[1] in self.has_suffix:
return True
return False
def non_download(self, torrent):
flist = self.qb.get_torrent_files(torrent['hash'])
is_video = False
for pos in range(len(flist)):
file = torrent['save_path'] + flist[pos]['name']
if not self.is_video(file):
self.qb.set_file_priority(torrent['hash'], pos, 0)
else:
is_video = True
# is video
if not is_video:
self.set_status(torrent, 1)
def set_status(self, torrent, status):
self.set_hashlist_status(torrent, status)
if TASK_DEBUG == 0 and status != 0:
self.qb.delete_permanently(torrent['hash'])
def is_downloading(self, torrent):
if torrent['name'] == torrent['hash']:
return True
else:
return False
def is_nondownload_overtime(self, torrent, sec):
ct = time.time()
use_time = int(ct) - int(torrent['added_on'])
flist = self.qb.get_torrent_files(torrent['hash'])
# print flist
flist_len = len(flist)
# 没有获取种子信息
# print 'ddd:',flist_len,use_time,sec
if flist_len == 0 and use_time > sec:
self.set_status(torrent, 2)
return True
is_video_download = False
# 获取了种子信息,但是没有下载
for pos in range(len(flist)):
file = torrent['save_path'] + flist[pos]['name']
if self.is_video(file):
if flist[pos]['progress'] != '0':
is_video_download = True
if not is_video_download and use_time > sec:
self.set_status(torrent, 3)
return True
return False
def is_downloading_overtime(self, torrent, sec):
ct = time.time()
use_time = int(ct) - int(torrent['added_on'])
if use_time > sec:
self.set_status(torrent, 4)
return True
return False
def is_downloading_overlimit(self, torrent):
sz = self.get_bt_size(torrent)
ct = formatTime()
size_limit = float(TASK_SIZE_LIMIT) * 1024 * 1024 * 1024
size_limit = int(size_limit)
print 'is_downloading_overlimit:', toSize(sz), toSize(size_limit)
if int(sz) > int(size_limit):
print 'overlimit sz:' + sz
self.add_queue(torrent['hash'], str(sz))
self.qb.delete_permanently(torrent['hash'])
def check_task(self):
while True:
self.__check()
torrents = self.qb.torrents(filter='downloading')
tlen = len(torrents)
if tlen > 0:
print "downloading torrents count:", tlen
for torrent in torrents:
if self.is_nondownload_overtime(torrent, 5 * 60):
pass
elif self.is_downloading_overtime(torrent, 7 * 24 * 60 * 60):
pass
elif self.is_downloading(torrent):
pass
elif self.is_downloading_overlimit(torrent):
pass
else:
self.non_download(torrent)
print torrent['name'], ' task downloading!'
else:
print self.debug("no downloading task!")
time.sleep(TASK_RATE)
def completed(self):
while True:
self.__check()
torrents = self.qb.torrents(filter='completed')
if not torrents:
continue
tlen = len(torrents)
print "completed torrents count:", tlen
if tlen > 0:
for torrent in torrents:
self.sign_torrent = torrent
path = torrent['save_path'] + torrent['name']
path = path.encode()
try:
self.video_do(path)
hash_dir = self.get_transfer_hash_dir(torrent['hash'])
if TASK_DEBUG == 0:
self.ffmpeg_del_hfile(hash_dir)
self.qb.delete_permanently(torrent['hash'])
except Exception as e:
print formatTime(), str(e)
print self.debug("done task!")
else:
print self.debug("no completed task!")
time.sleep(TASK_COMPLETED_RATE)
def add_hash_task(self, shash):
url = 'magnet:?xt=urn:btih:' + shash
self.qb.download_from_link(url)
print self.debug('queue add_hash_task is ok ... ')
def queue(self):
while True:
if QUEUE_SWITCH == '1':
print self.debug("------------ do queue task start! ---------------")
setting = self.qb.preferences()
torrents = self.qb.torrents()
tlen = len(torrents)
# print tlen, setting['max_active_torrents']
add = int(setting['max_active_torrents']) - tlen
if add == 0:
print self.debug('the download queue is full ... ')
else:
size_limit = float(TASK_SIZE_LIMIT) * 1024 * 1024 * 1024
size_limit = int(size_limit)
size_sql_where = ''
size_sql = ''
if size_limit != 0:
size_sql = ',`length` desc '
size_sql_where = 'where `length`<=' + str(size_limit)
sql = "select * from pl_hash_queue " + size_sql_where + \
" order by created_at desc " + \
size_sql + " limit " + str(add)
info = self.query(sql)
info_len = len(info)
if info_len == 0:
print self.debug('queue data is empty ... ')
else:
for x in range(info_len):
self.add_hash_task(info[x][1])
self.execute(
'delete from pl_hash_queue where id=' + str(info[x][0]))
print self.debug("------------ do queue task end ! ---------------")
time.sleep(TASK_RATE)
def test():
while True:
print self.debug("no download task!")
time.sleep(1)
test()
if __name__ == "__main__":
dl = downloadBT()
import threading
check_task = threading.Thread(target=dl.check_task)
check_task.start()
completed = threading.Thread(target=dl.completed)
completed.start()
queue = threading.Thread(target=dl.queue)
queue.start()