Skip to content

Commit

Permalink
Add consensusj-analytics and build with JDK 11
Browse files Browse the repository at this point in the history
* Add consensusj-analytics module which requires JDK 11
* Disable asciidoclet :-(
* Drop modularity (for now) in consensusj-jsonrpc-cli
  • Loading branch information
msgilligan committed Feb 26, 2021
1 parent a6ade0d commit e51c0de
Show file tree
Hide file tree
Showing 22 changed files with 646 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gradle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macOS-latest, windows-latest]
java: ['9']
java: ['11']
fail-fast: false
name: ${{ matrix.os }} JDK ${{ matrix.java }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/regtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
java: [ '9' ]
java: [ '11' ]
fail-fast: false
name: ${{ matrix.os }} JDK ${{ matrix.java }}
steps:
Expand Down
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ script:
jdk:
- openjdk8
- openjdk9
#- openjdk11 OpenJDK 11 disabled until we upgrade AsciiDoclet
- openjdk11
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public Options options() {
* main method for bitcoinj-cli tool.
*
* See {@link BitcoinRpcCliOptions} for options and https://bitcoin.org/en/developer-reference#bitcoin-core-apis[Bitcoin Core JSON-RPC API]
* for the methods and parameters. Users can use `-?` to get general help or `help <command>` to get help
* for the methods and parameters. Users can use `-?` to get general help or {@code help _command_} to get help
* on a specific command.
*
* @param args options, JSON-RPC method, JSON-RPC parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ public Block getBlock(Integer index) throws JsonRpcStatusException, IOException
*
* @param generate turn generation on or off
* @param genproclimit Generation is limited to [genproclimit] processors, -1 is unlimited
* @return List<Sha256Hash> list containing block header hashes of the generated blocks or empty list
* @return list containing {@link Sha256Hash} block header hashes of the generated blocks or empty list
* @throws JsonRpcStatusException JSON RPC status exception
* @throws IOException network error
*
Expand Down Expand Up @@ -500,7 +500,7 @@ public SignedRawTransaction signRawTransactionWithWallet(String unsignedTransact
}

/**
* Get raw transaction info as hex->bitcoinj or verbose (json->POJO)
* Get raw transaction info as hex (conv to bitcoinj) or verbose (json POJO)
* @param txid Transaction ID/hash
* @param verbose `true` to return JSON transaction
* @return RawTransactionInfo if verbose, otherwise Transaction
Expand Down Expand Up @@ -980,7 +980,7 @@ public List<ChainTip> getChainTips() throws JsonRpcStatusException, IOException
/**
* Attempt to add or remove a node from the addnode list, or to try a connection to a node once.
*
* @param node node to add as a string in the form of <IP address>:<port>
* @param node node to add as a string in the form of {@code IP_address:port}
* @param command `add`, `remove`, or `onetry`
* @throws JsonRpcStatusException JSON RPC status exception
* @throws IOException network error
Expand Down
30 changes: 30 additions & 0 deletions consensusj-analytics/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id 'java-library'
}

sourceCompatibility = 11
targetCompatibility = 11

dependencies {
api "io.reactivex.rxjava3:rxjava:${rxJavaVersion}"
api "${bitcoinjGroup}:${bitcoinjArtifact}:${bitcoinjVersion}"

// For annotations on TokenRichList
//api "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
//api "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}"

}

ext.moduleName = 'org.consensusj.analytics'

tasks.withType(JavaCompile) {
options.compilerArgs << '-parameters' // Required for Jackson ParameterNamesModule
}

jar {
inputs.property("moduleName", moduleName)
manifest {
attributes 'Automatic-Module-Name': moduleName,
'Implementation-Version': archiveVersion.get()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.consensusj.analytics.service;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;

/**
* Interface for reactive rich list service
*
* @param <N> Numeric type for balances
* @param <ID> Type for currency identifiers (e.g. String, Omni Currency ID, etc)
*/
public interface RichListService<N extends Number & Comparable<? super N>, ID> {

/**
* Return a single rich list
*
* @param currencyID The currency ID
* @param numEntries The requested number of entries in the list
* @return An RxJava single for lazy access to the response
*/
Single<TokenRichList<N, ID>> richList(ID currencyID, int numEntries);

/**
* Get a continuous stream of rich list updates
*
* @param currencyID The currency ID
* @param numEntries The requested number of entries in each list
* @return An RxJava Observable for lazy access to the stream
*/
Observable<TokenRichList<N, ID>> richListUpdates(ID currencyID, int numEntries);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.consensusj.analytics.service;

import org.bitcoinj.core.Address;
import org.bitcoinj.core.Sha256Hash;

import java.util.List;

/**
* Response format for Omni rich list queries.
*
* TODO: Jackson annotations?
*/
public class TokenRichList<N extends Number & Comparable<? super N>, ID> {
private final int blockHeight;
private final Sha256Hash blockHash;
private final long timestamp;
private final ID currencyID;
private final List<TokenBalancePair<N>> richList;
private final N otherBalanceTotal;

public TokenRichList(int blockHeight,
Sha256Hash blockHash,
long timestamp,
ID currencyID,
List<TokenBalancePair<N>> richList,
N otherBalanceTotal) {
this.blockHeight = blockHeight;
this.blockHash = blockHash;
this.timestamp = timestamp;
this.currencyID = currencyID;
this.richList = List.copyOf(richList);
this.otherBalanceTotal = otherBalanceTotal;
}

public static class TokenBalancePair<N extends Number & Comparable<? super N>> {
private final Address address;
private final N balance;

public TokenBalancePair(Address address, N balance) {
this.address = address;
this.balance = balance;
}

public Address getAddress() {
return address;
}

public N getBalance() {
return balance;
}
}

public int getBlockHeight() {
return blockHeight;
}

public Sha256Hash getBlockHash() {
return blockHash;
}

public long getTimestamp() {
return timestamp;
}

public ID getCurrencyID() {
return currencyID;
}

public List<TokenBalancePair<N>> getRichList() {
return richList;
}

public N getOtherBalanceTotal() {
return otherBalanceTotal;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.consensusj.analytics.util.collector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractQueue;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* Accumulates the {@code n} largest slices of a stream of objects of type {@code <E>}. Uses a {@link PriorityQueue}
* implementation of {@link AbstractQueue} internally.
* <p>
* This class is <b>not</b> thread-safe and is <b>not</b> designed for use with a {@link java.util.stream.Collector.Characteristics#CONCURRENT}
* {@link java.util.stream.Collector}. Switching the {@link AbstractQueue} implementation to {@link java.util.concurrent.PriorityBlockingQueue}
* will work (with reduced performance) but the implementations of {@link #accumulate(Object)}, {@link #combine(LargestSliceAccumulator)},
* and {@link #drain()} are not thread-safe. Even if this class were thread-safe, enabling {@code CONCURRENT}
* in a {@code Collector} using this class would <b>reduce</b> performance because multiple threads would be trying to merge Objects
* into the same accumulator. The optimal way to use this class is with one accumulator per thread
* and this is what Collector will do without the {@code CONCURRENT} flag.
*
* @param <E> must be comparable because it is the second sort field
* @param <N> numeric type for Slice Size
*/
public final class LargestSliceAccumulator<E, N extends Number & Comparable<? super N>> {
private static final Logger log = LoggerFactory.getLogger(LargestSliceAccumulator.class);
private final int n; // Maximum number of slices to track
private final Function<E, N> sliceSizeExtractor;
private final BinaryOperator<N> additionOperator;
private final AbstractQueue<E> sliceQueue;
private final Comparator<E> comparator;
private N otherTotal;

/**
* Construct
* @param n maximum number of keys (addresses) to track
* @param sliceSizeExtractor Function to compute the slice size
* @param zero The value zero for type N
* @param additionOperator binary addition operator for type N
*/
public LargestSliceAccumulator(int n,
Function<E, N> sliceSizeExtractor,
N zero,
BinaryOperator<N> additionOperator) {
if (n < 1) {
throw new IllegalArgumentException("parameter must be 1 or greater");
}
this.n = n;
this.sliceSizeExtractor = sliceSizeExtractor;
this.additionOperator = additionOperator;
this.otherTotal = zero;
this.comparator = Comparator.comparing(sliceSizeExtractor);
log.trace("Creating accum queue");
sliceQueue = new PriorityQueue<>(n, Comparator.comparing(sliceSizeExtractor));
}

/**
* Add a new slice to the accumulator
*
* @param newSlice slice to accumulate
*/
void accumulate(E newSlice) {
//log.trace("accumulating slice of size: {}", sliceSizeExtractor.apply(newSlice).doubleValue());
sliceQueue.add(newSlice);
drain(); // Remove extra elements.
}

/**
* Combine two accumulators
*
* @param other the other accumulator
* @return the combined accumulator (this)
*/
LargestSliceAccumulator<E, N> combine(LargestSliceAccumulator<E, N > other) {
other.sliceQueue.forEach(this::accumulate);
otherTotal = plus(otherTotal, other.otherTotal);
return this;
}

// It's simpler (and possibly faster) to let the queue do the size comparison
// and just remove extra elements rather than implement our own checks _before_
// adding to the queue.
private void drain() {
while (sliceQueue.size() > n) {
E removed = sliceQueue.poll();
if (removed != null) {
otherTotal = plus(otherTotal, sliceSizeExtractor.apply(removed));
}
}
}

/**
* Sort the sliceQueue and return as a {@link List}.
* Normally, This should only be called by the Collector finisher function.
*
* @return List of slices sorted by their slice size (based on extractor)
*/
List<E> getSortedSliceList() {
// E[] tmp = (E[]) sliceQueue.toArray(); // Copies to an array
// Arrays.sort(tmp, comparator); // Sorts the array
// return List.of(tmp); // Copy into a list
return sliceQueue.stream()
.sorted(comparator)
.collect(Collectors.toUnmodifiableList());
}

/**
* Get the total "slice size" of processed elements that are not saved in this accumulator.
*
* @return total slice size of other elements.
*/
N getTotalOther() {
return otherTotal;
}

private N plus(N n1, N n2) {
return additionOperator.apply(n1, n2);
}
}
Loading

0 comments on commit e51c0de

Please sign in to comment.