Skip to content

Commit

Permalink
refactor: remove job repository
Browse files Browse the repository at this point in the history
  • Loading branch information
batrov committed Dec 7, 2023
1 parent e6109d3 commit e840cfc
Show file tree
Hide file tree
Showing 16 changed files with 181 additions and 205 deletions.
6 changes: 0 additions & 6 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,10 @@ func runServer(ctx context.Context, cfg *Config) error {
return fmt.Errorf("create new lineage repository: %w", err)
}

jobRepository, err := postgres.NewJobRepository(pgClient)
if err != nil {
return fmt.Errorf("create new job repository: %w", err)
}

wrkr, err := initAssetWorker(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: discoveryRepository,
AssetRepo: assetRepository,
JobRepo: jobRepository,
Logger: logger,
})
if err != nil {
Expand Down
6 changes: 0 additions & 6 deletions cli/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,11 @@ func runWorker(ctx context.Context, cfg *Config) error {
return fmt.Errorf("create new asset repository: %w", err)
}

jobRepository, err := postgres.NewJobRepository(pgClient)
if err != nil {
return fmt.Errorf("create new job repository: %w", err)
}

mgr, err := workermanager.New(ctx, workermanager.Deps{
Config: cfg.Worker,
DiscoveryRepo: elasticsearch.NewDiscoveryRepository(esClient, logger, cfg.Elasticsearch.RequestTimeout,
strings.Split(cfg.ColSearchExclusionKeywords, ",")),
AssetRepo: assetRepository,
JobRepo: jobRepository,
Logger: logger,
})
if err != nil {
Expand Down
24 changes: 0 additions & 24 deletions core/job/job.go

This file was deleted.

31 changes: 0 additions & 31 deletions core/job/service.go

This file was deleted.

4 changes: 2 additions & 2 deletions internal/store/elasticsearch/discovery_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName strin
return nil, err
}

err = repo.updateAlias(ctx, backupIndexName, "universe")
err = repo.updateAlias(ctx, backupIndexName, defaultSearchIndex)
if err != nil {
return nil, err
}
Expand All @@ -101,7 +101,7 @@ func (repo *DiscoveryRepository) SyncAssets(ctx context.Context, indexName strin
}

cleanup := func() error {
err = repo.updateAlias(ctx, indexName, "universe")
err = repo.updateAlias(ctx, indexName, defaultSearchIndex)
if err != nil {
return err
}
Expand Down
49 changes: 0 additions & 49 deletions internal/store/postgres/job_model.go

This file was deleted.

63 changes: 0 additions & 63 deletions internal/store/postgres/job_repository.go

This file was deleted.

2 changes: 1 addition & 1 deletion internal/workermanager/discovery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (m *Manager) SyncAssets(ctx context.Context, job worker.JobSpec) error {
const batchSize = 1000
service := string(job.Payload)

jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service)
jobs, err := m.worker.GetSyncJobsByService(ctx, service)
if err != nil {
return fmt.Errorf("sync asset: get sync jobs by service: %w", err)
}
Expand Down
23 changes: 4 additions & 19 deletions internal/workermanager/in_situ_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,20 @@ package workermanager
import (
"context"
"fmt"
"time"
"sync"

"github.com/goto/compass/core/asset"
"github.com/goto/compass/core/job"
)

type InSituWorker struct {
discoveryRepo DiscoveryRepository
jobRepo job.Repository
assetRepo asset.Repository
mutex sync.Mutex
}

func NewInSituWorker(deps Deps) *InSituWorker {
return &InSituWorker{
discoveryRepo: deps.DiscoveryRepo,
jobRepo: deps.JobRepo,
assetRepo: deps.AssetRepo,
}
}
Expand All @@ -40,22 +38,9 @@ func (m *InSituWorker) EnqueueDeleteAssetJob(ctx context.Context, urn string) er

func (m *InSituWorker) EnqueueSyncAssetJob(ctx context.Context, service string) error {
const batchSize = 1000
jobs, err := m.jobRepo.GetSyncJobsByService(ctx, service)
if err != nil {
return fmt.Errorf("sync asset: get sync jobs by service: %w", err)
}

if len(jobs) > 0 {
return nil // mark job as done if there's earlier job with same service
}

jobID, err := m.jobRepo.Insert(ctx, jobSyncAsset, ([]byte)(service), time.Now().UTC())
if err != nil {
return fmt.Errorf("sync asset: insert job queue: %w", err)
}
defer func() {
_ = m.jobRepo.Delete(ctx, jobID)
}()
m.mutex.Lock()
defer m.mutex.Unlock()

cleanup, err := m.discoveryRepo.SyncAssets(ctx, service)
if err != nil {
Expand Down
55 changes: 55 additions & 0 deletions internal/workermanager/mocks/worker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 1 addition & 4 deletions internal/workermanager/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/goto/compass/core/asset"
"github.com/goto/compass/core/job"
"github.com/goto/compass/pkg/worker"
"github.com/goto/compass/pkg/worker/pgq"
"github.com/goto/compass/pkg/worker/workermw"
Expand All @@ -26,7 +25,6 @@ type Manager struct {
jobManagerPort int
discoveryRepo DiscoveryRepository
assetRepo asset.Repository
jobRepo job.Repository
logger log.Logger
}

Expand All @@ -36,6 +34,7 @@ type Worker interface {
Register(typ string, h worker.JobHandler) error
Run(ctx context.Context) error
Enqueue(ctx context.Context, jobs ...worker.JobSpec) error
GetSyncJobsByService(ctx context.Context, service string) ([]worker.Job, error)
}

type Config struct {
Expand All @@ -51,7 +50,6 @@ type Deps struct {
Config Config
DiscoveryRepo DiscoveryRepository
AssetRepo asset.Repository
JobRepo job.Repository
Logger log.Logger
}

Expand All @@ -78,7 +76,6 @@ func New(ctx context.Context, deps Deps) (*Manager, error) {
jobManagerPort: cfg.JobManagerPort,
discoveryRepo: deps.DiscoveryRepo,
assetRepo: deps.AssetRepo,
jobRepo: deps.JobRepo,
logger: deps.Logger,
}, nil
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/worker/job_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ type JobProcessor interface {
// Stats returns the job statistics by job type. It includes the number of
// active and dead jobs.
Stats(ctx context.Context) ([]JobTypeStats, error)

//
GetSyncJobsByService(ctx context.Context, service string) ([]Job, error)
}

// JobTypeStats is the statistics for the job type with number of active and
Expand Down
Loading

0 comments on commit e840cfc

Please sign in to comment.