Skip to content

Commit

Permalink
修复点赞和关注的并发不一致问题,取消ETCD限流配置
Browse files Browse the repository at this point in the history
  • Loading branch information
0202zc committed Feb 26, 2023
1 parent ec84317 commit b60b62e
Show file tree
Hide file tree
Showing 14 changed files with 278 additions and 65 deletions.
3 changes: 1 addition & 2 deletions cmd/comment/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/middleware"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/viper"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
Expand Down Expand Up @@ -47,7 +46,7 @@ func main() {
server.WithMiddleware(middleware.CommonMiddleware),
server.WithMiddleware(middleware.ServerMiddleware),
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
//server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
3 changes: 1 addition & 2 deletions cmd/favorite/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/middleware"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/viper"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
Expand Down Expand Up @@ -49,7 +48,7 @@ func main() {
server.WithMiddleware(middleware.CommonMiddleware),
server.WithMiddleware(middleware.ServerMiddleware),
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
//server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
3 changes: 2 additions & 1 deletion cmd/favorite/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
amqp "github.com/rabbitmq/amqp091-go"
"strings"
"time"
)

// FavoriteServiceImpl implements the last service interface defined in the IDL.
Expand All @@ -39,7 +40,7 @@ func (s *FavoriteServiceImpl) FavoriteAction(ctx context.Context, req *favorite.
VideoID: uint(req.VideoId),
UserID: uint(userID),
ActionType: uint(req.ActionType),
//CreatedAt: time.Now(),
CreatedAt: uint(time.Now().UnixMilli()),
}
jsonFC, _ := json.Marshal(fc)
fmt.Println("Publish new message: ", fc)
Expand Down
23 changes: 22 additions & 1 deletion cmd/favorite/service/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,50 @@ import (
const frequency = 10

// 点赞服务消息队列消费者
func consume() {
func consume() error {
msgs, err := FavoriteMq.ConsumeSimple()
if err != nil {
fmt.Println(err.Error())
logger.Errorf("FavoriteMQ Err: %s", err.Error())
}
// 将消息队列的消息全部取出
for msg := range msgs {
//err := redis.LockByMutex(context.Background(), redis.FavoriteMutex)
//if err != nil {
// logger.Errorf("Redis mutex lock error: %s", err.Error())
// return err
//}
fc := new(redis.FavoriteCache)
// 解析json
if err = json.Unmarshal(msg.Body, &fc); err != nil {
logger.Errorf("json unmarshal error: %s", err.Error())
fmt.Println("json unmarshal error:" + err.Error())
//err = redis.UnlockByMutex(context.Background(), redis.FavoriteMutex)
//if err != nil {
// logger.Errorf("Redis mutex unlock error: %s", err.Error())
// return err
//}
continue
}
fmt.Printf("==> Get new message: %v\n", fc)
// 将结构体存入redis
if err = redis.UpdateFavorite(context.Background(), fc); err != nil {
logger.Errorf("json unmarshal error: %s", err.Error())
fmt.Println("json unmarshal error:" + err.Error())
//err = redis.UnlockByMutex(context.Background(), redis.FavoriteMutex)
//if err != nil {
// logger.Errorf("Redis mutex unlock error: %s", err.Error())
// return err
//}
continue
}
//err = redis.UnlockByMutex(context.Background(), redis.FavoriteMutex)
//if err != nil {
// logger.Errorf("Redis mutex unlock error: %s", err.Error())
// return err
//}
}
return nil
}

// gocron定时任务,每隔一段时间就让Consumer消费消息队列的所有消息
Expand Down
3 changes: 1 addition & 2 deletions cmd/message/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/middleware"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/viper"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
Expand Down Expand Up @@ -48,7 +47,7 @@ func main() {
server.WithMiddleware(middleware.CommonMiddleware),
server.WithMiddleware(middleware.ServerMiddleware),
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
//server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
3 changes: 1 addition & 2 deletions cmd/relation/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"net"

"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"

"github.com/bytedance-youthcamp-jbzx/tiktok/cmd/relation/service"
Expand Down Expand Up @@ -49,7 +48,7 @@ func main() {
server.WithMiddleware(middleware.CommonMiddleware),
server.WithMiddleware(middleware.ServerMiddleware),
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
//server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
3 changes: 2 additions & 1 deletion cmd/relation/service/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
amqp "github.com/rabbitmq/amqp091-go"
"strings"
"time"
)

// RelationServiceImpl implements the last service interface defined in the IDL.
Expand Down Expand Up @@ -67,7 +68,7 @@ func (s *RelationServiceImpl) RelationAction(ctx context.Context, req *relation.
UserID: uint(userID),
ToUserID: uint(toUserID),
ActionType: uint(req.ActionType),
//CreatedAt: time.Now(),
CreatedAt: uint(time.Now().UnixMilli()),
}
jsonRc, _ := json.Marshal(relationCache)
if err = RelationMq.PublishSimple(ctx, jsonRc); err != nil {
Expand Down
20 changes: 20 additions & 0 deletions cmd/relation/service/timer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,40 @@ func consume() error {
}
// 将消息队列的消息全部取出
for msg := range msgs {
//err := redis.LockByMutex(context.Background(), redis.RelationMutex)
//if err != nil {
// logger.Errorf("Redis mutex lock error: %s", err.Error())
// return err
//}
rc := new(redis.RelationCache)
// 解析json
if err = json.Unmarshal(msg.Body, &rc); err != nil {
fmt.Println("json unmarshal error:" + err.Error())
logger.Errorf("RelationMQ Err: %s", err.Error())
//err = redis.UnlockByMutex(context.Background(), redis.RelationMutex)
//if err != nil {
// logger.Errorf("Redis mutex unlock error: %s", err.Error())
// return err
//}
continue
}
fmt.Printf("==> Get new message: %v\n", rc)
// 将结构体存入redis
if err = redis.UpdateRelation(context.Background(), rc); err != nil {
fmt.Println("add to redis error:" + err.Error())
logger.Errorf("RelationMQ Err: %s", err.Error())
//err = redis.UnlockByMutex(context.Background(), redis.RelationMutex)
//if err != nil {
// logger.Errorf("Redis mutex unlock error: %s", err.Error())
// return err
//}
continue
}
//err = redis.UnlockByMutex(context.Background(), redis.RelationMutex)
//if err != nil {
// logger.Errorf("Redis mutex unlock error: %s", err.Error())
// return err
//}
}
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/user/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/middleware"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/viper"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
Expand Down Expand Up @@ -48,7 +47,7 @@ func main() {
server.WithMiddleware(middleware.CommonMiddleware),
server.WithMiddleware(middleware.ServerMiddleware),
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
//server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
//server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
3 changes: 1 addition & 2 deletions cmd/video/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/middleware"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/viper"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/cloudwego/kitex/pkg/limit"
"github.com/cloudwego/kitex/pkg/rpcinfo"
"github.com/cloudwego/kitex/server"
)
Expand Down Expand Up @@ -48,7 +47,7 @@ func main() {
server.WithMiddleware(middleware.CommonMiddleware),
server.WithMiddleware(middleware.ServerMiddleware),
server.WithRegistry(r),
server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
//server.WithLimit(&limit.Option{MaxConnections: 1000, MaxQPS: 100}),
server.WithMuxTransport(),
// server.WithSuite(tracing.NewServerSuite()),
server.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
Expand Down
52 changes: 37 additions & 15 deletions dal/redis/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package redis
import (
"context"
"errors"
"fmt"
"github.com/bytedance-youthcamp-jbzx/tiktok/dal/db"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/gocron"
"github.com/bytedance-youthcamp-jbzx/tiktok/pkg/zap"
"github.com/go-redsync/redsync/v4"
"strconv"
"strings"
"time"
)

const frequency = 10
Expand Down Expand Up @@ -42,6 +44,19 @@ func deleteKeys(ctx context.Context, key string, mutex *redsync.Mutex) error {
return nil
}

func setKey(ctx context.Context, key string, value string, expireTime time.Duration, mutex *redsync.Mutex) error {
fmt.Println(key, " => ", value)
_, err := GetRedisHelper().Set(ctx, key, value, expireTime).Result()
errUnlock := UnlockByMutex(ctx, mutex)
if errUnlock != nil {
return errors.New("unlock failed: " + errUnlock.Error())
}
if err != nil {
return errors.New("Redis set key failed: " + err.Error())
}
return nil
}

func FavoriteMoveToDB() error {
logger := zap.InitLogger()

Expand All @@ -52,12 +67,17 @@ func FavoriteMoveToDB() error {
return err
}
for _, key := range keys {
actionType, err := GetRedisHelper().Get(ctx, key).Result()
LockByMutex(ctx, FavoriteMutex)
res, err := GetRedisHelper().Get(ctx, key).Result()
UnlockByMutex(ctx, FavoriteMutex)
if err != nil {
logger.Errorln(err.Error())
return err
}
// 拆分得key
// 拆分得 value
vSplit := strings.Split(res, "::")
_, redisAt := vSplit[0], vSplit[1]
// 拆分得 key
kSplit := strings.Split(key, "::")
vid, uid := kSplit[1], kSplit[3]
videoID, err := strconv.ParseInt(vid, 10, 64)
Expand All @@ -83,7 +103,7 @@ func FavoriteMoveToDB() error {
return err
}
if v == nil || u == nil {
delErr := deleteKeys(ctx, key, favoriteMutex)
delErr := deleteKeys(ctx, key, FavoriteMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -96,7 +116,7 @@ func FavoriteMoveToDB() error {
if err != nil {
logger.Errorln(err.Error())
return err
} else if favorite == nil && actionType == "1" {
} else if favorite == nil && redisAt == "1" {
// 数据库中没有该点赞记录,且最终状态为点赞,则插入数据库
video, err := db.GetVideoById(ctx, videoID)
if err != nil {
Expand All @@ -105,7 +125,7 @@ func FavoriteMoveToDB() error {
}
err = db.CreateVideoFavorite(ctx, userID, videoID, int64(video.AuthorID))
// 插入后,删除Redis中对应记录
delErr := deleteKeys(ctx, key, favoriteMutex)
delErr := deleteKeys(ctx, key, FavoriteMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -114,7 +134,7 @@ func FavoriteMoveToDB() error {
logger.Errorln(err.Error())
return err
}
} else if favorite != nil && actionType == "2" {
} else if favorite != nil && redisAt == "2" {
// 数据库中有该点赞记录,且最终状态为取消点赞,则从数据库中删除该记录
video, err := db.GetVideoById(ctx, videoID)
if err != nil {
Expand All @@ -123,7 +143,7 @@ func FavoriteMoveToDB() error {
}
err = db.DelFavoriteByUserVideoID(ctx, userID, videoID, int64(video.AuthorID))
// 插入后,删除Redis中对应记录
delErr := deleteKeys(ctx, key, favoriteMutex)
delErr := deleteKeys(ctx, key, FavoriteMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -135,7 +155,7 @@ func FavoriteMoveToDB() error {
} else {
// 其他情况
// 插入后,删除Redis中对应记录
delErr := deleteKeys(ctx, key, favoriteMutex)
delErr := deleteKeys(ctx, key, FavoriteMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -155,7 +175,9 @@ func RelationMoveToDB() error {
return err
}
for _, key := range keys {
actionType, err := GetRedisHelper().Get(ctx, key).Result()
res, err := GetRedisHelper().Get(ctx, key).Result()
vSplit := strings.Split(res, "::")
_, redisAt := vSplit[0], vSplit[1]
if err != nil {
logger.Errorln(err.Error())
return err
Expand Down Expand Up @@ -186,7 +208,7 @@ func RelationMoveToDB() error {
return err
}
if u == nil || tu == nil {
delErr := deleteKeys(ctx, key, favoriteMutex)
delErr := deleteKeys(ctx, key, RelationMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -199,11 +221,11 @@ func RelationMoveToDB() error {
if err != nil {
logger.Errorln(err.Error())
return err
} else if relation == nil && actionType == "1" {
} else if relation == nil && redisAt == "1" {
// 数据库中没有该关注记录,且最终状态为关注,则插入数据库
err = db.CreateRelation(ctx, userID, toUserID)
// 插入后,删除Redis中对应记录
delErr := deleteKeys(ctx, key, relationMutex)
delErr := deleteKeys(ctx, key, RelationMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -212,11 +234,11 @@ func RelationMoveToDB() error {
logger.Errorln(err.Error())
return err
}
} else if relation != nil && actionType == "2" {
} else if relation != nil && redisAt == "2" {
// 数据库中有该关注记录,且最终状态为取消关注,则从数据库中删除该记录
err = db.DelRelationByUserIDs(ctx, userID, toUserID)
// 删除Redis中对应记录
delErr := deleteKeys(ctx, key, relationMutex)
delErr := deleteKeys(ctx, key, RelationMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand All @@ -227,7 +249,7 @@ func RelationMoveToDB() error {
}
}
// 删除Redis中对应记录
delErr := deleteKeys(ctx, key, relationMutex)
delErr := deleteKeys(ctx, key, RelationMutex)
if delErr != nil {
logger.Errorln(delErr.Error())
return delErr
Expand Down
Loading

0 comments on commit b60b62e

Please sign in to comment.