Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
mdserver-web/plugins/simdht/workers/simdht_worker.py

487 lines
14 KiB

#!/usr/bin/env python
# encoding: utf-8
"""
磁力搜索meta信息入库程序
xiaoxia@xiaoxia.org
2015.6 Forked CreateChen's Project: https://github.com/CreateChen/simDownloader
2016.12!冰剑 !新增功能:过滤恶意推广网址的无效磁力链接
"""
import hashlib
import os
import SimpleXMLRPCServer
import time
import datetime
import traceback
import sys
import json
import socket
import threading
from hashlib import sha1
from random import randint
from struct import unpack
from socket import inet_ntoa
from threading import Timer, Thread
from time import sleep
from collections import deque
from Queue import Queue
reload(sys)
sys.setdefaultencoding('utf-8')
sys.path.append('/usr/local/lib/python2.7/site-packages')
import pygeoip
import MySQLdb as mdb
try:
raise
import libtorrent as lt
import ltMetadata
except:
lt = None
print sys.exc_info()[1]
import metautils
import simMetadata
from bencode import bencode, bdecode
from metadata import save_metadata
from configparser import ConfigParser
cp = ConfigParser()
cp.read("../db.cfg")
section_db = cp.sections()[0]
DB_HOST = cp.get(section_db, "DB_HOST")
DB_USER = cp.get(section_db, "DB_USER")
DB_PORT = cp.getint(section_db, "DB_PORT")
DB_PASS = cp.get(section_db, "DB_PASS")
DB_NAME = cp.get(section_db, "DB_NAME")
DB_SIZE_LIMIT = cp.get(section_db, "DB_SIZE_LIMIT")
DB_SIZE_TICK = cp.getint(section_db, "DB_SIZE_TICK")
DB_DEL_LINE = cp.getint(section_db, "DB_DEL_LINE")
BLACK_FILE = 'black_list.txt'
BOOTSTRAP_NODES = (
("router.bittorrent.com", 6881),
("dht.transmissionbt.com", 6881),
("router.utorrent.com", 6881)
)
TID_LENGTH = 2
RE_JOIN_DHT_INTERVAL = 3
TOKEN_LENGTH = 2
section_queue = cp.sections()[1]
MAX_QUEUE_LT = cp.getint(section_queue, "MAX_QUEUE_LT")
MAX_QUEUE_PT = cp.getint(section_queue, "MAX_QUEUE_PT")
MAX_NODE_QSIZE = cp.getint(section_queue, "MAX_NODE_QSIZE")
geoip = pygeoip.GeoIP('GeoIP.dat')
def load_res_blacklist(black_list_path):
black_list = []
file_path = os.path.join(os.path.dirname(__file__), black_list_path)
f = open(file_path, 'r')
while True:
line = f.readline()
if not(line):
break
black_list.append(line)
f.close()
return black_list
def is_ip_allowed(ip):
return geoip.country_code_by_addr(ip) not in ('CN')
def entropy(length):
return "".join(chr(randint(0, 255)) for _ in xrange(length))
def random_id():
h = sha1()
h.update(entropy(20))
return h.digest()
def decode_nodes(nodes):
n = []
length = len(nodes)
if (length % 26) != 0:
return n
for i in range(0, length, 26):
nid = nodes[i:i + 20]
ip = inet_ntoa(nodes[i + 20:i + 24])
port = unpack("!H", nodes[i + 24:i + 26])[0]
n.append((nid, ip, port))
return n
def timer(t, f):
Timer(t, f).start()
def get_neighbor(target, nid, end=10):
return target[:end] + nid[end:]
class KNode(object):
def __init__(self, nid, ip, port):
self.nid = nid
self.ip = ip
self.port = port
class DHTClient(Thread):
def __init__(self, max_node_qsize):
Thread.__init__(self)
self.setDaemon(True)
self.max_node_qsize = max_node_qsize
self.nid = random_id()
self.nodes = deque(maxlen=max_node_qsize)
def send_krpc(self, msg, address):
try:
self.ufd.sendto(bencode(msg), address)
except Exception:
pass
def send_find_node(self, address, nid=None):
nid = get_neighbor(nid, self.nid) if nid else self.nid
tid = entropy(TID_LENGTH)
msg = {
"t": tid,
"y": "q",
"q": "find_node",
"a": {
"id": nid,
"target": random_id()
}
}
self.send_krpc(msg, address)
def join_DHT(self):
for address in BOOTSTRAP_NODES:
self.send_find_node(address)
def re_join_DHT(self):
if len(self.nodes) == 0:
self.join_DHT()
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
def auto_send_find_node(self):
wait = 1.0 / self.max_node_qsize
while True:
try:
node = self.nodes.popleft()
self.send_find_node((node.ip, node.port), node.nid)
except IndexError:
pass
try:
sleep(wait)
except KeyboardInterrupt:
os._exit(0)
def process_find_node_response(self, msg, address):
nodes = decode_nodes(msg["r"]["nodes"])
for node in nodes:
(nid, ip, port) = node
if len(nid) != 20:
continue
if ip == self.bind_ip:
continue
n = KNode(nid, ip, port)
self.nodes.append(n)
class DHTServer(DHTClient):
def __init__(self, master, bind_ip, bind_port, max_node_qsize):
DHTClient.__init__(self, max_node_qsize)
self.master = master
self.bind_ip = bind_ip
self.bind_port = bind_port
self.process_request_actions = {
"get_peers": self.on_get_peers_request,
"announce_peer": self.on_announce_peer_request,
}
self.ufd = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.ufd.bind((self.bind_ip, self.bind_port))
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
def run(self):
self.re_join_DHT()
while True:
try:
(data, address) = self.ufd.recvfrom(65536)
msg = bdecode(data)
self.on_message(msg, address)
except Exception:
pass
def on_message(self, msg, address):
try:
if msg["y"] == "r":
if msg["r"].has_key("nodes"):
self.process_find_node_response(msg, address)
elif msg["y"] == "q":
try:
self.process_request_actions[msg["q"]](msg, address)
except KeyError:
self.play_dead(msg, address)
except KeyError:
pass
def on_get_peers_request(self, msg, address):
try:
infohash = msg["a"]["info_hash"]
tid = msg["t"]
nid = msg["a"]["id"]
token = infohash[:TOKEN_LENGTH]
msg = {
"t": tid,
"y": "r",
"r": {
"id": get_neighbor(infohash, self.nid),
"nodes": "",
"token": token
}
}
self.master.log_hash(infohash, address)
self.send_krpc(msg, address)
except KeyError:
pass
def on_announce_peer_request(self, msg, address):
try:
infohash = msg["a"]["info_hash"]
token = msg["a"]["token"]
nid = msg["a"]["id"]
tid = msg["t"]
if infohash[:TOKEN_LENGTH] == token:
if msg["a"].has_key("implied_port ") and msg["a"]["implied_port "] != 0:
port = address[1]
else:
port = msg["a"]["port"]
self.master.log_announce(infohash, (address[0], port))
except Exception:
print 'error'
pass
finally:
self.ok(msg, address)
def play_dead(self, msg, address):
try:
tid = msg["t"]
msg = {
"t": tid,
"y": "e",
"e": [202, "Server Error"]
}
self.send_krpc(msg, address)
except KeyError:
pass
def ok(self, msg, address):
try:
tid = msg["t"]
nid = msg["a"]["id"]
msg = {
"t": tid,
"y": "r",
"r": {
"id": get_neighbor(nid, self.nid)
}
}
self.send_krpc(msg, address)
except KeyError:
pass
class Master(Thread):
def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
self.queue = Queue()
self.metadata_queue = Queue()
self.dbconn = mdb.connect(
DB_HOST, DB_USER, DB_PASS, DB_NAME, port=DB_PORT, charset='utf8')
self.dbconn.autocommit(False)
self.dbcurr = self.dbconn.cursor()
self.dbcurr.execute('SET NAMES utf8')
self.n_reqs = self.n_valid = self.n_new = 0
self.n_downloading_lt = self.n_downloading_pt = 0
self.visited = set()
self.black_list = load_res_blacklist(BLACK_FILE)
def query(self, sql):
self.dbcurr.execute(sql)
result = self.dbcurr.fetchall()
data = map(list, result)
return data
def got_torrent(self):
binhash, address, data, dtype, start_time = self.metadata_queue.get()
if dtype == 'pt':
self.n_downloading_pt -= 1
elif dtype == 'lt':
self.n_downloading_lt -= 1
if not data:
return
self.n_valid += 1
save_metadata(self.dbcurr, binhash, address,
start_time, data, self.black_list)
self.n_new += 1
def run(self):
self.name = threading.currentThread().getName()
print self.name, 'started'
while True:
while self.metadata_queue.qsize() > 0:
self.got_torrent()
address, binhash, dtype = self.queue.get()
if binhash in self.visited:
continue
if len(self.visited) > 100000:
self.visited = set()
self.visited.add(binhash)
self.n_reqs += 1
info_hash = binhash.encode('hex')
utcnow = datetime.datetime.utcnow()
date = (utcnow + datetime.timedelta(hours=8))
date = datetime.datetime(date.year, date.month, date.day)
# Check if we have this info_hash
self.dbcurr.execute(
'SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
y = self.dbcurr.fetchone()
if y:
self.n_valid += 1
# 更新最近发现时间,请求数
self.dbcurr.execute(
'UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
else:
if dtype == 'pt':
t = threading.Thread(target=simMetadata.download_metadata, args=(
address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
self.n_downloading_pt += 1
elif dtype == 'lt' and self.n_downloading_lt < MAX_QUEUE_LT:
t = threading.Thread(target=ltMetadata.download_metadata, args=(
address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
self.n_downloading_lt += 1
if self.n_reqs >= 1000:
self.dbcurr.execute('INSERT INTO search_statusreport(date,new_hashes,total_requests, valid_requests) VALUES(%s,%s,%s,%s) ON DUPLICATE KEY UPDATE ' +
'total_requests=total_requests+%s, valid_requests=valid_requests+%s, new_hashes=new_hashes+%s',
(date, self.n_new, self.n_reqs, self.n_valid, self.n_reqs, self.n_valid, self.n_new))
self.dbconn.commit()
print '\n', time.ctime(), 'n_reqs', self.n_reqs, 'n_valid', self.n_valid, 'n_new', self.n_new, 'n_queue', self.queue.qsize(),
print 'n_d_pt', self.n_downloading_pt, 'n_d_lt', self.n_downloading_lt,
self.n_reqs = self.n_valid = self.n_new = 0
self.check_db_size()
def log_announce(self, binhash, address=None):
self.queue.put([address, binhash, 'pt'])
def log_hash(self, binhash, address=None):
if not lt:
return
if is_ip_allowed(address[0]):
return
if self.n_downloading_lt < MAX_QUEUE_LT:
self.queue.put([address, binhash, 'lt'])
def announce(info_hash, address):
binhash = info_hash.decode('hex')
master.log_announce(binhash, address)
return 'ok'
def rpc_server():
rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(
('localhost', 8004), logRequests=False)
rpcserver.register_function(announce, 'announce')
print 'Starting xml rpc server...'
rpcserver.serve_forever()
class DBCheck(Master):
def __init__(self, master):
Master.__init__(self)
self.setDaemon(True)
def delete_db(self, line=1):
sql = 'select id, info_hash from search_hash order by id limit ' + \
str(line)
data = self.query(sql)
for x in range(len(data)):
iid = data[x][0]
infohash = data[x][1]
sqldel = "delete from search_hash where id='" + str(iid) + "'"
self.query(sqldel)
sqldel2 = "delete from search_filelist where info_hash='" + \
str(infohash) + "'"
self.query(sqldel2)
print 'delete ', iid, infohash, 'done'
def check_db_size(self):
sql = "select (concat(round(sum(DATA_LENGTH/1024/1024),2),'M') + concat(round(sum(INDEX_LENGTH/1024/1024),2),'M') ) \
as sdata from information_schema.tables where TABLE_SCHEMA='" + DB_NAME + "' and TABLE_NAME in('search_hash','search_filelist', 'search_statusreport')"
db_size_limit = float(DB_SIZE_LIMIT) * 1024
data = self.query(sql)
db_size = data[0][0]
if db_size > db_size_limit:
self.delete_db(DB_DEL_LINE)
print 'db size limit:', db_size_limit, db_size
# self.delete_db(DB_DEL_LINE)
def run(self):
while True:
self.check_db_size()
time.sleep(DB_SIZE_TICK)
if __name__ == "__main__":
# max_node_qsize bigger, bandwith bigger, spped higher
master = Master()
master.start()
rpcthread = threading.Thread(target=rpc_server)
rpcthread.setDaemon(True)
rpcthread.start()
print 'DBCheck start'
check = DBCheck(master)
check.start()
print 'DHTServer start'
dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=MAX_NODE_QSIZE)
dht.start()
dht.auto_send_find_node()