Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mdserver-web/plugins/score/score_main.py

340 lines
12 KiB

7 years ago
#coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板 x3
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2016 宝塔软件(http://bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: 黄文良 <2879625666@qq.com>
# +-------------------------------------------------------------------
#+--------------------------------------------------------------------
#| 服务器测试
#+--------------------------------------------------------------------
import time,psutil,random,os,urllib,binascii,json,public,re;
class score_main:
__APIURL = 'https://www.bt.cn/api/Auth';
__UPATH = 'data/userInfo.json';
__userInfo = None;
__PDATA = None;
def CheckToken(self):
pdata = {}
data = {}
if os.path.exists(self.__UPATH):
self.__userInfo = json.loads(public.readFile(self.__UPATH));
if self.__userInfo:
pdata['access_key'] = self.__userInfo['access_key'];
data['secret_key'] = self.__userInfo['secret_key'];
else:
pdata['access_key'] = 'test';
data['secret_key'] = '123456';
pdata['data'] = data;
self.__PDATA = pdata;
#修正信息
def SetScore(self,get):
if hasattr(get,'ips'):
return self.SubmitSetScore('ips',get.ips);
if hasattr(get,'virt'):
return self.SubmitSetScore('virt',get.virt);
#发送信息修正
def SubmitSetScore(self,key,value):
self.CheckToken();
self.__PDATA['data'][key] = value
self.__PDATA['data'] = self.De_Code(self.__PDATA['data']);
result = json.loads(public.httpPost(self.__APIURL+'/SetSocre',self.__PDATA));
result['data'] = self.En_Code(result['data']);
return result;
#获取得分列表
def GetScore(self,get):
self.CheckToken();
self.__PDATA['data'] = self.De_Code(self.__PDATA['data']);
result = json.loads(public.httpPost(self.__APIURL+'/GetSocre',self.__PDATA));
result['data'] = self.En_Code(result['data']);
return result;
#获取配置信息
def GetConfig(self,get=None):
virt = '/usr/sbin/virt-what'
if not os.path.exists(virt):
if os.path.exists('/etc/yum.repos.d/epel.repo'):
os.system('mv /etc/yum.repos.d/epel.repo /etc/yum.repos.d/epel.repo_backup');
os.system('yum install virt-what -y');
if os.path.exists('/etc/yum.repos.d/epel.repo_backup'):
os.system('mv /etc/yum.repos.d/epel.repo_backup /etc/yum.repos.d/epel.repo');
data = {}
data['virt'] = public.ExecShell('virt-what')[0].strip();
cpuinfo = open('/proc/cpuinfo','r').read();
rep = "model\s+name\s+:\s+(.+)"
tmp = re.search(rep,cpuinfo);
data['cpu'] = tmp.groups()[0];
data['core'] = psutil.cpu_count();
data['memory'] = psutil.virtual_memory().total/1024/1024
data['system'] = self.GetSystemVersion();
scoreInfo = self.readScore();
data['disk'] = str(scoreInfo['read'])+','+str(scoreInfo['write'])
data['mem_score'] = scoreInfo['mem'];
data['cpu_score'] = scoreInfo['cpu1'] + scoreInfo['cpu2'] + scoreInfo['cpu3'] + scoreInfo['cpu4'];
data['disk_score'] = scoreInfo['disk_score'];
data['total_score'] = scoreInfo['mem']+data['cpu_score']+scoreInfo['disk_score'];
return data;
#提交到云端
def SubmitScore(self,get):
try:
self.CheckToken();
pdata = self.GetConfig(get);
if not pdata['total_score']: return public.returnMsg(False,'请先跑分!');
pdata['secret_key'] = self.__userInfo['secret_key'];
self.__PDATA['data'] = self.De_Code(pdata);
result = json.loads(public.httpPost(self.__APIURL+'/SubmitScore',self.__PDATA));
result['data'] = self.En_Code(result['data']);
return result;
except:
return None;
#取操作系统版本
def GetSystemVersion(self):
version = public.readFile('/etc/redhat-release')
if not version:
version = public.readFile('/etc/issue').replace('\\n \\l','').strip();
else:
version = version.replace('release ','').strip();
return version
#写当前得分
def writeScore(self,type,value):
scoreFile = '/www/server/panel/plugin/score/score.json';
if not os.path.exists(scoreFile):
data = {}
data['cpu1'] = 0;
data['cpu2'] = 0;
data['cpu3'] = 0;
data['cpu4'] = 0;
data['mem'] = 0;
data['disk_score'] = 0;
data['read'] = 0;
data['write'] = 0;
public.writeFile(scoreFile,json.dumps(data));
data = json.loads(public.readFile(scoreFile));
data[type] = value;
public.writeFile(scoreFile,json.dumps(data));
#读当前得分
def readScore(self):
scoreFile = '/www/server/panel/plugin/score/score.json';
if not os.path.exists(scoreFile):
data = {}
data['cpu1'] = 0;
data['cpu2'] = 0;
data['cpu3'] = 0;
data['cpu4'] = 0;
data['mem'] = 0;
data['disk_score'] = 0;
data['read'] = 0;
data['write'] = 0;
public.writeFile(scoreFile,json.dumps(data));
data = json.loads(public.readFile(scoreFile));
return data;
#测试CPU
def testCpu(self,get,n = 1):
data = {}
data['cpuCount'] = psutil.cpu_count();
if not hasattr(get,'type'): get.type = '0';
import re;
cpuinfo = open('/proc/cpuinfo','r').read();
rep = "model\s+name\s+:\s+(.+)"
tmp = re.search(rep,cpuinfo);
data['cpuType'] = ""
if tmp:
data['cpuType'] = tmp.groups()[0];
import system
data['system'] = system.system().GetSystemVersion();
path = '/www/server/panel/plugin/score/testcpu';
if not os.path.exists(path): os.system('gcc '+path+'.c -o ' +path + ' -lpthread');
start = time.time();
os.system(path + ' 32 ' + get.type);
end = time.time();
data['score'] = int(400 * 10 / (end - start));
if not os.path.exists(path): data['score'] = 0;
self.writeScore('cpu'+get.type, data['score'])
return data;
pass
#测试整数运算
def testInt(self):
return self.testIntOrFloat(1);
#测试浮点运行
def testFloat(self):
return self.testIntOrFloat(1.01);
#CPU测试体
def testIntOrFloat(self,n=1):
start = time.time();
num = 10000 * 100;
for i in range(num):
if i == 0: continue;
a = n + i;
b = n - i;
c = n * i;
d = n / i;
n = n + 1;
end = time.time();
usetime = end - start;
return num / 100 / usetime;
#冒泡算法测试
def testBubble(self):
start = time.time();
num = 10000 * 5;
xx = 'qwertyuiopasdfghjklzxcvbnm1234567890'
for c in xrange(num):
lst = []
for k in range(10):
lst.append(xx[random.randint(0,len(xx)-1)])
lst = self.bubbleSort(lst)
end = time.time();
usetime = end - start;
return num / 5 / usetime;
#冒泡排序
def bubbleSort(self,lst):
length = len(lst)
for i in xrange(0, length, 1):
for j in xrange(0, length-1-i, 1):
if lst[j] < lst[j+1]:
temp = lst[j]
lst[j] = lst[j+1]
lst[j+1] = temp
return lst
#二叉树算法测试
def testTree(self):
import testTree
start = time.time();
elems = range(3000) #生成树节点
tree = testTree.Tree() #新建一个树对象
for elem in elems:
tree.add(elem) #逐个加入树的节点
tree.level_queue(tree.root)
tree.front_digui(tree.root)
tree.middle_digui(tree.root)
tree.later_digui(tree.root)
tree.front_stack(tree.root)
tree.middle_stack(tree.root)
tree.later_stack(tree.root)
end = time.time();
usetime = end - start;
return 3000 / usetime;
#测试内存
def testMem(self,get):
mem = psutil.virtual_memory()
self.writeScore('mem', mem.total/1024/1024)
#提交数据
self.SubmitScore(get)
return mem.total/1024/1024;
#测试磁盘
def testDisk(self,get):
import os
data = {}
os.system('rm -f testDisk_*');
filename = "testDisk_" + time.strftime('%Y%m%d%H%M%S',time.localtime());
data['write'] = self.testDiskWrite(filename);
import shutil
filename2 = "testDisk_" + time.strftime('%Y%m%d%H%M%S',time.localtime());
shutil.move(filename,filename2);
data['read'] = self.testDiskRead(filename2);
diskIo = psutil.disk_partitions()
diskInfo = []
for disk in diskIo:
tmp = {}
tmp['path'] = disk[1]
tmp['size'] = psutil.disk_usage(disk[1])[0]
diskInfo.append(tmp)
data['diskInfo'] = diskInfo;
writeDisk = data['write'];
if data['write'] > 1000: writeDisk = 1000;
readDisk = data['read'];
if data['read'] > 1000: readDisk = 1000;
data['score'] = (writeDisk * 6) + (readDisk * 6)
os.remove(filename2);
self.writeScore('disk_score', data['score'])
self.writeScore('write', data['write'])
self.writeScore('read', data['read'])
return data;
pass
#测试磁盘写入速度
def testDiskWrite(self,filename):
import random
start = time.time();
fp = open(filename,'w+');
strTest = "";
strTmp = "";
for n in range(4):
strTmp += chr(random.randint(97, 122))
for n in range(1024):
strTest += strTmp;
for i in range(1024 * 256):
fp.write(strTest);
del(strTest);
del(strTmp);
fp.close()
end = time.time();
usetime = end - start;
return int(1024/usetime);
#测试磁盘读取速度
def testDiskRead(self,filename):
os.system('echo 3 > /proc/sys/vm/drop_caches');
import random
start = time.time();
fp = open(filename,'r');
size = 4096;
while True:
tmp = fp.read(size);
if not tmp: break;
del(tmp);
fp.close()
end = time.time();
usetime = end - start;
return int(1024/usetime);
def testWorkNet(self):
pass
#加密数据
def De_Code(self,data):
pdata = urllib.urlencode(data);
return binascii.hexlify(pdata);
#解密数据
def En_Code(self,data):
result = urllib.unquote(binascii.unhexlify(data));
return json.loads(result);