Skip to content

Commit

Permalink
Added e2e for remote Data node
Browse files Browse the repository at this point in the history
  • Loading branch information
qurname2 committed Aug 30, 2024
1 parent 2580781 commit 24bdf21
Showing 1 changed file with 89 additions and 26 deletions.
115 changes: 89 additions & 26 deletions test/e2e/ytsaurus_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,32 +225,32 @@ func deleteRemoteExecNodes(ctx context.Context, remoteExecNodes *ytv1.RemoteExec
}
}

// func runRemoteDataNodes(remoteDataNodes *ytv1.RemoteDataNodes) {
// Expect(k8sClient.Create(ctx, remoteDataNodes)).Should(Succeed())
// lookupKey := types.NamespacedName{Name: remoteDataNodes.Name, Namespace: remoteDataNodes.Namespace}
// Eventually(func() bool {
// createdYtsaurus := &ytv1.RemoteDataNodes{}
// err := k8sClient.Get(ctx, lookupKey, createdYtsaurus)
// return err == nil
// }, reactionTimeout, pollInterval).Should(BeTrue())
//
// By("Checking that remote data nodes state is equal to `Running`")
// Eventually(
// func() (*ytv1.RemoteDataNodes, error) {
// nodes := &ytv1.RemoteDataNodes{}
// err := k8sClient.Get(ctx, lookupKey, nodes)
// return nodes, err
// },
// reactionTimeout*2,
// pollInterval,
// ).Should(HaveField("Status.ReleaseStatus", ytv1.RemoteDataNodeReleaseStatusRunning))
//}
//
// func deleteRemoteDataNodes(ctx context.Context, remoteDataNodes *ytv1.RemoteDataNodes) {
// if err := k8sClient.Delete(ctx, remoteDataNodes); err != nil {
// log.Error(err, "Deleting remote ytsaurus failed")
// }
//}
func runRemoteDataNodes(remoteDataNodes *ytv1.RemoteDataNodes) {
Expect(k8sClient.Create(ctx, remoteDataNodes)).Should(Succeed())
lookupKey := types.NamespacedName{Name: remoteDataNodes.Name, Namespace: remoteDataNodes.Namespace}
Eventually(func() bool {
createdYtsaurus := &ytv1.RemoteDataNodes{}
err := k8sClient.Get(ctx, lookupKey, createdYtsaurus)
return err == nil
}, reactionTimeout, pollInterval).Should(BeTrue())

By("Checking that remote data nodes state is equal to `Running`")
Eventually(
func() (*ytv1.RemoteDataNodes, error) {
nodes := &ytv1.RemoteDataNodes{}
err := k8sClient.Get(ctx, lookupKey, nodes)
return nodes, err
},
reactionTimeout*2,
pollInterval,
).Should(HaveField("Status.ReleaseStatus", ytv1.RemoteDataNodeReleaseStatusRunning))
}

func deleteRemoteDataNodes(ctx context.Context, remoteDataNodes *ytv1.RemoteDataNodes) {
if err := k8sClient.Delete(ctx, remoteDataNodes); err != nil {
log.Error(err, "Deleting remote ytsaurus failed")
}
}

func runImpossibleUpdateAndRollback(ytsaurus *ytv1.Ytsaurus, ytClient yt.Client) {
name := types.NamespacedName{Name: ytsaurus.Name, Namespace: ytsaurus.Namespace}
Expand Down Expand Up @@ -784,6 +784,69 @@ var _ = Describe("Basic test for Ytsaurus controller", func() {
}
})

It("Should create ytsaurus with remote data nodes and write to a table", func() {
By("Creating a Ytsaurus resource")
ctx := context.Background()

namespace := "remotedata"

ytsaurus := testutil.CreateBaseYtsaurusResource(namespace)
// Ensure that no local data nodes exist, only remote ones (which will be created later).
ytsaurus.Spec.DataNodes = []ytv1.DataNodesSpec{}
g := ytconfig.NewGenerator(ytsaurus, "local")

remoteYtsaurus := &ytv1.RemoteYtsaurus{
ObjectMeta: metav1.ObjectMeta{
Name: testutil.RemoteResourceName,
Namespace: namespace,
},
Spec: ytv1.RemoteYtsaurusSpec{
MasterConnectionSpec: ytv1.MasterConnectionSpec{
CellTag: ytsaurus.Spec.PrimaryMasters.CellTag,
HostAddresses: []string{
"ms-0.masters.remotedata.svc.cluster.local",
},
},
},
}

remoteNodes := &ytv1.RemoteDataNodes{
ObjectMeta: metav1.ObjectMeta{Name: testutil.RemoteResourceName, Namespace: namespace},
Spec: ytv1.RemoteDataNodesSpec{
RemoteClusterSpec: &corev1.LocalObjectReference{
Name: testutil.RemoteResourceName,
},
CommonSpec: ytv1.CommonSpec{
CoreImage: testutil.CoreImageFirst,
},
DataNodesSpec: ytv1.DataNodesSpec{
InstanceSpec: testutil.CreateDataNodeInstanceSpec(1),
},
},
}

defer deleteYtsaurus(ctx, ytsaurus)
runYtsaurus(ytsaurus)

defer deleteRemoteYtsaurus(ctx, remoteYtsaurus)
createRemoteYtsaurus(remoteYtsaurus)

defer deleteRemoteDataNodes(ctx, remoteNodes)
runRemoteDataNodes(remoteNodes)

By("Creating ytsaurus client")
ytClient := getYtClient(g, namespace)
By("Create a chunk")
_, err := ytClient.CreateNode(ctx, ypath.Path("//tmp/a"), yt.NodeTable, nil)
Expect(err).Should(Succeed())

Eventually(func(g Gomega) {
writer, err := ytClient.WriteTable(ctx, ypath.Path("//tmp/a"), nil)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(writer.Write(testRow{A: "123"})).Should(Succeed())
g.Expect(writer.Commit()).Should(Succeed())
}, reactionTimeout, pollInterval).Should(Succeed())
})
It(
"Rpc proxies should require authentication",
func(ctx context.Context) {
Expand Down

0 comments on commit 24bdf21

Please sign in to comment.