Skip to content

Overview

MarkFAquaQ edited this page Jan 25, 2021 · 13 revisions

Chronicle Queue to kdb+ Adapter

What is Chronicle Queue?

This memory mapped file is also used for exceptionally fast inter-process communication (IPC) without affecting your system performance. This is especially useful when there is a need to transfer large amounts of data, its ideal for transferring data between processes very quickly on the same server or across the network. There is no Garbage Collection (GC) as everything is done off heap.

More info: https://chronicle.software/libraries/queue/

What is kdb+ Time Series Database?

Kdb+, from Kx, is / has:

  • a high-performance cross-platform historical time-series columnar database
  • an in-memory compute engine
  • a real-time streaming processor
  • an expressive query and programming language called q

More info: https://code.kx.com/q/

Project goal

Create an "adapter" to allow messages arriving on a Chronicle Queue to be inserted into a table in kdb+.

Scenario

  • "Quotes added as messages to a Chronicle Queue should be replicated as entries in the quote table in a kdb+ database."
  • Create an Adapter to enable this
    • Quote >> Chronicle Queue >> Adapter >> kdb+

Components

  • Producer
    • Will create Quotes and place as new messages on a named Chronicle Queue.
  • Chronicle Queue
    • A file based queue called "quote", addressed via filesystem location e.g. "C:\ChronicleQueue\Producer\quote"
  • Adapter
    • Application to read from the queue and write to a kdb+ database table.
  • kdb+
    • Destination database containing quote database table and supporting functions to allow data to be added.

"Quote" message

Messages representing a Quote will be randomly generated by the Producer and added to the Quote queue in the following format:

Name Value
time 2020.01.24+14:00:16.083Z
sym VOD.L
bid 152
bsize 42035
ask 152
assize 48514
bex XLON
aex XLON

Producer

The Producer will be configured to connect to a specified queue, the quote queue, and write write a stream of quotes to the queue.

The quotes will be randomly generated as per the format above and will be added to the queue using a Chronicle Queue Appender.

Quotes will then be written to the queue as a "self-describing message". E.g.:

appender.writeDocument(w -> w.write("quote").marshallable(
		m -> m.write("time").dateTime(LocalDateTime.now())
				.write("sym").text("VOD.L")
				.write("bid").float64(152)
				.write("bsize").float64(42035)
				.write("ask").float64(152)
				.write("assize").float64(48514)
				.write("bex").text(XLON)
				.write("aex").text(XLON)
));	

Adapter process

Adapter application

The Adapter for our scenario has been built as a Java with Spring Boot (CommandLineRunner) application that builds out to a jar file. The application will implement the process above using vendor specific libraries (Chronicle and kdb+) as well as other 3rd party Open Source libraries.

1. Connect to data source i.e. Chronicle Queue

When the application is started, use Chronicle Queue java library to connect to a queue.

This library can be added to a maven project pom.xml file as a dependency:

<dependency>
	<groupId>net.openhft</groupId>
	<artifactId>chronicle-queue</artifactId>
	<version>5.20.111</version>
</dependency>

The data source / queue should be identified via an external properties file. Chronicle Queues are typically addressed via filesystem location e.g. "C:\ChronicleQueue\Producer\quote"

chronicle.source=C:\\ChronicleQueue\\Producer\\quote

2. Check queue for new messages

Once connected to the data source, use a Chronicle Queue Tailer to read the queue and check for new messages of the configured "type". The tailer should be named, based on configuration in an external properties file. The tailer should read forward from the last message read when re-started.

adapter.tailerName=quoteTailer
adapter.messageType=quote

3. Read message data ( -> chronicle obj)

Marshall each quote message that is read by the tailer into a specific POJO representing the chronicle queue quote message. This will be achieved using a simple Builder component to create an instance of the "Chronicle Quote Message" POJO for each message received.

4. Do mapping (chronicle obj -> kdb obj)

