Skip to content

Commit

Permalink
Merge pull request #244 from atais/cluster-methods
Browse files Browse the repository at this point in the history
Added many cluster methods & improved testing
  • Loading branch information
debasishg authored Aug 29, 2019
2 parents c519398 + 7248446 commit 8ac0ebc
Show file tree
Hide file tree
Showing 44 changed files with 1,087 additions and 332 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
target
project/boot
*.swp
.idea
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
language: scala
dist: xenial
scala:
- 2.12.8
- 2.11.12
- 2.13.0
script:
- travis_retry sbt "++${TRAVIS_SCALA_VERSION}" test
- sbt +test
jdk:
- openjdk8
- openjdk11
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.redis
import com.redis.api.BaseApi
import com.redis.serialization._

trait Operations extends BaseApi {
trait BaseOperations extends BaseApi {
self: Redis =>

override def sort[A](key: String,
Expand Down Expand Up @@ -121,7 +121,8 @@ trait Operations extends BaseApi {
override def scan[A](cursor: Int, pattern: Any = "*", count: Int = 10)(implicit format: Format, parse: Parse[A]): Option[(Option[Int], Option[List[Option[A]]])] =
send("SCAN", cursor :: ((x: List[Any]) => if (pattern == "*") x else "match" :: pattern :: x) (if (count == 10) Nil else List("count", count)))(asPair)

override def ping: Option[String] = send("PING")(asString)
override def ping: Option[String] =
send("PING")(asString)

override def watch(key: Any, keys: Any*)(implicit format: Format): Boolean =
send("WATCH", key :: keys.toList)(asBoolean)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/redis/RedisClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ trait Redis extends IO with Protocol {
}

trait RedisCommand extends Redis
with Operations
with BaseOperations
with GeoOperations
with NodeOperations
with StringOperations
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/redis/api/BaseApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ trait BaseApi {
*/
def ping: Option[String]

protected val pong: Option[String] = Some("PONG")

/**
* Marks the given keys to be watched for conditional execution of a transaction.
*/
Expand Down
15 changes: 10 additions & 5 deletions src/main/scala/com/redis/api/EvalApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@ trait EvalApi {
/**
* evaluates lua code on the server.
*/
def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]
def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]

def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A]
def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A]

def evalInt(luaCode: String, keys: List[Any], args: List[Any]): Option[Int]

def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]
def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]]

def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A]
def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A]

def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])(implicit format: Format, parse: Parse[A]): Option[A]
def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A]

def scriptLoad(luaCode: String): Option[String]

Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/redis/api/HashApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ trait HashApi {
/**
* Incrementally iterate hash fields and associated values (since 2.8)
*/
def hscan[A](key: Any, cursor: Int, pattern: Any = "*", count: Int = 10)(implicit format: Format, parse: Parse[A]): Option[(Option[Int], Option[List[Option[A]]])]
def hscan[A](key: Any, cursor: Int, pattern: Any = "*", count: Int = 10)
(implicit format: Format, parse: Parse[A]): Option[(Option[Int], Option[List[Option[A]]])]


}
84 changes: 60 additions & 24 deletions src/main/scala/com/redis/cluster/BaseOps.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.redis.cluster

import com.redis.RedisClientPool
import com.redis.api.BaseApi
import com.redis.serialization.{Format, Parse}

Expand All @@ -16,11 +17,37 @@ trait BaseOps extends BaseApi {
}
}

override def rename(oldkey: Any, newkey: Any)(implicit format: Format): Boolean =
processForKey(oldkey)(_.rename(oldkey, newkey))
override def rename(oldkey: Any, newkey: Any)(implicit format: Format): Boolean = {
val oldNode: RedisClientPool = nodeForKey(oldkey)
val newNode: RedisClientPool = nodeForKey(newkey)
if (oldNode == newNode) {
oldNode.withClient(_.rename(oldkey, newkey))
} else if (oldNode.withClient(_.exists(oldkey))) {
val value = oldNode.withClient(_.get(oldkey))
oldNode.withClient(_.del(oldkey))
newNode.withClient(_.set(newkey, value))
} else {
throw new RuntimeException("ERR no such key")
}
}

override def renamenx(oldkey: Any, newkey: Any)(implicit format: Format): Boolean =
processForKey(oldkey)(_.renamenx(oldkey, newkey))
override def renamenx(oldkey: Any, newkey: Any)(implicit format: Format): Boolean = {
val oldNode: RedisClientPool = nodeForKey(oldkey)
val newNode: RedisClientPool = nodeForKey(newkey)
if (oldNode == newNode) {
oldNode.withClient(_.renamenx(oldkey, newkey))
} else if (oldNode.withClient(_.exists(oldkey))) {
if (newNode.withClient(_.exists(newkey))) {
false
} else {
val value = oldNode.withClient(_.get(oldkey))
oldNode.withClient(_.del(oldkey))
newNode.withClient(_.set(newkey, value))
}
} else {
throw new RuntimeException("ERR no such key")
}
}

