Skip to content

Commit

Permalink
graph backend KIKIMR-18277 (#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
adameat authored Dec 28, 2023
1 parent d0fceb7 commit e05b692
Show file tree
Hide file tree
Showing 72 changed files with 2,413 additions and 49 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ __pycache__/
*_pb2.py
*_pb2_grpc.py
*_pb2.pyi
*.pb.h
*.pb.cc

# MacOS specific
.DS_Store
Expand Down
1 change: 1 addition & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ struct TKikimrEvents : TEvents {
ES_DB_METADATA_CACHE,
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
};
};

Expand Down
7 changes: 7 additions & 0 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/helpers/pool_stats_collector.h>

#include <ydb/core/graph/api/service.h>
#include <ydb/core/graph/api/events.h>

namespace NKikimr {

// Periodically collects stats from executor threads and exposes them as mon counters
Expand Down Expand Up @@ -44,11 +47,15 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32>> pools;
double cpuUsage = 0;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
cpuUsage += pool.Usage;
}

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));

ctx.Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvSendMetrics("cpu_usage", cpuUsage));
}

private:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/base/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ PEERDIR(
ydb/core/base/services
ydb/core/debug
ydb/core/erasure
ydb/core/graph/api
ydb/core/protos
ydb/core/protos/out
ydb/core/scheme
Expand Down
1 change: 1 addition & 0 deletions ydb/core/cms/console/console_tenants_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ class TSubDomainManip : public TActorBootstrapped<TSubDomainManip> {
subdomain.SetName(Subdomain.second);
if (Tenant->IsExternalSubdomain) {
subdomain.SetExternalSchemeShard(true);
subdomain.SetGraphShard(true);
if (Tenant->IsExternalHive) {
subdomain.SetExternalHive(true);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ union TBasicKikimrServicesMask {
// next 64 flags

bool EnableDatabaseMetadataCache:1;
bool EnableGraphService:1;
};

struct {
Expand Down
16 changes: 15 additions & 1 deletion ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/wilson/wilson_uploader.h>

#include <ydb/core/graph/api/service.h>
#include <ydb/core/graph/api/shard.h>

#include <library/cpp/logger/global/global.h>
#include <library/cpp/logger/log.h>

Expand Down Expand Up @@ -1021,7 +1024,7 @@ void TLocalServiceInitializer::InitializeServices(
addToLocalConfig(TTabletTypes::ReplicationController, &NReplication::CreateController, TMailboxType::ReadAsFilled, appData->UserPoolId);
addToLocalConfig(TTabletTypes::BlobDepot, &NBlobDepot::CreateBlobDepot, TMailboxType::ReadAsFilled, appData->UserPoolId);
addToLocalConfig(TTabletTypes::StatisticsAggregator, &NStat::CreateStatisticsAggregator, TMailboxType::ReadAsFilled, appData->UserPoolId);

addToLocalConfig(TTabletTypes::GraphShard, &NGraph::CreateGraphShard, TMailboxType::ReadAsFilled, appData->UserPoolId);

TTenantPoolConfig::TPtr tenantPoolConfig = new TTenantPoolConfig(Config.GetTenantPoolConfig(), localConfig);
if (!tenantPoolConfig->IsEnabled && !tenantPoolConfig->StaticSlots.empty())
Expand Down Expand Up @@ -2666,5 +2669,16 @@ void TDatabaseMetadataCacheInitializer::InitializeServices(NActors::TActorSystem
TActorSetupCmd(CreateDatabaseMetadataCache(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId));
}

TGraphServiceInitializer::TGraphServiceInitializer(const TKikimrRunConfig& runConfig)
: IKikimrServicesInitializer(runConfig)
{
}

void TGraphServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) {
setup->LocalServices.emplace_back(
NGraph::MakeGraphServiceId(),
TActorSetupCmd(NGraph::CreateGraphService(appData->TenantName), TMailboxType::HTSwap, appData->UserPoolId));
}

} // namespace NKikimrServicesInitializers
} // namespace NKikimr
8 changes: 8 additions & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,5 +605,13 @@ class TDatabaseMetadataCacheInitializer : public IKikimrServicesInitializer {

void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

class TGraphServiceInitializer : public IKikimrServicesInitializer {
public:
TGraphServiceInitializer(const TKikimrRunConfig& runConfig);

void InitializeServices(NActors::TActorSystemSetup* setup, const NKikimr::TAppData* appData) override;
};

} // namespace NKikimrServicesInitializers
} // namespace NKikimr
4 changes: 4 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1614,6 +1614,10 @@ TIntrusivePtr<TServiceInitializersList> TKikimrRunner::CreateServiceInitializers
sil->AddServiceInitializer(new TDatabaseMetadataCacheInitializer(runConfig));
}

if (serviceMask.EnableGraphService) {
sil->AddServiceInitializer(new TGraphServiceInitializer(runConfig));
}

return sil;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ PEERDIR(
ydb/core/formats
ydb/core/fq/libs/init
ydb/core/fq/libs/logs
ydb/core/graph/service
ydb/core/graph/shard
ydb/core/grpc_services
ydb/core/grpc_services/base
ydb/core/grpc_services/auth_processor
Expand Down
48 changes: 48 additions & 0 deletions ydb/core/graph/api/events.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once

#include <ydb/core/base/events.h>
#include <ydb/core/graph/protos/graph.pb.h>

namespace NKikimr {
namespace NGraph {

struct TEvGraph {
enum EEv {
// requests
EvSendMetrics = EventSpaceBegin(TKikimrEvents::ES_GRAPH),
EvGetMetrics,
EvMetricsResult,
EvEnd
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_GRAPH), "expect EvEnd < EventSpaceEnd(TKikimrEvents::ES_GRAPH)");

struct TEvSendMetrics : TEventPB<TEvSendMetrics, NKikimrGraph::TEvSendMetrics, EvSendMetrics> {
TEvSendMetrics() = default;

TEvSendMetrics(const TString& name, double value) {
NKikimrGraph::TMetric* metric = Record.AddMetrics();
metric->SetName(name);
metric->SetValue(value);
}
};

struct TEvGetMetrics : TEventPB<TEvGetMetrics, NKikimrGraph::TEvGetMetrics, EvGetMetrics> {
TEvGetMetrics() = default;

TEvGetMetrics(const NKikimrGraph::TEvGetMetrics& request)
: TEventPB<TEvGetMetrics, NKikimrGraph::TEvGetMetrics, EvGetMetrics>(request)
{}
};

struct TEvMetricsResult : TEventPB<TEvMetricsResult, NKikimrGraph::TEvMetricsResult, EvMetricsResult> {
TEvMetricsResult() = default;

TEvMetricsResult(NKikimrGraph::TEvMetricsResult&& result)
: TEventPB<TEvMetricsResult, NKikimrGraph::TEvMetricsResult, EvMetricsResult>(std::move(result))
{}
};
};

} // NGraph
} // NKikimr
18 changes: 18 additions & 0 deletions ydb/core/graph/api/service.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include <ydb/library/actors/core/actor.h>

namespace NKikimr {
namespace NGraph {

using namespace NActors;

inline TActorId MakeGraphServiceId(ui32 node = 0) {
char x[12] = {'g','r','a','p','h','s', 'v', 'c'};
return TActorId(node, TStringBuf(x, 12));
}

IActor* CreateGraphService(const TString& database);

} // NGraph
} // NKikimr
14 changes: 14 additions & 0 deletions ydb/core/graph/api/shard.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <ydb/library/actors/core/actor.h>
#include <ydb/core/base/blobstorage.h>

namespace NKikimr {
namespace NGraph {

using namespace NActors;

IActor* CreateGraphShard(const TActorId& tablet, TTabletStorageInfo* info);

} // NGraph
} // NKikimr
18 changes: 18 additions & 0 deletions ydb/core/graph/api/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
LIBRARY()

OWNER(
xenoxeno
g:kikimr
)

SRCS(
events.h
service.h
shard.h
)

PEERDIR(
ydb/core/graph/protos
)

END()
31 changes: 31 additions & 0 deletions ydb/core/graph/protos/graph.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
syntax = "proto3";

package NKikimrGraph;

option java_package = "ru.yandex.kikimr.proto";

message TMetric {
string Name = 1;
double Value = 2;
}

message TEvSendMetrics {
repeated TMetric Metrics = 1;
}

message TEvGetMetrics {
optional uint64 TimeFrom = 1;
optional uint64 TimeTo = 2;
repeated string Metrics = 3;
optional uint32 MaxPoints = 4;
}

message TMetricData {
repeated double Values = 1 [packed = true];
}

message TEvMetricsResult {
repeated uint64 Time = 1 [packed = true];
repeated TMetricData Data = 2;
string Error = 3;
}
14 changes: 14 additions & 0 deletions ydb/core/graph/protos/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
PROTO_LIBRARY()

OWNER(
xenoxeno
g:kikimr
)

SRCS(
graph.proto
)

EXCLUDE_TAGS(GO_PROTO)

END()
25 changes: 25 additions & 0 deletions ydb/core/graph/service/log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#if defined BLOG_D || defined BLOG_I || defined BLOG_ERROR || defined BLOG_TRACE
#error log macro definition clash
#endif

#include <util/generic/string.h>
#include <ydb/library/actors/core/log.h>

namespace NKikimr {
namespace NGraph {

TString GetLogPrefix();

}
}

#define BLOG_D(stream) ALOG_DEBUG(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_I(stream) ALOG_INFO(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_W(stream) ALOG_WARN(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_NOTICE(stream) ALOG_NOTICE(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_ERROR(stream) ALOG_ERROR(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_CRIT(stream) ALOG_CRIT(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define BLOG_TRACE(stream) ALOG_TRACE(NKikimrServices::GRAPH, GetLogPrefix() << stream)
#define Y_ENSURE_LOG(cond, stream) if (!(cond)) { BLOG_ERROR("Failed condition \"" << #cond << "\" " << stream); }
Loading

0 comments on commit e05b692

Please sign in to comment.