Merge pull request #579 from midoks/dev

新功能同步到mariadb
pull/580/head
Mr Chen 12 months ago committed by GitHub
commit b77bfab5a8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 356
      plugins/mariadb/index.py
  2. 6
      plugins/mariadb/install.sh
  3. 83
      plugins/mariadb/js/mariadb.js
  4. 2
      plugins/mariadb/scripts/backup.py
  5. 4
      plugins/mysql/index.py
  6. 2
      plugins/mysql/js/mysql.js
  7. 2
      requirements.txt
  8. 22
      route/static/app/public.js

@ -1948,7 +1948,7 @@ def addMasterRepSlaveUser(version=''):
if isError != None:
return isError
sql_select = "grant select,lock tables,PROCESS on *.* to " + username + "@'%';"
sql_select = "grant select,reload,REPLICATION CLIENT,PROCESS on *.* to " + username + "@'%';"
pdb.execute(sql_select)
pdb.execute('FLUSH PRIVILEGES;')
@ -2583,6 +2583,293 @@ def dumpMysqlData(version=''):
return 'ok'
return 'fail'
############### --- 重要 数据补足同步 ---- ###########
def getSyncMysqlDB(dbname,sign = ''):
conn = pSqliteDb('slave_sync_user')
if sign != '':
data = conn.field('ip,port,user,pass,mode,cmd').where('ip=?', (sign,)).find()
else:
data = conn.field('ip,port,user,pass,mode,cmd').find()
user = data['user']
apass = data['pass']
port = data['port']
ip = data['ip']
# 远程数据
sync_db = mw.getMyORM()
# MySQLdb |
sync_db.setPort(port)
sync_db.setHost(ip)
sync_db.setUser(user)
sync_db.setPwd(apass)
sync_db.setDbName(dbname)
sync_db.setTimeout(60)
return sync_db
def syncDatabaseRepairTempFile():
tmp_log = mw.getMWLogs()+ '/mariadb-check.log'
return tmp_log
def syncDatabaseRepairLog(version=''):
import subprocess
args = getArgs()
data = checkArgs(args, ['db','sign','op'])
if not data[0]:
return data[1]
sync_args_db = args['db']
sync_args_sign = args['sign']
op = args['op']
tmp_log = syncDatabaseRepairTempFile()
cmd = 'cd '+mw.getServerDir()+'/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py sync_database_repair {"db":"'+sync_args_db+'","sign":"'+sync_args_sign+'"}'
# print(cmd)
if op == 'get':
log = mw.getLastLine(tmp_log, 15)
return mw.returnJson(True, log)
if op == 'cmd':
return mw.returnJson(True, 'ok', cmd)
if op == 'do':
os.system(' echo "开始执行" > '+ tmp_log)
os.system(cmd +' >> '+ tmp_log +' &')
# time.sleep(10)
# mw.execShell('rm -rf '+tmp_log)
return mw.returnJson(True, 'ok')
return mw.returnJson(False, '无效请求!')
def syncDatabaseRepair(version=''):
time_stats_s = time.time()
tmp_log = syncDatabaseRepairTempFile()
from pymysql.converters import escape_string
args = getArgs()
data = checkArgs(args, ['db','sign'])
if not data[0]:
return data[1]
sync_args_db = args['db']
sync_args_sign = args['sign']
# 本地数据
local_db = pMysqlDb()
# 远程数据
sync_db = getSyncMysqlDB(sync_args_db,sync_args_sign)
tables = local_db.query('show tables from `%s`' % sync_args_db)
table_key = "Tables_in_" + sync_args_db
inconsistent_table = []
tmp_dir = '/tmp/sync_db_repair'
mw.execShell('mkdir -p '+tmp_dir)
for tb in tables:
table_name = sync_args_db+'.'+tb[table_key]
table_check_file = tmp_dir+'/'+table_name+'.txt'
if os.path.exists(table_check_file):
# print(table_name+', 已检查OK')
continue
primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';";
primary_key_data = local_db.query(primary_key_sql)
# print(primary_key_sql,primary_key_data)
pkey_name = '*'
if len(primary_key_data) == 1:
pkey_name = primary_key_data[0]['Column_name']
# print(pkey_name)
if pkey_name != '*' :
# 智能校验(由于服务器同步可能会慢,比较总数总是对不上)
cmd_local_newpk_sql = 'select ' + pkey_name + ' from ' + table_name + " order by " + pkey_name + " desc limit 1"
cmd_local_newpk_data = local_db.query(cmd_local_newpk_sql)
# print(cmd_local_newpk_data)
if len(cmd_local_newpk_data) == 1:
# 比较总数
cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + ' where '+pkey_name + ' <= '+ str(cmd_local_newpk_data[0][pkey_name])
local_count_data = local_db.query(cmd_count_sql)
sync_count_data = sync_db.query(cmd_count_sql)
if local_count_data != sync_count_data:
print(cmd_count_sql)
print("all data compare: ",local_count_data, sync_count_data)
else:
print(table_name+' smart compare check ok.')
mw.writeFile(tmp_log, table_name+' smart compare check ok.\n','a+')
mw.execShell("echo 'ok' > "+table_check_file)
continue
# 比较总数
cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name
local_count_data = local_db.query(cmd_count_sql)
sync_count_data = sync_db.query(cmd_count_sql)
if local_count_data != sync_count_data:
print("all data compare: ",local_count_data, sync_count_data)
inconsistent_table.append(table_name)
diff = sync_count_data[0]['num'] - local_count_data[0]['num']
print(table_name+', need sync. diff,'+str(diff))
mw.writeFile(tmp_log, table_name+', need sync. diff,'+str(diff)+'\n','a+')
else:
print(table_name+' check ok.')
mw.writeFile(tmp_log, table_name+' check ok.\n','a+')
mw.execShell("echo 'ok' > "+table_check_file)
# inconsistent_table = ['xx.xx']
# 数据对齐
for table_name in inconsistent_table:
is_break = False
while not is_break:
local_db.ping()
# 远程数据
sync_db.ping()
print("check table:"+table_name)
mw.writeFile(tmp_log, "check table:"+table_name+'\n','a+')
table_name_pos = 0
table_name_pos_file = tmp_dir+'/'+table_name+'.pos.txt'
primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';";
primary_key_data = local_db.query(primary_key_sql)
pkey_name = primary_key_data[0]['Column_name']
if os.path.exists(table_name_pos_file):
table_name_pos = mw.readFile(table_name_pos_file)
data_select_sql = 'select * from '+table_name + ' where '+pkey_name+' > '+str(table_name_pos)+' limit 10000'
print(data_select_sql)
local_select_data = local_db.query(data_select_sql)
time_s = time.time()
sync_select_data = sync_db.query(data_select_sql)
print(f'sync query cos:{time.time() - time_s:.4f}s')
mw.writeFile(tmp_log, f'sync query cos:{time.time() - time_s:.4f}s\n','a+')
# print(local_select_data)
# print(sync_select_data)
# print(len(local_select_data))
# print(len(sync_select_data))
print('pos:',str(table_name_pos),'local compare sync,',local_select_data == sync_select_data)
cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name
local_count_data = local_db.query(cmd_count_sql)
time_s = time.time()
sync_count_data = sync_db.query(cmd_count_sql)
print(f'sync count data cos:{time.time() - time_s:.4f}s')
print(local_count_data,sync_count_data)
# 数据同步有延迟,相等即任务数据补足完成
if local_count_data[0]['num'] == sync_count_data[0]['num']:
is_break = True
break
diff = sync_count_data[0]['num'] - local_count_data[0]['num']
print("diff," + str(diff)+' line data!')
if local_select_data == sync_select_data:
data_count = len(local_select_data)
if data_count == 0:
# mw.writeFile(table_name_pos_file, '0')
print(table_name+",data is equal ok..")
is_break = True
break
# print(table_name,data_count)
pos = local_select_data[data_count-1][pkey_name]
print('pos',pos)
progress = pos/sync_count_data[0]['num']
print('progress,%.2f' % progress+'%')
mw.writeFile(table_name_pos_file, str(pos))
else:
sync_select_data_len = len(sync_select_data)
skip_idx = 0
# 主库PK -> 查询本地 | 保证一致
if sync_select_data_len > 0:
for idx in range(sync_select_data_len):
sync_idx_data = sync_select_data[idx]
local_idx_data = None
if idx in local_select_data:
local_idx_data = local_select_data[idx]
if sync_select_data[idx] == local_idx_data:
skip_idx = idx
pos = local_select_data[idx][pkey_name]
mw.writeFile(table_name_pos_file, str(pos))
# print(insert_data)
local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(sync_idx_data[pkey_name])
# print(local_inquery_sql)
ldata = local_db.query(local_inquery_sql)
# print('ldata:',ldata)
if len(ldata) == 0:
print("id:"+ str(sync_idx_data[pkey_name])+ " no exists, insert")
insert_sql = 'insert into ' + table_name
field_str = ''
value_str = ''
for field in sync_idx_data:
field_str += '`'+field+'`,'
value_str += '\''+escape_string(str(sync_idx_data[field]))+'\','
field_str = '(' +field_str.strip(',')+')'
value_str = '(' +value_str.strip(',')+')'
insert_sql = insert_sql+' '+field_str+' values'+value_str+';'
print(insert_sql)
r = local_db.execute(insert_sql)
print(r)
else:
# print('compare sync->local:',sync_idx_data == ldata[0] )
if ldata[0] == sync_idx_data:
continue
print("id:"+ str(sync_idx_data[pkey_name])+ " data is not equal, update")
update_sql = 'update ' + table_name
field_str = ''
value_str = ''
for field in sync_idx_data:
if field == pkey_name:
continue
field_str += '`'+field+'`=\''+escape_string(str(sync_idx_data[field]))+'\','
field_str = field_str.strip(',')
update_sql = update_sql+' set '+field_str+' where '+pkey_name+'=\''+str(sync_idx_data[pkey_name])+'\';'
print(update_sql)
r = local_db.execute(update_sql)
print(r)
# 本地PK -> 查询主库 | 保证一致
# local_select_data_len = len(local_select_data)
# if local_select_data_len > 0:
# for idx in range(local_select_data_len):
# if idx < skip_idx:
# continue
# local_idx_data = local_select_data[idx]
# print('local idx check', idx, skip_idx)
# local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name])
# print(local_inquery_sql)
# sdata = sync_db.query(local_inquery_sql)
# sdata_len = len(sdata)
# print('sdata:',sdata,sdata_len)
# if sdata_len == 0:
# delete_sql = 'delete from ' + table_name + ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name])
# print(delete_sql)
# r = local_db.execute(delete_sql)
# print(r)
# break
if is_break:
print("break all")
break
time.sleep(3)
print(f'data check cos:{time.time() - time_stats_s:.4f}s')
print("data supplementation completed")
mw.execShell('rm -rf '+tmp_dir)
return 'ok'
############### --- 重要 同步---- ###########
@ -2647,11 +2934,18 @@ def doFullSync(version=''):
def doFullSyncUser(version=''):
which_pv = mw.execShell('which pv')
is_exist_pv = False
if not os.path.exists(which_pv[0]):
is_exist_pv = True
args = getArgs()
data = checkArgs(args, ['db', 'sign'])
if not data[0]:
return data[1]
time_all_s = time.time()
sync_db = args['db']
sync_sign = args['sign']
@ -2675,23 +2969,67 @@ def doFullSyncUser(version=''):
os.system("rm -rf " + bak_file)
writeDbSyncStatus({'code': 0, 'msg': '开始同步...', 'progress': 0})
dmp_option = ''
mode = recognizeDbMode()
# if mode == 'gtid':
# dmp_option = ' --set-gtid-purged=off '
writeDbSyncStatus({'code': 1, 'msg': '远程导出数据...', 'progress': 10})
find_run_dump = mw.execShell('ps -ef | grep mariadb-dump | grep -v grep')
if find_run_dump[0] != "":
print("正在远程导出数据中,别着急...")
writeDbSyncStatus({'code': 3.1, 'msg': '正在远程导出数据中,别着急...', 'progress': 39})
return False
time_s = time.time()
if not os.path.exists(bak_file):
dmp_option += " --master-data=1 --apply-slave-statements --include-master-host-port "
# https://mariadb.com/kb/zh-cn/mariadb-dump/
dump_sql_data = getServerDir() + "/bin/mariadb-dump -f --opt --default-character-set=utf8 --single-transaction -h" + ip + " -P" + \
dump_sql_data = getServerDir() + "/bin/mariadb-dump " + dmp_option + " -f --default-character-set=utf8 --single-transaction --compress -q -h" + ip + " -P" + \
port + " -u" + user + " -p'" + apass + "' " + sync_db + ">" + bak_file
print(dump_sql_data)
mw.execShell(dump_sql_data)
writeDbSyncStatus({'code': 2, 'msg': '本地导入数据...', 'progress': 40})
time_e = time.time()
export_cos = time_e - time_s
print("export cos:", export_cos)
writeDbSyncStatus({'code': 3, 'msg': '导出耗时:'+str(int(export_cos))+'秒,正在到本地导入数据中...', 'progress': 40})
find_run_import = mw.execShell('ps -ef | grep mariadb| grep '+ bak_file +' | grep -v grep')
if find_run_import[0] != "":
print("正在导入数据中,别着急...")
writeDbSyncStatus({'code': 4.1, 'msg': '正在导入数据中,别着急...', 'progress': 59})
return False
# if os.path.exists(bak_file):
# db.execute('reset master')
time_s = time.time()
if os.path.exists(bak_file):
pwd = pSqliteDb('config').where('id=?', (1,)).getField('mysql_root')
sock = getSocketFile()
my_import_cmd = getServerDir() + '/bin/mariadb -S ' + sock + " -uroot -p'" + pwd + \
"' " + sync_db + '<' + bak_file
print(my_import_cmd)
mw.execShell(my_import_cmd)
if is_exist_pv:
my_import_cmd = getServerDir() + '/bin/mariadb -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db
my_import_cmd = "pv -t -p " + bak_file + '|' + my_import_cmd
print(my_import_cmd)
os.system(my_import_cmd)
else:
my_import_cmd = getServerDir() + '/bin/mariadb -S ' + sock + " -uroot -p'" + pwd + \
"' " + sync_db + '<' + bak_file
print(my_import_cmd)
mw.execShell(my_import_cmd)
time_e = time.time()
import_cos = time_e - time_s
print("import cos:", import_cos)
writeDbSyncStatus({'code': 4, 'msg': '导入耗时:'+str(int(import_cos))+'', 'progress': 60})
time.sleep(3)
pinfo = parseSlaveSyncCmd(data['cmd'])
# print(pinfo)
@ -3026,5 +3364,9 @@ if __name__ == "__main__":
print(doFullSync(version))
elif func == 'dump_mysql_data':
print(dumpMysqlData(version))
elif func == 'sync_database_repair':
print(syncDatabaseRepair())
elif func == 'sync_database_repair_log':
print(syncDatabaseRepairLog())
else:
print('error')