override def dbsize: Option[Long] = {
val r = onAllConns(_.dbsize).flatten
Expand All @@ -30,10 +57,14 @@ trait BaseOps extends BaseApi {
override def exists(key: Any)(implicit format: Format): Boolean =
processForKey(key)(_.exists(key))

override def del(key: Any, keys: Any*)(implicit format: Format): Option[Long] = {
val r = (key :: keys.toList).groupBy(nodeForKey)
.flatMap { case (r, keys) => r.withClient(_.del(keys.head, keys.tail)) }
if (r.isEmpty) None else Some(r.sum)
override def del(key: Any, keys: Any*)(implicit format: Format): Option[Long] = Some {
(key :: keys.toList)
.groupBy(nodeForKey)
.foldLeft(0L) { case (t, (n, ks)) =>
n.withClient { client =>
client.del(ks.head, ks.tail: _*).map(t +).getOrElse(t)
}
}
}

override def getType(key: Any)(implicit format: Format): Option[String] =
Expand Down Expand Up @@ -66,29 +97,34 @@ trait BaseOps extends BaseApi {
override def quit: Boolean =
onAllConns(_.quit) forall (_ == true)

// todo: implement
override def time[A](implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] = ???

// todo: implement
override def randomkey[A](implicit parse: Parse[A]): Option[A] = ???
override def time[A](implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] =
randomNode().withClient(_.time)

// todo: implement
override def select(index: Int): Boolean = ???
override def randomkey[A](implicit parse: Parse[A]): Option[A] =
onAllConns(_.randomkey).flatten.headOption

// todo: implement
override def move(key: Any, db: Int)(implicit format: Format): Boolean = ???
override def select(index: Int): Boolean =
onAllConns(_.select(index)).forall(_ == true)

// todo: implement
override def auth(secret: Any)(implicit format: Format): Boolean = ???
override def move(key: Any, db: Int)(implicit format: Format): Boolean =
processForKey(key)(_.move(key, db))

// todo: implement
override def persist(key: Any)(implicit format: Format): Boolean = ???
override def auth(secret: Any)(implicit format: Format): Boolean =
onAllConns(_.auth(secret)).forall(_ == true)

// todo: implement
override def scan[A](cursor: Int, pattern: Any, count: Int)(implicit format: Format, parse: Parse[A]): Option[(Option[Int], Option[List[Option[A]]])] = ???
override def persist(key: Any)(implicit format: Format): Boolean =
processForKey(key)(_.persist(key))

// todo: implement
override def ping: Option[String] = ???
override def scan[A](cursor: Int, pattern: Any, count: Int)
(implicit format: Format, parse: Parse[A]): Option[(Option[Int], Option[List[Option[A]]])] = ???

override def ping: Option[String] =
if (onAllConns(_.ping).forall(_ == pong)) {
pong
} else {
None
}

// todo: implement
override def watch(key: Any, keys: Any*)(implicit format: Format): Boolean = ???
Expand Down
73 changes: 73 additions & 0 deletions src/main/scala/com/redis/cluster/EvalOps.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.redis.cluster

import com.redis.api.EvalApi
import com.redis.serialization.{Format, Parse}

trait EvalOps extends EvalApi {
self: RedisClusterOps =>

// todo: broken output
override def evalMultiBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] =
processForKeys(keys)(gkeys => rc => rc.evalMultiBulk(luaCode, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalBulk[A](luaCode: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A] =
processForKeys(keys)(gkeys => rc => rc.evalBulk(luaCode, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalInt(luaCode: String, keys: List[Any], args: List[Any]): Option[Int] =
processForKeys(keys)(gkeys => rc => rc.evalInt(luaCode, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalMultiSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[List[Option[A]]] =
processForKeys(keys)(gkeys => rc => rc.evalMultiSHA(shahash, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalSHA[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A] =
processForKeys(keys)(gkeys => rc => rc.evalSHA(shahash, gkeys, args))
.flatten.headOption

// todo: broken output
override def evalSHABulk[A](shahash: String, keys: List[Any], args: List[Any])
(implicit format: Format, parse: Parse[A]): Option[A] =
processForKeys(keys)(gkeys => rc => rc.evalSHABulk(shahash, gkeys, args))
.flatten.headOption

override def scriptLoad(luaCode: String): Option[String] = {
val r = onAllConns(_.scriptLoad(luaCode))
oneCommonAnswerOr(r)(orError("ScriptLoad")).flatten
}

private val scriptExistsNot = Some(0)

override def scriptExists(shahash: String): Option[Int] = {
val r = onAllConns(_.scriptExists(shahash))
oneCommonAnswerOr(r)(_.find(_ == scriptExistsNot)).flatten
}

override def scriptFlush: Option[String] = {
val r = onAllConns(_.scriptFlush)
oneCommonAnswerOr(r)(orError("ScriptFlush")).flatten
}

private def orError[A](method: String)(r: Iterable[A]): Option[A] =
throw new IllegalStateException(s"Various values returned while $method from various instances: ${r.mkString(",")}")

protected def oneCommonAnswerOr[A](r: Iterable[A])(moreResultHandler: Iterable[A] => Option[A]): Option[A] = {
val distinct = r.toSeq.distinct
if (distinct.size > 1) {
moreResultHandler(distinct)
} else {
r.headOption
}
}

}
8 changes: 4 additions & 4 deletions src/main/scala/com/redis/cluster/HashOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ trait HashOps extends HashApi {
override def hgetall1[K, V](key: Any)(implicit format: Format, parseK: Parse[K], parseV: Parse[V]): Option[Map[K, V]] =
processForKey(key)(_.hgetall1[K, V](key))

// todo: implement
override def hsetnx(key: Any, field: Any, value: Any)(implicit format: Format): Boolean = ???
override def hsetnx(key: Any, field: Any, value: Any)(implicit format: Format): Boolean =
processForKey(key)(_.hsetnx(key, field, value))

// todo: implement
override def hincrbyfloat(key: Any, field: Any, value: Float)(implicit format: Format): Option[Float] = ???
override def hincrbyfloat(key: Any, field: Any, value: Float)(implicit format: Format): Option[Float] =
processForKey(key)(_.hincrbyfloat(key, field, value))

// todo: implement
override def hscan[A](key: Any, cursor: Int, pattern: Any, count: Int)(implicit format: Format, parse: Parse[A]): Option[(Option[Int], Option[List[Option[A]]])] = ???
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/com/redis/cluster/ListOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ trait ListOps extends ListApi {
override def brpop[K, V](timeoutInSeconds: Int, key: K, keys: K*)(implicit format: Format, parseK: Parse[K], parseV: Parse[V]): Option[(K, V)] =
inSameNode((key :: keys.toList): _*) { n => n.brpop[K, V](timeoutInSeconds, key, keys: _*) }

// todo: implement
override def lpushx(key: Any, value: Any)(implicit format: Format): Option[Long] = ???
override def lpushx(key: Any, value: Any)(implicit format: Format): Option[Long] =
processForKey(key)(_.lpushx(key, value))

// todo: implement
override def rpushx(key: Any, value: Any)(implicit format: Format): Option[Long] = ???
override def rpushx(key: Any, value: Any)(implicit format: Format): Option[Long] =
processForKey(key)(_.rpushx(key, value))

}
22 changes: 14 additions & 8 deletions src/main/scala/com/redis/cluster/NodeOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@ trait NodeOps extends NodeApi {
override def bgrewriteaof: Boolean =
onAllConns(_.bgrewriteaof) forall (_ == true)

// todo: implement
override def lastsave: Option[Long] = ???

// todo: implement
override def info: Option[String] = ???

// todo: implement
override def monitor: Boolean = ???
override def lastsave: Option[Long] =
onAllConns(_.lastsave).max

override def info: Option[String] = {
val e = onAllConns(_.info)
if (e.isEmpty) {
None
} else {
Some(e.flatten.mkString(","))
}
}

override def monitor: Boolean =
onAllConns(_.monitor).forall(_ == true)

// todo: implement
override def slaveof(options: Any): Boolean = ???
Expand Down
6 changes: 5 additions & 1 deletion src/main/scala/com/redis/cluster/RedisCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ abstract class RedisCluster(hosts: ClusterNode*)
with SetOps
with SortedSetOps
// with GeoOps todo: implement GeoApi
// with EvalOps todo: implement EvalApi
with EvalOps
// with HyperLogLogOps todo: implement HyperLogLogApi
with HashOps {

Expand Down Expand Up @@ -115,4 +115,8 @@ abstract class RedisCluster(hosts: ClusterNode*)

def close(): Unit = hr.cluster.map(_.close)

override protected[cluster] def randomNode(): RedisClientPool = {
val rni = r.nextInt(hr.cluster.size)
hr.cluster(rni)
}
}
Loading

0 comments on commit 8ac0ebc

Please sign in to comment.