Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mdserver-web/plugins/score/index.py

356 lines
11 KiB

7 years ago
# coding: utf-8
import time
import psutil
import random
import os
import urllib
import binascii
import json
import re
7 years ago
import sys
sys.path.append(os.getcwd() + "/class/core")
import public
7 years ago
class score_main:
__APIURL = 'https://www.bt.cn/api/Auth'
__UPATH = 'data/userInfo.json'
__userInfo = None
__PDATA = None
def CheckToken(self):
pdata = {}
data = {}
if os.path.exists(self.__UPATH):
self.__userInfo = json.loads(public.readFile(self.__UPATH))
if self.__userInfo:
pdata['access_key'] = self.__userInfo['access_key']
data['secret_key'] = self.__userInfo['secret_key']
else:
pdata['access_key'] = 'test'
data['secret_key'] = '123456'
pdata['data'] = data
self.__PDATA = pdata
# 修正信息
def SetScore(self, get):
if hasattr(get, 'ips'):
return self.SubmitSetScore('ips', get.ips)
if hasattr(get, 'virt'):
return self.SubmitSetScore('virt', get.virt)
# 发送信息修正
def SubmitSetScore(self, key, value):
self.CheckToken()
self.__PDATA['data'][key] = value
self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
result = json.loads(public.httpPost(
self.__APIURL + '/SetSocre', self.__PDATA))
result['data'] = self.En_Code(result['data'])
return result
# 获取得分列表
def GetScore(self, get):
self.CheckToken()
self.__PDATA['data'] = self.De_Code(self.__PDATA['data'])
result = json.loads(public.httpPost(
self.__APIURL + '/GetSocre', self.__PDATA))
result['data'] = self.En_Code(result['data'])
return result
# 获取配置信息
def GetConfig(self, get=None):
virt = '/usr/sbin/virt-what'
if not os.path.exists(virt):
if os.path.exists('/etc/yum.repos.d/epel.repo'):
os.system(
'mv /etc/yum.repos.d/epel.repo /etc/yum.repos.d/epel.repo_backup')
os.system('yum install virt-what -y')
if os.path.exists('/etc/yum.repos.d/epel.repo_backup'):
os.system(
'mv /etc/yum.repos.d/epel.repo_backup /etc/yum.repos.d/epel.repo')
data = {}
data['virt'] = public.ExecShell('virt-what')[0].strip()
cpuinfo = open('/proc/cpuinfo', 'r').read()
rep = "model\s+name\s+:\s+(.+)"
tmp = re.search(rep, cpuinfo)
data['cpu'] = tmp.groups()[0]
data['core'] = psutil.cpu_count()
data['memory'] = psutil.virtual_memory().total / 1024 / 1024
data['system'] = self.GetSystemVersion()
scoreInfo = self.readScore()
data['disk'] = str(scoreInfo['read']) + ',' + str(scoreInfo['write'])
data['mem_score'] = scoreInfo['mem']
data['cpu_score'] = scoreInfo['cpu1'] + \
scoreInfo['cpu2'] + scoreInfo['cpu3'] + scoreInfo['cpu4']
data['disk_score'] = scoreInfo['disk_score']
data['total_score'] = scoreInfo['mem'] + \
data['cpu_score'] + scoreInfo['disk_score']
return data
# 提交到云端
def SubmitScore(self, get):
try:
self.CheckToken()
pdata = self.GetConfig(get)
if not pdata['total_score']:
return public.returnMsg(False, '请先跑分!')
pdata['secret_key'] = self.__userInfo['secret_key']
self.__PDATA['data'] = self.De_Code(pdata)
result = json.loads(public.httpPost(
self.__APIURL + '/SubmitScore', self.__PDATA))
result['data'] = self.En_Code(result['data'])
return result
except:
return None
# 取操作系统版本
def GetSystemVersion(self):
version = public.readFile('/etc/redhat-release')
if not version:
version = public.readFile(
'/etc/issue').replace('\\n \\l', '').strip()
else:
version = version.replace('release ', '').strip()
return version
# 写当前得分
def writeScore(self, type, value):
scoreFile = '/www/server/panel/plugin/score/score.json'
if not os.path.exists(scoreFile):
data = {}
data['cpu1'] = 0
data['cpu2'] = 0
data['cpu3'] = 0
data['cpu4'] = 0
data['mem'] = 0
data['disk_score'] = 0
data['read'] = 0
data['write'] = 0
public.writeFile(scoreFile, json.dumps(data))
data = json.loads(public.readFile(scoreFile))
data[type] = value
public.writeFile(scoreFile, json.dumps(data))
# 读当前得分
def readScore(self):
scoreFile = '/www/server/panel/plugin/score/score.json'
if not os.path.exists(scoreFile):
data = {}
data['cpu1'] = 0
data['cpu2'] = 0
data['cpu3'] = 0
data['cpu4'] = 0
data['mem'] = 0
data['disk_score'] = 0
data['read'] = 0
data['write'] = 0
public.writeFile(scoreFile, json.dumps(data))
data = json.loads(public.readFile(scoreFile))
return data
# 测试CPU
def testCpu(self, get, n=1):
data = {}
data['cpuCount'] = psutil.cpu_count()
if not hasattr(get, 'type'):
get.type = '0'
import re
cpuinfo = open('/proc/cpuinfo', 'r').read()
rep = "model\s+name\s+:\s+(.+)"
tmp = re.search(rep, cpuinfo)
data['cpuType'] = ""
if tmp:
data['cpuType'] = tmp.groups()[0]
import system
data['system'] = system.system().GetSystemVersion()
path = '/www/server/panel/plugin/score/testcpu'
if not os.path.exists(path):
os.system('gcc ' + path + '.c -o ' + path + ' -lpthread')
start = time.time()
os.system(path + ' 32 ' + get.type)
end = time.time()
data['score'] = int(400 * 10 / (end - start))
if not os.path.exists(path):
data['score'] = 0
self.writeScore('cpu' + get.type, data['score'])
return data
pass
# 测试整数运算
def testInt(self):
return self.testIntOrFloat(1)
# 测试浮点运行
def testFloat(self):
return self.testIntOrFloat(1.01)
# CPU测试体
def testIntOrFloat(self, n=1):
start = time.time()
num = 10000 * 100
for i in range(num):
if i == 0:
continue
a = n + i
b = n - i
c = n * i
d = n / i
n = n + 1
end = time.time()
usetime = end - start
return num / 100 / usetime
# 冒泡算法测试
def testBubble(self):
start = time.time()
num = 10000 * 5
xx = 'qwertyuiopasdfghjklzxcvbnm1234567890'
for c in xrange(num):
lst = []
for k in range(10):
lst.append(xx[random.randint(0, len(xx) - 1)])
lst = self.bubbleSort(lst)
end = time.time()
usetime = end - start
return num / 5 / usetime
# 冒泡排序
def bubbleSort(self, lst):
length = len(lst)
for i in xrange(0, length, 1):
for j in xrange(0, length - 1 - i, 1):
if lst[j] < lst[j + 1]:
temp = lst[j]
lst[j] = lst[j + 1]
lst[j + 1] = temp
return lst
# 二叉树算法测试
def testTree(self):
import testTree
start = time.time()
elems = range(3000) # 生成树节点
tree = testTree.Tree() # 新建一个树对象
for elem in elems:
tree.add(elem) # 逐个加入树的节点
tree.level_queue(tree.root)
tree.front_digui(tree.root)
tree.middle_digui(tree.root)
tree.later_digui(tree.root)
tree.front_stack(tree.root)
tree.middle_stack(tree.root)
tree.later_stack(tree.root)
end = time.time()
usetime = end - start
return 3000 / usetime
# 测试内存
def testMem(self, get):
mem = psutil.virtual_memory()
self.writeScore('mem', mem.total / 1024 / 1024)
# 提交数据
self.SubmitScore(get)
return mem.total / 1024 / 1024
# 测试磁盘
def testDisk(self, get):
import os
data = {}
os.system('rm -f testDisk_*')
filename = "testDisk_" + \
time.strftime('%Y%m%d%H%M%S', time.localtime())
data['write'] = self.testDiskWrite(filename)
import shutil
filename2 = "testDisk_" + \
time.strftime('%Y%m%d%H%M%S', time.localtime())
shutil.move(filename, filename2)
data['read'] = self.testDiskRead(filename2)
diskIo = psutil.disk_partitions()
diskInfo = []
for disk in diskIo:
tmp = {}
tmp['path'] = disk[1]
tmp['size'] = psutil.disk_usage(disk[1])[0]
diskInfo.append(tmp)
data['diskInfo'] = diskInfo
writeDisk = data['write']
if data['write'] > 1000:
writeDisk = 1000
readDisk = data['read']
if data['read'] > 1000:
readDisk = 1000
data['score'] = (writeDisk * 6) + (readDisk * 6)
os.remove(filename2)
self.writeScore('disk_score', data['score'])
self.writeScore('write', data['write'])
self.writeScore('read', data['read'])
return data
pass
# 测试磁盘写入速度
def testDiskWrite(self, filename):
import random
start = time.time()
fp = open(filename, 'w+')
strTest = ""
strTmp = ""
for n in range(4):
strTmp += chr(random.randint(97, 122))
for n in range(1024):
strTest += strTmp
for i in range(1024 * 256):
fp.write(strTest)
del(strTest)
del(strTmp)
fp.close()
end = time.time()
usetime = end - start
return int(1024 / usetime)
# 测试磁盘读取速度
def testDiskRead(self, filename):
os.system('echo 3 > /proc/sys/vm/drop_caches')
import random
start = time.time()
fp = open(filename, 'r')
size = 4096
while True:
tmp = fp.read(size)
if not tmp:
break
del(tmp)
fp.close()
end = time.time()
usetime = end - start
return int(1024 / usetime)
def testWorkNet(self):
pass
# 加密数据
def De_Code(self, data):
pdata = urllib.urlencode(data)
return binascii.hexlify(pdata)
# 解密数据
def En_Code(self, data):
result = urllib.unquote(binascii.unhexlify(data))
return json.loads(result)