diff --git a/test/e2e/ytsaurus_controller_test.go b/test/e2e/ytsaurus_controller_test.go index b6a493c0..93d990a1 100644 --- a/test/e2e/ytsaurus_controller_test.go +++ b/test/e2e/ytsaurus_controller_test.go @@ -225,32 +225,32 @@ func deleteRemoteExecNodes(ctx context.Context, remoteExecNodes *ytv1.RemoteExec } } -// func runRemoteDataNodes(remoteDataNodes *ytv1.RemoteDataNodes) { -// Expect(k8sClient.Create(ctx, remoteDataNodes)).Should(Succeed()) -// lookupKey := types.NamespacedName{Name: remoteDataNodes.Name, Namespace: remoteDataNodes.Namespace} -// Eventually(func() bool { -// createdYtsaurus := &ytv1.RemoteDataNodes{} -// err := k8sClient.Get(ctx, lookupKey, createdYtsaurus) -// return err == nil -// }, reactionTimeout, pollInterval).Should(BeTrue()) -// -// By("Checking that remote data nodes state is equal to `Running`") -// Eventually( -// func() (*ytv1.RemoteDataNodes, error) { -// nodes := &ytv1.RemoteDataNodes{} -// err := k8sClient.Get(ctx, lookupKey, nodes) -// return nodes, err -// }, -// reactionTimeout*2, -// pollInterval, -// ).Should(HaveField("Status.ReleaseStatus", ytv1.RemoteDataNodeReleaseStatusRunning)) -//} -// -// func deleteRemoteDataNodes(ctx context.Context, remoteDataNodes *ytv1.RemoteDataNodes) { -// if err := k8sClient.Delete(ctx, remoteDataNodes); err != nil { -// log.Error(err, "Deleting remote ytsaurus failed") -// } -//} +func runRemoteDataNodes(remoteDataNodes *ytv1.RemoteDataNodes) { + Expect(k8sClient.Create(ctx, remoteDataNodes)).Should(Succeed()) + lookupKey := types.NamespacedName{Name: remoteDataNodes.Name, Namespace: remoteDataNodes.Namespace} + Eventually(func() bool { + createdYtsaurus := &ytv1.RemoteDataNodes{} + err := k8sClient.Get(ctx, lookupKey, createdYtsaurus) + return err == nil + }, reactionTimeout, pollInterval).Should(BeTrue()) + + By("Checking that remote data nodes state is equal to `Running`") + Eventually( + func() (*ytv1.RemoteDataNodes, error) { + nodes := &ytv1.RemoteDataNodes{} + err := k8sClient.Get(ctx, lookupKey, nodes) + return nodes, err + }, + reactionTimeout*2, + pollInterval, + ).Should(HaveField("Status.ReleaseStatus", ytv1.RemoteDataNodeReleaseStatusRunning)) +} + +func deleteRemoteDataNodes(ctx context.Context, remoteDataNodes *ytv1.RemoteDataNodes) { + if err := k8sClient.Delete(ctx, remoteDataNodes); err != nil { + log.Error(err, "Deleting remote ytsaurus failed") + } +} func runImpossibleUpdateAndRollback(ytsaurus *ytv1.Ytsaurus, ytClient yt.Client) { name := types.NamespacedName{Name: ytsaurus.Name, Namespace: ytsaurus.Namespace} @@ -784,6 +784,69 @@ var _ = Describe("Basic test for Ytsaurus controller", func() { } }) + It("Should create ytsaurus with remote data nodes and write to a table", func() { + By("Creating a Ytsaurus resource") + ctx := context.Background() + + namespace := "remotedata" + + ytsaurus := testutil.CreateBaseYtsaurusResource(namespace) + // Ensure that no local data nodes exist, only remote ones (which will be created later). + ytsaurus.Spec.DataNodes = []ytv1.DataNodesSpec{} + g := ytconfig.NewGenerator(ytsaurus, "local") + + remoteYtsaurus := &ytv1.RemoteYtsaurus{ + ObjectMeta: metav1.ObjectMeta{ + Name: testutil.RemoteResourceName, + Namespace: namespace, + }, + Spec: ytv1.RemoteYtsaurusSpec{ + MasterConnectionSpec: ytv1.MasterConnectionSpec{ + CellTag: ytsaurus.Spec.PrimaryMasters.CellTag, + HostAddresses: []string{ + "ms-0.masters.remotedata.svc.cluster.local", + }, + }, + }, + } + + remoteNodes := &ytv1.RemoteDataNodes{ + ObjectMeta: metav1.ObjectMeta{Name: testutil.RemoteResourceName, Namespace: namespace}, + Spec: ytv1.RemoteDataNodesSpec{ + RemoteClusterSpec: &corev1.LocalObjectReference{ + Name: testutil.RemoteResourceName, + }, + CommonSpec: ytv1.CommonSpec{ + CoreImage: testutil.CoreImageFirst, + }, + DataNodesSpec: ytv1.DataNodesSpec{ + InstanceSpec: testutil.CreateDataNodeInstanceSpec(1), + }, + }, + } + + defer deleteYtsaurus(ctx, ytsaurus) + runYtsaurus(ytsaurus) + + defer deleteRemoteYtsaurus(ctx, remoteYtsaurus) + createRemoteYtsaurus(remoteYtsaurus) + + defer deleteRemoteDataNodes(ctx, remoteNodes) + runRemoteDataNodes(remoteNodes) + + By("Creating ytsaurus client") + ytClient := getYtClient(g, namespace) + By("Create a chunk") + _, err := ytClient.CreateNode(ctx, ypath.Path("//tmp/a"), yt.NodeTable, nil) + Expect(err).Should(Succeed()) + + Eventually(func(g Gomega) { + writer, err := ytClient.WriteTable(ctx, ypath.Path("//tmp/a"), nil) + g.Expect(err).ShouldNot(HaveOccurred()) + g.Expect(writer.Write(testRow{A: "123"})).Should(Succeed()) + g.Expect(writer.Commit()).Should(Succeed()) + }, reactionTimeout, pollInterval).Should(Succeed()) + }) It( "Rpc proxies should require authentication", func(ctx context.Context) {