Skip to content

Commit

Permalink
fix proxy impl conflict
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <[email protected]>
  • Loading branch information
shaoting-huang committed Oct 23, 2024
1 parent 1d61b60 commit b0375b8
Show file tree
Hide file tree
Showing 17 changed files with 680 additions and 22 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -274,3 +274,5 @@ replace (
)

exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b

replace github.com/milvus-io/milvus-proto/go-api/v2 => /home/shaoting/workspace/milvus-proto/go-api
33 changes: 18 additions & 15 deletions internal/distributed/proxy/httpserver/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (
// v2
const (
// --- category ---
CollectionCategory = "/collections/"
EntityCategory = "/entities/"
PartitionCategory = "/partitions/"
UserCategory = "/users/"
RoleCategory = "/roles/"
IndexCategory = "/indexes/"
AliasCategory = "/aliases/"
ImportJobCategory = "/jobs/import/"
CollectionCategory = "/collections/"
EntityCategory = "/entities/"
PartitionCategory = "/partitions/"
UserCategory = "/users/"
RoleCategory = "/roles/"
IndexCategory = "/indexes/"
AliasCategory = "/aliases/"
ImportJobCategory = "/jobs/import/"
PrivilegeGroupCategory = "/privilege_groups/"

ListAction = "list"
HasAction = "has"
Expand All @@ -37,13 +38,15 @@ const (
AdvancedSearchAction = "advanced_search"
HybridSearchAction = "hybrid_search"

UpdatePasswordAction = "update_password"
GrantRoleAction = "grant_role"
RevokeRoleAction = "revoke_role"
GrantPrivilegeAction = "grant_privilege"
RevokePrivilegeAction = "revoke_privilege"
AlterAction = "alter"
GetProgressAction = "get_progress" // deprecated, keep it for compatibility, use `/v2/vectordb/jobs/import/describe` instead
UpdatePasswordAction = "update_password"
GrantRoleAction = "grant_role"
RevokeRoleAction = "revoke_role"
GrantPrivilegeAction = "grant_privilege"
RevokePrivilegeAction = "revoke_privilege"
AlterAction = "alter"
GetProgressAction = "get_progress" // deprecated, keep it for compatibility, use `/v2/vectordb/jobs/import/describe` instead
AddPrivilegesToGroupAction = "add_privileges_to_group"
DropPrivilegesFromGroupAction = "drop_privileges_from_group"
)

const (
Expand Down
81 changes: 81 additions & 0 deletions internal/distributed/proxy/httpserver/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ func (h *HandlersV2) RegisterRoutesToV2(router gin.IRouter) {
router.POST(RoleCategory+GrantPrivilegeAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.addPrivilegeToRole))))
router.POST(RoleCategory+RevokePrivilegeAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.removePrivilegeFromRole))))

// privilege group
router.POST(PrivilegeGroupCategory+CreateAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.createPrivilegeGroup))))
router.POST(PrivilegeGroupCategory+DropAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.dropPrivilegeGroup))))
router.POST(PrivilegeGroupCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.listPrivilegeGroups))))
router.POST(PrivilegeGroupCategory+AddPrivilegesToGroupAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.addPrivilegesToGroup))))
router.POST(PrivilegeGroupCategory+DropPrivilegesFromGroupAction, timeoutMiddleware(wrapperPost(func() any { return &GrantReq{} }, wrapperTraceLog(h.dropPrivilegesFromGroup))))

router.POST(IndexCategory+ListAction, timeoutMiddleware(wrapperPost(func() any { return &CollectionNameReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.listIndexes)))))
router.POST(IndexCategory+DescribeAction, timeoutMiddleware(wrapperPost(func() any { return &IndexReq{} }, wrapperTraceLog(h.wrapperCheckDatabase(h.describeIndex)))))

Expand Down Expand Up @@ -1760,6 +1767,80 @@ func (h *HandlersV2) removePrivilegeFromRole(ctx context.Context, c *gin.Context
return h.operatePrivilegeToRole(ctx, c, anyReq.(*GrantReq), milvuspb.OperatePrivilegeType_Revoke, dbName)
}

