@ -230,6 +230,8 @@ function _M.cron(self)
local dbs = { }
local stmts = { }
local stat_fields = { }
local ip_stats = { }
local url_stats = { }
local time_key = self : get_store_key ( )
local time_key_next = self : get_store_key_with_time ( ngx.time ( ) + 3600 )
@ -285,36 +287,87 @@ function _M.cron(self)
-- 每秒100条
for i = 1 , llen do
local data , _ = ngx.shared . mw_total : lpop ( total_key )
if data then
local info = json.decode ( data )
local input_sn = info [ ' server_name ' ]
local db = dbs [ input_sn ]
local stat_fields_is = stat_fields [ input_sn ]
if db then
if not data then
break
end
self : store_logs_line ( db , stmts [ input_sn ] [ " web_logs " ] , input_sn , info )
local info = json.decode ( data )
local input_sn = info [ ' server_name ' ]
local db = dbs [ input_sn ]
local stat_fields_is = stat_fields [ input_sn ]
if not db then
break
end
local stat_tmp_fields = info [ ' stat_fields ' ]
self : store_logs_line ( db , stmts [ input_sn ] [ " web_logs " ] , input_sn , info )
-- 合并统计数据
for stf_k , stf_v in pairs ( stat_tmp_fields ) do
local excluded = info [ " log_kv " ] [ ' excluded ' ]
local stat_tmp_fields = info [ ' stat_fields ' ]
if not stat_fields_is [ stf_k ] then
stat_fields_is [ stf_k ] = { }
end
for sv_k , sv_v in pairs ( stf_v ) do
if not stat_fields_is [ stf_k ] [ sv_k ] then
stat_fields_is [ stf_k ] [ sv_k ] = sv_v
else
stat_fields_is [ stf_k ] [ sv_k ] = stat_fields_is [ stf_k ] [ sv_k ] + 1
end
end
-- 合并统计数据
for stf_k , stf_v in pairs ( stat_tmp_fields ) do
-- 排除请求过滤
if excluded then
if stf_k == " spider_stat_fields " or stf_k == " client_stat_fields " then
break
end
end
if not stat_fields_is [ stf_k ] then
stat_fields_is [ stf_k ] = { }
end
for sv_k , sv_v in pairs ( stf_v ) do
if not stat_fields_is [ stf_k ] [ sv_k ] then
stat_fields_is [ stf_k ] [ sv_k ] = sv_v
else
stat_fields_is [ stf_k ] [ sv_k ] = stat_fields_is [ stf_k ] [ sv_k ] + 1
end
end
end
stat_fields [ input_sn ] = stat_fields_is
-- ip 统计合并
-- url 统计
if not excluded then
local ip = info [ " log_kv " ] [ ' ip ' ]
local body_length = info [ " log_kv " ] [ " body_length " ]
if not ip_stats [ input_sn ] then
ip_stats [ input_sn ] = { }
end
stat_fields [ input_sn ] = stat_fields_is
if not ip_stats [ input_sn ] [ ip ] then
local tmp = {
ip_num = 1 ,
body_length = body_length
}
ip_stats [ input_sn ] [ ip ] = tmp
else
ip_stats [ input_sn ] [ ip ] [ " ip_num " ] = ip_stats [ input_sn ] [ ip ] [ " ip_num " ] + 1
ip_stats [ input_sn ] [ ip ] [ " body_length " ] = ip_stats [ input_sn ] [ ip ] [ " body_length " ] + body_length
end
-- uri统计
if not url_stats [ input_sn ] then
url_stats [ input_sn ] = { }
end
local request_uri = info [ " log_kv " ] [ " request_uri " ]
local request_uri_md5 = ngx.md5 ( request_uri )
if not url_stats [ input_sn ] [ request_uri_md5 ] then
local tmp = {
url_num = 1 ,
uri = request_uri ,
body_length = body_length
}
url_stats [ input_sn ] [ request_uri_md5 ] = tmp
else
url_stats [ input_sn ] [ request_uri_md5 ] [ " url_num " ] = url_stats [ input_sn ] [ request_uri_md5 ] [ " url_num " ] + 1
url_stats [ input_sn ] [ request_uri_md5 ] [ " body_length " ] = url_stats [ input_sn ] [ request_uri_md5 ] [ " body_length " ] + body_length
end
end
-- self:D("url_stats:.."..json.encode(url_stats))
end
for site_k , site_v in ipairs ( sites ) do
@ -333,35 +386,52 @@ function _M.cron(self)
local stat_fields_is = stat_fields [ input_sn ]
local db = dbs [ input_sn ]
local local_ip_stats = ip_stats [ input_sn ]
local local_url_stats = url_stats [ input_sn ]
-- 统计【spider_stat,client_stat,request_stat】
for sti_k , sti_v in pairs ( stat_fields_is ) do
local vkk = " "
for sv_k , sv_v in pairs ( sti_v ) do
vkk = vkk .. sv_k .. " = " .. sv_k .. " + " .. sv_v .. " , "
end
if vkk ~= " " then
vkk = string.sub ( vkk , 1 , string.len ( vkk ) - 1 )
end
if sti_k == ' request_stat_fields ' and vkk ~= ' ' then
self : update_stat ( db , " request_stat " , time_key , vkk )
if db then
-- 统计【spider_stat,client_stat,request_stat】
for sti_k , sti_v in pairs ( stat_fields_is ) do
local vkk = " "
for sv_k , sv_v in pairs ( sti_v ) do
vkk = vkk .. sv_k .. " = " .. sv_k .. " + " .. sv_v .. " , "
end
if vkk ~= " " then
vkk = string.sub ( vkk , 1 , string.len ( vkk ) - 1 )
end
if sti_k == ' request_stat_fields ' and vkk ~= ' ' then
self : update_stat ( db , " request_stat " , time_key , vkk )
end
if sti_k == ' client_stat_fields ' and vkk ~= ' ' then
self : update_stat ( db , " client_stat " , time_key , vkk )
end
if sti_k == ' spider_stat_fields ' and vkk ~= ' ' then
self : update_stat ( db , " spider_stat " , time_key , vkk )
end
end
if sti_k == ' client_stat_fields ' and vkk ~= ' ' then
self : update_stat ( db , " client_stat " , time_key , vkk )
-- ip统计
if local_ip_stats then
for ip_addr , ip_val in pairs ( local_ip_stats ) do
self : update_statistics_ip ( db , ip_addr , ip_val [ " ip_num " ] , ip_val [ " body_length " ] )
end
end
if sti_k == ' spider_stat_fields ' and vkk ~= ' ' then
self : update_stat ( db , " spider_stat " , time_key , vkk )
-- url统计
if local_url_stats then
for url_md5 , url_val in pairs ( local_url_stats ) do
self : update_statistics_uri ( db , url_val [ " uri " ] , url_md5 , url_val [ " url_num " ] , url_val [ " body_length " ] )
end
end
end
if db then
-- delete expire data
local now_date = os.date ( " *t " )
local save_day = config [ ' global ' ] [ " save_day " ]
local save_date_timestamp = os.time { year = now_date.year ,
month = now_date.month , day = now_date.day - save_day , hour = 0 }
-- delete expire data
db : exec ( " DELETE FROM web_logs WHERE time< " .. tostring ( save_date_timestamp ) )
end
@ -404,33 +474,10 @@ function _M.store_logs_line(self, db, stmt, input_sn, info)
local request_headers = logline [ " request_headers " ]
local excluded = logline [ " excluded " ]
local request_stat_fields = nil
local client_stat_fields = nil
local spider_stat_fields = nil
-- local stat_fields = info['stat_fields']
-- if stat_fields == nil then
-- -- D("Log stat fields is nil.")
-- -- D("Logdata:"..logvalue)
-- else
-- stat_fields = self:split(stat_fields, ";")
-- request_stat_fields = stat_fields[1]
-- client_stat_fields = stat_fields[2]
-- spider_stat_fields = stat_fields[3]
-- if "x" == client_stat_fields then
-- client_stat_fields = nil
-- end
-- if "x" == spider_stat_fields then
-- spider_stat_fields = nil
-- end
-- end
local time_key = logline [ " time_key " ]
if not excluded then
-- self:D("store_logs_line:"..input_sn..":"..tostring(db)..":"..tostring(stmt))
stmt : bind_names {
time = time ,
ip = ip ,
@ -457,20 +504,8 @@ function _M.store_logs_line(self, db, stmt, input_sn, info)
return false
end
stmt : reset ( )
-- local res ,err = self:update_stat( db, "client_stat", time_key, client_stat_fields)
-- -- self:D("step res:"..tostring(res) ..",step err:"..tostring(err))
-- local res ,err = self:update_stat( db, "spider_stat", time_key, spider_stat_fields)
-- -- self:D("step res:"..tostring(res) ..",step err:"..tostring(err))
-- -- self:D("stat ok"..)
-- -- only count non spider requests
-- local ok, err = self:statistics_uri(db, request_uri, ngx.md5(request_uri), body_length)
-- local ok, err = self:statistics_ip(db, ip, body_length)
-- -- self:D("stat url ip ok")
end
-- self:update_stat( db, "request_stat", time_key, request_stat_fields)
return true
end
@ -519,6 +554,20 @@ end
--------------------- db start ---------------------------
function _M . update_statistics_uri ( self , db , uri , uri_md5 , day_num , body_length )
-- count the number of URI requests and traffic
local open_statistics_uri = config [ ' global ' ] [ " statistics_uri " ]
if not open_statistics_uri then return true end
local stat_sql = nil
stat_sql = " INSERT INTO uri_stat(uri_md5,uri) SELECT \" " .. uri_md5 .. " \" , \" " .. uri .. " \" WHERE NOT EXISTS (SELECT uri_md5 FROM uri_stat WHERE uri_md5= \" " .. uri_md5 .. " \" ); "
local res , err = db : exec ( stat_sql )
stat_sql = " UPDATE uri_stat SET " .. day_column .. " = " .. day_column .. " + " .. day_num .. " , " .. flow_column .. " = " .. flow_column .. " + " .. body_length .. " WHERE uri_md5= \" " .. uri_md5 .. " \" ; "
local res , err = db : exec ( stat_sql )
return true
end
function _M . statistics_uri ( self , db , uri , uri_md5 , body_length )
-- count the number of URI requests and traffic
local open_statistics_uri = config [ ' global ' ] [ " statistics_uri " ]
@ -533,8 +582,22 @@ function _M.statistics_uri(self, db, uri, uri_md5, body_length)
return true
end
function _M . update_statistics_ip ( self , db , ip , day_num , body_length )
local open_statistics_ip = config [ ' global ' ] [ " statistics_ip " ]
if not open_statistics_ip then return true end
local stat_sql = nil
stat_sql = " INSERT INTO ip_stat(ip) SELECT \" " .. ip .. " \" WHERE NOT EXISTS (SELECT ip FROM ip_stat WHERE ip= \" " .. ip .. " \" ); "
local res , err = db : exec ( stat_sql )
stat_sql = " UPDATE ip_stat SET " .. day_column .. " = " .. day_column .. " + " .. day_num .. " , " .. flow_column .. " = " .. flow_column .. " + " .. body_length .. " WHERE ip= \" " .. ip .. " \" "
local res , err = db : exec ( stat_sql )
return true
end
function _M . statistics_ip ( self , db , ip , body_length )
local open_statistics_ip = config [ ' global ' ] [ " statistics_ip " ]
local open_statistics_ip = config [ ' global ' ] [ " statistics_ip " ]
if not open_statistics_ip then return true end
local stat_sql = nil