diff --git a/plugins/mariadb/index.py b/plugins/mariadb/index.py index c8d3996fc..70982611e 100755 --- a/plugins/mariadb/index.py +++ b/plugins/mariadb/index.py @@ -1948,7 +1948,7 @@ def addMasterRepSlaveUser(version=''): if isError != None: return isError - sql_select = "grant select,lock tables,PROCESS on *.* to " + username + "@'%';" + sql_select = "grant select,reload,REPLICATION CLIENT,PROCESS on *.* to " + username + "@'%';" pdb.execute(sql_select) pdb.execute('FLUSH PRIVILEGES;') @@ -2583,6 +2583,293 @@ def dumpMysqlData(version=''): return 'ok' return 'fail' +############### --- 重要 数据补足同步 ---- ########### + +def getSyncMysqlDB(dbname,sign = ''): + conn = pSqliteDb('slave_sync_user') + if sign != '': + data = conn.field('ip,port,user,pass,mode,cmd').where('ip=?', (sign,)).find() + else: + data = conn.field('ip,port,user,pass,mode,cmd').find() + user = data['user'] + apass = data['pass'] + port = data['port'] + ip = data['ip'] + # 远程数据 + sync_db = mw.getMyORM() + # MySQLdb | + sync_db.setPort(port) + sync_db.setHost(ip) + sync_db.setUser(user) + sync_db.setPwd(apass) + sync_db.setDbName(dbname) + sync_db.setTimeout(60) + return sync_db + +def syncDatabaseRepairTempFile(): + tmp_log = mw.getMWLogs()+ '/mariadb-check.log' + return tmp_log + +def syncDatabaseRepairLog(version=''): + import subprocess + args = getArgs() + data = checkArgs(args, ['db','sign','op']) + if not data[0]: + return data[1] + + sync_args_db = args['db'] + sync_args_sign = args['sign'] + op = args['op'] + tmp_log = syncDatabaseRepairTempFile() + cmd = 'cd '+mw.getServerDir()+'/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py sync_database_repair {"db":"'+sync_args_db+'","sign":"'+sync_args_sign+'"}' + # print(cmd) + + if op == 'get': + log = mw.getLastLine(tmp_log, 15) + return mw.returnJson(True, log) + + if op == 'cmd': + return mw.returnJson(True, 'ok', cmd) + + if op == 'do': + os.system(' echo "开始执行" > '+ tmp_log) + os.system(cmd +' >> '+ tmp_log +' &') + # time.sleep(10) + # mw.execShell('rm -rf '+tmp_log) + return mw.returnJson(True, 'ok') + + return mw.returnJson(False, '无效请求!') + + +def syncDatabaseRepair(version=''): + time_stats_s = time.time() + tmp_log = syncDatabaseRepairTempFile() + + from pymysql.converters import escape_string + args = getArgs() + data = checkArgs(args, ['db','sign']) + if not data[0]: + return data[1] + + sync_args_db = args['db'] + sync_args_sign = args['sign'] + + # 本地数据 + local_db = pMysqlDb() + # 远程数据 + sync_db = getSyncMysqlDB(sync_args_db,sync_args_sign) + + tables = local_db.query('show tables from `%s`' % sync_args_db) + table_key = "Tables_in_" + sync_args_db + inconsistent_table = [] + + tmp_dir = '/tmp/sync_db_repair' + mw.execShell('mkdir -p '+tmp_dir) + + for tb in tables: + + table_name = sync_args_db+'.'+tb[table_key] + table_check_file = tmp_dir+'/'+table_name+'.txt' + + if os.path.exists(table_check_file): + # print(table_name+', 已检查OK') + continue + + primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';"; + primary_key_data = local_db.query(primary_key_sql) + # print(primary_key_sql,primary_key_data) + pkey_name = '*' + if len(primary_key_data) == 1: + pkey_name = primary_key_data[0]['Column_name'] + # print(pkey_name) + if pkey_name != '*' : + # 智能校验(由于服务器同步可能会慢,比较总数总是对不上) + cmd_local_newpk_sql = 'select ' + pkey_name + ' from ' + table_name + " order by " + pkey_name + " desc limit 1" + cmd_local_newpk_data = local_db.query(cmd_local_newpk_sql) + # print(cmd_local_newpk_data) + if len(cmd_local_newpk_data) == 1: + # 比较总数 + cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + ' where '+pkey_name + ' <= '+ str(cmd_local_newpk_data[0][pkey_name]) + local_count_data = local_db.query(cmd_count_sql) + sync_count_data = sync_db.query(cmd_count_sql) + + if local_count_data != sync_count_data: + print(cmd_count_sql) + print("all data compare: ",local_count_data, sync_count_data) + else: + print(table_name+' smart compare check ok.') + mw.writeFile(tmp_log, table_name+' smart compare check ok.\n','a+') + mw.execShell("echo 'ok' > "+table_check_file) + continue + + + + # 比较总数 + cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + local_count_data = local_db.query(cmd_count_sql) + sync_count_data = sync_db.query(cmd_count_sql) + + if local_count_data != sync_count_data: + print("all data compare: ",local_count_data, sync_count_data) + inconsistent_table.append(table_name) + diff = sync_count_data[0]['num'] - local_count_data[0]['num'] + print(table_name+', need sync. diff,'+str(diff)) + mw.writeFile(tmp_log, table_name+', need sync. diff,'+str(diff)+'\n','a+') + else: + print(table_name+' check ok.') + mw.writeFile(tmp_log, table_name+' check ok.\n','a+') + mw.execShell("echo 'ok' > "+table_check_file) + + + # inconsistent_table = ['xx.xx'] + # 数据对齐 + for table_name in inconsistent_table: + is_break = False + while not is_break: + local_db.ping() + # 远程数据 + sync_db.ping() + + print("check table:"+table_name) + mw.writeFile(tmp_log, "check table:"+table_name+'\n','a+') + table_name_pos = 0 + table_name_pos_file = tmp_dir+'/'+table_name+'.pos.txt' + primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';"; + primary_key_data = local_db.query(primary_key_sql) + pkey_name = primary_key_data[0]['Column_name'] + + if os.path.exists(table_name_pos_file): + table_name_pos = mw.readFile(table_name_pos_file) + + + data_select_sql = 'select * from '+table_name + ' where '+pkey_name+' > '+str(table_name_pos)+' limit 10000' + print(data_select_sql) + local_select_data = local_db.query(data_select_sql) + + time_s = time.time() + sync_select_data = sync_db.query(data_select_sql) + print(f'sync query cos:{time.time() - time_s:.4f}s') + mw.writeFile(tmp_log, f'sync query cos:{time.time() - time_s:.4f}s\n','a+') + + # print(local_select_data) + # print(sync_select_data) + + # print(len(local_select_data)) + # print(len(sync_select_data)) + print('pos:',str(table_name_pos),'local compare sync,',local_select_data == sync_select_data) + + + cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + local_count_data = local_db.query(cmd_count_sql) + time_s = time.time() + sync_count_data = sync_db.query(cmd_count_sql) + print(f'sync count data cos:{time.time() - time_s:.4f}s') + print(local_count_data,sync_count_data) + # 数据同步有延迟,相等即任务数据补足完成 + if local_count_data[0]['num'] == sync_count_data[0]['num']: + is_break = True + break + + diff = sync_count_data[0]['num'] - local_count_data[0]['num'] + print("diff," + str(diff)+' line data!') + + if local_select_data == sync_select_data: + data_count = len(local_select_data) + if data_count == 0: + # mw.writeFile(table_name_pos_file, '0') + print(table_name+",data is equal ok..") + is_break = True + break + + # print(table_name,data_count) + pos = local_select_data[data_count-1][pkey_name] + print('pos',pos) + progress = pos/sync_count_data[0]['num'] + print('progress,%.2f' % progress+'%') + mw.writeFile(table_name_pos_file, str(pos)) + else: + sync_select_data_len = len(sync_select_data) + skip_idx = 0 + # 主库PK -> 查询本地 | 保证一致 + if sync_select_data_len > 0: + for idx in range(sync_select_data_len): + sync_idx_data = sync_select_data[idx] + local_idx_data = None + if idx in local_select_data: + local_idx_data = local_select_data[idx] + if sync_select_data[idx] == local_idx_data: + skip_idx = idx + pos = local_select_data[idx][pkey_name] + mw.writeFile(table_name_pos_file, str(pos)) + + # print(insert_data) + local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(sync_idx_data[pkey_name]) + # print(local_inquery_sql) + ldata = local_db.query(local_inquery_sql) + # print('ldata:',ldata) + if len(ldata) == 0: + print("id:"+ str(sync_idx_data[pkey_name])+ " no exists, insert") + insert_sql = 'insert into ' + table_name + field_str = '' + value_str = '' + for field in sync_idx_data: + field_str += '`'+field+'`,' + value_str += '\''+escape_string(str(sync_idx_data[field]))+'\',' + field_str = '(' +field_str.strip(',')+')' + value_str = '(' +value_str.strip(',')+')' + insert_sql = insert_sql+' '+field_str+' values'+value_str+';' + print(insert_sql) + r = local_db.execute(insert_sql) + print(r) + else: + # print('compare sync->local:',sync_idx_data == ldata[0] ) + if ldata[0] == sync_idx_data: + continue + + print("id:"+ str(sync_idx_data[pkey_name])+ " data is not equal, update") + update_sql = 'update ' + table_name + field_str = '' + value_str = '' + for field in sync_idx_data: + if field == pkey_name: + continue + field_str += '`'+field+'`=\''+escape_string(str(sync_idx_data[field]))+'\',' + field_str = field_str.strip(',') + update_sql = update_sql+' set '+field_str+' where '+pkey_name+'=\''+str(sync_idx_data[pkey_name])+'\';' + print(update_sql) + r = local_db.execute(update_sql) + print(r) + + # 本地PK -> 查询主库 | 保证一致 + # local_select_data_len = len(local_select_data) + # if local_select_data_len > 0: + # for idx in range(local_select_data_len): + # if idx < skip_idx: + # continue + # local_idx_data = local_select_data[idx] + # print('local idx check', idx, skip_idx) + # local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name]) + # print(local_inquery_sql) + # sdata = sync_db.query(local_inquery_sql) + # sdata_len = len(sdata) + # print('sdata:',sdata,sdata_len) + # if sdata_len == 0: + # delete_sql = 'delete from ' + table_name + ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name]) + # print(delete_sql) + # r = local_db.execute(delete_sql) + # print(r) + # break + + + if is_break: + print("break all") + break + time.sleep(3) + print(f'data check cos:{time.time() - time_stats_s:.4f}s') + print("data supplementation completed") + mw.execShell('rm -rf '+tmp_dir) + return 'ok' + ############### --- 重要 同步---- ########### @@ -2647,11 +2934,18 @@ def doFullSync(version=''): def doFullSyncUser(version=''): + which_pv = mw.execShell('which pv') + is_exist_pv = False + if not os.path.exists(which_pv[0]): + is_exist_pv = True + + args = getArgs() data = checkArgs(args, ['db', 'sign']) if not data[0]: return data[1] + time_all_s = time.time() sync_db = args['db'] sync_sign = args['sign'] @@ -2675,23 +2969,67 @@ def doFullSyncUser(version=''): os.system("rm -rf " + bak_file) writeDbSyncStatus({'code': 0, 'msg': '开始同步...', 'progress': 0}) + + dmp_option = '' + mode = recognizeDbMode() + # if mode == 'gtid': + # dmp_option = ' --set-gtid-purged=off ' + writeDbSyncStatus({'code': 1, 'msg': '远程导出数据...', 'progress': 10}) + find_run_dump = mw.execShell('ps -ef | grep mariadb-dump | grep -v grep') + if find_run_dump[0] != "": + print("正在远程导出数据中,别着急...") + writeDbSyncStatus({'code': 3.1, 'msg': '正在远程导出数据中,别着急...', 'progress': 39}) + return False + + time_s = time.time() if not os.path.exists(bak_file): + dmp_option += " --master-data=1 --apply-slave-statements --include-master-host-port " + # https://mariadb.com/kb/zh-cn/mariadb-dump/ - dump_sql_data = getServerDir() + "/bin/mariadb-dump -f --opt --default-character-set=utf8 --single-transaction -h" + ip + " -P" + \ + dump_sql_data = getServerDir() + "/bin/mariadb-dump " + dmp_option + " -f --default-character-set=utf8 --single-transaction --compress -q -h" + ip + " -P" + \ port + " -u" + user + " -p'" + apass + "' " + sync_db + ">" + bak_file print(dump_sql_data) mw.execShell(dump_sql_data) - writeDbSyncStatus({'code': 2, 'msg': '本地导入数据...', 'progress': 40}) + time_e = time.time() + export_cos = time_e - time_s + print("export cos:", export_cos) + writeDbSyncStatus({'code': 3, 'msg': '导出耗时:'+str(int(export_cos))+'秒,正在到本地导入数据中...', 'progress': 40}) + + + find_run_import = mw.execShell('ps -ef | grep mariadb| grep '+ bak_file +' | grep -v grep') + if find_run_import[0] != "": + print("正在导入数据中,别着急...") + writeDbSyncStatus({'code': 4.1, 'msg': '正在导入数据中,别着急...', 'progress': 59}) + return False + + # if os.path.exists(bak_file): + # db.execute('reset master') + + time_s = time.time() if os.path.exists(bak_file): pwd = pSqliteDb('config').where('id=?', (1,)).getField('mysql_root') sock = getSocketFile() - my_import_cmd = getServerDir() + '/bin/mariadb -S ' + sock + " -uroot -p'" + pwd + \ - "' " + sync_db + '<' + bak_file - print(my_import_cmd) - mw.execShell(my_import_cmd) + + if is_exist_pv: + my_import_cmd = getServerDir() + '/bin/mariadb -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db + my_import_cmd = "pv -t -p " + bak_file + '|' + my_import_cmd + print(my_import_cmd) + os.system(my_import_cmd) + else: + my_import_cmd = getServerDir() + '/bin/mariadb -S ' + sock + " -uroot -p'" + pwd + \ + "' " + sync_db + '<' + bak_file + print(my_import_cmd) + mw.execShell(my_import_cmd) + + time_e = time.time() + import_cos = time_e - time_s + print("import cos:", import_cos) + writeDbSyncStatus({'code': 4, 'msg': '导入耗时:'+str(int(import_cos))+'秒', 'progress': 60}) + + time.sleep(3) pinfo = parseSlaveSyncCmd(data['cmd']) # print(pinfo) @@ -3026,5 +3364,9 @@ if __name__ == "__main__": print(doFullSync(version)) elif func == 'dump_mysql_data': print(dumpMysqlData(version)) + elif func == 'sync_database_repair': + print(syncDatabaseRepair()) + elif func == 'sync_database_repair_log': + print(syncDatabaseRepairLog()) else: print('error') diff --git a/plugins/mariadb/install.sh b/plugins/mariadb/install.sh index 75e693ed7..6db37f8d9 100755 --- a/plugins/mariadb/install.sh +++ b/plugins/mariadb/install.sh @@ -7,6 +7,12 @@ rootPath=$(dirname "$curPath") rootPath=$(dirname "$rootPath") serverPath=$(dirname "$rootPath") +# cd /www/server/mdserver-web/plugins/mariadb && bash install.sh install 8.2 +# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py try_slave_sync_bugfix {} +# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py do_full_sync {"db":"xxx","sign":"","begin":1} +# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py sync_database_repair {"db":"xxx","sign":""} +# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py init_slave_status +# cd /www/server/mdserver-web && source bin/activate && python3 plugins/mariadb/index.py install_pre_inspection install_tmp=${rootPath}/tmp/mw_install.pl diff --git a/plugins/mariadb/js/mariadb.js b/plugins/mariadb/js/mariadb.js index 63b680785..5c3304c3a 100755 --- a/plugins/mariadb/js/mariadb.js +++ b/plugins/mariadb/js/mariadb.js @@ -874,7 +874,7 @@ function openPhpmyadmin(name,username,password){ } if (rdata.data['cfg']['choose'] != 'mariadb'){ - layer.msg('当前为['+rdata.choose+']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']}); + layer.msg('当前为['+rdata.data['cfg']['choose']+']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']}); return; } var home_page = rdata.data['home_page']; @@ -1978,6 +1978,84 @@ function getFullSyncStatus(db){ } +function dataSyncVerify(db){ + var reqTimer = null; + + function requestLogs(layerIndex){ + myPostN('sync_database_repair_log', {db:db, sign:'',op:'get'}, function(rdata){ + var rdata = $.parseJSON(rdata.data); + + if(!rdata.status) { + layer.close(layerIndex); + layer.msg(rdata.msg,{icon:2, time:2000}); + clearInterval(reqTimer); + return; + }; + + if (rdata.msg == ''){ + rdata.msg = '暂无数据!'; + } + + $("#data_verify_log").html(rdata.msg); + //滚动到最低 + var ob = document.getElementById('data_verify_log'); + ob.scrollTop = ob.scrollHeight; + }); + } + + layer.open({ + type: 1, + title: '同步数据库['+db+']数据校验', + area: '500px', + btn:[ "开始","取消","手动"], + content:"
\ + "+'
'+"\
+            
", + cancel: function(){ + if (reqTimer){ + clearInterval(reqTimer); + } + }, + yes:function(index,layer_index){ + myPostN('sync_database_repair_log', {db:db, sign:'',op:'do'}, function(data){}); + layer.msg("执行成功"); + + requestLogs(layer_index); + reqTimer = setInterval(function(){ + requestLogs(layer_index); + },3000); + }, + success:function(){ + }, + btn3: function(){ + myPostN('sync_database_repair_log', {db:db, sign:'',op:'cmd'}, function(rdata){ + var rdata = $.parseJSON(rdata.data); + layer.open({ + title: "手动执行命令CMD", + area: ['600px', '180px'], + type:1, + closeBtn: 1, + shadeClose: false, + btn:["复制","取消"], + content: '
\ +
\ +
'+rdata.data+'
\ +
\ +
', + success:function(){ + copyText(rdata.data); + }, + yes:function(){ + copyText(rdata.data); + } + }); + }); + return false; + } + + }); +} + function addSlaveSSH(ip=''){ myPost('get_slave_ssh_by_ip', {ip:ip}, function(rdata){ @@ -2633,7 +2711,8 @@ function masterOrSlaveConf(version=''){ list += '' + rdata.data[i]['name'] +''; list += '' + ''+(rdata.data[i]['slave']?'退出':'加入')+' | ' + - '同步' + + '同步 | ' + + '数据校验' + ''; list += ''; } diff --git a/plugins/mariadb/scripts/backup.py b/plugins/mariadb/scripts/backup.py index f45c83d95..a70445a28 100755 --- a/plugins/mariadb/scripts/backup.py +++ b/plugins/mariadb/scripts/backup.py @@ -65,7 +65,7 @@ class backupTools: # mw.execShell(db_path + "/bin/mysqldump --defaults-file=" + my_conf_path + " --skip-lock-tables --default-character-set=utf8 " + # name + " | gzip > " + filename) - cmd = db_path + "/bin/mysqldump --defaults-file=" + my_conf_path + " --single-transaction --quick --default-character-set=utf8 " + \ + cmd = db_path + "/bin/mariadb-dump --defaults-file=" + my_conf_path + " --single-transaction --quick --default-character-set=utf8 " + \ name + " | gzip > " + filename mw.execShell(cmd) diff --git a/plugins/mysql/index.py b/plugins/mysql/index.py index d97e3d1f1..7f6facf38 100755 --- a/plugins/mysql/index.py +++ b/plugins/mysql/index.py @@ -3075,9 +3075,7 @@ def syncDatabaseRepairLog(version=''): if op == 'do': os.system(' echo "开始执行" > '+ tmp_log) - subprocess.Popen(cmd +' >> '+ tmp_log +' &') - # time.sleep(10) - # mw.execShell('rm -rf '+tmp_log) + os.system(cmd +' >> '+ tmp_log +' &') return mw.returnJson(True, 'ok') return mw.returnJson(False, '无效请求!') diff --git a/plugins/mysql/js/mysql.js b/plugins/mysql/js/mysql.js index 47d9f895a..ca65c7570 100755 --- a/plugins/mysql/js/mysql.js +++ b/plugins/mysql/js/mysql.js @@ -871,7 +871,7 @@ function openPhpmyadmin(name,username,password){ } if (rdata.data['cfg']['choose'] != 'mysql'){ - layer.msg('当前为['+rdata.choose+']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']}); + layer.msg('当前为['+rdata.data['cfg']['choose'] + ']模式,若要使用请修改phpMyAdmin访问切换.',{icon:2,shade: [0.3, '#000']}); return; } var home_page = rdata.data['home_page']; diff --git a/requirements.txt b/requirements.txt index 9e5803c34..824ea4357 100644 --- a/requirements.txt +++ b/requirements.txt @@ -29,7 +29,7 @@ pymemcache redis pillow Jinja2>=2.11.2 -PyMySQL==1.1.1 +PyMySQL>=1.0.2 whitenoise==5.3.0 pyotp pytz diff --git a/route/static/app/public.js b/route/static/app/public.js index 404eca408..2e1555800 100755 --- a/route/static/app/public.js +++ b/route/static/app/public.js @@ -1000,7 +1000,7 @@ function setSelectChecked(c, d) { function jump() { layer.closeAll(); - window.location.href = "/soft" + window.location.href = "/soft"; } function installTips() { @@ -1134,15 +1134,15 @@ function getPanelList(){ var user = $(this).attr("data-user"); var pw = $(this).attr("data-pw"); layer.open({ - type: 2, - title: false, - closeBtn: 0, //不显示关闭按钮 - shade: [0], - area: ['340px', '215px'], - offset: 'rb', //右下角弹出 - time: 5, //2秒后自动关闭 - anim: 2, - content: [murl+'/login', 'no'] + type: 2, + title: false, + closeBtn: 0, //不显示关闭按钮 + shade: [0], + area: ['340px', '215px'], + offset: 'rb', //右下角弹出 + time: 5, //2秒后自动关闭 + anim: 2, + content: [murl+'/login', 'no'] }); var loginForm ='