Skip to content
This repository has been archived by the owner on Mar 11, 2020. It is now read-only.

Commit

Permalink
Removed uses of the default Task pool. Fixes #25
Browse files Browse the repository at this point in the history
  • Loading branch information
runarorama committed Jan 29, 2016
1 parent c819af9 commit c0f32e6
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 24 deletions.
10 changes: 5 additions & 5 deletions core/src/main/scala/knobs/IORef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ sealed abstract class IORef[A] {
}

object IORef {
def apply[A](value: A): Task[IORef[A]] = Task(new IORef[A] {
def apply[A](value: A): Task[IORef[A]] = Task.delay(new IORef[A] {
val ref = new AtomicReference(value)
def read = Task(ref.get)
def write(value: A) = Task(ref.set(value))
def read = Task.delay(ref.get)
def write(value: A) = Task.delay(ref.set(value))
def atomicModify[B](f: A => (A, B)) = for {
a <- read
(a2, b) = f(a)
p <- Task(ref.compareAndSet(a, a2))
r <- if (p) Task(b) else atomicModify(f)
p <- Task.delay(ref.compareAndSet(a, a2))
r <- if (p) Task.now(b) else atomicModify(f)
} yield r
})
}
2 changes: 1 addition & 1 deletion core/src/main/scala/knobs/MutableConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ case class MutableConfig(root: String, base: BaseConfig) {
*/
def require[A:Configured](name: Name): Task[A] = for {
v <- lookup(name)
r <- v.map(Task(_)).getOrElse(Task.fail(KeyError(name)))
r <- v.map(Task.now(_)).getOrElse(Task.fail(KeyError(name)))
} yield r

/**
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/knobs/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ object Resource {
def resolve(r: File, child: String): File =
new File(resolveName(r.getPath, child))
def load(path: Worth[File]) =
loadFile(path, Task(scala.io.Source.fromFile(path.worth).mkString))
loadFile(path, Task.delay(scala.io.Source.fromFile(path.worth).mkString))
override def shows(r: File) = r.toString
def watch(path: Worth[File]) = for {
ds <- load(path)
Expand All @@ -229,7 +229,7 @@ object Resource {
implicit def uriResource: Resource[URI] = new Resource[URI] {
def resolve(r: URI, child: String): URI = r resolve new URI(child)
def load(path: Worth[URI]) =
loadFile(path, Task(scala.io.Source.fromURL(path.worth.toURL).mkString + "\n"))
loadFile(path, Task.delay(scala.io.Source.fromURL(path.worth.toURL).mkString + "\n"))
override def shows(r: URI) = r.toString
}

Expand All @@ -240,9 +240,9 @@ object Resource {
ClassPath(resolveName(r.s, child), r.loader)
def load(path: Worth[ClassPath]) = {
val r = path.worth
loadFile(path, Task(r.loader.getResourceAsStream(r.s)) flatMap { x =>
loadFile(path, Task.delay(r.loader.getResourceAsStream(r.s)) flatMap { x =>
if (x == null) Task.fail(new java.io.FileNotFoundException(r.s + " not found on classpath"))
else Task(scala.io.Source.fromInputStream(x).mkString)
else Task.delay(scala.io.Source.fromInputStream(x).mkString)
})
}
override def shows(r: ClassPath) = {
Expand All @@ -263,13 +263,13 @@ object Resource {
def load(path: Worth[Pattern]) = {
val pat = path.worth
for {
ds <- Task(sys.props.toMap.filterKeys(pat matches _).map {
ds <- Task.delay(sys.props.toMap.filterKeys(pat matches _).map {
case (k, v) => Bind(k, ConfigParser.value.parse(v).toOption.getOrElse(CfgText(v)))
})
r <- (ds.isEmpty, path) match {
case (true, Required(_)) =>
Task.fail(new ConfigError(path.worth, s"Required system properties $pat not present."))
case _ => Task(ds.toList)
case _ => Task.now(ds.toList)
}
} yield r
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/knobs/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ package object knobs {
s <- IORef(Map[Pattern, List[ChangeHandler]]())
bc = BaseConfig(paths = p, cfgMap = m, subs = s)
ticks = mergeN(Process.emitAll(loaded.values.map(_._2).toSeq))
// TODO: Give these errors to a proper error handling mechanism
_ <- Task(ticks.evalMap(_ => bc.reload).run.runAsync(_.fold(_.printStackTrace, _ => ())))
_ <- Task.async[Unit](k => ticks.evalMap(_ => bc.reload).run.runAsync(k))
} yield bc

private [knobs] def addDot(p: String): String =
Expand Down Expand Up @@ -155,7 +154,7 @@ package object knobs {
Task.fail(new Exception(s"type error: $name must be a number or a string"))
case _ => for {
// added because lots of sys-admins think software is case unaware. Doh!
e <- Task(sys.props.get(name) orElse sys.env.get(name) orElse sys.env.get(name.toLowerCase))
e <- Task.delay(sys.props.get(name) orElse sys.env.get(name) orElse sys.env.get(name.toLowerCase))
r <- e.map(Task.now).getOrElse(
Task.fail(ConfigError(f, s"no such variable $name")))
} yield r
Expand Down Expand Up @@ -218,10 +217,10 @@ package object knobs {
a(n, v).attempt.flatMap {
case -\/(e) =>
Task.fail(new Exception(s"A ChangeHandler threw an exception for ${(p, n)}", e))
case _ => Task(())
case _ => Task.now(())
}

subs.foldLeft(Task(())) {
subs.foldLeft(Task.now(())) {
case (next, (p@Exact(n), acts)) => for {
_ <- next
v = after get n
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/knobs/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object Test extends Properties("Knobs") {

lazy val interpTest: Task[Prop] =
withLoad(List(Required(ClassPathResource("pathological.cfg")))) { cfg => for {
home <- Task(sys.env.get("HOME"))
home <- Task.delay(sys.env.get("HOME"))
cfgHome <- cfg.lookup[String]("ba")
} yield cfgHome == home }

Expand Down
8 changes: 4 additions & 4 deletions zookeeper/src/main/scala/knobs/Zookeeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ object ZooKeeper {
cfg <- knobs.loadImmutable(config)
loc = cfg.require[String]("zookeeper.connection-string")
path = cfg.require[String]("zookeeper.path-to-config")
c <- Task(CuratorFrameworkFactory.newClient(loc, retryPolicy))
_ <- Task(c.start)
c <- Task.delay(CuratorFrameworkFactory.newClient(loc, retryPolicy))
_ <- Task.delay(c.start)
} yield (Watched(ZNode(c, path)), c)
}

Expand Down Expand Up @@ -139,7 +139,7 @@ object ZooKeeper {
p <- doZK(config)
(box, c) = p
_ <- k(box)
_ <- Task(c.close)
_ <- Task.delay(c.close)
} yield ()

/**
Expand Down Expand Up @@ -177,7 +177,7 @@ object ZooKeeper {

protected def unsafe(config: List[KnobsResource] = defaultCfg): (ResourceBox, Task[Unit]) = {
val (box, c) = doZK(config).run
(box, Task(c.close))
(box, Task.delay(c.close))
}

}
4 changes: 2 additions & 2 deletions zookeeper/src/test/scala/knobs/ZookeeperTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ object ZooKeeperTests extends Properties("ZooKeeper") {
n1 <- cfg.require[Int]("foo")
_ <- cfg.subscribe(Exact("foo"), {
case ("foo", Some(CfgNumber(n))) =>
ref.write(n.toInt).flatMap(_ => Task(latch.countDown))
case _ => Task(latch.countDown)
ref.write(n.toInt).flatMap(_ => Task.delay(latch.countDown))
case _ => Task.delay(latch.countDown)
})
_ <- Task {
c.setData.forPath("/knobs.cfg", "foo = 20\n".toArray.map(_.toByte))
Expand Down

0 comments on commit c0f32e6

Please sign in to comment.