Mapstruct allows simple or very complex mapping between source and destination POJO's to be coded quickly as an interface. Mapping is defined in the interface and when the application is built, code is automatically generated to implement the interface. Use Mapstruct to map the source Chronicle Quote Message POJO to the destination kdb+ Quote POJO.

Maven dependency to include Mapstruct:

<dependency>
	<groupId>org.mapstruct</groupId>
	<artifactId>mapstruct</artifactId>
	<version>1.4.1.Final</version>
</dependency>

Reference guide: https://mapstruct.org/documentation/stable/reference/html/

5. Add kdb msg to current kdb envelope

Rather than send messages to kdb+ one at a time, it is more efficient to batch them together into a single call in a structured Object array. To facilitate this, the application will manage an "envelope" containing the combined data of one or more messages.

6. Send data to destination ( -> kdb+)

Batching methods

The envelope size will be limited by another configuration parameter:

kdb.envelope.size=100

As there may be occasions when an envelope may not be full, that will lead to messages having been read from the source queue being "stuck" in the Adapter. To get around this, a Timer will be used to track the amount of time between messages being added to the envelope.

kdb.envelope.waitTime=100

The parameter above will set the wait time and if that is exceeded, the current envelope will be sent to kdb+ and then reset.

kdb+

kdb+ provides a java API in a single source file on GitHub. Inclusion in a development project is, therefore, a straightforward matter of including the file with other source code under the package kx, and ensuring it is properly imported and referenced by other classes.

Reference: https://code.kx.com/q/wp/java-api/

The Adapter will use this API to connect to (or check a connection is already open to) a kdb+ database defined in external config e.g.:

kdb.host=localhost
kdb.port=5000
kdb.login=username:password
kdb.connection-enabled=true

In the kdb+ database there is a quote table defined as:

