Skip to content

Commit

Permalink
feat: added MT support for regulation worker (#2831)
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-malani authored Jan 6, 2023
1 parent 333310d commit f1d3d03
Show file tree
Hide file tree
Showing 24 changed files with 563 additions and 397 deletions.
8 changes: 4 additions & 4 deletions config/backend-config/backend-config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package backendconfig

//go:generate mockgen -destination=../../mocks/config/backend-config/mock_backendconfig.go -package=mock_backendconfig github.com/rudderlabs/rudder-server/config/backend-config BackendConfig
//go:generate mockgen -destination=./mock_workspaceconfig.go -package=backendconfig -source=./backend-config.go workspaceConfig
// go:generate mockgen -destination=../../mocks/config/backend-config/mock_backendconfig.go -package=mock_backendconfig github.com/rudderlabs/rudder-server/config/backend-config BackendConfig
// go:generate mockgen -destination=./mock_workspaceconfig.go -package=backendconfig -source=./backend-config.go workspaceConfig

import (
"context"
Expand Down Expand Up @@ -63,7 +63,7 @@ type workspaceConfig interface {
SetUp() error
// Deprecated: use Identity() instead.
AccessToken() string
Get(context.Context, string) (map[string]ConfigT, error)
Get(context.Context) (map[string]ConfigT, error)
Identity() identity.Identifier
}

Expand Down Expand Up @@ -169,7 +169,7 @@ func (bc *backendConfigImpl) configUpdate(ctx context.Context, workspaces string
}
}()

sourceJSON, err = bc.workspaceConfig.Get(ctx, workspaces)
sourceJSON, err = bc.workspaceConfig.Get(ctx)
if err != nil {
statConfigBackendError.Increment()
pkgLogger.Warnf("Error fetching config from backend: %v", err)
Expand Down
20 changes: 10 additions & 10 deletions config/backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestConfigUpdate(t *testing.T) {
defer ctrl.Finish()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx), workspaces).Return(map[string]ConfigT{}, fakeError).Times(1)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{}, fakeError).Times(1)

bc := &backendConfigImpl{
workspaceConfig: wc,
Expand All @@ -216,7 +216,7 @@ func TestConfigUpdate(t *testing.T) {
defer ctrl.Finish()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx), workspaces).Return(map[string]ConfigT{workspaces: sampleBackendConfig}, nil).Times(1)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{workspaces: sampleBackendConfig}, nil).Times(1)

bc := &backendConfigImpl{
workspaceConfig: wc,
Expand All @@ -238,9 +238,9 @@ func TestConfigUpdate(t *testing.T) {
defer cancel()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx), workspaces).Return(map[string]ConfigT{workspaces: sampleBackendConfig}, nil).Times(1)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{workspaces: sampleBackendConfig}, nil).Times(1)

