Skip to content

Commit

Permalink
rdd refactor
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Jan 29, 2024
1 parent 8b7160f commit 56bab2d
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 20 deletions.
20 changes: 20 additions & 0 deletions rucat_job/src/rdd/dependency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use super::rdd::RDD;

pub(super) struct Dependency {
rdd: RDD,
dependency_type: DependencyVariant,
}

impl Dependency {
pub fn get_rdd(&self) -> &RDD {
&self.rdd
}
}

/// [Dependency] can be converted to [RDD]
pub(super) enum DependencyVariant {
Narrow,
Shuffle,
OneToOne,
Range,
}
1 change: 1 addition & 0 deletions rucat_job/src/rdd/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod dependency;
mod deterministic_level;
mod map_partition_rdd;
mod partition;
Expand Down
2 changes: 1 addition & 1 deletion rucat_job/src/rdd/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub(super) enum Partition {

impl Partition {
/// Get the identifier for a partition in an RDD
fn index(&self) -> PartitionIndex {
pub fn index(&self) -> PartitionIndex {
match self {
Partition::Dummy => 0,
}
Expand Down
45 changes: 27 additions & 18 deletions rucat_job/src/rdd/rdd.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use std::task::Context;

use super::{partition::PartitionIndex, storage_level::StorageLevel};
use super::{dependency::Dependency, partition::PartitionIndex, storage_level::StorageLevel};
use crate::{rdd::partition::Partition, task_context::TaskContext};

/// Element types of RDD
enum RDDElem {
pub(super) enum RDDElem {
U8(u8),
}

/// define RDD as (Dependencies, RDDVariant) where
/// RDDVariant is the enum of all kinds of RDDs.
/// Unlilke the RDD in Spark, we don't define the context as an argument of RDD as it is `global`
struct RDD {
pub(super) struct RDD {
dependencies: Vec<Dependency>,
storage_level: Option<StorageLevel>,
rdd_core: RDDVariant,
Expand All @@ -22,8 +20,28 @@ impl RDD {
&self.dependencies
}

fn compute(&self, split: Partition, context: Context) -> impl Iterator<Item = RDDElem> {
std::iter::empty()
/// the first parent RDD
fn get_first_parent(&self) -> Option<&RDD> {
self.dependencies.first().map(|d| d.get_rdd())
}

fn compute(&self, split: Partition, context: TaskContext) -> impl Iterator<Item = RDDElem> {
match &self.rdd_core {
RDDVariant::MapPartitionRDD { f } => f(
context,
split.index(),
Box::new(self.get_first_parent().unwrap().into_iter()),
),
}
}
}

impl IntoIterator for &RDD {
type Item = RDDElem;
type IntoIter = std::vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
todo!()
}
}

Expand All @@ -34,17 +52,8 @@ enum RDDVariant {
dyn Fn(
TaskContext,
PartitionIndex,
dyn Iterator<Item = RDDElem>,
) -> dyn Iterator<Item = RDDElem>,
Box<dyn Iterator<Item = RDDElem>>,
) -> Box<dyn Iterator<Item = RDDElem>>,
>,
},
}

/// [Dependency] cannnot be generic because there is no way for RDD to know the type of its each [Dependency]
/// [Dependency] can be converted to [RDD]
enum Dependency {
Narrow,
Shuffle,
OneToOne,
Range,
}
2 changes: 1 addition & 1 deletion rucat_job/src/stage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde_traitobject as st;

use crate::task::{SubDataTrait, SubExecute, SubResultTrait, Task};
use crate::task::{SubDataTrait, SubExecute, SubResultTrait};

/// Driver will call `get_data` and `split`, and send `Task` to workers.
/// Workers simplify `SubTask`s to `SubResult`s and send them back to the driver.
Expand Down

0 comments on commit 56bab2d

Please sign in to comment.