@ -7,6 +7,12 @@ rootPath=$(dirname "$curPath")
rootPath=$(dirname "$rootPath")
serverPath=$(dirname "$rootPath")
# cd /www/server/mdserver-web/plugins/mariadb && bash install.sh install 8.2
# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py try_slave_sync_bugfix {}
# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py do_full_sync {"db":"xxx","sign":"","begin":1}
# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py sync_database_repair {"db":"xxx","sign":""}
# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py init_slave_status
# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py install_pre_inspection
install_tmp=${rootPath}/tmp/mw_install.pl

@ -874,7 +874,7 @@ function openPhpmyadmin(name,username,password){
}
if (rdata.data['cfg']['choose'] != 'mariadb'){
layer.msg('当前为['+rdata.choose+']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']});
layer.msg('当前为['+rdata.data['cfg']['choose']+']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']});
return;
}
var home_page = rdata.data['home_page'];
@ -1978,6 +1978,84 @@ function getFullSyncStatus(db){
}
function dataSyncVerify(db){
var reqTimer = null;
function requestLogs(layerIndex){
myPostN('sync_database_repair_log', {db:db, sign:'',op:'get'}, function(rdata){
var rdata = $.parseJSON(rdata.data);
if(!rdata.status) {
layer.close(layerIndex);
layer.msg(rdata.msg,{icon:2, time:2000});
clearInterval(reqTimer);
return;
};
if (rdata.msg == ''){
rdata.msg = '暂无数据!';
}
$("#data_verify_log").html(rdata.msg);
//滚动到最低
var ob = document.getElementById('data_verify_log');
ob.scrollTop = ob.scrollHeight;
});
}
layer.open({
type: 1,
title: '同步数据库['+db+']数据校验',
area: '500px',
btn:[ "开始","取消","手动"],
content:"<div class='bt-form'>\
"+'<pre id="data_verify_log" style="overflow: auto; border: 0px none; line-height:23px;padding: 5px; margin: 0px; white-space: pre-wrap; height: 395px; background-color: rgb(51,51,51);color:#f1f1f1;border-radius:0px;font-family:"></pre>'+"\
</div>",
cancel: function(){
if (reqTimer){
clearInterval(reqTimer);
}
},
yes:function(index,layer_index){
myPostN('sync_database_repair_log', {db:db, sign:'',op:'do'}, function(data){});
layer.msg("执行成功");
requestLogs(layer_index);
reqTimer = setInterval(function(){
requestLogs(layer_index);
},3000);
},
success:function(){
},
btn3: function(){
myPostN('sync_database_repair_log', {db:db, sign:'',op:'cmd'}, function(rdata){
var rdata = $.parseJSON(rdata.data);
layer.open({
title: "手动执行命令CMD",
area: ['600px', '180px'],
type:1,
closeBtn: 1,
shadeClose: false,
btn:["复制","取消"],
content: '<div class="pd15">\
<div class="divtable">\
<pre class="layui-code">'+rdata.data+'</pre>\
</div>\
</div>',
success:function(){
copyText(rdata.data);
},
yes:function(){
copyText(rdata.data);
}
});
});
return false;
}
});
}
function addSlaveSSH(ip=''){
myPost('get_slave_ssh_by_ip', {ip:ip}, function(rdata){
@ -2633,7 +2711,8 @@ function masterOrSlaveConf(version=''){
list += '<td>' + rdata.data[i]['name'] +'</td>';
list += '<td style="text-align:right">' +
'<a href="javascript:;" class="btlink" onclick="setDbSlave(\''+rdata.data[i]['name']+'\')" title="加入|退出">'+(rdata.data[i]['slave']?'退出':'加入')+'</a> | ' +
'<a href="javascript:;" class="btlink" onclick="getFullSyncStatus(\''+rdata.data[i]['name']+'\')" title="同步">同步</a>' +
'<a href="javascript:;" class="btlink" onclick="getFullSyncStatus(\''+rdata.data[i]['name']+'\')" title="同步">同步</a> | ' +
'<a href="javascript:;" class="btlink" onclick="dataSyncVerify(\''+rdata.data[i]['name']+'\')" title="数据校验">数据校验</a>' +
'</td>';
list += '</tr>';
}

@ -65,7 +65,7 @@ class backupTools:
# mw.execShell(db_path + "/bin/mysqldump --defaults-file=" + my_conf_path + " --skip-lock-tables --default-character-set=utf8 " +
# name + " | gzip > " + filename)
cmd = db_path + "/bin/mysqldump --defaults-file=" + my_conf_path + " --single-transaction --quick --default-character-set=utf8 " + \
cmd = db_path + "/bin/mariadb-dump --defaults-file=" + my_conf_path + " --single-transaction --quick --default-character-set=utf8 " + \
name + " | gzip > " + filename
mw.execShell(cmd)

@ -3075,9 +3075,7 @@ def syncDatabaseRepairLog(version=''):
if op == 'do':
os.system(' echo "开始执行" > '+ tmp_log)
subprocess.Popen(cmd +' >> '+ tmp_log +' &')
# time.sleep(10)
# mw.execShell('rm -rf '+tmp_log)
os.system(cmd +' >> '+ tmp_log +' &')
return mw.returnJson(True, 'ok')
return mw.returnJson(False, '无效请求!')

@ -871,7 +871,7 @@ function openPhpmyadmin(name,username,password){
}
if (rdata.data['cfg']['choose'] != 'mysql'){
layer.msg('当前为['+rdata.choose+']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']});
layer.msg('当前为['+rdata.data['cfg']['choose'] + ']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']});
return;
}
var home_page = rdata.data['home_page'];

