diff --git a/plugins/mysql-yum/index.py b/plugins/mysql-yum/index.py index edbcd8119..7e0994cdd 100755 --- a/plugins/mysql-yum/index.py +++ b/plugins/mysql-yum/index.py @@ -3052,8 +3052,14 @@ def doFullSyncUser(version=''): apass = data['pass'] port = data['port'] ip = data['ip'] + cmd = data['cmd'] + + channel_name = getChannelNameForCmd(cmd) + sync_mdb = getSyncMysqlDB(sync_db,sync_sign) bak_file = '/tmp/tmp.sql' + if os.path.exists(bak_file): + os.system("rm -rf " + bak_file) writeDbSyncStatus({'code': 0, 'msg': '开始同步...', 'progress': 0}) dmp_option = '' @@ -3061,29 +3067,82 @@ def doFullSyncUser(version=''): if mode == 'gtid': dmp_option = ' --set-gtid-purged=off ' - writeDbSyncStatus({'code': 1, 'msg': '远程导出数据...', 'progress': 20}) + writeDbSyncStatus({'code': 1, 'msg': '远程导出数据...', 'progress': 15}) + + mdb8 = ['8.0','8.1','8.2','8.3','8.4'] + if mw.inArray(mdb8,version): + db.query("stop slave user='{}' password='{}';".format(user, apass)) + else: + db.query("stop slave") + + time.sleep(1) + writeDbSyncStatus({'code': 2, 'msg': '远程导出数据...', 'progress': 20}) + + find_run_dump = mw.execShell('ps -ef | grep mysqldump | grep -v grep') + if find_run_dump[0] != "": + print("正在远程导出数据中,别着急...") + writeDbSyncStatu if not os.path.exists(bak_file): - dump_sql_data = getServerDir() + "/bin/usr/bin/mysqldump " + dmp_option + " --force --opt --default-character-set=utf8 --single-transaction -h" + ip + " -P" + \ - port + " -u" + user + " -p\"" + apass + \ - "\" --ssl-mode=DISABLED " + sync_db + " > " + bak_file - # print(dump_sql_data) - mw.execShell(dump_sql_data) + + if isSimpleSyncCmd(cmd): + dmp_option += " --master-data=1 --apply-slave-statements --include-master-host-port " + else: + dmp_option += ' ' + + dump_sql_data = getServerDir() + "/bin/usr/bin/mysqldump --single-transaction --default-character-set=utf8mb4 --compress -q " + dmp_option + " -h" + \ + ip + " -P" + port + " -u" + user + ' -p"' + apass + '" --ssl-mode=DISABLED ' + sync_db + " > " + bak_file + print(dump_sql_data) + time_s = time.time() + r = mw.execShell(dump_sql_data) + print(r) + time_e = time.time() + export_cos = time_e - time_s + print("export cos:", export_cos) + + writeDbSyncStatus({'code': 3, 'msg': '导出耗时:'+str(int(export_cos))+'秒,正在到本地导入数据中...', 'progress': 40}) + + find_run_import = mw.execShell('ps -ef | grep mysql| grep '+ bak_file +' | grep -v grep') + if find_run_import[0] != "": + print("正在导入数据中,别着急...") + writeDbSyncStatus({'code': 4.1, 'msg': '正在导入数据中,别着急...', 'progress': 39}) + return False writeDbSyncStatus({'code': 2, 'msg': '本地导入数据...', 'progress': 40}) + time_s = time.time() if os.path.exists(bak_file): + # 重置 + db.execute('reset master') + if channel_name != '': + doFullSyncUserImportContentForChannel(bak_file, channel_name) + pwd = pSqliteDb('config').where('id=?', (1,)).getField('mysql_root') sock = getSocketFile() + + if is_exist_pv: + my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db_import + my_import_cmd = "pv -t -p " + bak_file + '|' + my_import_cmd + print(my_import_cmd) + os.system(my_import_cmd) + else: + my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + " -uroot -p'" + pwd + "' " + sync_db_import + ' < ' + bak_file + print(my_import_cmd) + mw.execShell(my_import_cmd) + my_import_cmd = getServerDir() + '/bin/usr/bin/mysql -S ' + sock + ' -uroot -p' + pwd + \ ' ' + sync_db_import + ' < ' + bak_file mw.execShell(my_import_cmd) - if version == '8.0': + + if mw.inArray(mdb8, version): db.query("start slave user='{}' password='{}';".format(user, apass)) else: db.query("start slave") - writeDbSyncStatus({'code': 6, 'msg': '从库重启完成...', 'progress': 100}) + db.query("start all slaves") + time_all_e = time.time() + cos = time_all_e - time_all_s + writeDbSyncStatus({'code': 6, 'msg': '总耗时:'+str(int(cos))+'秒,从库重启完成...', 'progress': 100}) if os.path.exists(bak_file): os.system("rm -rf " + bak_file)