當前要聞:Mysql到TiDB遷移,雙寫數據庫兜底方案

2022-12-27 12:14:22 來源:51CTO博客

作者:京東零售 石磊

TiDB 作為開源 NewSQL 數據庫的典型代表之一,同樣支持 SQL,支持事務 ACID 特性。在通訊協議上,TiDB 選擇與 MySQL 完全兼容,并盡可能兼容 MySQL 的語法。因此,基于 MySQL 數據庫開發的系統,大多數可以平滑遷移至 TiDB,而幾乎不用修改代碼。對用戶來說,遷移成本極低,過渡自然。


(資料圖片僅供參考)

然而,仍有一些 MySQL 的特性和行為,TiDB 目前暫時不支持或表現與 MySQL 有差異。除此之外,TiDB 提供了一些擴展語法和功能,為用戶提供更多的便利。

TiDB 仍處在快速發展的道路上,對 MySQL 功能和行為的支持方面,正按 路線圖 的規劃在前行。

兼容策略

先從總體上概括 TiDB 和 MySQL 兼容策略,如下表:

通訊協議

SQL語法

功能和行為

完全兼容

兼容絕大多數

兼容大多數

截至 4.0 版本,TiDB 與 MySQL 的區別總結如下表:

MySQL

TiDB

隔離級別

支持讀未提交、讀已提交、可重復讀、串行化,默認為可重復讀

樂觀事務支持快照隔離,悲觀事務支持快照隔離和讀已提交

鎖機制

悲觀鎖

樂觀鎖、悲觀鎖

存儲過程

支持

不支持

觸發器

支持

不支持

事件

支持

不支持

自定義函數

支持

不支持

窗口函數

支持

部分支持

JSON

支持

不支持部分 MySQL 8.0 新增的函數

外鍵約束

支持

忽略外鍵約束

字符集

只支持 ascii、latin1、binary、utf8、utf8mb4

增加/刪除主鍵

支持

通過??alter-primary-key??配置開關提供

CREATE TABLE tblName AS SELECT stmt

支持

不支持

CREATE TEMPORARY TABLE

支持

TiDB 忽略 TEMPORARY 關鍵字,按照普通表創建

DML affected rows

支持

不支持

AutoRandom 列屬性

不支持

支持

Sequence 序列生成器

不支持

支持

三種方案比較

雙寫方案:同時往mysql和tidb寫入數據,兩個數據庫數據完全保持同步

?優點:此方案最安全,作為兜底方案不需擔心數據庫回滾問題,因為數據完全一致,可以無縫回滾到mysql

?缺點:新方案,調研方案實現,成本較高

讀寫分離:數據寫入mysql,從tidb讀,具體方案是切換到線上以后,保持讀寫分離一周時間左右,這一周時間用來確定tidb數據庫沒有問題,再把寫操作也切換到tidb

?優點: 切換過程,mysql和tidb數據保持同步,滿足數據回滾到mysql方案

?缺點:mysql和tidb數據庫同步存在延時,對部分寫入數據要求實時查詢的會導致查詢失敗,同時一旦整體切換到tidb,無法回切到mysql

直接切換:直接一步切換到tidb

?優點:切換過程最簡單,成本最低

?缺點:此方案沒有兜底方案,切換到tidb,無法再回切到mysql或者同步數據回mysql風險較大,無法保證數據是否可用

Django雙寫mysql與tidb策略

settings.py中新增配置
# Dev Database settingsDATABASES = {    "default": {        "ENGINE": "django.db.backends.mysql",        "NAME": "name",        "USER": "root",        "PASSWORD": "123456",        "HOST": "db",    },    "replica": {        "ENGINE": "django.db.backends.mysql",        "NAME": "name",        "USER": "root",        "PASSWORD": "123456",        "HOST": "db",    },    "bak": {        "ENGINE": "django.db.backends.mysql",        "NAME": "name",        "USER": "root",        "PASSWORD": "123456",        "HOST": "db",    },}# 多重寫入數據庫配置MULTI_WRITE_DB = "bak"

雙寫中間件 basemodel.py

