diff --git a/rucat_job/src/rdd/dependency.rs b/rucat_job/src/rdd/dependency.rs new file mode 100644 index 0000000..1c90443 --- /dev/null +++ b/rucat_job/src/rdd/dependency.rs @@ -0,0 +1,20 @@ +use super::rdd::RDD; + +pub(super) struct Dependency { + rdd: RDD, + dependency_type: DependencyVariant, +} + +impl Dependency { + pub fn get_rdd(&self) -> &RDD { + &self.rdd + } +} + +/// [Dependency] can be converted to [RDD] +pub(super) enum DependencyVariant { + Narrow, + Shuffle, + OneToOne, + Range, +} diff --git a/rucat_job/src/rdd/mod.rs b/rucat_job/src/rdd/mod.rs index 7dcb093..3d79b8e 100644 --- a/rucat_job/src/rdd/mod.rs +++ b/rucat_job/src/rdd/mod.rs @@ -1,3 +1,4 @@ +mod dependency; mod deterministic_level; mod map_partition_rdd; mod partition; diff --git a/rucat_job/src/rdd/partition.rs b/rucat_job/src/rdd/partition.rs index 7c0613e..71a89de 100644 --- a/rucat_job/src/rdd/partition.rs +++ b/rucat_job/src/rdd/partition.rs @@ -7,7 +7,7 @@ pub(super) enum Partition { impl Partition { /// Get the identifier for a partition in an RDD - fn index(&self) -> PartitionIndex { + pub fn index(&self) -> PartitionIndex { match self { Partition::Dummy => 0, } diff --git a/rucat_job/src/rdd/rdd.rs b/rucat_job/src/rdd/rdd.rs index ad0bd56..32c845d 100644 --- a/rucat_job/src/rdd/rdd.rs +++ b/rucat_job/src/rdd/rdd.rs @@ -1,17 +1,15 @@ -use std::task::Context; - -use super::{partition::PartitionIndex, storage_level::StorageLevel}; +use super::{dependency::Dependency, partition::PartitionIndex, storage_level::StorageLevel}; use crate::{rdd::partition::Partition, task_context::TaskContext}; /// Element types of RDD -enum RDDElem { +pub(super) enum RDDElem { U8(u8), } /// define RDD as (Dependencies, RDDVariant) where /// RDDVariant is the enum of all kinds of RDDs. /// Unlilke the RDD in Spark, we don't define the context as an argument of RDD as it is `global` -struct RDD { +pub(super) struct RDD { dependencies: Vec, storage_level: Option, rdd_core: RDDVariant, @@ -22,8 +20,28 @@ impl RDD { &self.dependencies } - fn compute(&self, split: Partition, context: Context) -> impl Iterator { - std::iter::empty() + /// the first parent RDD + fn get_first_parent(&self) -> Option<&RDD> { + self.dependencies.first().map(|d| d.get_rdd()) + } + + fn compute(&self, split: Partition, context: TaskContext) -> impl Iterator { + match &self.rdd_core { + RDDVariant::MapPartitionRDD { f } => f( + context, + split.index(), + Box::new(self.get_first_parent().unwrap().into_iter()), + ), + } + } +} + +impl IntoIterator for &RDD { + type Item = RDDElem; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + todo!() } } @@ -34,17 +52,8 @@ enum RDDVariant { dyn Fn( TaskContext, PartitionIndex, - dyn Iterator, - ) -> dyn Iterator, + Box>, + ) -> Box>, >, }, } - -/// [Dependency] cannnot be generic because there is no way for RDD to know the type of its each [Dependency] -/// [Dependency] can be converted to [RDD] -enum Dependency { - Narrow, - Shuffle, - OneToOne, - Range, -} diff --git a/rucat_job/src/stage.rs b/rucat_job/src/stage.rs index 200d585..45eab18 100644 --- a/rucat_job/src/stage.rs +++ b/rucat_job/src/stage.rs @@ -1,6 +1,6 @@ use serde_traitobject as st; -use crate::task::{SubDataTrait, SubExecute, SubResultTrait, Task}; +use crate::task::{SubDataTrait, SubExecute, SubResultTrait}; /// Driver will call `get_data` and `split`, and send `Task` to workers. /// Workers simplify `SubTask`s to `SubResult`s and send them back to the driver.