From e7e160c8e1bbbded2c32723a11fae5493616fc9a Mon Sep 17 00:00:00 2001 From: Mr Chen Date: Mon, 10 Jun 2024 01:58:11 +0800 Subject: [PATCH] update --- plugins/mysql-apt/index.py | 400 ++++++++++++++++++++++++++++++++++++- plugins/mysql/index.py | 1 - 2 files changed, 391 insertions(+), 10 deletions(-) diff --git a/plugins/mysql-apt/index.py b/plugins/mysql-apt/index.py index 55b36216b..80e4f0bae 100755 --- a/plugins/mysql-apt/index.py +++ b/plugins/mysql-apt/index.py @@ -2670,6 +2670,290 @@ def dumpMysqlData(version=''): return 'ok' return 'fail' +############### --- 重要 数据补足同步 ---- ########### + +def getSyncMysqlDB(dbname,sign = ''): + conn = pSqliteDb('slave_sync_user') + if sign != '': + data = conn.field('ip,port,user,pass,mode,cmd').where('ip=?', (sign,)).find() + else: + data = conn.field('ip,port,user,pass,mode,cmd').find() + user = data['user'] + apass = data['pass'] + port = data['port'] + ip = data['ip'] + # 远程数据 + sync_db = mw.getMyORM() + # MySQLdb | + sync_db.setPort(port) + sync_db.setHost(ip) + sync_db.setUser(user) + sync_db.setPwd(apass) + sync_db.setDbName(dbname) + sync_db.setTimeout(60) + return sync_db + +def syncDatabaseRepairTempFile(): + tmp_log = mw.getMWLogs()+ '/mysql-check.log' + return tmp_log + +def syncDatabaseRepairLog(version=''): + import subprocess + args = getArgs() + data = checkArgs(args, ['db','sign','op']) + if not data[0]: + return data[1] + + sync_args_db = args['db'] + sync_args_sign = args['sign'] + op = args['op'] + tmp_log = syncDatabaseRepairTempFile() + cmd = 'cd '+mw.getServerDir()+'/mdserver-web && source bin/activate && python3 plugins/mysql/index.py sync_database_repair {"db":"'+sync_args_db+'","sign":"'+sync_args_sign+'"}' + # print(cmd) + + if op == 'get': + log = mw.getLastLine(tmp_log, 15) + return mw.returnJson(True, log) + + if op == 'cmd': + return mw.returnJson(True, 'ok', cmd) + + if op == 'do': + os.system(' echo "开始执行" > '+ tmp_log) + os.system(cmd +' >> '+ tmp_log +' &') + return mw.returnJson(True, 'ok') + + return mw.returnJson(False, '无效请求!') + + +def syncDatabaseRepair(version=''): + time_stats_s = time.time() + tmp_log = syncDatabaseRepairTempFile() + + from pymysql.converters import escape_string + args = getArgs() + data = checkArgs(args, ['db','sign']) + if not data[0]: + return data[1] + + sync_args_db = args['db'] + sync_args_sign = args['sign'] + + # 本地数据 + local_db = pMysqlDb() + # 远程数据 + sync_db = getSyncMysqlDB(sync_args_db,sync_args_sign) + + tables = local_db.query('show tables from `%s`' % sync_args_db) + table_key = "Tables_in_" + sync_args_db + inconsistent_table = [] + + tmp_dir = '/tmp/sync_db_repair' + mw.execShell('mkdir -p '+tmp_dir) + + for tb in tables: + + table_name = sync_args_db+'.'+tb[table_key] + table_check_file = tmp_dir+'/'+table_name+'.txt' + + if os.path.exists(table_check_file): + # print(table_name+', 已检查OK') + continue + + primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';"; + primary_key_data = local_db.query(primary_key_sql) + # print(primary_key_sql,primary_key_data) + pkey_name = '*' + if len(primary_key_data) == 1: + pkey_name = primary_key_data[0]['Column_name'] + # print(pkey_name) + if pkey_name != '*' : + # 智能校验(由于服务器同步可能会慢,比较总数总是对不上) + cmd_local_newpk_sql = 'select ' + pkey_name + ' from ' + table_name + " order by " + pkey_name + " desc limit 1" + cmd_local_newpk_data = local_db.query(cmd_local_newpk_sql) + # print(cmd_local_newpk_data) + if len(cmd_local_newpk_data) == 1: + # 比较总数 + cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + ' where '+pkey_name + ' <= '+ str(cmd_local_newpk_data[0][pkey_name]) + local_count_data = local_db.query(cmd_count_sql) + sync_count_data = sync_db.query(cmd_count_sql) + + if local_count_data != sync_count_data: + print(cmd_count_sql) + print("all data compare: ",local_count_data, sync_count_data) + else: + print(table_name+' smart compare check ok.') + mw.writeFile(tmp_log, table_name+' smart compare check ok.\n','a+') + mw.execShell("echo 'ok' > "+table_check_file) + continue + + + + # 比较总数 + cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + local_count_data = local_db.query(cmd_count_sql) + sync_count_data = sync_db.query(cmd_count_sql) + + if local_count_data != sync_count_data: + print("all data compare: ",local_count_data, sync_count_data) + inconsistent_table.append(table_name) + diff = sync_count_data[0]['num'] - local_count_data[0]['num'] + print(table_name+', need sync. diff,'+str(diff)) + mw.writeFile(tmp_log, table_name+', need sync. diff,'+str(diff)+'\n','a+') + else: + print(table_name+' check ok.') + mw.writeFile(tmp_log, table_name+' check ok.\n','a+') + mw.execShell("echo 'ok' > "+table_check_file) + + + # inconsistent_table = ['xx.xx'] + # 数据对齐 + for table_name in inconsistent_table: + is_break = False + while not is_break: + local_db.ping() + # 远程数据 + sync_db.ping() + + print("check table:"+table_name) + mw.writeFile(tmp_log, "check table:"+table_name+'\n','a+') + table_name_pos = 0 + table_name_pos_file = tmp_dir+'/'+table_name+'.pos.txt' + primary_key_sql = "SHOW INDEX FROM "+table_name+" WHERE Key_name = 'PRIMARY';"; + primary_key_data = local_db.query(primary_key_sql) + pkey_name = primary_key_data[0]['Column_name'] + + if os.path.exists(table_name_pos_file): + table_name_pos = mw.readFile(table_name_pos_file) + + + data_select_sql = 'select * from '+table_name + ' where '+pkey_name+' > '+str(table_name_pos)+' limit 10000' + print(data_select_sql) + local_select_data = local_db.query(data_select_sql) + + time_s = time.time() + sync_select_data = sync_db.query(data_select_sql) + print(f'sync query cos:{time.time() - time_s:.4f}s') + mw.writeFile(tmp_log, f'sync query cos:{time.time() - time_s:.4f}s\n','a+') + + # print(local_select_data) + # print(sync_select_data) + + # print(len(local_select_data)) + # print(len(sync_select_data)) + print('pos:',str(table_name_pos),'local compare sync,',local_select_data == sync_select_data) + + + cmd_count_sql = 'select count('+pkey_name+') as num from '+table_name + local_count_data = local_db.query(cmd_count_sql) + time_s = time.time() + sync_count_data = sync_db.query(cmd_count_sql) + print(f'sync count data cos:{time.time() - time_s:.4f}s') + print(local_count_data,sync_count_data) + # 数据同步有延迟,相等即任务数据补足完成 + if local_count_data[0]['num'] == sync_count_data[0]['num']: + is_break = True + break + + diff = sync_count_data[0]['num'] - local_count_data[0]['num'] + print("diff," + str(diff)+' line data!') + + if local_select_data == sync_select_data: + data_count = len(local_select_data) + if data_count == 0: + # mw.writeFile(table_name_pos_file, '0') + print(table_name+",data is equal ok..") + is_break = True + break + + # print(table_name,data_count) + pos = local_select_data[data_count-1][pkey_name] + print('pos',pos) + progress = pos/sync_count_data[0]['num'] + print('progress,%.2f' % progress+'%') + mw.writeFile(table_name_pos_file, str(pos)) + else: + sync_select_data_len = len(sync_select_data) + skip_idx = 0 + # 主库PK -> 查询本地 | 保证一致 + if sync_select_data_len > 0: + for idx in range(sync_select_data_len): + sync_idx_data = sync_select_data[idx] + local_idx_data = None + if idx in local_select_data: + local_idx_data = local_select_data[idx] + if sync_select_data[idx] == local_idx_data: + skip_idx = idx + pos = local_select_data[idx][pkey_name] + mw.writeFile(table_name_pos_file, str(pos)) + + # print(insert_data) + local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(sync_idx_data[pkey_name]) + # print(local_inquery_sql) + ldata = local_db.query(local_inquery_sql) + # print('ldata:',ldata) + if len(ldata) == 0: + print("id:"+ str(sync_idx_data[pkey_name])+ " no exists, insert") + insert_sql = 'insert into ' + table_name + field_str = '' + value_str = '' + for field in sync_idx_data: + field_str += '`'+field+'`,' + value_str += '\''+escape_string(str(sync_idx_data[field]))+'\',' + field_str = '(' +field_str.strip(',')+')' + value_str = '(' +value_str.strip(',')+')' + insert_sql = insert_sql+' '+field_str+' values'+value_str+';' + print(insert_sql) + r = local_db.execute(insert_sql) + print(r) + else: + # print('compare sync->local:',sync_idx_data == ldata[0] ) + if ldata[0] == sync_idx_data: + continue + + print("id:"+ str(sync_idx_data[pkey_name])+ " data is not equal, update") + update_sql = 'update ' + table_name + field_str = '' + value_str = '' + for field in sync_idx_data: + if field == pkey_name: + continue + field_str += '`'+field+'`=\''+escape_string(str(sync_idx_data[field]))+'\',' + field_str = field_str.strip(',') + update_sql = update_sql+' set '+field_str+' where '+pkey_name+'=\''+str(sync_idx_data[pkey_name])+'\';' + print(update_sql) + r = local_db.execute(update_sql) + print(r) + + # 本地PK -> 查询主库 | 保证一致 + # local_select_data_len = len(local_select_data) + # if local_select_data_len > 0: + # for idx in range(local_select_data_len): + # if idx < skip_idx: + # continue + # local_idx_data = local_select_data[idx] + # print('local idx check', idx, skip_idx) + # local_inquery_sql = 'select * from ' + table_name+ ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name]) + # print(local_inquery_sql) + # sdata = sync_db.query(local_inquery_sql) + # sdata_len = len(sdata) + # print('sdata:',sdata,sdata_len) + # if sdata_len == 0: + # delete_sql = 'delete from ' + table_name + ' where ' +pkey_name+' = '+ str(local_idx_data[pkey_name]) + # print(delete_sql) + # r = local_db.execute(delete_sql) + # print(r) + # break + + + if is_break: + print("break all") + break + time.sleep(3) + print(f'data check cos:{time.time() - time_stats_s:.4f}s') + print("data supplementation completed") + mw.execShell('rm -rf '+tmp_dir) + return 'ok' ############### --- 重要 同步---- ########### @@ -2694,6 +2978,42 @@ def doFullSync(version=''): if mode == 'sync-user': return doFullSyncUser(version) +def isSimpleSyncCmd(sql): + new_sql = sql.lower() + if new_sql.find('master_auto_position') > 0: + return False + return True + +def getChannelNameForCmd(cmd): + cmd = cmd.lower() + cmd_arr = cmd.split('channel') + if len(cmd_arr) == 2: + cmd_channel_info = cmd_arr[1] + channel_name = cmd_channel_info.strip() + channel_name = channel_name.strip(';') + channel_name = channel_name.strip("'") + return channel_name + return '' + +def doFullSyncUserImportContentForChannel(file, channel_name): + # print(file, channel_name) + content = mw.readFile(file) + + content = content.replace('STOP SLAVE;', "STOP SLAVE for channel '{}';".format(channel_name)) + content = content.replace('START SLAVE;', "START SLAVE for channel '{}';".format(channel_name)) + + find_head = "CHANGE MASTER TO " + find_re = find_head+"(.*?);" + find_r = re.search(find_re, content, re.I|re.M) + if find_r: + find_rg = find_r.groups() + if len(find_rg)>0: + find_str = find_head+find_rg[0] + if find_str.lower().find('channel')==-1: + content = content.replace(find_str+';', find_str+" for channel '{}';".format(channel_name)) + + mw.writeFile(file,content) + return True def doFullSyncUser(version=''): args = getArgs() @@ -2727,8 +3047,14 @@ def doFullSyncUser(version=''): apass = data['pass'] port = data['port'] ip = data['ip'] + cmd = data['cmd'] + + channel_name = getChannelNameForCmd(cmd) + sync_mdb = getSyncMysqlDB(sync_db,sync_sign) bak_file = '/tmp/tmp.sql' + if os.path.exists(bak_file): + os.system("rm -rf " + bak_file) writeDbSyncStatus({'code': 0, 'msg': '开始同步...', 'progress': 0}) dmp_option = '' @@ -2736,32 +3062,88 @@ def doFullSyncUser(version=''): if mode == 'gtid': dmp_option = ' --set-gtid-purged=off ' - writeDbSyncStatus({'code': 1, 'msg': '远程导出数据...', 'progress': 20}) + time.sleep(1) + writeDbSyncStatus({'code': 1, 'msg': '正在停止从库...', 'progress': 15}) + mdb8 = ['8.0','8.1','8.2','8.3','8.4'] + if mw.inArray(mdb8,version): + db.query("stop slave user='{}' password='{}';".format(user, apass)) + else: + db.query("stop slave") + + time.sleep(1) + writeDbSyncStatus({'code': 2, 'msg': '远程导出数据...', 'progress': 20}) + + find_run_dump = mw.execShell('ps -ef | grep mysqldump | grep -v grep') + if find_run_dump[0] != "": + print("正在远程导出数据中,别着急...") + writeDbSyncStatus({'code': 3.1, 'msg': '正在远程导出数据中,别着急...', 'progress': 19}) + return False + + time_s = time.time() if not os.path.exists(bak_file): - dump_sql_data = getServerDir() + "/bin/usr/bin/mysqldump " + dmp_option + " --force --opt --default-character-set=utf8 --single-transaction -h" + ip + " -P" + \ - port + " -u" + user + " -p\"" + apass + \ - "\" --ssl-mode=DISABLED " + sync_db + " > " + bak_file - # print(dump_sql_data) - mw.execShell(dump_sql_data) + if isSimpleSyncCmd(cmd): + dmp_option += " --master-data=1 --apply-slave-statements --include-master-host-port " + else: + dmp_option += ' ' + + dump_sql_data = getServerDir() + "/bin/usr/bin/mysqldump --single-transaction --default-character-set=utf8mb4 --compress -q " + dmp_option + " -h" + \ + ip + " -P" + port + " -u" + user + ' -p"' + apass + '" --ssl-mode=DISABLED ' + sync_db + " > " + bak_file + print(dump_sql_data) + time_s = time.time() + r = mw.execShell(dump_sql_data) + print(r) + time_e = time.time() + export_cos = time_e - time_s + print("export cos:", export_cos) + + writeDbSyncStatus({'code': 3, 'msg': '导出耗时:'+str(int(export_cos))+'秒,正在到本地导入数据中...', 'progress': 40}) + + find_run_import = mw.execShell('ps -ef | grep mysql| grep '+ bak_file +' | grep -v grep') + if find_run_import[0] != "": + print("正在导入数据中,别着急...") + writeDbSyncStatus({'code': 4.1, 'msg': '正在导入数据中,别着急...', 'progress': 39}) + return False + - writeDbSyncStatus({'code': 2, 'msg': '本地导入数据...', 'progress': 40}) + time_s = time.time() if os.path.exists(bak_file): + + # 重置 + db.execute('reset master') + if channel_name != '': + doFullSyncUserImportContentForChannel(bak_file, channel_name) + pwd = pSqliteDb('config').where('id=?', (1,)).getField('mysql_root') sock = getSocketFile() + + if is_exist_pv: + my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db_import + my_import_cmd = "pv -t -p " + bak_file + '|' + my_import_cmd + print(my_import_cmd) + os.system(my_import_cmd) + else: + my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db_import + ' < ' + bak_file + print(my_import_cmd) + mw.execShell(my_import_cmd) + my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + ' -uroot -p' + pwd + \ ' ' + sync_db_import + ' < ' + bak_file mw.execShell(my_import_cmd) - if version == '8.0': + if mw.inArray(mdb8, version): db.query("start slave user='{}' password='{}';".format(user, apass)) else: db.query("start slave") - writeDbSyncStatus({'code': 6, 'msg': '从库重启完成...', 'progress': 100}) + db.query("start all slaves") + time_all_e = time.time() + cos = time_all_e - time_all_s + writeDbSyncStatus({'code': 6, 'msg': '总耗时:'+str(int(cos))+'秒,从库重启完成...', 'progress': 100}) if os.path.exists(bak_file): os.system("rm -rf " + bak_file) + return True diff --git a/plugins/mysql/index.py b/plugins/mysql/index.py index f2e90c0eb..c9b538c25 100755 --- a/plugins/mysql/index.py +++ b/plugins/mysql/index.py @@ -3536,7 +3536,6 @@ def doFullSyncUser(version=''): # r = db.query(cmd) # print(r) - mdb8 = ['8.0','8.1','8.2','8.3','8.4'] if mw.inArray(mdb8,version): db.query("start slave user='{}' password='{}';".format(user, apass)) else: