Skip to content

基于FreeSql多库分布式事务、跨库查询、跨库分页查询、跨库增删改等功能

Notifications You must be signed in to change notification settings

d4ilys/SharingCore

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

63 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

目录

前言

话说2021年开始了一个基于ASP.NET Core 微服务的项目,谈到微服务 多库环境下 分布式事务、分库分表这些问题都是逃不开的,于是首先从ORM开始调研,需要考虑到一些重要的因素 功能强大、支持多种数据库(并且行为一致,防止出现换库的情况)、支持分库分表 等等,这时候第一时间就想到了 FreeSql ,FreeSql的架构设计非常好,每一种支持的数据库都有对应的Provider实现 做到行为一致,而且支持CodeFirst和DbFirst,分库分表FreeSql也有比较简单切有效的方案,本人也经常向FreeSql的作者叶老板请教学习,非常佩服他的技术与人品,也非常感谢他能做出这么好的ORM框架。

分布式事务

既然分库了 分布式事务怎么处理,说到分布式事务 常见的解决方案有TCC/SAGA/消息队列最终一致性,在.NET生态中有基于消息队列实现的分布式事务 CAP ,TCC和SAGA调研了很久没有发现有比较成熟的实现,那么就决定使用CAP(最终一致性事务) 由于项目持续的改版,业务的实时性变得越来越高,基于消息队列的这种最终一致性或者说异步事务的方案 越来越不适合我们的项目,这时候就需要同步的事务方案,TCC/SAGE又没有太好的解决方案(我真的没有找到。。),于是想着自己设计一个,基于FreeSql实现事务管理器。

想要的效果:和单库事务一样,出现错误回滚 但是问题来了 多库呢?不同的数据库呢?

  • 在多库事务的开启时,每个库管理开启自己的事务
  • 如果某一个库事务开启后的操作出现异常,则回滚全部数据库事务
  • 在多库事务提交时,每个库的事务统一提交
  • 记录日志,第一个执行Common的数据库称之为主库,会自动创建一个日志表,用于记录多库事务的信息、执行的SQL、业务模块 用于人工介入或者事务补偿
  • 如果主库(第一个库)Common成功后,其他某一个库可能由于网络原因、数据库宕机 无法Common事务,导致数据不一致,这时候要根据日志进行事务补偿或者人工介入,例如 存在三个库(订单库、物流库、商品库) 订单库就是主库(会记录日志) 在Common事务时,如果订单库(主库)Common失败,则(订单库、物流库、商品库)事务全部回滚,如果订单库(主库)Common成功,但是物流库由于其他原因无法Common成功 则会被日志记录并跳过,然后再去Common 商品库 以及其他库..

跨库查询/跨库分页查询

通过时间分片定位、事件委托、分页算法实现跨库分页查询

1.appsettings.json配置

{
   "SharingCore":{
   "ShowSqlLog": true,   //是否显示SQL日志
   //数据库配置信息
   "DatabaseInfo": [
    {
      "Key": "sharingcore_basics", //数据库名即可
      "Identification": "sharingcore_basics",
      "DataType": "MySql",
      "ConnectString": "Data Source=host;Port=Port;User ID=root;Password=123;Initial Catalog=sharingcore_basics;Charset=utf8;SslMode=none;AllowLoadLocalInfile=true;",
      "Slaves": [
      ]
    },
    //业务库2022、分库
    {
      "Key": "sharingcore_business_2022",
      "Identification": "sharingcore_business", //除去日期的标识
      "DataType": "MySql",
      "ConnectString": "Data Source=host;Port=Port;User ID=root;Password=123;Initial Catalog=sharingcore_business_2022;Charset=utf8;SslMode=none;AllowLoadLocalInfile=true;",
      "Slaves": [
      ]
    },
    //业务库2023、分库
    {
      "Key": "sharingcore_business_2023",
      "Identification": "sharingcore_business",
      "DataType": "MySql",
      "ConnectString": "Data Source=host;Port=Port;User ID=root;Password=123;Initial Catalog=sharingcore_business_2022;Charset=utf8;SslMode=none;AllowLoadLocalInfile=true;",
      "Slaves": [
      ]
    },
    //日志时序数据库 不分库
    {
      "Key": "sharingcore_log",
      "Identification": "sharingcore_log",
      "DataType": "questdb",
      "ConnectString": "host=host;port=8812;username=admin;password=quest;database=qdb;ServerCompatibilityMode=NoTypeLoading;",
      "Slaves": [
      ]
    }
  ]
 }
}

2.初始化数据库

创建SharingCoreDbs扩展方法

/// <summary>
/// 基础库
/// </summary>
/// <param name="dbs"></param>
/// <returns></returns>
public static string Basics(this SharingCoreDbs dbs) => "sharingcore_basics";

/// <summary>
/// 主业务库
/// </summary>
/// <param name="dbs"></param>
/// <returns></returns>
[Database(Name = "sharingcore_business_{yyyy}", Separate = "createtime=2022-01-01(1 year)")]
public static string Business(this SharingCoreDbs dbs) => "sharingcore_business";

/// <summary>
/// 日志库
/// </summary>
/// <param name="dbs"></param>
/// <returns></returns>
public static string Logs(this SharingCoreDbs dbs) => "sharingcore_log";

可以创建GlobalUsings.cs更方面

global using static SharingCore.MultiDatabase.Wrapper.SharingCores;
global using SharingCore;

ASP.NET Core 6.0/7.0 Program.cs中

var builder = WebApplication.CreateBuilder(args).InjectSharingCore(); //注入
//简单方式
services.AddSharingCore();
//复杂构建
services.AddSharingCore(options =>
{
    options.DBConfigKey = "CustomDbConfig"; //指定配置文件中的KEY,如不指定 默认为 SharingCore
    options.DemandLoading = true; //按需加载
    options.IdleTimeout = TimeSpan.FromSeconds(20);

    #region 所有数据库自定义配置

        //设置所有库的过滤器
        options.CustomAllDatabaseSettings.FreeSqlFilter<FreeSqlFilter>(f => f.isDelete == 0);
    //FreeSqlBuilder时候每个库可以扩展
    options.CustomAllDatabaseSettings.FreeSqlBuilderInject = builder =>
        builder.UseQuestDbRestAPI("192.168.0.1:9001", "admin", "123");

    #endregion

        #region 单个数据库自定义配置

        var orderCustomDatabaseSettings = new CustomDatabaseSettings();
    //设置所有库的过滤器
    orderCustomDatabaseSettings.FreeSqlFilter<FreeSqlFilter>(f => f.isDelete == 0);
    //FreeSqlBuilder时候每个库可以扩展
    orderCustomDatabaseSettings.FreeSqlBuilderInject = builder =>
        builder.UseNoneCommandParameter(false);
    options.CustomDatabaseSettings.Add("order", orderCustomDatabaseSettings);

    #endregion
});
  • 按需加载:例如 配置文件中有30个数据库但是不同的工作服务中会用到不同的数据库 并不想全都加载,这时候可以每一个服务自定义SharingCoreDbs扩展方法 来控制加载数据库,SharingCore会根据扩展方法进行加载数据库

3.获取IFreeSql操作对象

//不分库
var Logs = Dbs.Logs().GetFreeSql().Ado.Query<string>("select 1"); 
//不分库
var Basics = Dbs.Basics().GetFreeSql().Ado.Query<string>("select 1");
//通过年定位库
var Business_2022 = Dbs.Business().GetFreeSql("2022").Ado.Query<string>("select 1"); 
//直接获取当前年数据库
var Business_2023 = Dbs.Business().GetNowFreeSql().Ado.Query<string>("select 1"); 

5.跨库分页查询

var result = SharingFeatures.QueryPageList(query =>
{
    var result = query.Db.Select<order>().PageCore(query, out var count)
     			.ToListCore(o => o, query, count);
                    return new QueryFuncResult<order>(result, count);
},param => param.Init(Dbs.Business(), 10, page, DateTime.Parse("2022-12-28"),DateTime.Parse("2023-01-04")),out var total);
Console.WriteLine($"总条数:{total},查询条数:{result.Count}");

6. 跨库增删改

//只会往当前年库里插入
var executeAffrows = Business_Now.Insert(new order
{
    commodity_name = "iwatch",
    order_time = DateTime.Now,
    buyer_name = "张三"
}).ExecuteAffrows();
Console.WriteLine(executeAffrows);

//通过日期范围进行插入 
SharingFeatures.NoQuery<order>(noQuery =>
    {
        noQuery.Db.Insert(new order
            {
                commodity_name = "iwatch",
                order_time = DateTime.Now,
                buyer_name = "张三"
            })
            .WithTransaction(noQuery.Transaction) //可以保证跨库事务
            .ExecuteAffrows();
    },
    param => param.Init(Dbs.Business(), DateTime.Parse("2023-02-03"),
        DateTime.Parse("2023-02-03")), //只会写入到2023年的库
    //事务补偿
    (logId, dbWarp, exception) => { });

SharingFeatures.NoQuery<order>(noQuery =>
    {
        noQuery.Db.Insert(new order
            {
                commodity_name = "iwatch",
                order_time = DateTime.Now,
                buyer_name = "张三"
            })
            .WithTransaction(noQuery.Transaction) //可以保证跨库事务
            .ExecuteAffrows();
        var next = new Random().Next(2);
        if (next == 1)
        {
            throw new Exception();
        }
    },
    param => param.Init(Dbs.Business(), DateTime.Parse("2022-02-03"),
        DateTime.Parse("2023-02-03")), //2022和2023年库均写入
    // 事务补偿
    (logId, dbWarp, exception) => { })

7.跨库并行查询(不分页)

var list = await SharingCore.QueryAsync(query =>
{
      var list = query.Db.Select<order>()
      .Where(o => o.order_time.Value.BetweenEnd(query.StartTime, query.EndTime)).ToList();
       return list;
}, query => query.Init(Dbs.Business(), DateTime.Parse("2022-02-01"), DateTime.Parse("2023-05-01")));
Console.WriteLine(list.Count);

8.跨库ToOne查询

var list = await SharingCore.QueryToOneAsync(query =>
{
      var list = query.Db.Select<order>()
      .Where(o => o.id == 199).ToList();
      return list;
}, query => query.Init(Dbs.Business(), DateTime.Parse("2022-02-01"), DateTime.Parse("2023-05-01")));
Console.WriteLine(list.Count);

9.跨库Any查询

var list = await SharingCore.QueryAnyAsync(func =>
{
    var list = func.Db.Select<vehicleLudan>()
        .Where(l => l.dateLudan.Value.BetweenEnd(func.StartTime, func.EndTime))
        .Where(l => l.id = "1")
        .ToList();
    return list;
}, query => query.Init(DBAll.Business, start, end));

10.分布式事务、多库事务

var businessWarp = Dbs.Business().GetNowDbWarp();
var basicsWarp = Dbs.Basics().GetDbWarp();
using (var tran = SharingCore.Transaction(businessWarp, basicsWarp))
{
    //监听到有提交失败的库时,启动事务补偿
    tran.OnCommitFail += TransactionCompensation;
    try
    {
        tran.BeginTran();
        var orderData = new order 
        {
            buyer_name = $"事务{i}",
            commodity_name = "事务",
            order_time = DateTime.Now
        };
        var r1 = tran.Orm1.Insert(orderData).ExecuteAffrows();
		var userData = new users
        {
            name = $"事务{i}",
            password = "123",
            username = "tom"
        };
        var r2 = tran.Orm2.Insert<users>(userData).ExecuteAffrows();
        
        //随机发生异常,如果是普通异常,并且Commit之前,都可以正常回滚
        if (new Random().Next(5) == 1)
        {
            throw new Exception("业务中发生异常,所有事务都要回滚~");
        }
		//日志,用于记录日志信息,进行事务补偿
        var log = new multi_transaction_log()
        {
            content = $"{i}分布式事务测试...",
        };
        //提交事务并返回结果
        var result = tran.Commit(log);
        Console.WriteLine(result);
    }
    catch
    {
        //发生普通异常 直接回滚
        tran.Rellback();
    }
}

   //如果第一个库提交成功,其他库提交的过程中失败,那么将这里进行事务补偿
void TransactionCompensation(string logId, DbWarp dbWarp, Exception ex)
{
    //日志中有记录SQL
    var id = Convert.ToInt64(logId);
    //这里的DBWarp是日志存储所在的数据库
    var log = dbWarp.Instance.Select<multi_transaction_log>().Where(b => b.id ==
                                                                    id).ToOne();
    //拿到多库事务执行的信息
    var log_result = JsonConvert.DeserializeObject<List<TransactionsResult>>
        (log.result_msg);
    foreach (var transactionsResult in log_result)
    {
        //拿到失败Common失败的数据库
        if (transactionsResult.Successful == false)
        {
            //失败数据库的KEY
            var failDb = transactionsResult.Key;
            //获取数据库操作对象准备执行事务补偿
            var tempDb = failDb.GetFreeSql();
            //拿到在分布式事务中这个库所Common失败的SQL
            var sqls = log.exec_sql;
            var sqlsDic = JsonConvert.DeserializeObject<Dictionary<string,
            List<string>>>(sqls);
            var sqlsList = sqlsDic[failDb];
            //事务补偿,执行在Common时没有成功的SQL
            tempDb.Transaction(() => {
                foreach (var noQuerySql in sqlsList)
                {
                    tempDb.Ado.ExecuteNonQuery(noQuerySql);
                }

                if (dbWarp.Instance.Delete<multi_transaction_log>().Where(m => m.id == id).ExecuteAffrows() == 0)
                {
                    throw new Exception("如果删除日志失败,回滚SQL..");
                }
            });
            Console.WriteLine("事务补偿成功...");
        }
    }
}

About

基于FreeSql多库分布式事务、跨库查询、跨库分页查询、跨库增删改等功能

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages