diff --git a/plugins/simdht/index.py b/plugins/simdht/index.py index 0cbf7afa6..819576e45 100755 --- a/plugins/simdht/index.py +++ b/plugins/simdht/index.py @@ -72,7 +72,7 @@ def getSqlFile(): def getDbConf(): - file = getServerDir() + "/workers/db.cfg" + file = getServerDir() + "/db.cfg" return file @@ -88,6 +88,12 @@ def initDreplace(): sdir = getPluginDir() + '/workers' public.execShell('cp -rf ' + sdir + ' ' + getServerDir()) + cfg = getServerDir() + '/db.cfg' + if not os.path.exists(cfg): + cfg_tpl = getPluginDir() + '/workers/db.cfg' + content = public.readFile(cfg_tpl) + public.writeFile(cfg, content) + file_tpl = getInitDTpl() service_path = os.path.dirname(os.getcwd()) diff --git a/plugins/simdht/workers/db.cfg b/plugins/simdht/workers/db.cfg index 6be5cc806..df3a289e3 100755 --- a/plugins/simdht/workers/db.cfg +++ b/plugins/simdht/workers/db.cfg @@ -5,6 +5,10 @@ DB_PORT = 3306 DB_PASS = ssbc DB_NAME = ssbc +#UNIT(G) +DB_SIZE_LIMIT = 1 +DB_SIZE_TICK = 3 + [queue] MAX_QUEUE_LT = 30 MAX_QUEUE_PT = 200 diff --git a/plugins/simdht/workers/simdht_worker.py b/plugins/simdht/workers/simdht_worker.py index 88d1515a5..c93a515a5 100755 --- a/plugins/simdht/workers/simdht_worker.py +++ b/plugins/simdht/workers/simdht_worker.py @@ -56,6 +56,8 @@ DB_USER = cp.get(section_db, "DB_USER") DB_PORT = cp.getint(section_db, "DB_PORT") DB_PASS = cp.get(section_db, "DB_PASS") DB_NAME = cp.get(section_db, "DB_NAME") +DB_SIZE_LIMIT = cp.get(section_db, "DB_SIZE_LIMIT") +DB_SIZE_TICK = cp.getint(section_db, "DB_SIZE_TICK") BLACK_FILE = 'black_list.txt' BOOTSTRAP_NODES = ( @@ -323,6 +325,12 @@ class Master(Thread): self.visited = set() self.black_list = load_res_blacklist(BLACK_FILE) + def query(self, sql): + self.dbcurr.execute(sql) + result = self.dbcurr.fetchall() + data = map(list, result) + return data + def got_torrent(self): binhash, address, data, dtype, start_time = self.metadata_queue.get() if dtype == 'pt': @@ -341,6 +349,7 @@ class Master(Thread): self.name = threading.currentThread().getName() print self.name, 'started' while True: + while self.metadata_queue.qsize() > 0: self.got_torrent() address, binhash, dtype = self.queue.get() @@ -389,6 +398,8 @@ class Master(Thread): print 'n_d_pt', self.n_downloading_pt, 'n_d_lt', self.n_downloading_lt, self.n_reqs = self.n_valid = self.n_new = 0 + self.check_db_size() + def log_announce(self, binhash, address=None): self.queue.put([address, binhash, 'pt']) @@ -414,6 +425,48 @@ def rpc_server(): print 'Starting xml rpc server...' rpcserver.serve_forever() + +class DBCheck(Master): + + def __init__(self, master): + Master.__init__(self) + self.setDaemon(True) + + def delete_db(self, line=1): + sql = 'select id, info_hash from search_hash order by id limit ' + \ + str(line) + data = self.query(sql) + for x in range(len(data)): + iid = data[x][0] + infohash = data[x][1] + + sqldel = "delete from search_hash where id='" + str(iid) + "'" + self.query(sqldel) + + sqldel2 = "delete from search_filelist where info_hash='" + \ + str(infohash) + "'" + self.query(sqldel2) + print 'delete ', iid, infohash, 'done' + + def check_db_size(self): + sql = "select (concat(round(sum(DATA_LENGTH/1024/1024),2),'M') + concat(round(sum(INDEX_LENGTH/1024/1024),2),'M') ) \ + as sdata from information_schema.tables where TABLE_SCHEMA='" + DB_NAME + "' and TABLE_NAME in('search_hash','search_filelist', 'search_statusreport')" + + db_size_limit = int(DB_SIZE_LIMIT) * 1024 + data = self.query(sql) + db_size = data[0][0] + + if db_size > db_size_limit: + self.delete_db() + + # print 'db size limit:', int(DB_SIZE_LIMIT) * 1024, db_size + # self.delete_db() + + def run(self): + while True: + self.check_db_size() + time.sleep(DB_SIZE_TICK) + if __name__ == "__main__": # max_node_qsize bigger, bandwith bigger, spped higher master = Master() @@ -423,6 +476,9 @@ if __name__ == "__main__": rpcthread.setDaemon(True) rpcthread.start() + check = DBCheck(master) + check.start() + dht = DHTServer(master, "0.0.0.0", 6881, max_node_qsize=MAX_NODE_QSIZE) dht.start() dht.auto_send_find_node()