func (h *HandlersV2) createPrivilegeGroup(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PrivilegeGroupReq)
req := &milvuspb.CreatePrivilegeGroupRequest{
GroupName: &milvuspb.PrivilegeGroupEntity{Name: httpReq.PrivilegeGroupName},
Privileges: httpReq.Privileges,
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/CreatePrivilegeGroup", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.CreatePrivilegeGroup(reqCtx, req.(*milvuspb.CreatePrivilegeGroupRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) dropPrivilegeGroup(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PrivilegeGroupReq)
req := &milvuspb.DropPrivilegeGroupRequest{
GroupName: &milvuspb.PrivilegeGroupEntity{Name: httpReq.PrivilegeGroupName},
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropPrivilegeGroup", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropPrivilegeGroup(reqCtx, req.(*milvuspb.DropPrivilegeGroupRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) listPrivilegeGroups(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
req := &milvuspb.ListPrivilegeGroupsRequest{}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/ListPrivilegeGroups", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.ListPrivilegeGroups(reqCtx, req.(*milvuspb.ListPrivilegeGroupsRequest))
})
if err == nil {
groupNames := []string{}
for _, group := range resp.(*milvuspb.ListPrivilegeGroupsResponse).Groups {
groupNames = append(groupNames, group.Name)
}
HTTPReturn(c, http.StatusOK, wrapperReturnList(groupNames))
}
return resp, err
}

func (h *HandlersV2) addPrivilegesToGroup(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PrivilegeGroupReq)
req := &milvuspb.AddPrivilegesToGroupRequest{
GroupName: &milvuspb.PrivilegeGroupEntity{Name: httpReq.PrivilegeGroupName},
Privileges: httpReq.Privileges,
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/AddPrivilegesToGroup", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.AddPrivilegesToGroup(reqCtx, req.(*milvuspb.AddPrivilegesToGroupRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) dropPrivilegesFromGroup(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
httpReq := anyReq.(*PrivilegeGroupReq)
req := &milvuspb.DropPrivilegesFromGroupRequest{
GroupName: &milvuspb.PrivilegeGroupEntity{Name: httpReq.PrivilegeGroupName},
Privileges: httpReq.Privileges,
}
resp, err := wrapperProxy(ctx, c, req, h.checkAuth, false, "/milvus.proto.milvus.MilvusService/DropPrivilegesFromGroups", func(reqCtx context.Context, req any) (interface{}, error) {
return h.proxy.DropPrivilegesFromGroup(reqCtx, req.(*milvuspb.DropPrivilegesFromGroupRequest))
})
if err == nil {
HTTPReturn(c, http.StatusOK, wrapperReturnDefault())
}
return resp, err
}

func (h *HandlersV2) listIndexes(ctx context.Context, c *gin.Context, anyReq any, dbName string) (interface{}, error) {
collectionGetter, _ := anyReq.(requestutil.CollectionNameGetter)
indexNames := []string{}
Expand Down
7 changes: 7 additions & 0 deletions internal/distributed/proxy/httpserver/request_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gin-gonic/gin"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/pkg/util/merr"
)

Expand Down Expand Up @@ -273,6 +274,12 @@ func (req *RoleReq) GetRoleName() string {
return req.RoleName
}

type PrivilegeGroupReq struct {
PrivilegeGroupName string `json:"privilegeGroupName" binding:"required"`
Privileges []*milvuspb.PrivilegeEntity `json:"privileges"`
DbName string `json:"dbName"`
}

