From 83b4622c0054dfee24523fd6328d652f6a7386f8 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 11 May 2021 23:34:22 +0100 Subject: [PATCH] PM-3146: HotStuff Service Tests (#27) * PM-3146: Test FiberMap. * PM-3146: Test FiberSet. * PM-3146: Test Pipe. * PM-3146: Test Network. * PM-3146: Test MessageStash. * PM-3146: Rename to MessageStashSpec. * PM-3146: No need for Task for MessageStashSpec --- build.sc | 2 + .../src/io/iohk/metronome/core/PipeSpec.scala | 27 +++ .../metronome/core/fibers/FiberMapSpec.scala | 160 ++++++++++++++++++ .../metronome/core/fibers/FiberSetSpec.scala | 83 +++++++++ .../hotstuff/service/MessageStashSpec.scala | 111 ++++++++++++ .../hotstuff/service/NetworkSpec.scala | 107 ++++++++++++ 6 files changed, 490 insertions(+) create mode 100644 metronome/core/test/src/io/iohk/metronome/core/PipeSpec.scala create mode 100644 metronome/core/test/src/io/iohk/metronome/core/fibers/FiberMapSpec.scala create mode 100644 metronome/core/test/src/io/iohk/metronome/core/fibers/FiberSetSpec.scala create mode 100644 metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/MessageStashSpec.scala create mode 100644 metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/NetworkSpec.scala diff --git a/build.sc b/build.sc index bf5dc5a2..5628395e 100644 --- a/build.sc +++ b/build.sc @@ -158,6 +158,8 @@ class MetronomeModule(val crossScalaVersion: String) extends CrossScalaModule { ivy"com.chuusai::shapeless:${VersionOf.shapeless}", ivy"io.monix::monix:${VersionOf.monix}" ) + + object test extends TestModule } /** Storage abstractions, e.g. a generic key-value store. */ diff --git a/metronome/core/test/src/io/iohk/metronome/core/PipeSpec.scala b/metronome/core/test/src/io/iohk/metronome/core/PipeSpec.scala new file mode 100644 index 00000000..5f24205e --- /dev/null +++ b/metronome/core/test/src/io/iohk/metronome/core/PipeSpec.scala @@ -0,0 +1,27 @@ +package io.iohk.metronome.core + +import org.scalatest.flatspec.AsyncFlatSpec +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import org.scalatest.matchers.should.Matchers + +class PipeSpec extends AsyncFlatSpec with Matchers { + + behavior of "Pipe" + + it should "send messages between the sides" in { + val test = for { + pipe <- Pipe[Task, String, Int] + _ <- pipe.left.send("foo") + _ <- pipe.left.send("bar") + _ <- pipe.right.send(1) + rs <- pipe.right.receive.take(2).toListL + ls <- pipe.left.receive.headOptionL + } yield { + rs shouldBe List("foo", "bar") + ls shouldBe Some(1) + } + + test.runToFuture + } +} diff --git a/metronome/core/test/src/io/iohk/metronome/core/fibers/FiberMapSpec.scala b/metronome/core/test/src/io/iohk/metronome/core/fibers/FiberMapSpec.scala new file mode 100644 index 00000000..575e2eab --- /dev/null +++ b/metronome/core/test/src/io/iohk/metronome/core/fibers/FiberMapSpec.scala @@ -0,0 +1,160 @@ +package io.iohk.metronome.core.fibers + +import cats.effect.concurrent.Ref +import monix.eval.Task +import monix.execution.atomic.AtomicInt +import monix.execution.Scheduler.Implicits.global +import org.scalatest.{Inspectors, Inside} +import org.scalatest.compatible.Assertion +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should.Matchers +import scala.util.Random +import scala.concurrent.duration._ +import monix.execution.BufferCapacity + +class FiberMapSpec extends AsyncFlatSpec with Matchers with Inside { + + def test(t: Task[Assertion]) = + t.timeout(10.seconds).runToFuture + + def testMap(f: FiberMap[Task, String] => Task[Assertion]) = test { + FiberMap[Task, String]().use(f) + } + + behavior of "FiberMap" + + it should "process tasks in the order they are submitted" in testMap { + fiberMap => + val stateRef = Ref.unsafe[Task, Map[String, Vector[Int]]](Map.empty) + + val keys = List("a", "b", "c") + + val valueMap = keys.map { + _ -> Random.shuffle(Range(0, 10).toVector) + }.toMap + + val tasks = for { + k <- keys + v <- valueMap(k) + } yield (k, v) + + def append(k: String, v: Int): Task[Unit] = + stateRef.update { state => + state.updated(k, state.getOrElse(k, Vector.empty) :+ v) + } + + for { + handles <- Task.traverse(tasks) { case (k, v) => + // This is a version that wouldn't preserve the order: + // append(k, v).start.map(_.join) + fiberMap.submit(k)(append(k, v)) + } + _ <- Task.parTraverse(handles)(identity) + state <- stateRef.get + } yield { + Inspectors.forAll(keys) { k => + state(k) shouldBe valueMap(k) + } + } + } + + it should "process tasks concurrently across keys" in testMap { fiberMap => + val running = AtomicInt(0) + val maxRunning = AtomicInt(0) + + val keys = List("a", "b") + val tasks = List.fill(10)(keys).flatten + + for { + handles <- Task.traverse(tasks) { k => + val task = for { + r <- Task(running.incrementAndGet()) + _ <- Task(maxRunning.getAndTransform(m => math.max(m, r))) + _ <- Task.sleep(20.millis) // Increase chance for overlap. + _ <- Task(running.decrement()) + } yield () + + fiberMap.submit(k)(task) + } + _ <- Task.parTraverse(handles)(identity) + } yield { + running.get() shouldBe 0 + maxRunning.get() shouldBe keys.size + } + } + + it should "return a value we can wait on" in testMap { fiberMap => + for { + task <- fiberMap.submit("foo")(Task("spam")) + value <- task + } yield { + value shouldBe "spam" + } + } + + it should "reject new submissions after shutdown" in test { + FiberMap[Task, String]().allocated.flatMap { case (fiberMap, release) => + for { + _ <- fiberMap.submit("foo")(Task("alpha")) + _ <- release + r <- fiberMap.submit("foo")(Task(2)).attempt + } yield { + inside(r) { case Left(ex) => + ex shouldBe a[IllegalStateException] + ex.getMessage should include("shut down") + } + } + } + } + + it should "reject new submissions for keys that hit their capacity limit" in test { + FiberMap[Task, String](BufferCapacity.Bounded(capacity = 1)).use { + fiberMap => + def trySubmit(k: String) = + fiberMap.submit(k)(Task.never).attempt + + for { + _ <- trySubmit("foo") + _ <- trySubmit("foo") + r3 <- trySubmit("foo") + r4 <- trySubmit("bar") + } yield { + inside(r3) { case Left(ex) => + ex shouldBe a[FiberMap.QueueFullException] + } + r4.isRight shouldBe true + } + } + } + + it should "cancel and raise errors in already submitted tasks after shutdown" in test { + FiberMap[Task, String]().allocated.flatMap { case (fiberMap, release) => + for { + r <- fiberMap.submit("foo")(Task.never) + _ <- release + r <- r.attempt + } yield { + inside(r) { case Left(ex) => + ex shouldBe a[RuntimeException] + ex.getMessage should include("shut down") + } + } + } + } + + it should "keep processing even if a task fails" in testMap { fiberMap => + for { + t1 <- fiberMap.submit("foo")( + Task.raiseError(new RuntimeException("Boom!")) + ) + t2 <- fiberMap.submit("foo")(Task(2)) + r1 <- t1.attempt + r2 <- t2 + } yield { + inside(r1) { case Left(ex) => + ex.getMessage shouldBe "Boom!" + } + r2 shouldBe 2 + } + } +} diff --git a/metronome/core/test/src/io/iohk/metronome/core/fibers/FiberSetSpec.scala b/metronome/core/test/src/io/iohk/metronome/core/fibers/FiberSetSpec.scala new file mode 100644 index 00000000..422b85c9 --- /dev/null +++ b/metronome/core/test/src/io/iohk/metronome/core/fibers/FiberSetSpec.scala @@ -0,0 +1,83 @@ +package io.iohk.metronome.core.fibers + +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import monix.execution.atomic.AtomicInt +import org.scalatest.compatible.Assertion +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.Inside +import scala.concurrent.duration._ + +class FiberSetSpec extends AsyncFlatSpec with Matchers with Inside { + + def test(t: Task[Assertion]) = + t.timeout(10.seconds).runToFuture + + behavior of "FiberSet" + + it should "reject new submissions after shutdown" in test { + FiberSet[Task].allocated.flatMap { case (fiberSet, release) => + for { + _ <- fiberSet.submit(Task("foo")) + _ <- release + r <- fiberSet.submit(Task("bar")).attempt + } yield { + inside(r) { case Left(ex) => + ex shouldBe a[IllegalStateException] + ex.getMessage should include("shut down") + } + } + } + } + + it should "cancel and raise errors in already submitted tasks after shutdown" in test { + FiberSet[Task].allocated.flatMap { case (fiberSet, release) => + for { + r <- fiberSet.submit(Task.never) + _ <- release + r <- r.attempt + } yield { + inside(r) { case Left(ex) => + ex shouldBe a[RuntimeException] + ex.getMessage should include("shut down") + } + } + } + } + + it should "return a value we can wait on" in test { + FiberSet[Task].use { fiberSet => + for { + task <- fiberSet.submit(Task("spam")) + value <- task + } yield { + value shouldBe "spam" + } + } + } + + it should "process tasks concurrently" in test { + FiberSet[Task].use { fiberSet => + val running = AtomicInt(0) + val maxRunning = AtomicInt(0) + + for { + handles <- Task.traverse(1 to 10) { _ => + val task = for { + r <- Task(running.incrementAndGet()) + _ <- Task(maxRunning.getAndTransform(m => math.max(m, r))) + _ <- Task.sleep(20.millis) // Increase chance for overlap. + _ <- Task(running.decrement()) + } yield () + + fiberSet.submit(task) + } + _ <- Task.parTraverse(handles)(identity) + } yield { + running.get() shouldBe 0 + maxRunning.get() should be > 1 + } + } + } +} diff --git a/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/MessageStashSpec.scala b/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/MessageStashSpec.scala new file mode 100644 index 00000000..1ef30214 --- /dev/null +++ b/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/MessageStashSpec.scala @@ -0,0 +1,111 @@ +package io.iohk.metronome.hotstuff.service + +import io.iohk.metronome.hotstuff.consensus.basic.Agreement +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import io.iohk.metronome.hotstuff.consensus.basic.{ + ProtocolError, + Event, + Message, + Phase, + QuorumCertificate +} +import io.iohk.metronome.hotstuff.consensus.ViewNumber +import io.iohk.metronome.crypto.GroupSignature + +class MessageStashSpec extends AnyFlatSpec with Matchers { + import ConsensusService.MessageStash + + object TestAgreement extends Agreement { + override type Block = Nothing + override type Hash = Int + override type PSig = Nothing + override type GSig = Int + override type PKey = String + override type SKey = Nothing + } + type TestAgreement = TestAgreement.type + + "MessageStash" should behave like { + + val emptyStash = MessageStash.empty[TestAgreement] + + val error = ProtocolError.TooEarly[TestAgreement]( + Event.MessageReceived[TestAgreement]( + "Alice", + Message.NewView( + ViewNumber(10), + QuorumCertificate[TestAgreement]( + Phase.Prepare, + ViewNumber(9), + 123, + GroupSignature(456) + ) + ) + ), + expectedInViewNumber = ViewNumber(11), + expectedInPhase = Phase.Prepare + ) + val errorSlotKey = (error.expectedInViewNumber, error.expectedInPhase) + + it should "stash errors" in { + emptyStash.slots shouldBe empty + + val stash = emptyStash.stash(error) + + stash.slots should contain key errorSlotKey + stash.slots(errorSlotKey) should contain key error.event.sender + stash.slots(errorSlotKey)(error.event.sender) shouldBe error.event.message + } + + it should "stash only the last message from a sender" in { + val error2 = error.copy(event = + error.event.copy(message = + Message.NewView( + ViewNumber(10), + QuorumCertificate[TestAgreement]( + Phase.Prepare, + ViewNumber(8), + 122, + GroupSignature(455) + ) + ) + ) + ) + val stash = emptyStash.stash(error).stash(error2) + + stash.slots(errorSlotKey)( + error.event.sender + ) shouldBe error2.event.message + } + + it should "unstash due errors" in { + val errors = List( + error, + error.copy( + expectedInPhase = Phase.PreCommit + ), + error.copy( + expectedInViewNumber = error.expectedInViewNumber.next + ), + error.copy( + expectedInViewNumber = error.expectedInViewNumber.next, + expectedInPhase = Phase.Commit + ), + error.copy( + expectedInViewNumber = error.expectedInViewNumber.next.next + ) + ) + + val stash0 = errors.foldLeft(emptyStash)(_ stash _) + + val (stash1, unstashed) = stash0.unstash( + errors(2).expectedInViewNumber, + errors(2).expectedInPhase + ) + + stash1.slots.keySet should have size 2 + unstashed should have size 3 + } + } +} diff --git a/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/NetworkSpec.scala b/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/NetworkSpec.scala new file mode 100644 index 00000000..82145acc --- /dev/null +++ b/metronome/hotstuff/service/test/src/io/iohk/metronome/hotstuff/service/NetworkSpec.scala @@ -0,0 +1,107 @@ +package io.iohk.metronome.hotstuff.service + +import cats.effect.Resource +import cats.effect.concurrent.Ref +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import org.scalatest.flatspec.AsyncFlatSpec +import org.scalatest.matchers.should.Matchers +import io.iohk.metronome.hotstuff.consensus.basic.Agreement +import io.iohk.metronome.networking.ConnectionHandler.MessageReceived +import monix.tail.Iterant + +class NetworkSpec extends AsyncFlatSpec with Matchers { + + sealed trait TestMessage + case class TestFoo(foo: String) extends TestMessage + case class TestBar(bar: Int) extends TestMessage + + object TestAgreement extends Agreement { + override type Block = Nothing + override type Hash = Nothing + override type PSig = Nothing + override type GSig = Nothing + override type PKey = String + override type SKey = Nothing + } + type TestAgreement = TestAgreement.type + + type TestKeyAndMessage = (TestAgreement.PKey, TestMessage) + type TestMessageReceived = MessageReceived[TestAgreement.PKey, TestMessage] + + class TestNetwork( + outbox: Vector[TestKeyAndMessage], + val inbox: Ref[Task, Vector[ + MessageReceived[TestAgreement.PKey, TestMessage] + ]] + ) extends Network[Task, TestAgreement, TestMessage] { + + override def incomingMessages: Iterant[Task, TestMessageReceived] = + Iterant.fromIndexedSeq { + outbox.map { case (sender, message) => + MessageReceived(sender, message) + } + } + + override def sendMessage( + recipient: TestAgreement.PKey, + message: TestMessage + ): Task[Unit] = + inbox.update(_ :+ MessageReceived(recipient, message)) + } + + object TestNetwork { + def apply(outbox: Vector[TestKeyAndMessage]) = + Ref + .of[Task, Vector[TestMessageReceived]](Vector.empty) + .map(new TestNetwork(outbox, _)) + } + + behavior of "splitter" + + it should "split and merge messages" in { + val messages = Vector( + "Alice" -> TestFoo("spam"), + "Bob" -> TestBar(42), + "Charlie" -> TestFoo("eggs") + ) + val resources = for { + network <- Resource.liftF(TestNetwork(messages)) + (fooNetwork, barNetwork) <- Network + .splitter[Task, TestAgreement, TestMessage, String, Int](network)( + split = { + case TestFoo(msg) => Left(msg) + case TestBar(msg) => Right(msg) + }, + merge = { + case Left(msg) => TestFoo(msg) + case Right(msg) => TestBar(msg) + } + ) + } yield (network, fooNetwork, barNetwork) + + val test = resources.use { case (network, fooNetwork, barNetwork) => + for { + fms <- fooNetwork.incomingMessages.take(2).toListL + bms <- barNetwork.incomingMessages.take(1).toListL + _ <- barNetwork.sendMessage("Dave", 123) + _ <- fooNetwork.sendMessage("Eve", "Adam") + _ <- barNetwork.sendMessage("Fred", 456) + nms <- network.inbox.get + } yield { + fms shouldBe List( + MessageReceived("Alice", "spam"), + MessageReceived("Charlie", "eggs") + ) + bms shouldBe List(MessageReceived("Bob", 42)) + nms shouldBe List( + MessageReceived("Dave", TestBar(123)), + MessageReceived("Eve", TestFoo("Adam")), + MessageReceived("Fred", TestBar(456)) + ) + } + } + + test.runToFuture + } +}