Skip to content

Commit

Permalink
PM-3146: HotStuff Service Tests (#27)
Browse files Browse the repository at this point in the history
* PM-3146: Test FiberMap.

* PM-3146: Test FiberSet.

* PM-3146: Test Pipe.

* PM-3146: Test Network.

* PM-3146: Test MessageStash.

* PM-3146: Rename to MessageStashSpec.

* PM-3146: No need for Task for MessageStashSpec
  • Loading branch information
aakoshh authored May 11, 2021
1 parent a33267d commit 83b4622
Show file tree
Hide file tree
Showing 6 changed files with 490 additions and 0 deletions.
2 changes: 2 additions & 0 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ class MetronomeModule(val crossScalaVersion: String) extends CrossScalaModule {
ivy"com.chuusai::shapeless:${VersionOf.shapeless}",
ivy"io.monix::monix:${VersionOf.monix}"
)

object test extends TestModule
}

/** Storage abstractions, e.g. a generic key-value store. */
Expand Down
27 changes: 27 additions & 0 deletions metronome/core/test/src/io/iohk/metronome/core/PipeSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.iohk.metronome.core

import org.scalatest.flatspec.AsyncFlatSpec
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.scalatest.matchers.should.Matchers

class PipeSpec extends AsyncFlatSpec with Matchers {

behavior of "Pipe"

it should "send messages between the sides" in {
val test = for {
pipe <- Pipe[Task, String, Int]
_ <- pipe.left.send("foo")
_ <- pipe.left.send("bar")
_ <- pipe.right.send(1)
rs <- pipe.right.receive.take(2).toListL
ls <- pipe.left.receive.headOptionL
} yield {
rs shouldBe List("foo", "bar")
ls shouldBe Some(1)
}

test.runToFuture
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.iohk.metronome.core.fibers

import cats.effect.concurrent.Ref
import monix.eval.Task
import monix.execution.atomic.AtomicInt
import monix.execution.Scheduler.Implicits.global
import org.scalatest.{Inspectors, Inside}
import org.scalatest.compatible.Assertion
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.util.Random
import scala.concurrent.duration._
import monix.execution.BufferCapacity

class FiberMapSpec extends AsyncFlatSpec with Matchers with Inside {

def test(t: Task[Assertion]) =
t.timeout(10.seconds).runToFuture

def testMap(f: FiberMap[Task, String] => Task[Assertion]) = test {
FiberMap[Task, String]().use(f)
}

behavior of "FiberMap"

it should "process tasks in the order they are submitted" in testMap {
fiberMap =>
val stateRef = Ref.unsafe[Task, Map[String, Vector[Int]]](Map.empty)

val keys = List("a", "b", "c")

val valueMap = keys.map {
_ -> Random.shuffle(Range(0, 10).toVector)
}.toMap

val tasks = for {
k <- keys
v <- valueMap(k)
} yield (k, v)

def append(k: String, v: Int): Task[Unit] =
stateRef.update { state =>
state.updated(k, state.getOrElse(k, Vector.empty) :+ v)
}

for {
handles <- Task.traverse(tasks) { case (k, v) =>
// This is a version that wouldn't preserve the order:
// append(k, v).start.map(_.join)
fiberMap.submit(k)(append(k, v))
}
_ <- Task.parTraverse(handles)(identity)
state <- stateRef.get
} yield {
Inspectors.forAll(keys) { k =>
state(k) shouldBe valueMap(k)
}
}
}

it should "process tasks concurrently across keys" in testMap { fiberMap =>
val running = AtomicInt(0)
val maxRunning = AtomicInt(0)

val keys = List("a", "b")
val tasks = List.fill(10)(keys).flatten

for {
handles <- Task.traverse(tasks) { k =>
val task = for {
r <- Task(running.incrementAndGet())
_ <- Task(maxRunning.getAndTransform(m => math.max(m, r)))
_ <- Task.sleep(20.millis) // Increase chance for overlap.
_ <- Task(running.decrement())
} yield ()

fiberMap.submit(k)(task)
}
_ <- Task.parTraverse(handles)(identity)
} yield {
running.get() shouldBe 0
maxRunning.get() shouldBe keys.size
}
}

it should "return a value we can wait on" in testMap { fiberMap =>
for {
task <- fiberMap.submit("foo")(Task("spam"))
value <- task
} yield {
value shouldBe "spam"
}
}

it should "reject new submissions after shutdown" in test {
FiberMap[Task, String]().allocated.flatMap { case (fiberMap, release) =>
for {
_ <- fiberMap.submit("foo")(Task("alpha"))
_ <- release
r <- fiberMap.submit("foo")(Task(2)).attempt
} yield {
inside(r) { case Left(ex) =>
ex shouldBe a[IllegalStateException]
ex.getMessage should include("shut down")
}
}
}
}

it should "reject new submissions for keys that hit their capacity limit" in test {
FiberMap[Task, String](BufferCapacity.Bounded(capacity = 1)).use {
fiberMap =>
def trySubmit(k: String) =
fiberMap.submit(k)(Task.never).attempt

for {
_ <- trySubmit("foo")
_ <- trySubmit("foo")
r3 <- trySubmit("foo")
r4 <- trySubmit("bar")
} yield {
inside(r3) { case Left(ex) =>
ex shouldBe a[FiberMap.QueueFullException]
}
r4.isRight shouldBe true
}
}
}

it should "cancel and raise errors in already submitted tasks after shutdown" in test {
FiberMap[Task, String]().allocated.flatMap { case (fiberMap, release) =>
for {
r <- fiberMap.submit("foo")(Task.never)
_ <- release
r <- r.attempt
} yield {
inside(r) { case Left(ex) =>
ex shouldBe a[RuntimeException]
ex.getMessage should include("shut down")
}
}
}
}

it should "keep processing even if a task fails" in testMap { fiberMap =>
for {
t1 <- fiberMap.submit("foo")(
Task.raiseError(new RuntimeException("Boom!"))
)
t2 <- fiberMap.submit("foo")(Task(2))
r1 <- t1.attempt
r2 <- t2
} yield {
inside(r1) { case Left(ex) =>
ex.getMessage shouldBe "Boom!"
}
r2 shouldBe 2
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package io.iohk.metronome.core.fibers

import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.execution.atomic.AtomicInt
import org.scalatest.compatible.Assertion
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.Inside
import scala.concurrent.duration._

class FiberSetSpec extends AsyncFlatSpec with Matchers with Inside {

def test(t: Task[Assertion]) =
t.timeout(10.seconds).runToFuture

behavior of "FiberSet"

it should "reject new submissions after shutdown" in test {
FiberSet[Task].allocated.flatMap { case (fiberSet, release) =>
for {
_ <- fiberSet.submit(Task("foo"))
_ <- release
r <- fiberSet.submit(Task("bar")).attempt
} yield {
inside(r) { case Left(ex) =>
ex shouldBe a[IllegalStateException]
ex.getMessage should include("shut down")
}
}
}
}

it should "cancel and raise errors in already submitted tasks after shutdown" in test {
FiberSet[Task].allocated.flatMap { case (fiberSet, release) =>
for {
r <- fiberSet.submit(Task.never)
_ <- release
r <- r.attempt
} yield {
inside(r) { case Left(ex) =>
ex shouldBe a[RuntimeException]
ex.getMessage should include("shut down")
}
}
}
}

it should "return a value we can wait on" in test {
FiberSet[Task].use { fiberSet =>
for {
task <- fiberSet.submit(Task("spam"))
value <- task
} yield {
value shouldBe "spam"
}
}
}

it should "process tasks concurrently" in test {
FiberSet[Task].use { fiberSet =>
val running = AtomicInt(0)
val maxRunning = AtomicInt(0)

for {
handles <- Task.traverse(1 to 10) { _ =>
val task = for {
r <- Task(running.incrementAndGet())
_ <- Task(maxRunning.getAndTransform(m => math.max(m, r)))
_ <- Task.sleep(20.millis) // Increase chance for overlap.
_ <- Task(running.decrement())
} yield ()

fiberSet.submit(task)
}
_ <- Task.parTraverse(handles)(identity)
} yield {
running.get() shouldBe 0
maxRunning.get() should be > 1
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.iohk.metronome.hotstuff.service

import io.iohk.metronome.hotstuff.consensus.basic.Agreement
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import io.iohk.metronome.hotstuff.consensus.basic.{
ProtocolError,
Event,
Message,
Phase,
QuorumCertificate
}
import io.iohk.metronome.hotstuff.consensus.ViewNumber
import io.iohk.metronome.crypto.GroupSignature

class MessageStashSpec extends AnyFlatSpec with Matchers {
import ConsensusService.MessageStash

object TestAgreement extends Agreement {
override type Block = Nothing
override type Hash = Int
override type PSig = Nothing
override type GSig = Int
override type PKey = String
override type SKey = Nothing
}
type TestAgreement = TestAgreement.type

"MessageStash" should behave like {

val emptyStash = MessageStash.empty[TestAgreement]

val error = ProtocolError.TooEarly[TestAgreement](
Event.MessageReceived[TestAgreement](
"Alice",
Message.NewView(
ViewNumber(10),
QuorumCertificate[TestAgreement](
Phase.Prepare,
ViewNumber(9),
123,
GroupSignature(456)
)
)
),
expectedInViewNumber = ViewNumber(11),
expectedInPhase = Phase.Prepare
)
val errorSlotKey = (error.expectedInViewNumber, error.expectedInPhase)

it should "stash errors" in {
emptyStash.slots shouldBe empty

val stash = emptyStash.stash(error)

stash.slots should contain key errorSlotKey
stash.slots(errorSlotKey) should contain key error.event.sender
stash.slots(errorSlotKey)(error.event.sender) shouldBe error.event.message
}

it should "stash only the last message from a sender" in {
val error2 = error.copy(event =
error.event.copy(message =
Message.NewView(
ViewNumber(10),
QuorumCertificate[TestAgreement](
Phase.Prepare,
ViewNumber(8),
122,
GroupSignature(455)
)
)
)
)
val stash = emptyStash.stash(error).stash(error2)

stash.slots(errorSlotKey)(
error.event.sender
) shouldBe error2.event.message
}

it should "unstash due errors" in {
val errors = List(
error,
error.copy(
expectedInPhase = Phase.PreCommit
),
error.copy(
expectedInViewNumber = error.expectedInViewNumber.next
),
error.copy(
expectedInViewNumber = error.expectedInViewNumber.next,
expectedInPhase = Phase.Commit
),
error.copy(
expectedInViewNumber = error.expectedInViewNumber.next.next
)
)

val stash0 = errors.foldLeft(emptyStash)(_ stash _)

val (stash1, unstashed) = stash0.unstash(
errors(2).expectedInViewNumber,
errors(2).expectedInPhase
)

stash1.slots.keySet should have size 2
unstashed should have size 3
}
}
}
Loading

0 comments on commit 83b4622

Please sign in to comment.