pull/109/head
Mr Chen 6 years ago
parent a4dbdf1c49
commit f5d63f7a4f
  1. BIN
      plugins/gae/ico.png
  2. BIN
      plugins/memcached/ico.png
  3. 21
      plugins/simdht/workers/index_worker.py
  4. 58
      plugins/simdht/workers/simdht_worker.py

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 KiB

After

Width:  |  Height:  |  Size: 1.0 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 801 B

After

Width:  |  Height:  |  Size: 653 B

@ -1,5 +1,5 @@
#!/usr/bin/env python
#coding: utf8
# coding: utf8
"""
从MySQL数据库中读取未索引的资源更新到Sphinx的实时索引中
xiaoxia@xiaoxia.org
@ -12,29 +12,33 @@ import MySQLdb.cursors
SRC_HOST = '127.0.0.1'
SRC_USER = 'root'
SRC_PASS = ''
SRC_PASS = 'root'
DST_HOST = '127.0.0.1'
DST_USER = 'root'
DST_PASS = ''
DST_PASS = 'root'
src_conn = mdb.connect(SRC_HOST, SRC_USER, SRC_PASS, 'ssbc', charset='utf8', cursorclass=MySQLdb.cursors.DictCursor)
src_conn = mdb.connect(SRC_HOST, SRC_USER, SRC_PASS, 'ssbc',
charset='utf8', cursorclass=MySQLdb.cursors.DictCursor)
src_curr = src_conn.cursor()
src_curr.execute('SET NAMES utf8')
dst_conn = mdb.connect(DST_HOST, DST_USER, DST_PASS, 'rt_main', port=9306, charset='utf8')
dst_conn = mdb.connect(DST_HOST, DST_USER, DST_PASS,
'rt_main', port=9306, charset='utf8')
dst_curr = dst_conn.cursor()
dst_curr.execute('SET NAMES utf8')
def work():
src_curr.execute('SELECT id, name, CRC32(category) AS category, length, UNIX_TIMESTAMP(create_time) AS create_time, ' +
'UNIX_TIMESTAMP(last_seen) AS last_seen FROM search_hash WHERE tagged=false LIMIT 10000')
'UNIX_TIMESTAMP(last_seen) AS last_seen FROM search_hash WHERE tagged=false LIMIT 10000')
total = src_curr.rowcount
print 'fetched', total
for one in src_curr:
ret = dst_curr.execute('insert into rt_main(id,name,category,length,create_time,last_seen) values(%s,%s,%s,%s,%s,%s)',
(one['id'], one['name'], one['category'], one['length'], one['create_time'], one['last_seen']))
(one['id'], one['name'], one['category'], one['length'], one['create_time'], one['last_seen']))
if ret:
src_curr.execute('UPDATE search_hash SET tagged=True WHERE id=%s', (one['id'],))
src_curr.execute(
'UPDATE search_hash SET tagged=True WHERE id=%s', (one['id'],))
print 'Indexed', one['name'].encode('utf8')
print 'Done!'
return total
@ -46,4 +50,3 @@ if __name__ == '__main__':
continue
print 'Wait 10mins...'
time.sleep(600)

@ -44,7 +44,7 @@ from metadata import save_metadata
DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PORT = 3306
DB_PASS = ''
DB_PASS = 'root'
DB_NAME = 'ssbc'
BLACK_FILE = 'black_list.txt'
@ -62,6 +62,7 @@ MAX_QUEUE_PT = 200
geoip = pygeoip.GeoIP('GeoIP.dat')
def load_res_blacklist(black_list_path):
black_list = []
file_path = os.path.join(os.path.dirname(__file__), black_list_path)
@ -74,8 +75,10 @@ def load_res_blacklist(black_list_path):
f.close()
return black_list
def is_ip_allowed(ip):
return geoip.country_code_by_addr(ip) not in ('CN','TW','HK')
return geoip.country_code_by_addr(ip) not in ('CN', 'TW', 'HK')
def entropy(length):
return "".join(chr(randint(0, 255)) for _ in xrange(length))
@ -94,9 +97,9 @@ def decode_nodes(nodes):
return n
for i in range(0, length, 26):
nid = nodes[i:i+20]
ip = inet_ntoa(nodes[i+20:i+24])
port = unpack("!H", nodes[i+24:i+26])[0]
nid = nodes[i:i + 20]
ip = inet_ntoa(nodes[i + 20:i + 24])
port = unpack("!H", nodes[i + 24:i + 26])[0]
n.append((nid, ip, port))
return n
@ -107,7 +110,7 @@ def timer(t, f):
def get_neighbor(target, nid, end=10):
return target[:end]+nid[end:]
return target[:end] + nid[end:]
class KNode(object):
@ -173,8 +176,10 @@ class DHTClient(Thread):
nodes = decode_nodes(msg["r"]["nodes"])
for node in nodes:
(nid, ip, port) = node
if len(nid) != 20: continue
if ip == self.bind_ip: continue
if len(nid) != 20:
continue
if ip == self.bind_ip:
continue
n = KNode(nid, ip, port)
self.nodes.append(n)
@ -193,12 +198,12 @@ class DHTServer(DHTClient):
"announce_peer": self.on_announce_peer_request,
}
self.ufd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.ufd = socket.socket(
socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self.ufd.bind((self.bind_ip, self.bind_port))
timer(RE_JOIN_DHT_INTERVAL, self.re_join_DHT)
def run(self):
self.re_join_DHT()
while True:
@ -290,12 +295,14 @@ class DHTServer(DHTClient):
class Master(Thread):
def __init__(self):
Thread.__init__(self)
self.setDaemon(True)
self.queue = Queue()
self.metadata_queue = Queue()
self.dbconn = mdb.connect(DB_HOST, DB_USER, DB_PASS, DB_NAME, port=DB_PORT, charset='utf8')
self.dbconn = mdb.connect(
DB_HOST, DB_USER, DB_PASS, DB_NAME, port=DB_PORT, charset='utf8')
self.dbconn.autocommit(False)
self.dbcurr = self.dbconn.cursor()
self.dbcurr.execute('SET NAMES utf8')
@ -314,10 +321,10 @@ class Master(Thread):
return
self.n_valid += 1
save_metadata(self.dbcurr, binhash, address, start_time, data,self.black_list)
save_metadata(self.dbcurr, binhash, address,
start_time, data, self.black_list)
self.n_new += 1
def run(self):
self.name = threading.currentThread().getName()
print self.name, 'started'
@ -339,31 +346,35 @@ class Master(Thread):
date = datetime.datetime(date.year, date.month, date.day)
# Check if we have this info_hash
self.dbcurr.execute('SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
self.dbcurr.execute(
'SELECT id FROM search_hash WHERE info_hash=%s', (info_hash,))
y = self.dbcurr.fetchone()
if y:
self.n_valid += 1
# 更新最近发现时间,请求数
self.dbcurr.execute('UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
self.dbcurr.execute(
'UPDATE search_hash SET last_seen=%s, requests=requests+1 WHERE info_hash=%s', (utcnow, info_hash))
else:
if dtype == 'pt':
t = threading.Thread(target=simMetadata.download_metadata, args=(address, binhash, self.metadata_queue))
t = threading.Thread(target=simMetadata.download_metadata, args=(
address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
self.n_downloading_pt += 1
elif dtype == 'lt' and self.n_downloading_lt < MAX_QUEUE_LT:
t = threading.Thread(target=ltMetadata.download_metadata, args=(address, binhash, self.metadata_queue))
t = threading.Thread(target=ltMetadata.download_metadata, args=(
address, binhash, self.metadata_queue))
t.setDaemon(True)
t.start()
self.n_downloading_lt += 1
if self.n_reqs >= 1000:
self.dbcurr.execute('INSERT INTO search_statusreport(date,new_hashes,total_requests, valid_requests) VALUES(%s,%s,%s,%s) ON DUPLICATE KEY UPDATE ' +
'total_requests=total_requests+%s, valid_requests=valid_requests+%s, new_hashes=new_hashes+%s',
(date, self.n_new, self.n_reqs, self.n_valid, self.n_reqs, self.n_valid, self.n_new))
'total_requests=total_requests+%s, valid_requests=valid_requests+%s, new_hashes=new_hashes+%s',
(date, self.n_new, self.n_reqs, self.n_valid, self.n_reqs, self.n_valid, self.n_new))
self.dbconn.commit()
print '\n', time.ctime(), 'n_reqs', self.n_reqs, 'n_valid', self.n_valid, 'n_new', self.n_new, 'n_queue', self.queue.qsize(),
print 'n_d_pt', self.n_downloading_pt, 'n_d_lt', self.n_downloading_lt,
print '\n', time.ctime(), 'n_reqs', self.n_reqs, 'n_valid', self.n_valid, 'n_new', self.n_new, 'n_queue', self.queue.qsize(),
print 'n_d_pt', self.n_downloading_pt, 'n_d_lt', self.n_downloading_lt,
self.n_reqs = self.n_valid = self.n_new = 0
def log_announce(self, binhash, address=None):
@ -385,7 +396,8 @@ def announce(info_hash, address):
def rpc_server():
rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(('localhost', 8004), logRequests=False)
rpcserver = SimpleXMLRPCServer.SimpleXMLRPCServer(
('localhost', 8004), logRequests=False)
rpcserver.register_function(announce, 'announce')
print 'Starting xml rpc server...'
rpcserver.serve_forever()
@ -402,5 +414,3 @@ if __name__ == "__main__":
dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=200)
dht.start()
dht.auto_send_find_node()

Loading…
Cancel
Save