.schema.addschema ([]table:`quote;col:`time`chrontime`sym`bid`bsize`ask`asize`bex`aex;coltype:`timestamp`timestamp`symbol`float`float`float`float`symbol`symbol;isnested:000000000b);

and a function ".u.upd" to insert data into the database.

The destination table and function we are using are specified in the external config file also:

kdb.destination=quote
kdb.destination.function=.u.upd

To insert entries into the quote table, the adapter uses one of the "ks" methods to send all messages in the current envelope to kdb+:

kdbConnection.ks(kdbMethod, destinationTable, kdbEnvelope.toObjectArray());

the kdb message data is formatted as Object[] similar to this:

[chrontime[],sym[],bsize[],ask[],asize[],bex[],aex[]]

Running the process

Start the Producer

Once running, the Producer component can be started and stopped via endpoints to allow some control and help testing. These can be accessed via the built-in Swagger page, directly with a url or directly through a tool like Postman. E.g.:

*Start message production using http://localhost:9090/producer/quoteLoader?Command%3A%20start%2Fstop=start *Stop message production using http://localhost:9090/producer/quoteLoader?Command%3A%20start%2Fstop=stop

The Producer console showing messages being added to the queue:

2021-01-20 15:50:16.199  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187963] / (1 written)
2021-01-20 15:50:16.305  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187964] / (2 written)
2021-01-20 15:50:16.417  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187965] / (3 written)
2021-01-20 15:50:16.528  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187966] / (4 written)
2021-01-20 15:50:16.643  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187967] / (5 written)
2021-01-20 15:50:16.755  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187968] / (6 written)
2021-01-20 15:50:16.860  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187969] / (7 written)
2021-01-20 15:50:16.972  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187970] / (8 written)
2021-01-20 15:50:17.082  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187971] / (9 written)
2021-01-20 15:50:17.193  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187972] / (10 written)
2021-01-20 15:50:17.305  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187973] / (11 written)
2021-01-20 15:50:17.416  INFO 17636 --- [nio-9090-exec-8] c.c.d.p.controllers.ProducerController   : *** Quote Message written to index [80088255187974] / (12 written)
......

Start kdb+ instance

If running on a Windows OS, use a cmd prompt / Powershell window and run the following command:

q dummytp.q -p 5000 -u 1}

should return the "q prompt similar to that below:

KDB+ 4.0 2020.05.04 Copyright (C) 1993-2020 Kx Systems
w64/ 4(16)core 16080MB xx yy-zz 172.20.64.1 EXPIRE 2021.11.18 [email protected] KOD #1234567

q)

Check that the quote table exists and is empty:

q)quote
time chrontime sym bid bsize ask asize bex aex
----------------------------------------------
q)

Start the Adapter

Once started, the adapter will connect to the kdb+ database first and then the Chronicle queue.

....
2021-01-20 15:49:40.509  INFO 22676 --- [  restartedMain] net.openhft.chronicle.core.Jvm           : Chronicle core loaded from file:/C:/Users/MarkF/.m2/repository/net/openhft/chronicle-core/2.20.116/chronicle-core-2.20.116.jar
2021-01-20 15:49:40.712  WARN 22676 --- [  restartedMain] .ReflectionBasedByteBufferCleanerService : Make sure you have set the command line option "--illegal-access=permit --add-exports java.base/jdk.internal.ref=ALL-UNNAMED" to enable ReflectionBasedByteBufferCleanerService
2021-01-20 15:49:40.716  WARN 22676 --- [  restartedMain] net.openhft.chronicle.bytes.MappedFile   : Took 27 ms to add mapping for C:\ChronicleQueue\Producer\quote\metadata.cq4t
2021-01-20 15:49:41.149  INFO 22676 --- [  restartedMain] c.k.a.chronicle.ChronicleKdbAdapter      : Tailer starting at index: 80088255187963
2021-01-20 15:50:16.177  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Still connected to Kdb server
2021-01-20 15:50:16.179  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Persisted message to Kdb
2021-01-20 15:50:16.180  INFO 22676 --- [  restartedMain] c.k.a.chronicle.ChronicleKdbAdapter      : Processed message @ index: 80088255187963
2021-01-20 15:50:16.306  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Still connected to Kdb server
2021-01-20 15:50:16.307  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Persisted message to Kdb
2021-01-20 15:50:16.308  INFO 22676 --- [  restartedMain] c.k.a.chronicle.ChronicleKdbAdapter      : Processed message @ index: 80088255187964
2021-01-20 15:50:16.418  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Still connected to Kdb server
2021-01-20 15:50:16.418  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Persisted message to Kdb
2021-01-20 15:50:16.418  INFO 22676 --- [  restartedMain] c.k.a.chronicle.ChronicleKdbAdapter      : Processed message @ index: 80088255187965
2021-01-20 15:50:16.530  INFO 22676 --- [  restartedMain] com.kdb.adapter.kdb.KdbConnector         : *** Still connected to Kdb server
....

Each message read is added to kdb+ and will show in the kdb+ console as follows:

....
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.162380300;,`JUVE.MI;,1231f;,5596f;,123..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.304377500;,`JUVE.MI;,1236f;,1756f;,123..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.416378800;,`JUVE.MI;,1236f;,10074f;,12..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.528382000;,`VOD.L;,153f;,4009f;,153f;,..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.640377200;,`JUVE.MI;,1235f;,42974f;,12..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.751386200;,`JUVE.MI;,1234f;,13624f;,12..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.860383100;,`VOD.L;,153f;,22700f;,153f;..
insert successful
received message: "quote"
received message: (".u.upd";`quote;(,2021.01.20D15:50:16.971379100;,`HEIN.AS;,102f;,25877f;,102..
insert successful
....

A simple check of the quote table shows that the data has been added:

q)quote
time                          chrontime                     sym     bid  bsiz..
-----------------------------------------------------------------------------..
2021.01.20D15:50:16.201095000 2021.01.20D15:50:16.162380300 JUVE.MI 1231 5596..
2021.01.20D15:50:16.310367000 2021.01.20D15:50:16.304377500 JUVE.MI 1236 1756..
2021.01.20D15:50:16.419537000 2021.01.20D15:50:16.416378800 JUVE.MI 1236 1007..
2021.01.20D15:50:16.531943000 2021.01.20D15:50:16.528382000 VOD.L   153  4009..
2021.01.20D15:50:16.646396000 2021.01.20D15:50:16.640377200 JUVE.MI 1235 4297..
2021.01.20D15:50:16.755128000 2021.01.20D15:50:16.751386200 JUVE.MI 1234 1362..
2021.01.20D15:50:16.864050000 2021.01.20D15:50:16.860383100 VOD.L   153  2270..
2021.01.20D15:50:16.976238000 2021.01.20D15:50:16.971379100 HEIN.AS 102  2587..
2021.01.20D15:50:17.085610000 2021.01.20D15:50:17.082375700 HEIN.AS 101  2271..
2021.01.20D15:50:17.198911000 2021.01.20D15:50:17.193383600 VOD.L   151  3711..
2021.01.20D15:50:17.316188000 2021.01.20D15:50:17.305379200 HEIN.AS 103  8677..
2021.01.20D15:50:17.419329000 2021.01.20D15:50:17.416375700 VOD.L   151  3894..
2021.01.20D15:50:17.540995000 2021.01.20D15:50:17.535379900 VOD.L   153  2080..
2021.01.20D15:50:17.648976000 2021.01.20D15:50:17.644907400 HEIN.AS 102  4721..
2021.01.20D15:50:17.768347000 2021.01.20D15:50:17.760911300 JUVE.MI 1239 3485..
2021.01.20D15:50:17.882014000 2021.01.20D15:50:17.873906000 JUVE.MI 1237 5590..
2021.01.20D15:50:17.988313000 2021.01.20D15:50:17.983929900 VOD.L   154  6214..
2021.01.20D15:50:18.107047000 2021.01.20D15:50:18.099918800 HEIN.AS 102  4223..

NOTE: The "time" column is the inserted time kdb generated and the "chrontime" column is the timestamp value from each message generated by the Producer application.

Additional info

The adapter application has some additional data availble that has been enabled using Spring Boot Actuator.

General Application status

This endpoint reports the state of the java application e.g. UP, and additional JVM info.

http://localhost:8080/adapter/health

{"status":"UP","components":{"diskSpace":{"status":"UP","details":{"total":255482392576,"free":28916776960,"threshold":10485760,"exists":true}},"ping":{"status":"UP"}}}

Adapter specific information

This endpoint is more specific information on the adapter instance itself. It reports the name of the Tailer, the queue being processed, the table in kdb+ being written to and the index (pointer in Chronicle Queue) of the last message read from the queue.

http://localhost:8080/adapter/status

{"message":"Adapter info: Tailer [quoteTailer] processing queue [C:\\ChronicleQueue\\Producer\\quote] writing to KDB [quote] Last message index [80066780334716]"}

Summary

As a reminder, the adapter scenario that was used was:

  • "Quotes added as messages to a Chronicle Queue should be replicated as entries in the quote table in a kdb+ database."
  • Create an Adapter to enable this
    • Quote >> Chronicle Queue >> Adapter >> kdb+

We now have enabled this by developing an adapter that can process quote messages from a configurable source, that is a Chronicle Queue, and write each message as a new entry in a configurable destination table in a kdb+ database.

Further considerations

Whilst there are some generic aspects to the adapter that are configurable e.g. source and destination details, there are some parts of the solution that are difficult to generify.

The scenario used was based on our quote messages. The messages were generated in a specific, known format, added to the queue in a specific manner, i.e. as a self-describing message. This allowed the adapter to map known fields from the source message to known fields in the destination object. Finally, the kdb+ table used was structured specifically for quote data.

The quote adapter example that has been completed, should be considered as a base framework to develop further message specific adapters from Chronicle Queue into kdb+. That means for every adapter required there will

Possible next steps

Develop the reverse scenario, i.e. quote >> kdb+ >> adapter >> Chronicle queue