import copyimport loggingimport tracebackfrom django.db import models, transaction, routerfrom django.db.models.deletion import Collectorfrom django.db.models import sqlfrom django.db.models.sql.constants import CURSORfrom jcdp.settings import MULTI_WRITE_DB, DATABASESmulti_write_db = MULTI_WRITE_DB# 重寫QuerySetclass BaseQuerySet(models.QuerySet):    def create(self, **kwargs):        return super().create(**kwargs)    def update(self, **kwargs):        try:            rows = super().update(**kwargs)            if multi_write_db in DATABASES:                self._for_write = True                query = self.query.chain(sql.UpdateQuery)                query.add_update_values(kwargs)                with transaction.mark_for_rollback_on_error(using=multi_write_db):                    query.get_compiler(multi_write_db).execute_sql(CURSOR)        except Exception:            logging.error(traceback.format_exc())            raise        return rows    def delete(self):        try:            deleted, _rows_count = super().delete()            if multi_write_db in DATABASES:                del_query = self._chain()                del_query._for_write = True                del_query.query.select_for_update = False                del_query.query.select_related = False                collector = Collector(using=multi_write_db)                collector.collect(del_query)                collector.delete()        except Exception:            logging.error(traceback.format_exc())            raise        return deleted, _rows_count    def raw(self, raw_query, params=None, translations=None, using=None):        try:            qs = super().raw(raw_query, params=params, translations=translations, using=using)            if multi_write_db in DATABASES:                super().raw(raw_query, params=params, translations=translations, using=multi_write_db)        except Exception:            logging.error(traceback.format_exc())            raise        return qs    def bulk_create(self, objs, batch_size=None, ignore_conflicts=False):        try:            for obj in objs:                obj.save()        except Exception:            logging.error(traceback.format_exc())            raise        # objs = super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts)        # if multi_write_db in DATABASES:        #     self._db = multi_write_db        #     super().bulk_create(objs, batch_size=batch_size, ignore_conflicts=ignore_conflicts)        return objs    def bulk_update(self, objs, fields, batch_size=None):        try:            super().bulk_update(objs, fields, batch_size=batch_size)            if multi_write_db in DATABASES:                self._db = multi_write_db                super().bulk_update(objs, fields, batch_size=batch_size)        except Exception:            logging.error(traceback.format_exc())            raiseclass BaseManager(models.Manager):    _queryset_class = BaseQuerySetclass BaseModel(models.Model):    objects = BaseManager()    class Meta:        abstract = True    def delete(            self, using=None, *args, **kwargs    ):        try:            instance = copy.deepcopy(self)            super().delete(using=using, *args, **kwargs)            if multi_write_db in DATABASES:                super(BaseModel, instance).delete(using=multi_write_db, *args, **kwargs)        except Exception:            logging.error(traceback.format_exc())            raise    def save_base(self, raw=False, force_insert=False,                  force_update=False, using=None, update_fields=None):        try:            using = using or router.db_for_write(self.__class__, instance=self)            assert not (force_insert and (force_update or update_fields))            assert update_fields is None or update_fields            cls = self.__class__            # Skip proxies, but keep the origin as the proxy model.            if cls._meta.proxy:                cls = cls._meta.concrete_model            meta = cls._meta            # A transaction isn"t needed if one query is issued.            if meta.parents:                context_manager = transaction.atomic(using=using, savepoint=False)            else:                context_manager = transaction.mark_for_rollback_on_error(using=using)            with context_manager:                parent_inserted = False                if not raw:                    parent_inserted = self._save_parents(cls, using, update_fields)                self._save_table(                    raw, cls, force_insert or parent_inserted,                    force_update, using, update_fields,                )            if multi_write_db in DATABASES:                super().save_base(raw=raw,                                  force_insert=raw,                                  force_update=force_update,                                  using=multi_write_db,                                  update_fields=update_fields)            # Store the database on which the object was saved            self._state.db = using            # Once saved, this is no longer a to-be-added instance.            self._state.adding = False        except Exception:            logging.error(traceback.format_exc())            raise

上述配置完成以后,在每個應用的models.py中引用新的BaseModel類作為模型基類即可實現雙寫目的

class DirectoryStructure(BaseModel):    """    目錄結構    """    view = models.CharField(max_length=128, db_index=True)  # 視圖名稱 eg:部門視圖 項目視圖    sub_view = models.CharField(max_length=128, unique=True, db_index=True)  # 子視圖名稱    sub_view_num = models.IntegerField()  # 子視圖順序號

注:目前該方法尚不支持多對多模型的雙寫情景,如有業務需求,還需重寫ManyToManyField類,方法參考猴子補丁方式

遷移數據庫過程踩坑記錄

TIDB配置項差異:確認數據庫配置:ONLY_FULL_GROUP_BY 禁用 (mysql默認禁用)

TIDB不支持事務savepoint,代碼中需要顯式關閉savepoint=False

TIDB由于是分布式數據庫,對于自增主鍵字段的自增策略與mysq有差異,若業務代碼會與主鍵id關聯,需要注意

標簽: 視圖名稱 寫入數據 沒有問題

上一篇:學習下Redis內存模型
下一篇:嵌入式:ARM嵌入式系統開發流程概述