# coding: utf-8 import sqlite3 import os class Sql(): #------------------------------ # 数据库操作类 For sqlite3 #------------------------------ __DB_FILE = None # 数据库文件 __DB_CONN = None # 数据库连接对象 __DB_TABLE = "" # 被操作的表名称 __OPT_WHERE = "" # where条件 __OPT_LIMIT = "" # limit条件 __OPT_ORDER = "" # order条件 __OPT_FIELD = "*" # field条件 __OPT_PARAM = () # where值 def __init__(self): self.__DB_FILE = 'data/default.db' def __GetConn(self): # 取数据库对象 try: if self.__DB_CONN == None: self.__DB_CONN = sqlite3.connect(self.__DB_FILE) except Exception, ex: return "error: " + str(ex) def dbfile(self, name): self.__DB_FILE = 'data/' + name + '.db' return self def dbPos(self, path, name): self.__DB_FILE = path + '/' + name + '.db' return self def table(self, table): # 设置表名 self.__DB_TABLE = table return self def where(self, where, param): # WHERE条件 if where: self.__OPT_WHERE = " WHERE " + where self.__OPT_PARAM = param return self def order(self, order): # ORDER条件 if len(order): self.__OPT_ORDER = " ORDER BY " + order return self def limit(self, limit): # LIMIT条件 if len(limit): self.__OPT_LIMIT = " LIMIT " + limit return self def field(self, field): # FIELD条件 if len(field): self.__OPT_FIELD = field return self def select(self): # 查询数据集 self.__GetConn() try: sql = "SELECT " + self.__OPT_FIELD + " FROM " + self.__DB_TABLE + \ self.__OPT_WHERE + self.__OPT_ORDER + self.__OPT_LIMIT result = self.__DB_CONN.execute(sql, self.__OPT_PARAM) data = result.fetchall() # 构造字曲系列 if self.__OPT_FIELD != "*": field = self.__OPT_FIELD.split(',') tmp = [] for row in data: i = 0 tmp1 = {} for key in field: tmp1[key] = row[i] i += 1 tmp.append(tmp1) del(tmp1) data = tmp del(tmp) else: # 将元组转换成列表 tmp = map(list, data) data = tmp del(tmp) self.__close() return data except Exception, ex: return "error: " + str(ex) def getField(self, keyName): # 取回指定字段 result = self.field(keyName).select() if len(result) == 1: return result[0][keyName] return result def setField(self, keyName, keyValue): # 更新指定字段 return self.save(keyName, (keyValue,)) def find(self): # 取一行数据 result = self.limit("1").select() if len(result) == 1: return result[0] return result def count(self): # 取行数 key = "COUNT(*)" data = self.field(key).select() try: return int(data[0][key]) except: return 0 def add(self, keys, param): # 插入数据 self.__GetConn() self.__DB_CONN.text_factory = str try: values = "" for key in keys.split(','): values += "?," values = self.checkInput(values[0:len(values) - 1]) sql = "INSERT INTO " + self.__DB_TABLE + \ "(" + keys + ") " + "VALUES(" + values + ")" result = self.__DB_CONN.execute(sql, param) id = result.lastrowid self.__close() self.__DB_CONN.commit() return id except Exception, ex: return "error: " + str(ex) def checkInput(self, data): if not data: return data if type(data) != str: return data checkList = [ {'d': '<', 'r': '<'}, {'d': '>', 'r': '>'}, {'d': '\'', 'r': '‘'}, {'d': '"', 'r': '“'}, {'d': '&', 'r': '&'}, {'d': '#', 'r': '#'}, {'d': '<', 'r': '<'} ] for v in checkList: data = data.replace(v['d'], v['r']) return data def addAll(self, keys, param): # 插入数据 self.__GetConn() self.__DB_CONN.text_factory = str try: values = "" for key in keys.split(','): values += "?," values = values[0:len(values) - 1] sql = "INSERT INTO " + self.__DB_TABLE + \ "(" + keys + ") " + "VALUES(" + values + ")" result = self.__DB_CONN.execute(sql, param) return True except Exception, ex: return "error: " + str(ex) def commit(self): self.__close() self.__DB_CONN.commit() def save(self, keys, param): # 更新数据 self.__GetConn() self.__DB_CONN.text_factory = str try: opt = "" for key in keys.split(','): opt += key + "=?," opt = opt[0:len(opt) - 1] sql = "UPDATE " + self.__DB_TABLE + " SET " + opt + self.__OPT_WHERE import public public.writeFile('/tmp/test.pl', sql) # 处理拼接WHERE与UPDATE参数 tmp = list(param) for arg in self.__OPT_PARAM: tmp.append(arg) self.__OPT_PARAM = tuple(tmp) result = self.__DB_CONN.execute(sql, self.__OPT_PARAM) self.__close() self.__DB_CONN.commit() return result.rowcount except Exception, ex: return "error: " + str(ex) def delete(self, id=None): # 删除数据 self.__GetConn() try: if id: self.__OPT_WHERE = " WHERE id=?" self.__OPT_PARAM = (id,) sql = "DELETE FROM " + self.__DB_TABLE + self.__OPT_WHERE result = self.__DB_CONN.execute(sql, self.__OPT_PARAM) self.__close() self.__DB_CONN.commit() return result.rowcount except Exception, ex: return "error: " + str(ex) def execute(self, sql, param): # 执行SQL语句返回受影响行 self.__GetConn() try: result = self.__DB_CONN.execute(sql, param) self.__DB_CONN.commit() return result.rowcount except Exception, ex: return "error: " + str(ex) def query(self, sql, param): # 执行SQL语句返回数据集 self.__GetConn() try: result = self.__DB_CONN.execute(sql, param) # 将元组转换成列表 data = map(list, result) return data except Exception, ex: return "error: " + str(ex) def create(self, name): # 创建数据表 self.__GetConn() import public script = public.readFile('data/' + name + '.sql') result = self.__DB_CONN.executescript(script) self.__DB_CONN.commit() return result.rowcount def fofile(self, filename): # 执行脚本 self.__GetConn() import public script = public.readFile(filename) result = self.__DB_CONN.executescript(script) self.__DB_CONN.commit() return result.rowcount def __close(self): # 清理条件属性 self.__OPT_WHERE = "" self.__OPT_FIELD = "*" self.__OPT_ORDER = "" self.__OPT_LIMIT = "" self.__OPT_PARAM = () def close(self): # 释放资源 try: self.__DB_CONN.close() self.__DB_CONN = None except: pass