diff --git a/class/core/db.py b/class/core/db.py index 4adb9fc7c..5be95e8b9 100755 --- a/class/core/db.py +++ b/class/core/db.py @@ -281,7 +281,16 @@ class Sql(): except Exception as ex: return "error: " + str(ex) - def execute(self, sql, param): + def originExecute(self, sql, param=()): + self.__GetConn() + try: + result = self.__DB_CONN.execute(sql, param) + self.__DB_CONN.commit() + return result + except Exception as ex: + return "error: " + str(ex) + + def execute(self, sql, param=()): # 执行SQL语句返回受影响行 self.__GetConn() # print sql, param diff --git a/plugins/webstats/tool_migrate.py b/plugins/webstats/tool_migrate.py index 2ac613e62..5d6499f29 100644 --- a/plugins/webstats/tool_migrate.py +++ b/plugins/webstats/tool_migrate.py @@ -32,19 +32,166 @@ def getTaskConf(): return conf +def getConf(): + conf = getServerDir() + "/lua/config.json" + return conf + + +def getGlobalConf(): + conf = getConf() + content = mw.readFile(conf) + result = json.loads(content) + return result + + +def pSqliteDb(dbname='web_logs', site_name='unset', fn="logs"): + + db_dir = getServerDir() + '/logs/' + site_name + if not os.path.exists(db_dir): + mw.execShell('mkdir -p ' + db_dir) + + name = fn + file = db_dir + '/' + name + '.db' + + if not os.path.exists(file): + conn = mw.M(dbname).dbPos(db_dir, name) + sql = mw.readFile(getPluginDir() + '/conf/init.sql') + sql_list = sql.split(';') + for index in range(len(sql_list)): + conn.execute(sql_list[index], ()) + else: + conn = mw.M(dbname).dbPos(db_dir, name) + + conn.execute("PRAGMA synchronous = 0", ()) + conn.execute("PRAGMA page_size = 4096", ()) + conn.execute("PRAGMA journal_mode = wal", ()) + + conn.text_factory = lambda x: str(x, encoding="utf-8", errors='ignore') + # conn.text_factory = lambda x: unicode(x, "utf-8", "ignore") + return conn + + def migrateSiteHotLogs(site_name, query_date): print(site_name, query_date) - return mw.returnMsg(True, "{} 日志合并成功!".format(site_name)) + + migrating_flag = getServerDir() + "/logs/%s/migrating" % site_name + hot_db = getServerDir() + "/logs/%s/logs.db" % site_name + hot_db_tmp = getServerDir() + "/logs/%s/logs_tmp.db" % site_name + history_logs_db = getServerDir() + "/logs/%s/history_logs.db" % site_name + # 1. copy to tmp file + try: + import shutil + print("coping {} to {} ...".format(hot_db, hot_db_tmp)) + mw.writeFile(migrating_flag, "yes") + shutil.copy(hot_db, hot_db_tmp) + if not os.path.exists(hot_db_tmp): + return mw.returnMsg(False, "migrating fail, copy tmp file!") + except: + return mw.returnMsg(False, "{} migrating fail.".format(site_name)) + finally: + if os.path.exists(migrating_flag): + os.remove(migrating_flag) + + # 2. 从临时备份中迁移热日志数据到历史日志 + print("begin tmp to hot log data ...") + try: + print("history file: {}".format(history_logs_db)) + logs_conn = pSqliteDb('web_log', site_name, 'logs_tmp') + history_logs_conn = pSqliteDb('web_log', site_name, 'history_logs') + + hot_db_columns = logs_conn.originExecute( + "PRAGMA table_info([web_logs])") + _columns = ",".join([c[1] for c in hot_db_columns if c[1] != "id"]) + query_start = 0 + todayTime = time.strftime('%Y-%m-%d 00:00:00', time.localtime()) + todayUt = int(time.mktime(time.strptime( + todayTime, "%Y-%m-%d %H:%M:%S"))) + + logs_sql = "select {} from web_logs where time<{}".format( + _columns, todayUt) + selector = logs_conn.originExecute(logs_sql) + log = selector.fetchone() + while log: + params = "" + for field in log: + if params: + params += "," + if field is None: + field = "\'\'" + elif type(field) == str: + field = "\'" + field.replace("\'", "\”") + "\'" + params += str(field) + insert_sql = "insert into web_logs(" + \ + _columns + ") values(" + params + ")" + history_logs_conn.execute(insert_sql) + log = selector.fetchone() + + print("sorting historical data, this action takes a long time...") + history_logs_conn.execute("VACUUM;") + + gcfg = getGlobalConf() + save_day = gcfg['global']["save_day"] + print("delete historical data {} days ago...".format(save_day)) + time_now = time.localtime() + save_timestamp = time.mktime( + (time_now.tm_year, time_now.tm_mon, time_now.tm_mday - save_day, 0, 0, 0, 0, 0, 0)) + delete_sql = "delete from site_logs where time <= {}".format( + save_timestamp) + history_logs_conn.execute(delete_sql) + history_logs_conn.commit() + + # 3. delete merged data and clean up statistics + print("delete merged thermal data...") + mw.writeFile(migrating_flag, "yes") + + hot_db_conn = pSqliteDb('web_logs', site_name) + del_hot_log = "delete from web_logs where time<{}".format(todayUt) + print(del_hot_log) + r = hot_db_conn.execute(del_hot_log) + print("delete:", r) + print("deleting statistics over 180 days...") + save_time_key = time.strftime( + '%Y%m%d00', time.localtime(time.time() - 180 * 86400)) + + del_request_stat_sql = "delete from request_stat where time<={}".format( + save_time_key) + hot_db_conn.execute(del_request_stat_sql) + + hot_db_conn.execute( + "delete from spider_stat where time<={}".format(save_time_key)) + hot_db_conn.execute( + "delete from client_stat where time<={}".format(save_time_key)) + hot_db_conn.execute( + "delete from referer_stat where time<={}".format(save_time_key)) + hot_db_conn.commit() + print("clean up the hot database...") + hot_db_conn.execute("VACUUM;") + hot_db_conn.commit() + + if os.path.exists(migrating_flag): + os.remove(migrating_flag) + except Exception as e: + if site_name: + print("{} logs to history error:{}".format(site_name, e)) + else: + print("logs to history error:{}".format(e)) + finally: + if os.path.exists(hot_db_tmp): + os.remove(hot_db_tmp) + + print("{} logs migrate ok.".format(site_name)) + return mw.returnMsg(True, "{} logs migrate ok".format(site_name)) def migrateHotLogs(query_date="today"): print("begin migrate hot logs") sites = mw.M('sites').field('name').order("addtime").select() - # print(sites) unset_site = {"name": "unset"} sites.append(unset_site) + # migrateSiteHotLogs('t1.cn', query_date) + for site_info in sites: # print(site_info['name']) site_name = site_info["name"]