@ -29,7 +29,7 @@ pymemcache
redis
pillow
Jinja2>=2.11.2
PyMySQL==1.1.1
PyMySQL>=1.0.2
whitenoise==5.3.0
pyotp
pytz

@ -1000,7 +1000,7 @@ function setSelectChecked(c, d) {
function jump() {
layer.closeAll();
window.location.href = "/soft"
window.location.href = "/soft";
}
function installTips() {
@ -1134,15 +1134,15 @@ function getPanelList(){
var user = $(this).attr("data-user");
var pw = $(this).attr("data-pw");
layer.open({
type: 2,
title: false,
closeBtn: 0, //不显示关闭按钮
shade: [0],
area: ['340px', '215px'],
offset: 'rb', //右下角弹出
time: 5, //2秒后自动关闭
anim: 2,
content: [murl+'/login', 'no']
type: 2,
title: false,
closeBtn: 0, //不显示关闭按钮
shade: [0],
area: ['340px', '215px'],
offset: 'rb', //右下角弹出
time: 5, //2秒后自动关闭
anim: 2,
content: [murl+'/login', 'no']
});
var loginForm ='<div id="btpanelform" style="display:none"><form id="toBtpanel" action="'+murl+'/do_login" method="post" target="btpfrom">\
<input name="username" id="btp_username" value="'+user+'" type="text">\
@ -1375,7 +1375,7 @@ function remind(a){
<td>\
<div class="titlename c3">'+g.data[d].name+'</span>\
<span class="rs-status">'+lan.bt.task_ok+'<span>\
<span class="rs-time">耗时['+ getSFM(g.data[d].end - g.data[d].start) +']</span>\
<span class="rs-time">安装等待中...</span>\
</div>\
</td>\
<td class="text-right c3">'+g.data[d].addtime+'</td>\

Loading…
Cancel
Save