Simple Linux Panel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
mdserver-web/class/core/db.py

348 lines
10 KiB

7 years ago
# coding: utf-8
import sqlite3
import os
class Sql():
#------------------------------
# 数据库操作类 For sqlite3
#------------------------------
__DB_FILE = None # 数据库文件
__DB_CONN = None # 数据库连接对象
3 years ago
__DB_TABLE = "" # 被操作的表名称
__OPT_WHERE = "" # where条件
__OPT_LIMIT = "" # limit条件
__OPT_GROUP = "" # group条件
__OPT_ORDER = "" # order条件
__OPT_FIELD = "*" # field条件
__OPT_PARAM = () # where值
7 years ago
def __init__(self):
self.__DB_FILE = 'data/default.db'
def __GetConn(self):
# 取数据库对象
try:
if self.__DB_CONN == None:
self.__DB_CONN = sqlite3.connect(self.__DB_FILE)
3 years ago
self.__DB_CONN.text_factory = str
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
def dbfile(self, name):
self.__DB_FILE = 'data/' + name + '.db'
return self
6 years ago
def dbPos(self, path, name):
self.__DB_FILE = path + '/' + name + '.db'
return self
7 years ago
def table(self, table):
# 设置表名
self.__DB_TABLE = table
return self
def where(self, where, param):
# WHERE条件
if where:
self.__OPT_WHERE = " WHERE " + where
self.__OPT_PARAM = param
return self
3 years ago
def andWhere(self, where, param):
# WHERE条件
if where:
self.__OPT_WHERE = self.__OPT_WHERE + " and " + where
# print(param)
# print(self.__OPT_PARAM)
self.__OPT_PARAM = self.__OPT_PARAM + param
return self
7 years ago
def order(self, order):
# ORDER条件
if len(order):
self.__OPT_ORDER = " ORDER BY " + order
3 years ago
else:
self.__OPT_ORDER = ""
7 years ago
return self
3 years ago
def group(self, group):
if len(group):
self.__OPT_GROUP = " GROUP BY " + group
else:
self.__OPT_GROUP = ""
return self
7 years ago
def limit(self, limit):
# LIMIT条件
if len(limit):
self.__OPT_LIMIT = " LIMIT " + limit
3 years ago
else:
self.__OPT_LIMIT = ""
7 years ago
return self
def field(self, field):
# FIELD条件
if len(field):
self.__OPT_FIELD = field
return self
def select(self):
# 查询数据集
self.__GetConn()
try:
sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + \
3 years ago
self.__OPT_WHERE + self.__OPT_GROUP + self.__OPT_ORDER + self.__OPT_LIMIT
7 years ago
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
data = result.fetchall()
# 构造字曲系列
if self.__OPT_FIELD != "*":
field = self.__OPT_FIELD.split(',')
tmp = []
for row in data:
i = 0
tmp1 = {}
for key in field:
tmp1[key] = row[i]
i += 1
tmp.append(tmp1)
del(tmp1)
data = tmp
del(tmp)
else:
# 将元组转换成列表
tmp = map(list, data)
data = tmp
del(tmp)
self.__close()
return data
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
3 years ago
def inquiry(self, input_field=''):
3 years ago
# 查询数据集
# 不清空查询参数
self.__GetConn()
try:
sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + \
3 years ago
self.__OPT_WHERE + self.__OPT_GROUP + self.__OPT_ORDER + self.__OPT_LIMIT
3 years ago
# print(sql, self.__OPT_PARAM)
3 years ago
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
data = result.fetchall()
# 构造字曲系列
if self.__OPT_FIELD != "*":
3 years ago
if input_field != "":
field = input_field.split(',')
else:
field = self.__OPT_FIELD.split(',')
3 years ago
tmp = []
for row in data:
i = 0
tmp1 = {}
for key in field:
tmp1[key] = row[i]
i += 1
tmp.append(tmp1)
del(tmp1)
data = tmp
del(tmp)
else:
# 将元组转换成列表
tmp = map(list, data)
data = tmp
del(tmp)
return data
except Exception as ex:
return "error: " + str(ex)
7 years ago
def getField(self, keyName):
# 取回指定字段
result = self.field(keyName).select()
if len(result) == 1:
return result[0][keyName]
return result
def setField(self, keyName, keyValue):
# 更新指定字段
return self.save(keyName, (keyValue,))
def find(self):
# 取一行数据
result = self.limit("1").select()
if len(result) == 1:
return result[0]
return result
def count(self):
# 取行数
key = "COUNT(*)"
data = self.field(key).select()
try:
return int(data[0][key])
except:
return 0
def add(self, keys, param):
# 插入数据
self.__GetConn()
try:
values = ""
for key in keys.split(','):
values += "?,"
values = self.checkInput(values[0:len(values) - 1])
sql = "INSERT INTO " + self.__DB_TABLE + \
"(" + keys + ") " + "VALUES(" + values + ")"
result = self.__DB_CONN.execute(sql, param)
3 years ago
last_id = result.lastrowid
7 years ago
self.__close()
self.__DB_CONN.commit()
3 years ago
return last_id
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
def checkInput(self, data):
if not data:
return data
if type(data) != str:
return data
checkList = [
{'d': '<', 'r': ''},
{'d': '>', 'r': ''},
{'d': '\'', 'r': ''},
{'d': '"', 'r': ''},
{'d': '&', 'r': ''},
{'d': '#', 'r': ''},
{'d': '<', 'r': ''}
]
for v in checkList:
data = data.replace(v['d'], v['r'])
return data
def addAll(self, keys, param):
# 插入数据
self.__GetConn()
try:
values = ""
for key in keys.split(','):
values += "?,"
values = values[0:len(values) - 1]
sql = "INSERT INTO " + self.__DB_TABLE + \
"(" + keys + ") " + "VALUES(" + values + ")"
result = self.__DB_CONN.execute(sql, param)
return True
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
def commit(self):
self.__close()
self.__DB_CONN.commit()
def save(self, keys, param):
# 更新数据
self.__GetConn()
try:
opt = ""
for key in keys.split(','):
opt += key + "=?,"
opt = opt[0:len(opt) - 1]
sql = "UPDATE " + self.__DB_TABLE + " SET " + opt + self.__OPT_WHERE
3 years ago
# import mw
# mw.writeFile('/tmp/test.pl', sql)
7 years ago
# 处理拼接WHERE与UPDATE参数
tmp = list(param)
for arg in self.__OPT_PARAM:
tmp.append(arg)
self.__OPT_PARAM = tuple(tmp)
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
self.__close()
self.__DB_CONN.commit()
return result.rowcount
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
def delete(self, id=None):
# 删除数据
self.__GetConn()
try:
if id:
self.__OPT_WHERE = " WHERE id=?"
self.__OPT_PARAM = (id,)
sql = "DELETE FROM " + self.__DB_TABLE + self.__OPT_WHERE
result = self.__DB_CONN.execute(sql, self.__OPT_PARAM)
self.__close()
self.__DB_CONN.commit()
return result.rowcount
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
3 years ago
def originExecute(self, sql, param=()):
self.__GetConn()
try:
result = self.__DB_CONN.execute(sql, param)
self.__DB_CONN.commit()
return result
except Exception as ex:
return "error: " + str(ex)
def execute(self, sql, param=()):
7 years ago
# 执行SQL语句返回受影响行
self.__GetConn()
6 years ago
# print sql, param
7 years ago
try:
result = self.__DB_CONN.execute(sql, param)
self.__DB_CONN.commit()
return result.rowcount
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
def query(self, sql, param):
# 执行SQL语句返回数据集
self.__GetConn()
try:
result = self.__DB_CONN.execute(sql, param)
# 将元组转换成列表
4 years ago
# data = map(list, result)
return result
4 years ago
except Exception as ex:
7 years ago
return "error: " + str(ex)
def create(self, name):
# 创建数据表
self.__GetConn()
import mw
script = mw.readFile('data/' + name + '.sql')
7 years ago
result = self.__DB_CONN.executescript(script)
self.__DB_CONN.commit()
return result.rowcount
def fofile(self, filename):
# 执行脚本
self.__GetConn()
import mw
script = mw.readFile(filename)
7 years ago
result = self.__DB_CONN.executescript(script)
self.__DB_CONN.commit()
return result.rowcount
def __close(self):
# 清理条件属性
self.__OPT_WHERE = ""
self.__OPT_FIELD = "*"
self.__OPT_ORDER = ""
self.__OPT_LIMIT = ""
self.__OPT_PARAM = ()
def close(self):
# 释放资源
try:
self.__DB_CONN.close()
self.__DB_CONN = None
except:
pass