pubSub := pubsub.PublishSubscriber{}
var pubSub pubsub.PublishSubscriber
bc := &backendConfigImpl{
eb: &pubSub,
workspaceConfig: wc,
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestCache(t *testing.T) {
mockID = &mockIdentifier{key: workspaces, token: workspaceToken}
)
wc.EXPECT().Identity().Return(mockID).Times(1)
wc.EXPECT().Get(gomock.Any(), workspaces).
wc.EXPECT().Get(gomock.Any()).
Return(
map[string]ConfigT{sampleWorkspaceID: sampleBackendConfig},
nil,
Expand Down Expand Up @@ -440,13 +440,13 @@ func TestCache(t *testing.T) {
defer cancel()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx), workspaces).Return(map[string]ConfigT{}, errors.New("control plane down")).Times(1)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{}, errors.New("control plane down")).Times(1)
sampleBackendConfigBytes, _ := json.Marshal(map[string]ConfigT{sampleWorkspaceID: sampleBackendConfig})
unmarshalledConfig := make(map[string]ConfigT)
err = json.Unmarshal(sampleBackendConfigBytes, &unmarshalledConfig)
require.NoError(t, err)
cacheStore.EXPECT().Get(gomock.Eq(ctx)).Return(sampleBackendConfigBytes, nil).Times(1)
pubSub := pubsub.PublishSubscriber{}
var pubSub pubsub.PublishSubscriber
bc := &backendConfigImpl{
eb: &pubSub,
workspaceConfig: wc,
Expand Down Expand Up @@ -474,9 +474,9 @@ func TestCache(t *testing.T) {
defer cancel()

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Get(gomock.Eq(ctx), workspaces).Return(map[string]ConfigT{}, errors.New("control plane down")).Times(1)
wc.EXPECT().Get(gomock.Eq(ctx)).Return(map[string]ConfigT{}, errors.New("control plane down")).Times(1)
cacheStore.EXPECT().Get(gomock.Eq(ctx)).Return([]byte{}, sql.ErrNoRows).Times(1)
pubSub := pubsub.PublishSubscriber{}
var pubSub pubsub.PublishSubscriber
bc := &backendConfigImpl{
eb: &pubSub,
workspaceConfig: wc,
Expand All @@ -501,7 +501,7 @@ func TestCache(t *testing.T) {

wc := NewMockworkspaceConfig(ctrl)
wc.EXPECT().Identity().Return(mockID).Times(1)
wc.EXPECT().Get(gomock.Any(), workspaces).Return(map[string]ConfigT{sampleWorkspaceID: sampleBackendConfig}, nil).AnyTimes()
wc.EXPECT().Get(gomock.Any()).Return(map[string]ConfigT{sampleWorkspaceID: sampleBackendConfig}, nil).AnyTimes()
bc := &backendConfigImpl{
workspaceConfig: wc,
eb: pubsub.New(),
Expand Down
16 changes: 8 additions & 8 deletions config/backend-config/mock_workspaceconfig.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions config/backend-config/namespace_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ func (nc *namespaceConfig) SetUp() (err error) {
}

// Get returns sources from the workspace
func (nc *namespaceConfig) Get(ctx context.Context, workspaces string) (map[string]ConfigT, error) {
return nc.getFromAPI(ctx, workspaces)
func (nc *namespaceConfig) Get(ctx context.Context) (map[string]ConfigT, error) {
return nc.getFromAPI(ctx)
}

// getFromApi gets the workspace config from api
func (nc *namespaceConfig) getFromAPI(ctx context.Context, _ string) (map[string]ConfigT, error) {
func (nc *namespaceConfig) getFromAPI(ctx context.Context) (map[string]ConfigT, error) {
config := make(map[string]ConfigT)
if nc.namespace == "" {
return config, fmt.Errorf("namespace is not configured")
Expand Down
11 changes: 5 additions & 6 deletions config/backend-config/namespace_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ func Test_Namespace_Get(t *testing.T) {
logger.Reset()

var (
namespace = "free-us-1"
workspaceID1 = "2CCgbmvBSa8Mv81YaIgtR36M7aW"
cpRouterURL = "mockCpRouterURL"
namespace = "free-us-1"
cpRouterURL = "mockCpRouterURL"
)

be := &backendConfigServer{
Expand All @@ -69,7 +68,7 @@ func Test_Namespace_Get(t *testing.T) {
}
require.NoError(t, client.SetUp())

c, err := client.Get(context.Background(), workspaceID1)
c, err := client.Get(context.Background())
require.NoError(t, err)
require.Len(t, c, 2)

Expand All @@ -89,7 +88,7 @@ func Test_Namespace_Get(t *testing.T) {

require.NoError(t, client.SetUp())

c, err := client.Get(context.Background(), "")
c, err := client.Get(context.Background())
require.EqualError(t, err, `backend config request failed with 401: {"message":"Unauthorized"}`) // Unauthorized
require.Empty(t, c)
})
Expand All @@ -105,7 +104,7 @@ func Test_Namespace_Get(t *testing.T) {

require.NoError(t, client.SetUp())

c, err := client.Get(context.Background(), workspaceID1)
c, err := client.Get(context.Background())
require.EqualError(t, err, "backend config request failed with 404")
require.Empty(t, c)
})
Expand Down
2 changes: 1 addition & 1 deletion config/backend-config/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (*NOOP) SetUp() error {
return nil
}

func (*NOOP) Get(_ context.Context, _ string) (map[string]ConfigT, error) {
func (*NOOP) Get(_ context.Context) (map[string]ConfigT, error) {
return map[string]ConfigT{}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions config/backend-config/single_workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ func (wc *singleWorkspaceConfig) AccessToken() string {
}

// Get returns sources from the workspace
func (wc *singleWorkspaceConfig) Get(ctx context.Context, workspace string) (map[string]ConfigT, error) {
func (wc *singleWorkspaceConfig) Get(ctx context.Context) (map[string]ConfigT, error) {
if configFromFile {
return wc.getFromFile()
} else {
return wc.getFromAPI(ctx, workspace)
return wc.getFromAPI(ctx)
}
}

// getFromApi gets the workspace config from api
func (wc *singleWorkspaceConfig) getFromAPI(ctx context.Context, _ string) (map[string]ConfigT, error) {
func (wc *singleWorkspaceConfig) getFromAPI(ctx context.Context) (map[string]ConfigT, error) {
config := make(map[string]ConfigT)
if wc.configBackendURL == nil {
return config, fmt.Errorf("single workspace: config backend url is nil")
Expand Down
6 changes: 3 additions & 3 deletions config/backend-config/single_workspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSingleWorkspaceGetFromAPI(t *testing.T) {
token: token,
configBackendURL: parsedSrvURL,
}
conf, err := wc.getFromAPI(context.Background(), "")
conf, err := wc.getFromAPI(context.Background())
require.NoError(t, err)
require.Equal(t, map[string]ConfigT{sampleWorkspaceID: sampleBackendConfig}, conf)

Expand All @@ -59,7 +59,7 @@ func TestSingleWorkspaceGetFromAPI(t *testing.T) {
token: "testToken",
configBackendURL: configBackendURL,
}
conf, err := wc.getFromAPI(context.Background(), "")
conf, err := wc.getFromAPI(context.Background())
require.ErrorContains(t, err, "unsupported protocol scheme")
require.Equal(t, map[string]ConfigT{}, conf)
})
Expand All @@ -69,7 +69,7 @@ func TestSingleWorkspaceGetFromAPI(t *testing.T) {
token: "testToken",
configBackendURL: nil,
}
conf, err := wc.getFromAPI(context.Background(), "")
conf, err := wc.getFromAPI(context.Background())
require.ErrorContains(t, err, "config backend url is nil")
require.Equal(t, map[string]ConfigT{}, conf)
})
Expand Down
8 changes: 4 additions & 4 deletions mocks/config/backend-config/mock_backendconfig.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions regulation-worker/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<!-- regulation-worker needs config-backend-url & config-backend-token as env.
So, if running it locally, store `CONFIG_BACKEND_URL`, `CONFIG_BACKEND_TOKEN` &
`DEST_TRANSFORM_URL` variables with url & token in ./cmd/.env file-->
Set the following appropriately before running regulation-worker:-

1. CONFIG_BACKEND_URL
2. WORKSPACE_TOKEN (workspace secret: only required for single-tenant)
3. WORKSPACE_NAMESPACE (namespace secret: only required for multi-tenant)
4. DEST_TRANSFORM_URL (transformer url required to make downstream API call to destionations of API type.)
59 changes: 30 additions & 29 deletions regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,58 +19,59 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/delete/batch"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/delete/kvstore"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/destination"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/service"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

var pkgLogger = logger.NewLogger().Child("regulation-worker")

func init() {
initialize.Init()
backendconfig.Init()
oauth.Init()
}

func main() {
pkgLogger.Info("starting regulation-worker")
ctx, cancel := context.WithCancel(context.Background())

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

<-c
pkgLogger.Info("Starting regulation-worker")
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
err := Run(ctx)
if ctx.Err() == nil {
cancel()
close(c)
}()
Run(ctx)
}
if err != nil {
pkgLogger.Errorf("Running regulation worker: %v", err)
os.Exit(1)
}
}

func Run(ctx context.Context) {
func Run(ctx context.Context) error {
admin.Init()
stats.Default.Start(ctx)
backendconfig.Init()
if err := backendconfig.Setup(nil); err != nil {
panic(fmt.Errorf("error while setting up backend config: %v", err))
return fmt.Errorf("setting up backend config: %w", err)
}
dest := &destination.DestMiddleware{
dest := &destination.DestinationConfig{
Dest: backendconfig.DefaultBackendConfig,
}
workspaceId, err := dest.GetWorkspaceId(ctx)

deploymentType, err := deployment.GetFromEnv()
if err != nil {
panic(fmt.Errorf("error while getting workspaceId: %w", err))
return fmt.Errorf("getting deployment type: %w", err)
}
pkgLogger.Infof("Running regulation worker in %s mode", deploymentType)
backendconfig.DefaultBackendConfig.StartWithIDs(ctx, "")
backendconfig.DefaultBackendConfig.WaitForConfig(ctx)
identity := backendconfig.DefaultBackendConfig.Identity()
dest.Start(ctx)

// setting up oauth
OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete))

svc := service.JobSvc{
API: &client.JobAPI{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)},
URLPrefix: config.MustGetString("CONFIG_BACKEND_URL"),
WorkspaceToken: config.MustGetString("CONFIG_BACKEND_TOKEN"),
WorkspaceID: workspaceId,
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)},
URLPrefix: config.MustGetString("CONFIG_BACKEND_URL"),
Identity: identity,
},
DestDetail: dest,
Deleter: delete.NewRouter(
Expand All @@ -93,9 +94,9 @@ func Run(ctx context.Context) {
return l.Loop(ctx)
})()
if err != nil && !errors.Is(err, context.Canceled) {
pkgLogger.Errorf("error: %v", err)
panic(err)
return fmt.Errorf("error: %v", err)
}
return nil
}

func withLoop(svc service.JobSvc) *service.Looper {
Expand Down
Loading

0 comments on commit f1d3d03

Please sign in to comment.