local setmetatable = setmetatable local _M = { _VERSION = '1.0' } local mt = { __index = _M } local json = require "cjson" local sqlite3 = require "lsqlite3" local config = require "webstats_config" local sites = require "webstats_sites" local debug_mode = true local total_key = "log_kv_total" local cache = ngx.shared.mw_total local today = ngx.re.gsub(ngx.today(),'-','') local day = os.date("%d") local number_day = tonumber(day) local day_column = "day"..number_day local flow_column = "flow"..number_day local spider_column = "spider_flow"..number_day local log_dir = "{$SERVER_APP}/logs" function _M.new(self) local self = { total_key = total_key, params = nil, site_config = nil, config = nil, } -- self.dbs = {} return setmetatable(self, mt) end function _M.getInstance(self) if rawget(self, "instance") == nil then rawset(self, "instance", self.new()) self:cron() end assert(self.instance ~= nil) return self.instance end function _M.initDB(self, input_sn) local path = log_dir .. '/' .. input_sn .. "/logs.db" db, _ = sqlite3.open(path) return db end function _M.getTotalKey(self) return self.total_key end function _M.setConfData( self, config, site_config ) self.config = config self.site_config = site_config end function _M.setParams( self, params ) self.params = params end function _M.split(self, str, reps) local arr = {} string.gsub(str,'[^'..reps..']+',function(w) table.insert(arr,w) end) return arr end -- 后台任务 function _M.cron(self) local timer_every_get_data = function (premature) local cron_key = 'cron_every_1s' if self:is_working(cron_key) then return true end self:lock_working(cron_key) local llen, _ = ngx.shared.mw_total:llen(total_key) -- 每秒100条 for i=1,100 do local data, _ = ngx.shared.mw_total:lpop(total_key) if data then local info = json.decode(data) local input_sn = info['server_name'] if self:is_working(input_sn) then ngx.shared.mw_total:rpush(total_key, data) os.execute("sleep " .. 0.6) return true end self:store_logs(info) end end local tmp_log = "{$SERVER_APP}/logs/tmp_"+ngx.now()+".log" -- 空闲空间小于10M,立即存盘 -- local capacity_bytes = ngx.shared.mw_total:free_space() -- self:D("capacity_bytes:"..capacity_bytes) os.execute("sleep " .. 1) local nlen = llen - 100 for i = 1,llen do local data = "" local tmp, _ = ngx.shared.mw_total:lpop(total_key) if tmp then data = data .. tmp .. "\n" end if data ~= "" then self:write_file(tmp_log, data, "ab") end end self:unlock_working(cron_key) end ngx.timer.every(1, timer_every_get_data) end function _M.store_logs_line(self, db, stmt, input_sn, info) local logline = info['log_kv'] local time = logline["time"] local id = logline["id"] local protocol = logline["protocol"] local client_port = logline["client_port"] local status_code = logline["status_code"] local uri = logline["uri"] local request_uri = logline["request_uri"] local method = logline["method"] local body_length = logline["body_length"] local referer = logline["referer"] local ip = logline["ip"] local ip_list = logline["ip_list"] local request_time = logline["request_time"] local is_spider = logline["is_spider"] local domain = logline["domain"] local server_name = logline["server_name"] local user_agent = logline["user_agent"] local request_headers = logline["request_headers"] local excluded = logline["excluded"] local request_stat_fields = nil local client_stat_fields = nil local spider_stat_fields = nil local stat_fields = info['stat_fields'] if stat_fields == nil then -- D("Log stat fields is nil.") -- D("Logdata:"..logvalue) else stat_fields = self:split(stat_fields, ";") request_stat_fields = stat_fields[1] client_stat_fields = stat_fields[2] spider_stat_fields = stat_fields[3] if "x" == client_stat_fields then client_stat_fields = nil end if "x" == spider_stat_fields then spider_stat_fields = nil end end local time_key = logline["time_key"] if not excluded then stmt:bind_names { time=time, ip=ip, domain=domain, server_name=server_name, method=method, status_code=status_code, uri=request_uri, body_length=body_length, referer=referer, user_agent=user_agent, protocol=protocol, request_time=request_time, is_spider=is_spider, request_headers=request_headers, ip_list=ip_list, client_port=client_port, } local res, err = stmt:step() if tostring(res) == "5" then self:D("step res:"..tostring(res)) self:D("step err:"..tostring(err)) self:D("the step database connection is busy, so it will be stored later.") return false end stmt:reset() self:update_stat( db, "client_stat", time_key, client_stat_fields) self:update_stat( db, "spider_stat", time_key, spider_stat_fields) -- self:D("stat ok") -- only count non spider requests local ok, err = self:statistics_uri(db, request_uri, ngx.md5(request_uri), body_length) local ok, err = self:statistics_ip(db, ip, body_length) -- self:D("stat url ip ok") end self:update_stat( db, "request_stat", time_key, request_stat_fields) return true end function _M.statistics_uri(self, db, uri, uri_md5, body_length) -- count the number of URI requests and traffic local open_statistics_uri = config['global']["statistics_uri"] if not open_statistics_uri then return true end local stat_sql = nil stat_sql = "INSERT INTO uri_stat(uri_md5,uri) SELECT \""..uri_md5.."\",\""..uri.."\" WHERE NOT EXISTS (SELECT uri_md5 FROM uri_stat WHERE uri_md5=\""..uri_md5.."\");" local res, err = db:exec(stat_sql) stat_sql = "UPDATE uri_stat SET "..day_column.."="..day_column.."+1,"..flow_column.."="..flow_column.."+"..body_length.." WHERE uri_md5=\""..uri_md5.."\"" local res, err = db:exec(stat_sql) return true end function _M.statistics_ip(self, db, ip, body_length) local open_statistics_ip = config['global']["statistics_ip"] if not open_statistics_ip then return true end local stat_sql = nil stat_sql = "INSERT INTO ip_stat(ip) SELECT \""..ip.."\" WHERE NOT EXISTS (SELECT ip FROM ip_stat WHERE ip=\""..ip.."\");" local res, err = db:exec(stat_sql) stat_sql = "UPDATE ip_stat SET "..day_column.."="..day_column.."+1,"..flow_column.."="..flow_column.."+"..body_length.." WHERE ip=\""..ip.."\"" local res, err = db:exec(stat_sql) return true end function _M.update_stat(self,db, stat_table, key, columns) -- 根据指定表名,更新统计数据 if not columns then return end local stmt = db:prepare(string.format("INSERT INTO %s(time) SELECT :time WHERE NOT EXISTS(SELECT time FROM %s WHERE time=:time);", stat_table, stat_table)) stmt:bind_names{time=key} local res, err = stmt:step() stmt:finalize() local update_sql = "UPDATE ".. stat_table .. " SET " .. columns update_sql = update_sql .. " WHERE time=" .. key status, errorString = db:exec(update_sql) end function _M.store_logs(self,info) local input_sn = info["server_name"] -- 迁移合并时不执行 if self:is_migrating(input_sn) == true then return end if self:is_working(input_sn) then return true end self:lock_working(input_sn) local db = self:initDB(input_sn) local stmt2 = nil if db ~= nil then stmt2 = db:prepare[[INSERT INTO web_logs( time, ip, domain, server_name, method, status_code, uri, body_length, referer, user_agent, protocol, request_time, is_spider, request_headers, ip_list, client_port) VALUES(:time, :ip, :domain, :server_name, :method, :status_code, :uri, :body_length, :referer, :user_agent, :protocol, :request_time, :is_spider, :request_headers, :ip_list, :client_port)]] end if db == nil or stmt2 == nil then if db and db:isopen() then db:close() end return true end db:exec([[PRAGMA synchronous = 0]]) db:exec([[PRAGMA page_size = 4096]]) db:exec([[PRAGMA journal_mode = wal]]) db:exec([[PRAGMA journal_size_limit = 1073741824]]) status, errorString = db:exec([[BEGIN TRANSACTION]]) local update_day = self:load_update_day(input_sn) if not update_day or update_day ~= today then local update_sql = "UPDATE uri_stat SET "..day_column.."=0,"..flow_column.."=0" status, errorString = db:exec(update_sql) update_sql = "UPDATE ip_stat SET "..day_column.."=0,"..flow_column.."=0" status, errorString = db:exec(update_sql) self:write_update_day(input_sn) end self:store_logs_line(db, stmt2, input_sn, info) local res, err = stmt2:finalize() if tostring(res) == "5" then -- self:D("Finalize res:"..tostring(res)) -- self:D("Finalize err:"..tostring(err)) end local now_date = os.date("*t") local save_day = config['global']["save_day"] local save_date_timestamp = os.time{year=now_date.year, month=now_date.month, day=now_date.day-save_day, hour=0} -- delete expire data db:exec("DELETE FROM web_logs WHERE time<"..tostring(save_date_timestamp)) local res, err = db:execute([[COMMIT]]) if db and db:isopen() then db:close() end self:unlock_working(input_sn) end -- debug func function _M.D(self,msg) if not debug_mode then return true end local fp = io.open('{$SERVER_APP}/debug.log', 'ab') if fp == nil then return nil end local localtime = os.date("%Y-%m-%d %H:%M:%S") if server_name then fp:write(tostring(msg) .. "\n") else fp:write(localtime..":"..tostring(msg) .. "\n") end fp:flush() fp:close() return true end function _M.is_migrating(self,input_sn) local file = io.open("{$SERVER_APP}/migrating", "rb") if file then return true end local file = io.open("{$SERVER_APP}/logs/"..input_sn.."/migrating", "rb") if file then return true end return false end function _M.is_working(self,sign) local work_status = cache:get(sign.."_working") if work_status ~= nil and work_status == true then return true end return false end function _M.lock_working(self, sign) local working_key = sign.."_working" cache:set(working_key, true, 60) end function _M.unlock_working(self, sign) local working_key = sign.."_working" cache:set(working_key, false) end function _M.write_file(self, filename, body, mode) local fp = io.open(filename, mode) if fp == nil then return nil end fp:write(body) fp:flush() fp:close() return true end function _M.read_file_body(self, filename) local fp = io.open(filename,'rb') if not fp then return nil end fbody = fp:read("*a") fp:close() if fbody == '' then return nil end return fbody end function _M.load_update_day(self, input_sn) local _file = "{$SERVER_APP}/logs/"..input_sn.."/update_day.log" return self:read_file_body(_file) end function _M.write_update_day(self, input_sn) local _file = "{$SERVER_APP}/logs/"..input_sn.."/update_day.log" return self:write_file(_file, today, "w") end function _M.lpop(self) local cache = ngx.shared.mw_total return cache:lpop(total_key) end function _M.rpop(self) local cache = ngx.shared.mw_total return cache:rpop(total_key) end return _M