From 6b24dfc21e8ae5fc7588b8a737aeeb766ec7497e Mon Sep 17 00:00:00 2001 From: midoks Date: Mon, 17 Oct 2022 11:36:29 +0800 Subject: [PATCH] Update webstats_common.lua --- plugins/webstats/lua/webstats_common.lua | 78 +++++++++++++++--------- 1 file changed, 50 insertions(+), 28 deletions(-) diff --git a/plugins/webstats/lua/webstats_common.lua b/plugins/webstats/lua/webstats_common.lua index 980cd22b2..dd4dfe89e 100644 --- a/plugins/webstats/lua/webstats_common.lua +++ b/plugins/webstats/lua/webstats_common.lua @@ -47,6 +47,10 @@ function _M.initDB(self, input_sn) local path = log_dir .. '/' .. input_sn .. "/logs.db" db, _ = sqlite3.open(path) + db:exec([[PRAGMA synchronous = 0]]) + db:exec([[PRAGMA page_size = 4096]]) + db:exec([[PRAGMA journal_mode = wal]]) + db:exec([[PRAGMA journal_size_limit = 1073741824]]) return db end @@ -72,6 +76,17 @@ end -- 后台任务 function _M.cron(self) local timer_every_get_data = function (premature) + -- self:D( json.encode (sites) ) + + local dbs = {} + for i,v in ipairs(sites) do + dbs[v["name"]] = self:initDB(v["name"]) + end + dbs["unset"] = self:initDB("unset") + + + -- self:D( tostring (dbs["t1.cn"]) ) + local cron_key = 'cron_every_1s' if self:is_working(cron_key) then return true @@ -86,32 +101,50 @@ function _M.cron(self) if data then local info = json.decode(data) local input_sn = info['server_name'] + local db = dbs[input_sn] if self:is_working(input_sn) then ngx.shared.mw_total:rpush(total_key, data) os.execute("sleep " .. 0.6) return true end - self:store_logs(info) + self:store_logs(db, info) end end - local tmp_log = "{$SERVER_APP}/logs/tmp_" .. tostring( ngx.now() ) .. ".log" - -- 空闲空间小于10M,立即存盘 - -- local capacity_bytes = ngx.shared.mw_total:free_space() - -- self:D("capacity_bytes:"..capacity_bytes) - os.execute("sleep " .. 1) - local nlen = llen - 100 - for i = 1,llen do - local data = "" - local tmp, _ = ngx.shared.mw_total:lpop(total_key) - if tmp then - data = data .. tmp .. "\n" + + for i,v in ipairs(sites) do + local db = dbs[v["name"]] + local res, err = db:execute([[COMMIT]]) + if db and db:isopen() then + db:close() end + end - if data ~= "" then - self:write_file(tmp_log, data, "ab") - end + local db = dbs["unset"] + local res, err = db:execute([[COMMIT]]) + if db and db:isopen() then + db:close() end + + + + -- local tmp_log = "{$SERVER_APP}/logs/tmp_" .. tostring( ngx.now() ) .. ".log" + -- -- 空闲空间小于10M,立即存盘 + -- -- local capacity_bytes = ngx.shared.mw_total:free_space() + -- -- self:D("capacity_bytes:"..capacity_bytes) + -- os.execute("sleep " .. 1) + -- local nlen = llen - 100 + -- for i = 1,llen do + -- local data = "" + -- local tmp, _ = ngx.shared.mw_total:lpop(total_key) + -- if tmp then + -- data = data .. tmp .. "\n" + -- end + + -- if data ~= "" then + -- self:write_file(tmp_log, data, "ab") + -- end + -- end self:unlock_working(cron_key) @@ -189,8 +222,7 @@ function _M.store_logs_line(self, db, stmt, input_sn, info) local res, err = stmt:step() if tostring(res) == "5" then - self:D("step res:"..tostring(res)) - self:D("step err:"..tostring(err)) + self:D("step res:"..tostring(res) ..",step err:"..tostring(err)) self:D("the step database connection is busy, so it will be stored later.") return false end @@ -250,7 +282,7 @@ function _M.update_stat(self,db, stat_table, key, columns) status, errorString = db:exec(update_sql) end -function _M.store_logs(self,info) +function _M.store_logs(self, db, info) local input_sn = info["server_name"] -- 迁移合并时不执行 @@ -264,7 +296,6 @@ function _M.store_logs(self,info) end self:lock_working(input_sn) - local db = self:initDB(input_sn) local stmt2 = nil if db ~= nil then @@ -283,10 +314,6 @@ function _M.store_logs(self,info) return true end - db:exec([[PRAGMA synchronous = 0]]) - db:exec([[PRAGMA page_size = 4096]]) - db:exec([[PRAGMA journal_mode = wal]]) - db:exec([[PRAGMA journal_size_limit = 1073741824]]) status, errorString = db:exec([[BEGIN TRANSACTION]]) @@ -316,11 +343,6 @@ function _M.store_logs(self,info) -- delete expire data db:exec("DELETE FROM web_logs WHERE time<"..tostring(save_date_timestamp)) - - local res, err = db:execute([[COMMIT]]) - if db and db:isopen() then - db:close() - end self:unlock_working(input_sn) end