|
|
|
@ -47,6 +47,10 @@ function _M.initDB(self, input_sn) |
|
|
|
|
local path = log_dir .. '/' .. input_sn .. "/logs.db" |
|
|
|
|
db, _ = sqlite3.open(path) |
|
|
|
|
|
|
|
|
|
db:exec([[PRAGMA synchronous = 0]]) |
|
|
|
|
db:exec([[PRAGMA page_size = 4096]]) |
|
|
|
|
db:exec([[PRAGMA journal_mode = wal]]) |
|
|
|
|
db:exec([[PRAGMA journal_size_limit = 1073741824]]) |
|
|
|
|
return db |
|
|
|
|
end |
|
|
|
|
|
|
|
|
@ -72,6 +76,17 @@ end |
|
|
|
|
-- 后台任务 |
|
|
|
|
function _M.cron(self) |
|
|
|
|
local timer_every_get_data = function (premature) |
|
|
|
|
-- self:D( json.encode (sites) ) |
|
|
|
|
|
|
|
|
|
local dbs = {} |
|
|
|
|
for i,v in ipairs(sites) do |
|
|
|
|
dbs[v["name"]] = self:initDB(v["name"]) |
|
|
|
|
end |
|
|
|
|
dbs["unset"] = self:initDB("unset") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- self:D( tostring (dbs["t1.cn"]) ) |
|
|
|
|
|
|
|
|
|
local cron_key = 'cron_every_1s' |
|
|
|
|
if self:is_working(cron_key) then |
|
|
|
|
return true |
|
|
|
@ -86,32 +101,50 @@ function _M.cron(self) |
|
|
|
|
if data then |
|
|
|
|
local info = json.decode(data) |
|
|
|
|
local input_sn = info['server_name'] |
|
|
|
|
local db = dbs[input_sn] |
|
|
|
|
if self:is_working(input_sn) then |
|
|
|
|
ngx.shared.mw_total:rpush(total_key, data) |
|
|
|
|
os.execute("sleep " .. 0.6) |
|
|
|
|
return true |
|
|
|
|
end |
|
|
|
|
self:store_logs(info) |
|
|
|
|
self:store_logs(db, info) |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
local tmp_log = "{$SERVER_APP}/logs/tmp_" .. tostring( ngx.now() ) .. ".log" |
|
|
|
|
-- 空闲空间小于10M,立即存盘 |
|
|
|
|
-- local capacity_bytes = ngx.shared.mw_total:free_space() |
|
|
|
|
-- self:D("capacity_bytes:"..capacity_bytes) |
|
|
|
|
os.execute("sleep " .. 1) |
|
|
|
|
local nlen = llen - 100 |
|
|
|
|
for i = 1,llen do |
|
|
|
|
local data = "" |
|
|
|
|
local tmp, _ = ngx.shared.mw_total:lpop(total_key) |
|
|
|
|
if tmp then |
|
|
|
|
data = data .. tmp .. "\n" |
|
|
|
|
|
|
|
|
|
for i,v in ipairs(sites) do |
|
|
|
|
local db = dbs[v["name"]] |
|
|
|
|
local res, err = db:execute([[COMMIT]]) |
|
|
|
|
if db and db:isopen() then |
|
|
|
|
db:close() |
|
|
|
|
end |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
if data ~= "" then |
|
|
|
|
self:write_file(tmp_log, data, "ab") |
|
|
|
|
end |
|
|
|
|
local db = dbs["unset"] |
|
|
|
|
local res, err = db:execute([[COMMIT]]) |
|
|
|
|
if db and db:isopen() then |
|
|
|
|
db:close() |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- local tmp_log = "{$SERVER_APP}/logs/tmp_" .. tostring( ngx.now() ) .. ".log" |
|
|
|
|
-- -- 空闲空间小于10M,立即存盘 |
|
|
|
|
-- -- local capacity_bytes = ngx.shared.mw_total:free_space() |
|
|
|
|
-- -- self:D("capacity_bytes:"..capacity_bytes) |
|
|
|
|
-- os.execute("sleep " .. 1) |
|
|
|
|
-- local nlen = llen - 100 |
|
|
|
|
-- for i = 1,llen do |
|
|
|
|
-- local data = "" |
|
|
|
|
-- local tmp, _ = ngx.shared.mw_total:lpop(total_key) |
|
|
|
|
-- if tmp then |
|
|
|
|
-- data = data .. tmp .. "\n" |
|
|
|
|
-- end |
|
|
|
|
|
|
|
|
|
-- if data ~= "" then |
|
|
|
|
-- self:write_file(tmp_log, data, "ab") |
|
|
|
|
-- end |
|
|
|
|
-- end |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self:unlock_working(cron_key) |
|
|
|
@ -189,8 +222,7 @@ function _M.store_logs_line(self, db, stmt, input_sn, info) |
|
|
|
|
|
|
|
|
|
local res, err = stmt:step() |
|
|
|
|
if tostring(res) == "5" then |
|
|
|
|
self:D("step res:"..tostring(res)) |
|
|
|
|
self:D("step err:"..tostring(err)) |
|
|
|
|
self:D("step res:"..tostring(res) ..",step err:"..tostring(err)) |
|
|
|
|
self:D("the step database connection is busy, so it will be stored later.") |
|
|
|
|
return false |
|
|
|
|
end |
|
|
|
@ -250,7 +282,7 @@ function _M.update_stat(self,db, stat_table, key, columns) |
|
|
|
|
status, errorString = db:exec(update_sql) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
function _M.store_logs(self,info) |
|
|
|
|
function _M.store_logs(self, db, info) |
|
|
|
|
local input_sn = info["server_name"] |
|
|
|
|
|
|
|
|
|
-- 迁移合并时不执行 |
|
|
|
@ -264,7 +296,6 @@ function _M.store_logs(self,info) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
self:lock_working(input_sn) |
|
|
|
|
local db = self:initDB(input_sn) |
|
|
|
|
|
|
|
|
|
local stmt2 = nil |
|
|
|
|
if db ~= nil then |
|
|
|
@ -283,10 +314,6 @@ function _M.store_logs(self,info) |
|
|
|
|
return true |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|
db:exec([[PRAGMA synchronous = 0]]) |
|
|
|
|
db:exec([[PRAGMA page_size = 4096]]) |
|
|
|
|
db:exec([[PRAGMA journal_mode = wal]]) |
|
|
|
|
db:exec([[PRAGMA journal_size_limit = 1073741824]]) |
|
|
|
|
|
|
|
|
|
status, errorString = db:exec([[BEGIN TRANSACTION]]) |
|
|
|
|
|
|
|
|
@ -316,11 +343,6 @@ function _M.store_logs(self,info) |
|
|
|
|
-- delete expire data |
|
|
|
|
db:exec("DELETE FROM web_logs WHERE time<"..tostring(save_date_timestamp)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
local res, err = db:execute([[COMMIT]]) |
|
|
|
|
if db and db:isopen() then |
|
|
|
|
db:close() |
|
|
|
|
end |
|
|
|
|
self:unlock_working(input_sn) |
|
|
|
|
end |
|
|
|
|
|
|
|
|
|