type GrantReq struct {
RoleName string `json:"roleName" binding:"required"`
ObjectType string `json:"objectType" binding:"required"`
Expand Down
20 changes: 20 additions & 0 deletions internal/distributed/proxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,26 @@ func (s *Server) RestoreRBAC(ctx context.Context, req *milvuspb.RestoreRBACMetaR
return s.proxy.RestoreRBAC(ctx, req)
}

func (s *Server) CreatePrivilegeGroup(ctx context.Context, req *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
return s.proxy.CreatePrivilegeGroup(ctx, req)
}

func (s *Server) DropPrivilegeGroup(ctx context.Context, req *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) {
return s.proxy.DropPrivilegeGroup(ctx, req)
}

func (s *Server) ListPrivilegeGroups(ctx context.Context, req *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) {
return s.proxy.ListPrivilegeGroups(ctx, req)
}

func (s *Server) addPrivilegesToGroup(ctx context.Context, req *milvuspb.AddPrivilegesToGroupRequest) (*commonpb.Status, error) {
return s.proxy.AddPrivilegesToGroup(ctx, req)
}

func (s *Server) dropPrivilegesFromGroup(ctx context.Context, req *milvuspb.DropPrivilegesFromGroupRequest) (*commonpb.Status, error) {
return s.proxy.DropPrivilegesFromGroup(ctx, req)
}

func (s *Server) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.RefreshPolicyInfoCacheRequest) (*commonpb.Status, error) {
return s.proxy.RefreshPolicyInfoCache(ctx, req)
}
Expand Down
20 changes: 20 additions & 0 deletions internal/distributed/rootcoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,3 +550,23 @@ func (s *Server) BackupRBAC(ctx context.Context, request *milvuspb.BackupRBACMet
func (s *Server) RestoreRBAC(ctx context.Context, request *milvuspb.RestoreRBACMetaRequest) (*commonpb.Status, error) {
return s.rootCoord.RestoreRBAC(ctx, request)
}

func (s *Server) CreatePrivilegeGroup(ctx context.Context, request *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) {
return s.rootCoord.CreatePrivilegeGroup(ctx, request)
}

func (s *Server) DropPrivilegeGroup(ctx context.Context, request *milvuspb.DropPrivilegeGroupRequest) (*commonpb.Status, error) {
return s.rootCoord.DropPrivilegeGroup(ctx, request)
}

func (s *Server) ListPrivilegeGroups(ctx context.Context, request *milvuspb.ListPrivilegeGroupsRequest) (*milvuspb.ListPrivilegeGroupsResponse, error) {
return s.rootCoord.ListPrivilegeGroups(ctx, request)
}

func (s *Server) addPrivilegesToGroup(ctx context.Context, request *milvuspb.AddPrivilegesToGroupRequest) (*commonpb.Status, error) {
return s.rootCoord.AddPrivilegesToGroup(ctx, request)
}

func (s *Server) dropPrivilegesFromGroup(ctx context.Context, request *milvuspb.DropPrivilegesFromGroupRequest) (*commonpb.Status, error) {
return s.rootCoord.DropPrivilegesFromGroup(ctx, request)
}
5 changes: 5 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ type RootCoordCatalog interface {
BackupRBAC(ctx context.Context, tenant string) (*milvuspb.RBACMeta, error)
RestoreRBAC(ctx context.Context, tenant string, meta *milvuspb.RBACMeta) error

GetPrivilegeGroup(ctx context.Context, groupName string) ([]*milvuspb.PrivilegeEntity, error)
DropPrivilegeGroup(ctx context.Context, groupName string) error
AlterPrivilegeGroup(ctx context.Context, groupName string, privileges []*milvuspb.PrivilegeEntity) error
ListPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupEntity, error)

Close()
}

Expand Down
81 changes: 81 additions & 0 deletions internal/metastore/kv/rootcoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,87 @@ func (kc *Catalog) RestoreRBAC(ctx context.Context, tenant string, meta *milvusp
return err
}

func (kc *Catalog) GetPrivilegeGroup(ctx context.Context, groupName string) ([]*milvuspb.PrivilegeEntity, error) {
k := BuildPrivilegeGroupkey(groupName)
data, err := kc.Txn.Load(k)
if err != nil {
if errors.Is(err, merr.ErrIoKeyNotFound) {
return nil, fmt.Errorf("privilege group [%s] does not exist", groupName)
}
log.Error("failed to load privilege group", zap.String("group", groupName), zap.Error(err))
return nil, err
}
var privilegeNames []string
err = json.Unmarshal([]byte(data), &privilegeNames)
if err != nil {
log.Error("failed to unmarshal privilege group data", zap.String("group", groupName), zap.Error(err))
return nil, err
}

var privileges []*milvuspb.PrivilegeEntity
for _, name := range privilegeNames {
privileges = append(privileges, &milvuspb.PrivilegeEntity{Name: name})
}
return privileges, nil
}

