diff --git a/plugins/webstats/lua/webstats_common.lua b/plugins/webstats/lua/webstats_common.lua index ee1a7f52d..a9d4f6a9a 100644 --- a/plugins/webstats/lua/webstats_common.lua +++ b/plugins/webstats/lua/webstats_common.lua @@ -46,7 +46,7 @@ end function _M.getInstance(self) if rawget(self, "instance") == nil then rawset(self, "instance", self.new()) - -- self:cron() + self:cron() end assert(self.instance ~= nil) return self.instance @@ -210,28 +210,26 @@ end -- 后台任务 function _M.cron(self) local timer_every_get_data = function (premature) - -- self:D( json.encode (sites) ) + + local llen, _ = ngx.shared.mw_total:llen(total_key) + if llen < 1 then + return true + end local dbs = {} for i,v in ipairs(sites) do local input_sn = v["name"] + -- 迁移合并时不执行 + if self:is_migrating(input_sn) then + return true + end + local db = self:initDB(input_sn) if db then dbs[input_sn] = db - - local update_day = self:load_update_day(input_sn) - if not update_day or update_day ~= today then - - local update_sql = "UPDATE uri_stat SET "..day_column.."=0,"..flow_column.."=0" - status, errorString = db:exec(update_sql) - - update_sql = "UPDATE ip_stat SET "..day_column.."=0,"..flow_column.."=0" - status, errorString = db:exec(update_sql) - self:write_update_day(input_sn) - end - + self:clean_stats(db,input_sn) db:exec([[BEGIN TRANSACTION]]) end end @@ -244,25 +242,19 @@ function _M.cron(self) end self:lock_working(cron_key) - - local llen, _ = ngx.shared.mw_total:llen(total_key) + -- 每秒100条 for i=1,llen do local data, _ = ngx.shared.mw_total:lpop(total_key) if data then local info = json.decode(data) local input_sn = info['server_name'] - -- 迁移合并时不执行 - if self:is_migrating(input_sn) then - return true - end - local db = dbs[input_sn] if db then if self:is_working(input_sn) then ngx.shared.mw_total:rpush(total_key, data) - os.execute("sleep " .. 0.6) + self:unlock_working(cron_key) return true end @@ -278,7 +270,6 @@ function _M.cron(self) end end - for _, local_db in pairs(dbs) do if local_db then @@ -318,7 +309,7 @@ function _M.cron(self) self:unlock_working(cron_key) end - ngx.timer.every(3, timer_every_get_data) + ngx.timer.every(1, timer_every_get_data) end function _M.store_logs(self, db, stmt2, info) diff --git a/plugins/webstats/lua/webstats_log.lua b/plugins/webstats/lua/webstats_log.lua index ca35036a6..36d80fcd1 100644 --- a/plugins/webstats/lua/webstats_log.lua +++ b/plugins/webstats/lua/webstats_log.lua @@ -14,24 +14,11 @@ log_by_lua_block { local __C = require "webstats_common" local C = __C:getInstance() - -- local redis = require "resty.redis" - -- local red = redis:new() - - -- local ok, err = red:connect("127.0.0.1", 6379) - -- if not ok then - -- ngx.say("failed to connect: ", err) - -- return - -- end - - -- red:auth("admin") - - -- cache start --- local cache = ngx.shared.mw_total local function cache_set(server_name, id ,key, val) local line_kv = "log_kv_"..server_name..'_'..id.."_"..key -- cache:set(line_kv, val) - cache:set(line_kv, val) end @@ -110,7 +97,6 @@ log_by_lua_block { if site_exclude_ip then for i, _ip in pairs(site_exclude_ip) do - -- D("set exclude ip: ".._ip) cache:set(input_server_name .. "_exclude_ip_".._ip, true) end end @@ -149,7 +135,7 @@ log_by_lua_block { if not auto_config['exclude_url'] then return false end local the_uri = string.sub(ngx.var.request_uri, 2) local url_conf = auto_config["exclude_url"] - for i,conf in pairs(url_conf) + for i,conf in ipairs(url_conf) do local mode = conf["mode"] local url = conf["url"] @@ -320,26 +306,22 @@ log_by_lua_block { local stat_fields = request_stat_fields..";"..client_stat_fields..";"..spider_stat_fields - -- local data = { - -- server_name = server_name, - -- stat_fields = stat_fields, - -- log_kv = kv, - -- } + local data = { + server_name = server_name, + stat_fields = stat_fields, + log_kv = kv, + } - -- local push_data = json.encode(data) - -- local key = C:getTotalKey() - -- ngx.shared.mw_total:rpush(key, push_data) + local push_data = json.encode(data) + local key = C:getTotalKey() + ngx.shared.mw_total:rpush(key, push_data) -- C:D("stat_fields:"..stat_fields) -- C:D("log_kv:"..json.encode(kv)) - cache_set(server_name, new_id, "stat_fields", stat_fields) - cache_set(server_name, new_id, "log_kv", json.encode(kv)) + -- cache_set(server_name, new_id, "stat_fields", stat_fields) + -- cache_set(server_name, new_id, "log_kv", json.encode(kv)) - -- for i,v in pairs(kv) do - -- cache_set(server_name, new_id, tostring(i), tostring(v)) - -- C:D("kv:"..tostring(i)..":"..tostring(v)) - -- end end @@ -536,8 +518,7 @@ log_by_lua_block { load_exclude_ip(server_name) cache_logs(server_name) - store_logs(server_name) - + -- store_logs(server_name) -- D("------------ debug end -------------") end