func (kc *Catalog) DropPrivilegeGroup(ctx context.Context, groupName string) error {
k := BuildPrivilegeGroupkey(groupName)
err := kc.Txn.Remove(k)
if err != nil {
log.Warn("fail to drop privilege group", zap.String("key", k), zap.Error(err))
return err
}
return nil
}

func (kc *Catalog) AlterPrivilegeGroup(ctx context.Context, groupName string, privileges []*milvuspb.PrivilegeEntity) error {
var privilegeNames []string
for _, privilege := range privileges {
privilegeNames = append(privilegeNames, privilege.Name)
}

privilegeData, err := json.Marshal(privilegeNames)
if err != nil {
log.Error("failed to marshal privileges", zap.String("group", groupName), zap.Error(err))
return err
}
k := BuildPrivilegeGroupkey(groupName)
hasKey, err := kc.Txn.Has(k)
if hasKey {
err = kc.Txn.Remove(k)
}
err = kc.Txn.Save(k, string(privilegeData))
if err != nil {
log.Warn("fail to put privilege group", zap.String("key", k), zap.Error(err))
return err
}
return nil
}

func (kc *Catalog) ListPrivilegeGroups(ctx context.Context) ([]*milvuspb.PrivilegeGroupEntity, error) {
var privilegeGroups []*milvuspb.PrivilegeGroupEntity

keys, _, err := kc.Txn.LoadWithPrefix(PrivilegeGroupPrefix)
if err != nil {
log.Error("failed to list privilege groups", zap.String("prefix", PrivilegeGroupPrefix), zap.Error(err))
return nil, err
}

for _, key := range keys {
groupName := typeutil.AfterN(key, PrivilegeGroupPrefix+"/", "/")
if len(groupName) != 1 {
log.Warn("invalid privilege group key", zap.String("string", key), zap.String("sub_string", PrivilegeGroupPrefix))
continue
}

privilegeGroups = append(privilegeGroups, &milvuspb.PrivilegeGroupEntity{
Name: groupName[0],
})
}
return privilegeGroups, nil
}

func (kc *Catalog) Close() {
// do nothing
}
Expand Down
7 changes: 7 additions & 0 deletions internal/metastore/kv/rootcoord/rootcoord_constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (

// GranteeIDPrefix prefix for mapping among privilege and grantor
GranteeIDPrefix = ComponentPrefix + CommonCredentialPrefix + "/grantee-id"

// PrivilegeGroupPrefix prefix for privilege group
PrivilegeGroupPrefix = ComponentPrefix + "/privilege-group"
)

func BuildDatabasePrefixWithDBID(dbID int64) string {
Expand All @@ -70,3 +73,7 @@ func getDatabasePrefix(dbID int64) string {
}
return CollectionMetaPrefix
}

func BuildPrivilegeGroupkey(groupName string) string {
return fmt.Sprintf("%s/%s", PrivilegeGroupPrefix, groupName)
}
5 changes: 5 additions & 0 deletions internal/proto/root_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ service RootCoord {
rpc ListPolicy(internal.ListPolicyRequest) returns (internal.ListPolicyResponse) {}
rpc BackupRBAC(milvus.BackupRBACMetaRequest) returns (milvus.BackupRBACMetaResponse){}
rpc RestoreRBAC(milvus.RestoreRBACMetaRequest) returns (common.Status){}
rpc CreatePrivilegeGroup(milvus.CreatePrivilegeGroupRequest) returns (common.Status) {}
rpc DropPrivilegeGroup(milvus.DropPrivilegeGroupRequest) returns (common.Status) {}
rpc ListPrivilegeGroups(milvus.ListPrivilegeGroupsRequest) returns (milvus.ListPrivilegeGroupsResponse) {}
rpc AddPrivilegesToGroup(milvus.AddPrivilegesToGroupRequest) returns (common.Status) {}
rpc DropPrivilegesFromGroup(milvus.DropPrivilegesFromGroupRequest) returns (common.Status) {}

rpc CheckHealth(milvus.CheckHealthRequest) returns (milvus.CheckHealthResponse) {}

Expand Down
Loading

0 comments on commit b0375b8

Please sign in to comment.