From 891ea7190cba15c22a659d4889ab972a097daf34 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Fri, 22 Sep 2023 18:03:38 +0200 Subject: [PATCH] [vector-db-support] Add support for Milvius.io - part 1 (#470) --- examples/applications/query-milvus/.gitignore | 1 + examples/applications/query-milvus/README.md | 83 ++++++ .../applications/query-milvus/chatbot.yaml | 94 +++++++ .../query-milvus/configuration.yaml | 34 +++ .../applications/query-milvus/crawler.yaml | 97 +++++++ .../applications/query-milvus/gateways.yaml | 43 +++ .../query-milvus/write-to-database.yaml | 93 +++++++ examples/secrets/secrets.yaml | 7 + .../oss/streaming/ai/jstl/JstlEvaluator.java | 9 + .../oss/streaming/ai/jstl/JstlFunctions.java | 22 ++ .../langstream-vector-agents/pom.xml | 5 + .../agents/vector/InterpolationUtils.java | 93 +++++++ .../milvus/MilvusAssetsManagerProvider.java | 183 ++++++++++++ .../vector/milvus/MilvusDataSource.java | 143 ++++++++++ .../agents/vector/milvus/MilvusModel.java | 230 ++++++++++++++++ .../agents/vector/milvus/MilvusWriter.java | 138 ++++++++++ .../vector/pinecone/PineconeDataSource.java | 260 ++++++++---------- ...am.ai.agents.datasource.DataSourceProvider | 3 +- ....api.database.VectorDatabaseWriterProvider | 3 +- ...eam.api.runner.assets.AssetManagerProvider | 3 +- .../datasource/impl/MilvusDataSourceTest.java | 156 +++++++++++ .../test/resources/milvus-compose-test.yml | 76 +++++ .../impl/assets/MilvusAssetsProvider.java | 58 ++++ .../VectorDatabaseResourceProvider.java | 15 +- ...i.langstream.api.runtime.AssetNodeProvider | 3 +- .../kafka/MilvusVectorAssetQueryWriteIT.java | 173 ++++++++++++ 26 files changed, 1877 insertions(+), 148 deletions(-) create mode 100644 examples/applications/query-milvus/.gitignore create mode 100644 examples/applications/query-milvus/README.md create mode 100644 examples/applications/query-milvus/chatbot.yaml create mode 100644 examples/applications/query-milvus/configuration.yaml create mode 100644 examples/applications/query-milvus/crawler.yaml create mode 100644 examples/applications/query-milvus/gateways.yaml create mode 100644 examples/applications/query-milvus/write-to-database.yaml create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/InterpolationUtils.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusAssetsManagerProvider.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusDataSource.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusModel.java create mode 100644 langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java create mode 100644 langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/MilvusDataSourceTest.java create mode 100644 langstream-agents/langstream-vector-agents/src/test/resources/milvus-compose-test.yml create mode 100644 langstream-core/src/main/java/ai/langstream/impl/assets/MilvusAssetsProvider.java create mode 100644 langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java diff --git a/examples/applications/query-milvus/.gitignore b/examples/applications/query-milvus/.gitignore new file mode 100644 index 000000000..55dea2dd3 --- /dev/null +++ b/examples/applications/query-milvus/.gitignore @@ -0,0 +1 @@ +java/lib/* \ No newline at end of file diff --git a/examples/applications/query-milvus/README.md b/examples/applications/query-milvus/README.md new file mode 100644 index 000000000..b1c196bc8 --- /dev/null +++ b/examples/applications/query-milvus/README.md @@ -0,0 +1,83 @@ +# Indexing a WebSite + +This sample application shows how to use the WebCrawler Source Connector and use Milvus.io as a Vector Database. + +## Prerequisites + +Create a S3 bucket, it will contain only a metadata file for the WebCrawler. + +Start a Milvus.io instance, you can use the following Helm chart: + +The LangStream application will create for you a collection named "documents" in "default" database. + +``` +documents ( + filename string, + chunk_id int, + num_tokens int, + language string, + text string +) +``` + + +## Configure access to the Vector Database + +Export some ENV variables in order to configure access to the database: + +```bash +export MILVUS_HOST=... +export MILVUS_PORT=... +export MILVUS_USERNAME=... +export MILVUS_PASSWORD=... +``` + + +The examples/secrets/secrets.yaml resolves those environment variables for you. +When you go in production you are supposed to create a dedicated secrets.yaml file for each environment. + +## Configure an S3 bucket to store the status of the Crawler + +The Web Crawling Source Connector requires an S3 bucket to store the status of the crawler. +It doesn't copy the contents of the web pages, it only stores some metadata. + +If you are using AWS S3, you can use the following environment variables: + +``` +export S3_BUCKET_NAME... +export S3_ENDPOINT=https://s3.amazonaws.com +export S3_ACCESS_KEY=... +export S3_SECRET=... +``` + +The default configuration uses the internal MinIO service deployed in the local Kubernetes cluster, +this is useful for testing purposes only and it works only when you deployed LangStream locally. + + +## Configure the pipeline + +Edit the file `crawler.yaml` and configure the list of the allowed web domains, this is required in order to not let the crawler escape outside your data. +Configure the list of seed URLs, for instance with your home page. + +The default configuration in this example will crawl the LangStream website. + +## Deploy the LangStream application + +``` +./bin/langstream apps deploy test -app examples/applications/query_milvus -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml +``` + +## Talk with the Chat bot using the CLI +Since the application opens a gateway, we can use the gateway API to send and consume messages. + +``` +./bin/langstream gateway chat test -cg bot-output -pg user-input -p sessionId=$(uuidgen) +``` + +Responses are streamed to the output-topic. If you want to inspect the history of the raw answers you can +consume from the log-topic using the llm-debug gateway: + +``` +./bin/langstream gateway consume test llm-debug +``` + diff --git a/examples/applications/query-milvus/chatbot.yaml b/examples/applications/query-milvus/chatbot.yaml new file mode 100644 index 000000000..d37a1c22f --- /dev/null +++ b/examples/applications/query-milvus/chatbot.yaml @@ -0,0 +1,94 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +topics: + - name: "questions-topic" + creation-mode: create-if-not-exists + - name: "answers-topic" + creation-mode: create-if-not-exists + - name: "log-topic" + creation-mode: create-if-not-exists +errors: + on-failure: "skip" +pipeline: + - name: "convert-to-structure" + type: "document-to-json" + input: "questions-topic" + configuration: + text-field: "question" + - name: "compute-embeddings" + type: "compute-ai-embeddings" + configuration: + model: "{{{secrets.open-ai.embeddings-model}}}" # This needs to match the name of the model deployment, not the base model + embeddings-field: "value.question_embeddings" + text: "{{% value.question }}" + flush-interval: 0 + - name: "lookup-related-documents-in-llm" + type: "query" + configuration: + datasource: "MilvusDatasource" + query: | + { + "collection-name": "documents", + "vectors": ?, + "top-k": 1 + "output-fields": ["text"] + } + fields: + - "value.question_embeddings" + output-field: "value.related_documents" + - name: "ai-chat-completions" + type: "ai-chat-completions" + + configuration: + model: "{{{secrets.open-ai.chat-completions-model}}}" # This needs to be set to the model deployment name, not the base name + # on the log-topic we add a field with the answer + completion-field: "value.answer" + # we are also logging the prompt we sent to the LLM + log-field: "value.prompt" + # here we configure the streaming behavior + # as soon as the LLM answers with a chunk we send it to the answers-topic + stream-to-topic: "answers-topic" + # on the streaming answer we send the answer as whole message + # the 'value' syntax is used to refer to the whole value of the message + stream-response-completion-field: "value" + # we want to stream the answer as soon as we have 20 chunks + # in order to reduce latency for the first message the agent sends the first message + # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value + # eventually we want to send bigger messages to reduce the overhead of each message on the topic + min-chunks-per-message: 20 + messages: + - role: system + content: | + An user is going to perform a questions, The documents below may help you in answering to their questions. + Please try to leverage them in your answer as much as possible. + Take into consideration that the user is always asking questions about the LangStream project. + If you provide code or YAML snippets, please explicitly state that they are examples. + Do not provide information that is not related to the LangStream project. + + Documents: + {{%# value.related_documents}} + {{% text}} + {{%/ value.related_documents}} + - role: user + content: "{{% value.question}}" + - name: "cleanup-response" + type: "drop-fields" + output: "log-topic" + configuration: + fields: + - "question_embeddings" + - "related_documents" \ No newline at end of file diff --git a/examples/applications/query-milvus/configuration.yaml b/examples/applications/query-milvus/configuration.yaml new file mode 100644 index 000000000..b56d7b622 --- /dev/null +++ b/examples/applications/query-milvus/configuration.yaml @@ -0,0 +1,34 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +configuration: + resources: + - type: "open-ai-configuration" + name: "OpenAI Azure configuration" + configuration: + url: "{{ secrets.open-ai.url }}" + access-key: "{{ secrets.open-ai.access-key }}" + provider: "{{ secrets.open-ai.provider }}" + - type: "datasource" + name: "MilvusDatasource" + configuration: + service: "milvus" + username: "{{{ secrets.milvus.username }}}" + password: "{{{ secrets.milvus.password }}}" + host: "{{{ secrets.milvus.host }}}" + port: "{{{ secrets.milvus.port }}}" + diff --git a/examples/applications/query-milvus/crawler.yaml b/examples/applications/query-milvus/crawler.yaml new file mode 100644 index 000000000..9bec4b98d --- /dev/null +++ b/examples/applications/query-milvus/crawler.yaml @@ -0,0 +1,97 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Crawl a website" +topics: + - name: "chunks-topic" + creation-mode: create-if-not-exists +resources: + size: 2 +pipeline: + - name: "Crawl the WebSite" + type: "webcrawler-source" + configuration: + seed-urls: ["https://docs.langstream.ai/"] + allowed-domains: ["https://docs.langstream.ai"] + forbidden-paths: [] + min-time-between-requests: 500 + reindex-interval-seconds: 3600 + max-error-count: 5 + max-urls: 1000 + max-depth: 50 + handle-robots-file: true + user-agent: "" # this is computed automatically, but you can override it + scan-html-documents: true + http-timeout: 10000 + handle-cookies: true + max-unflushed-pages: 100 + bucketName: "{{{secrets.s3.bucket-name}}}" + endpoint: "{{{secrets.s3.endpoint}}}" + access-key: "{{{secrets.s3.access-key}}}" + secret-key: "{{{secrets.s3.secret}}}" + region: "{{{secrets.s3.region}}}" + - name: "Extract text" + type: "text-extractor" + - name: "Normalise text" + type: "text-normaliser" + configuration: + make-lowercase: true + trim-spaces: true + - name: "Detect language" + type: "language-detector" + configuration: + allowedLanguages: ["en", "fr"] + property: "language" + - name: "Split into chunks" + type: "text-splitter" + configuration: + splitter_type: "RecursiveCharacterTextSplitter" + chunk_size: 400 + separators: ["\n\n", "\n", " ", ""] + keep_separator: false + chunk_overlap: 100 + length_function: "cl100k_base" + - name: "Convert to structured data" + type: "document-to-json" + configuration: + text-field: text + copy-properties: true + - name: "prepare-structure" + type: "compute" + configuration: + fields: + - name: "value.filename" + expression: "properties.url" + type: STRING + - name: "value.chunk_id" + expression: "properties.chunk_id" + type: STRING + - name: "value.language" + expression: "properties.language" + type: STRING + - name: "value.chunk_num_tokens" + expression: "properties.chunk_num_tokens" + type: STRING + - name: "compute-embeddings" + id: "step1" + type: "compute-ai-embeddings" + output: "chunks-topic" + configuration: + model: "text-embedding-ada-002" # This needs to match the name of the model deployment, not the base model + embeddings-field: "value.embeddings_vector" + text: "{{% value.text }}" + batch-size: 10 + flush-interval: 500 \ No newline at end of file diff --git a/examples/applications/query-milvus/gateways.yaml b/examples/applications/query-milvus/gateways.yaml new file mode 100644 index 000000000..132788270 --- /dev/null +++ b/examples/applications/query-milvus/gateways.yaml @@ -0,0 +1,43 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +gateways: + - id: "user-input" + type: produce + topic: "questions-topic" + parameters: + - sessionId + produceOptions: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + - id: "bot-output" + type: consume + topic: "answers-topic" + parameters: + - sessionId + consumeOptions: + filters: + headers: + - key: langstream-client-session-id + valueFromParameters: sessionId + + + - id: "llm-debug" + type: consume + topic: "log-topic" \ No newline at end of file diff --git a/examples/applications/query-milvus/write-to-database.yaml b/examples/applications/query-milvus/write-to-database.yaml new file mode 100644 index 000000000..1c009cd79 --- /dev/null +++ b/examples/applications/query-milvus/write-to-database.yaml @@ -0,0 +1,93 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +name: "Write to AstraDB" +topics: + - name: "chunks-topic" + creation-mode: create-if-not-exists +assets: + - name: "documents-table" + asset-type: "cassandra-table" + creation-mode: create-if-not-exists + config: + table-name: "documents" + keyspace: "documents" + datasource: "AstraDatasource" + create-statements: + - - name: "documents-table" + asset-type: "milvus-collection" + creation-mode: create-if-not-exists + config: + collection-name: "documents" + database-name: "default" + datasource: "MilvusDatasource" + create-statements: + - | + { + "collection-name": "documents", + "database-name": "default", + "dimension": 5, + "field-types": [ + { + "name": "id", + "primary-key": true, + "data-type": "Int64" + "auto-id": true + }, + { + "name": "filename", + "data-type": "string" + }, + { + "name": "name", + "data-type": "string" + }, + { + "name": "num_tokens", + "data-type": "Int32" + }, + { + "name": "language", + "data-type": "string" + }, + { + "name": "text", + "data-type": "string" + }, + ] + } +pipeline: + - name: "Write to Milvus" + type: "vector-db-sink" + input: "chunks-topic" + resources: + size: 2 + configuration: + datasource: "MilvusDatasource" + collection-name: "documents" + fields: + - name: filename + expression: "value.filename" + - name: vector + expression: "fn:toListOfFloat(value.embeddings_vector)" + - name: name + expression: "value.name" + - name: language + expression: "value.language" + - name: text + expression: "value.text" + - name: num_tokens + expression: "value.chunk_num_tokens" \ No newline at end of file diff --git a/examples/secrets/secrets.yaml b/examples/secrets/secrets.yaml index 7a24daccf..162902d26 100644 --- a/examples/secrets/secrets.yaml +++ b/examples/secrets/secrets.yaml @@ -80,3 +80,10 @@ secrets: project-name: "${PINECONE_PROJECT_NAME:-}" environment: "${PINECONE_ENVIRONMENT:-asia-southeast1-gcp-free}" index-name: "${PINECONE_INDEX_NAME:-example-index}" + - name: milvus + id: milvus + data: + username: "${MILVUS_USERNAME:-}" + password: "${MILVUS_PASSWORD:-}" + host: "${MILVUS_HOST:-}" + port: "${MILVUS_PORT:19530}" \ No newline at end of file diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlEvaluator.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlEvaluator.java index 236f56c36..46c43c806 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlEvaluator.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlEvaluator.java @@ -105,6 +105,15 @@ private void registerFunctions() { this.expressionContext .getFunctionMapper() .mapFunction("fn", "toInt", JstlFunctions.class.getMethod("toInt", Object.class)); + this.expressionContext + .getFunctionMapper() + .mapFunction( + "fn", + "toListOfFloat", + JstlFunctions.class.getMethod("toListOfFloat", Object.class)); + this.expressionContext + .getFunctionMapper() + .mapFunction("fn", "toLong", JstlFunctions.class.getMethod("toLong", Object.class)); this.expressionContext .getFunctionMapper() .mapFunction( diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlFunctions.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlFunctions.java index d819b122d..2204d45cd 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlFunctions.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/jstl/JstlFunctions.java @@ -80,6 +80,28 @@ public static Object toInt(Object input) { return JstlTypeConverter.INSTANCE.coerceToInteger(input); } + public static Object toLong(Object input) { + if (input == null) { + return null; + } + return JstlTypeConverter.INSTANCE.coerceToLong(input); + } + + public static Object toListOfFloat(Object input) { + if (input == null) { + return null; + } + if (input instanceof Collection collection) { + List result = new ArrayList<>(collection.size()); + for (Object o : collection) { + result.add(JstlTypeConverter.INSTANCE.coerceToFloat(o)); + } + return result; + } else { + throw new IllegalArgumentException("Cannot convert " + input + " to list of float"); + } + } + public static List split(Object input, Object separatorExpression) { if (input == null) { return null; diff --git a/langstream-agents/langstream-vector-agents/pom.xml b/langstream-agents/langstream-vector-agents/pom.xml index c07543b10..6b21b1e0f 100644 --- a/langstream-agents/langstream-vector-agents/pom.xml +++ b/langstream-agents/langstream-vector-agents/pom.xml @@ -60,6 +60,11 @@ pinecone-client 0.2.3 + + io.milvus + milvus-sdk-java + 2.3.1 + org.projectlombok lombok diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/InterpolationUtils.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/InterpolationUtils.java new file mode 100644 index 000000000..0dede2807 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/InterpolationUtils.java @@ -0,0 +1,93 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class InterpolationUtils { + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true); + + public static String interpolate(String query, List array) { + if (query == null || !query.contains("?")) { + return query; + } + for (Object value : array) { + int questionMark = query.indexOf("?"); + if (questionMark < 0) { + return query; + } + String valueAsString = convertValueToJson(value); + query = + query.substring(0, questionMark) + + valueAsString + + query.substring(questionMark + 1); + } + + return query; + } + + @SneakyThrows + private static String convertValueToJson(Object value) { + return MAPPER.writeValueAsString(value); + } + + public static final R buildObjectFromJson( + String json, Class jsonModel, List params) { + R parsedQuery; + try { + log.info("Query {}", json); + params.forEach( + param -> + log.info( + "Param {} {}", param, param != null ? param.getClass() : null)); + // interpolate the query + json = interpolate(json, params); + log.info("Interpolated query {}", json); + parsedQuery = MAPPER.readValue(json, jsonModel); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + log.info("Parsed query: {}", parsedQuery); + return parsedQuery; + } + + public static final R buildObjectFromJson( + String json, Class jsonModel, List params, ObjectMapper mapper) { + R parsedQuery; + try { + log.info("Query {}", json); + params.forEach( + param -> + log.info( + "Param {} {}", param, param != null ? param.getClass() : null)); + // interpolate the query + json = interpolate(json, params); + log.info("Interpolated query {}", json); + parsedQuery = mapper.readValue(json, jsonModel); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + log.info("Parsed query: {}", parsedQuery); + return parsedQuery; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusAssetsManagerProvider.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusAssetsManagerProvider.java new file mode 100644 index 000000000..55c07554b --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusAssetsManagerProvider.java @@ -0,0 +1,183 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.milvus; + +import static ai.langstream.agents.vector.InterpolationUtils.buildObjectFromJson; + +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.assets.AssetManagerProvider; +import ai.langstream.api.util.ConfigurationUtils; +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.DescribeCollectionResponse; +import io.milvus.param.R; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.DescribeCollectionParam; +import io.milvus.param.collection.DropCollectionParam; +import io.milvus.param.collection.HasCollectionParam; +import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MilvusAssetsManagerProvider implements AssetManagerProvider { + + @Override + public boolean supports(String assetType) { + return "milvus-collection".equals(assetType); + } + + @Override + public AssetManager createInstance(String assetType) { + + switch (assetType) { + case "milvus-collection": + return new MilvusCollectionAssetManager(); + default: + throw new IllegalArgumentException(); + } + } + + private static class MilvusCollectionAssetManager implements AssetManager { + + MilvusDataSource.MilvusQueryStepDataSource datasource; + AssetDefinition assetDefinition; + + @Override + public void initialize(AssetDefinition assetDefinition) throws Exception { + this.datasource = buildDataSource(assetDefinition); + this.assetDefinition = assetDefinition; + } + + @Override + public boolean assetExists() throws Exception { + String collectionName = getCollectionName(); + String databaseName = getDatabaseName(); + log.info( + "Checking is collection {} exists in database {}", + collectionName, + databaseName); + + MilvusServiceClient milvusClient = datasource.getMilvusClient(); + R hasCollection = + milvusClient.hasCollection( + HasCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDatabaseName(databaseName) + .build()); + + if (hasCollection.getException() != null) { + throw hasCollection.getException(); + } + + if (hasCollection.getData() != null && hasCollection.getData()) { + log.info("Table {} exists", collectionName); + R describeCollectionResponseR = + milvusClient.describeCollection( + DescribeCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDatabaseName(databaseName) + .build()); + log.info("Describe table result: {}", describeCollectionResponseR.getData()); + return true; + } else { + log.info("Table {} does not exist", collectionName); + return false; + } + } + + private String getCollectionName() { + return ConfigurationUtils.getString( + "collection-name", null, assetDefinition.getConfig()); + } + + private String getDatabaseName() { + return ConfigurationUtils.getString("database-name", null, assetDefinition.getConfig()); + } + + @Override + public void deployAsset() throws Exception { + List statements = + ConfigurationUtils.getList("create-statements", assetDefinition.getConfig()); + execStatements(statements); + } + + private void execStatements(List statements) { + MilvusServiceClient milvusClient = datasource.getMilvusClient(); + for (String statement : statements) { + log.info("Executing: {}", statement); + + if (statement.contains("fieldTypes")) { + CreateCollectionParam parsedQuery = + buildObjectFromJson( + statement, + CreateCollectionParam.Builder.class, + List.of(), + MilvusModel.getMapper()) + .build(); + milvusClient.createCollection(parsedQuery); + } else { + CreateSimpleCollectionParam parsedQuery = + buildObjectFromJson( + statement, + CreateSimpleCollectionParam.Builder.class, + List.of(), + MilvusModel.getMapper()) + .build(); + milvusClient.createCollection(parsedQuery); + } + } + } + + @Override + public boolean deleteAssetIfExists() throws Exception { + if (!assetExists()) { + return false; + } + MilvusServiceClient milvusClient = datasource.getMilvusClient(); + String collectionName = getCollectionName(); + String databaseName = getDatabaseName(); + milvusClient.dropCollection( + DropCollectionParam.newBuilder() + .withCollectionName(collectionName) + .withDatabaseName(databaseName) + .build()); + + return true; + } + + @Override + public void close() throws Exception { + if (datasource != null) { + datasource.close(); + } + } + } + + private static MilvusDataSource.MilvusQueryStepDataSource buildDataSource( + AssetDefinition assetDefinition) { + MilvusDataSource dataSource = new MilvusDataSource(); + Map datasourceDefinition = + ConfigurationUtils.getMap("datasource", Map.of(), assetDefinition.getConfig()); + Map configuration = + ConfigurationUtils.getMap("configuration", Map.of(), datasourceDefinition); + MilvusDataSource.MilvusQueryStepDataSource result = + dataSource.createDataSourceImplementation(configuration); + result.initialize(null); + return result; + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusDataSource.java new file mode 100644 index 000000000..a49a8c8c6 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusDataSource.java @@ -0,0 +1,143 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.milvus; + +import static ai.langstream.agents.vector.InterpolationUtils.buildObjectFromJson; + +import ai.langstream.ai.agents.datasource.DataSourceProvider; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.milvus.client.MilvusServiceClient; +import io.milvus.param.ConnectParam; +import io.milvus.param.R; +import io.milvus.param.highlevel.dml.SearchSimpleParam; +import io.milvus.param.highlevel.dml.response.SearchResponse; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import lombok.Data; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MilvusDataSource implements DataSourceProvider { + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Override + public boolean supports(Map dataSourceConfig) { + return "milvus".equals(dataSourceConfig.get("service")); + } + + @Data + public static final class MilvusConfig { + + @JsonProperty(value = "user", required = true) + private String user = "default"; + + @JsonProperty(value = "password", required = true) + private String password; + + @JsonProperty(value = "host", required = true) + private String host; + + @JsonProperty(value = "port") + private int port = 19530; + } + + @Override + public MilvusQueryStepDataSource createDataSourceImplementation( + Map dataSourceConfig) { + + MilvusConfig clientConfig = MAPPER.convertValue(dataSourceConfig, MilvusConfig.class); + + return new MilvusQueryStepDataSource(clientConfig); + } + + public static class MilvusQueryStepDataSource implements QueryStepDataSource { + + private final MilvusConfig clientConfig; + @Getter private MilvusServiceClient milvusClient; + + public MilvusQueryStepDataSource(MilvusConfig clientConfig) { + this.clientConfig = clientConfig; + } + + @Override + public void initialize(Map config) { + this.milvusClient = + new MilvusServiceClient( + ConnectParam.newBuilder() + .withHost(clientConfig.host) + .withPort(clientConfig.port) + .withAuthorization(clientConfig.user, clientConfig.password) + .build()); + } + + @Override + public List> fetchData(String query, List params) { + try { + SearchSimpleParam searchParam = + buildObjectFromJson( + query, + SearchSimpleParam.Builder.class, + params, + MilvusModel.getMapper()) + .build(); + R respSearch = milvusClient.search(searchParam); + + if (respSearch.getException() != null) { + throw new RuntimeException(respSearch.getException()); + } + + SearchResponse data = respSearch.getData(); + + data.getRowRecords() + .forEach( + r -> { + log.info("Record {}", r); + }); + + return data.getRowRecords().stream() + .map( + r -> { + Map result = new HashMap<>(); + r.getFieldValues() + .forEach( + (k, v) -> { + result.put( + k, v == null ? null : v.toString()); + }); + return result; + }) + .toList(); + + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + if (milvusClient != null) { + milvusClient.close(); + } + } + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusModel.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusModel.java new file mode 100644 index 000000000..9dbd40147 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusModel.java @@ -0,0 +1,230 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.milvus; + +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; +import io.milvus.param.collection.CreateCollectionParam; +import io.milvus.param.collection.FieldType; +import io.milvus.param.dml.SearchParam; +import io.milvus.param.highlevel.collection.CreateSimpleCollectionParam; +import io.milvus.param.highlevel.dml.SearchSimpleParam; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MilvusModel { + + private static final ObjectMapper MAPPER = builderMapper(); + + private static ObjectMapper builderMapper() { + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.addDeserializer( + CreateSimpleCollectionParam.Builder.class, + new MilvusBuilderDeserializer<>( + CreateSimpleCollectionParam.Builder.class, + CreateSimpleCollectionParam::newBuilder)); + + module.addDeserializer( + CreateCollectionParam.Builder.class, + new MilvusBuilderDeserializer<>( + CreateCollectionParam.Builder.class, CreateCollectionParam::newBuilder)); + + module.addDeserializer( + SearchParam.Builder.class, + new MilvusBuilderDeserializer<>( + SearchParam.Builder.class, SearchParam::newBuilder)); + + module.addDeserializer( + FieldType.class, + new MultistepMilvusBuilderDeserializer<>( + FieldType.class, FieldType::newBuilder, FieldType.Builder::build)); + + module.addDeserializer( + SearchSimpleParam.Builder.class, + new MilvusBuilderDeserializer<>( + SearchSimpleParam.Builder.class, + SearchSimpleParam::newBuilder, + (key, value) -> { + switch (key) { + case "vectors": + { + if (value instanceof List list) { + List floatList = new ArrayList<>(); + for (Object n : list) { + if (n instanceof Number number) { + floatList.add(number.floatValue()); + } else { + throw new IllegalArgumentException( + "Value " + + n + + " is not a number, it is not valid for the vectors field"); + } + } + return floatList; + } else { + return value; + } + } + default: + return value; + } + })); + mapper.registerModule(module); + return mapper; + } + + public static ObjectMapper getMapper() { + return MAPPER; + } + + public static String convertToJSONName(String camelCase) { + StringBuilder result = new StringBuilder(); + + for (int i = 0; i < camelCase.length(); i++) { + char currentChar = camelCase.charAt(i); + + // If the current character is uppercase and not the first character, add a hyphen + if (Character.isUpperCase(currentChar) && i > 0) { + result.append('-'); + } + + // Convert the current character to lowercase and add it to the result + result.append(Character.toLowerCase(currentChar)); + } + + return result.toString(); + } + + private static class MilvusBuilderDeserializer extends StdDeserializer { + + private final Supplier creator; + private final BiFunction fieldValueConverter; + + public MilvusBuilderDeserializer(Class vc, Supplier creator) { + super(vc); + this.creator = creator; + this.fieldValueConverter = (key, value) -> value; + } + + public MilvusBuilderDeserializer( + Class vc, + Supplier creator, + BiFunction fieldValueConverter) { + super(vc); + this.creator = creator; + this.fieldValueConverter = fieldValueConverter; + } + + @Override + public T deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + try { + JsonNode node = jp.getCodec().readTree(jp); + + T builder = creator.get(); + + applyProperties(jp, builder, node, fieldValueConverter); + return builder; + } catch (Exception e) { + throw new IOException(e); + } + } + } + + private static void applyProperties( + JsonParser jp, + T builder, + JsonNode node, + BiFunction fieldValueConverter) + throws JsonProcessingException, IllegalAccessException, InvocationTargetException { + Method[] methods = builder.getClass().getMethods(); + for (Method m : methods) { + String name = m.getName(); + log.info("Method name: {}", m.getName()); + String propertyName; + if (name.startsWith("with")) { + propertyName = name.substring(4); + } else { + propertyName = name; + } + log.info("Property name: {}", propertyName); + String jsonStilePropertyName = convertToJSONName(propertyName); + log.info("JSON Property name: {}", jsonStilePropertyName); + JsonNode jsonNode = node.get(jsonStilePropertyName); + if (jsonNode != null) { + Class parameterType = m.getParameterTypes()[0]; + Object value = jp.getCodec().treeToValue(jsonNode, parameterType); + log.info("raw value: {}", value.getClass()); + value = fieldValueConverter.apply(jsonStilePropertyName, value); + log.info("Applying value: {}", value); + m.invoke(builder, value); + } + } + } + + private static class MultistepMilvusBuilderDeserializer extends StdDeserializer { + + private final Supplier builderCreator; + private final Function builderCaller; + private final BiFunction fieldValueConverter; + + public MultistepMilvusBuilderDeserializer( + Class vc, Supplier builderCreator, Function builderCaller) { + super(vc); + this.builderCreator = builderCreator; + this.builderCaller = builderCaller; + this.fieldValueConverter = (key, value) -> value; + } + + public MultistepMilvusBuilderDeserializer( + Class vc, + Supplier builderCreator, + Function builderCaller, + BiFunction fieldValueConverter) { + super(vc); + this.builderCreator = builderCreator; + this.builderCaller = builderCaller; + this.fieldValueConverter = fieldValueConverter; + } + + @Override + public T deserialize(JsonParser jp, DeserializationContext ctxt) + throws IOException, JsonProcessingException { + try { + JsonNode node = jp.getCodec().readTree(jp); + R builder = builderCreator.get(); + applyProperties(jp, builder, node, fieldValueConverter); + return builderCaller.apply(builder); + } catch (Exception e) { + throw new IOException(e); + } + } + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java new file mode 100644 index 000000000..74017174a --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/milvus/MilvusWriter.java @@ -0,0 +1,138 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.milvus; + +import ai.langstream.ai.agents.GenAIToolKitAgent; +import ai.langstream.api.database.VectorDatabaseWriter; +import ai.langstream.api.database.VectorDatabaseWriterProvider; +import ai.langstream.api.runner.code.Record; +import ai.langstream.api.util.ConfigurationUtils; +import com.alibaba.fastjson.JSONObject; +import com.datastax.oss.streaming.ai.TransformContext; +import com.datastax.oss.streaming.ai.jstl.JstlEvaluator; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.MutationResult; +import io.milvus.param.R; +import io.milvus.param.dml.UpsertParam; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MilvusWriter implements VectorDatabaseWriterProvider { + + private static final ObjectMapper MAPPER = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + @Override + public boolean supports(Map dataSourceConfig) { + return "milvus".equals(dataSourceConfig.get("service")); + } + + @Override + public MilvusVectorDatabaseWriter createImplementation(Map datasourceConfig) { + return new MilvusVectorDatabaseWriter(datasourceConfig); + } + + public static class MilvusVectorDatabaseWriter implements VectorDatabaseWriter, AutoCloseable { + + private final MilvusDataSource.MilvusQueryStepDataSource dataSource; + private String collectionName; + private String databaseName; + private Map fields = new HashMap<>(); + + public MilvusVectorDatabaseWriter(Map datasourceConfig) { + MilvusDataSource dataSourceProvider = new MilvusDataSource(); + dataSource = dataSourceProvider.createDataSourceImplementation(datasourceConfig); + } + + @Override + public void close() throws Exception { + dataSource.close(); + } + + @Override + public void initialise(Map agentConfiguration) { + this.collectionName = + ConfigurationUtils.getString("collection-name", "", agentConfiguration); + this.databaseName = + ConfigurationUtils.getString("database-name", "", agentConfiguration); + List> fields = + (List>) + agentConfiguration.getOrDefault("fields", List.of()); + fields.forEach( + field -> { + this.fields.put( + field.get("name").toString(), + buildEvaluator(field, "expression", Object.class)); + }); + dataSource.initialize(null); + } + + @Override + public CompletableFuture upsert(Record record, Map context) { + CompletableFuture handle = new CompletableFuture<>(); + try { + TransformContext transformContext = + GenAIToolKitAgent.recordToTransformContext(record, true); + + MilvusServiceClient milvusClient = dataSource.getMilvusClient(); + + UpsertParam.Builder builder = UpsertParam.newBuilder(); + builder.withCollectionName(collectionName); + + if (databaseName != null && !databaseName.isEmpty()) { + // this doesn't work at the moment, see + // https://github.com/milvus-io/milvus-sdk-java/pull/644 + builder.withDatabaseName(databaseName); + } + + JSONObject row = new JSONObject(); + fields.forEach( + (name, evaluator) -> { + Object value = evaluator.evaluate(transformContext); + row.put(name, value); + }); + builder.withRows(List.of(row)); + UpsertParam upsert = builder.build(); + + R upsertResponse = milvusClient.upsert(upsert); + log.info("Result {}", upsertResponse); + if (upsertResponse.getException() != null) { + handle.completeExceptionally(upsertResponse.getException()); + } else { + handle.complete(null); + } + } catch (Exception e) { + handle.completeExceptionally(e); + } + return handle; + } + } + + private static JstlEvaluator buildEvaluator( + Map agentConfiguration, String param, Class type) { + String expression = agentConfiguration.getOrDefault(param, "").toString(); + if (expression == null || expression.isEmpty()) { + return null; + } + return new JstlEvaluator("${" + expression + "}", type); + } +} diff --git a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java index bae96e157..027ca6fb3 100644 --- a/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java +++ b/langstream-agents/langstream-vector-agents/src/main/java/ai/langstream/agents/vector/pinecone/PineconeDataSource.java @@ -15,6 +15,8 @@ */ package ai.langstream.agents.vector.pinecone; +import static ai.langstream.agents.vector.InterpolationUtils.buildObjectFromJson; + import ai.langstream.ai.agents.datasource.DataSourceProvider; import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; import com.fasterxml.jackson.annotation.JsonProperty; @@ -44,8 +46,8 @@ import java.util.Map; import java.util.stream.Collectors; import lombok.Data; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; @Slf4j public class PineconeDataSource implements DataSourceProvider { @@ -116,136 +118,16 @@ public void initialize(Map config) { @Override public List> fetchData(String query, List params) { try { - Query parsedQuery; - try { - log.info("Query {}", query); - params.forEach( - param -> - log.info( - "Param {} {}", - param, - param != null ? param.getClass() : null)); - // interpolate the query - query = interpolate(query, params); - log.info("Interpolated query {}", query); - - parsedQuery = MAPPER.readValue(query, Query.class); - } catch (Exception e) { - throw new RuntimeException(e); - } - log.info("Parsed query: {}", parsedQuery); - - QueryVector.Builder builder = QueryVector.newBuilder(); - - if (parsedQuery.vector != null) { - builder.addAllValues(parsedQuery.vector); - } - - if (parsedQuery.sparseVector != null) { - builder.setSparseValues( - SparseValues.newBuilder() - .addAllValues(parsedQuery.sparseVector.getValues()) - .addAllIndices(parsedQuery.sparseVector.getIndices()) - .build()); - } - - if (parsedQuery.filter != null && !parsedQuery.filter.isEmpty()) { - builder.setFilter(buildFilter(parsedQuery.filter)); - } - - if (parsedQuery.namespace != null) { - builder.setNamespace(parsedQuery.namespace); - } - - QueryVector queryVector = builder.build(); - QueryRequest.Builder requestBuilder = QueryRequest.newBuilder(); + Query parsedQuery = buildObjectFromJson(query, Query.class, params); - if (parsedQuery.namespace != null) { - requestBuilder.setNamespace(parsedQuery.namespace); - } - - QueryRequest batchQueryRequest = - requestBuilder - .addQueries(queryVector) - .setTopK(parsedQuery.topK) - .setIncludeMetadata(parsedQuery.includeMetadata) - .setIncludeValues(parsedQuery.includeValues) - .build(); + QueryRequest batchQueryRequest = mapQueryToQueryRequest(parsedQuery); List> results; if (clientConfig.getEndpoint() == null) { - - QueryResponse queryResponse = - connection.getBlockingStub().query(batchQueryRequest); - - if (log.isDebugEnabled()) { - log.debug("Query response: {}", queryResponse); - } - log.info("Query response: {}", queryResponse); - - results = new ArrayList<>(); - queryResponse - .getResultsList() - .forEach( - res -> - res.getMatchesList() - .forEach( - match -> { - String id = match.getId(); - Map row = - new HashMap<>(); - - if (parsedQuery.includeMetadata) { - // put all the metadata - if (match.getMetadata() - != null) { - match.getMetadata() - .getFieldsMap() - .forEach( - (key, - value) -> { - if (log - .isDebugEnabled()) { - log - .debug( - "Key: {}, value: {} {}", - key, - value, - value - != null - ? value - .getClass() - : null); - } - Object - converted = - valueToObject( - value); - row.put( - key, - converted - != null - ? converted - .toString() - : null); - }); - } - } - row.put("id", id); - results.add(row); - })); + results = executeQueryUsingClien(batchQueryRequest, parsedQuery); } else { - HttpClient client = HttpClient.newHttpClient(); - HttpRequest request = - HttpRequest.newBuilder(URI.create(clientConfig.getEndpoint())) - .POST( - HttpRequest.BodyPublishers.ofString( - batchQueryRequest.toString())) - .build(); - String body = client.send(request, HttpResponse.BodyHandlers.ofString()).body(); - log.info("Mock result {}", body); - results = MAPPER.readValue(body, new TypeReference<>() {}); + results = executeQueryWithMockHttpService(batchQueryRequest); } return results; } catch (IOException | StatusRuntimeException | InterruptedException e) { @@ -253,28 +135,120 @@ public List> fetchData(String query, List params) { } } - static String interpolate(String query, List array) { - if (query == null || !query.contains("?")) { - return query; + private List> executeQueryWithMockHttpService( + QueryRequest batchQueryRequest) throws IOException, InterruptedException { + List> results; + HttpClient client = HttpClient.newHttpClient(); + HttpRequest request = + HttpRequest.newBuilder(URI.create(clientConfig.getEndpoint())) + .POST(HttpRequest.BodyPublishers.ofString(batchQueryRequest.toString())) + .build(); + String body = client.send(request, HttpResponse.BodyHandlers.ofString()).body(); + log.info("Mock result {}", body); + results = MAPPER.readValue(body, new TypeReference<>() {}); + return results; + } + + @NotNull + private List> executeQueryUsingClien( + QueryRequest batchQueryRequest, Query parsedQuery) { + List> results; + QueryResponse queryResponse = connection.getBlockingStub().query(batchQueryRequest); + + if (log.isDebugEnabled()) { + log.debug("Query response: {}", queryResponse); } - for (Object value : array) { - int questionMark = query.indexOf("?"); - if (questionMark < 0) { - return query; - } - Object valueAsString = convertValueToJson(value); - query = - query.substring(0, questionMark) - + valueAsString - + query.substring(questionMark + 1); + log.info("Query response: {}", queryResponse); + + results = new ArrayList<>(); + queryResponse + .getResultsList() + .forEach( + res -> + res.getMatchesList() + .forEach( + match -> { + String id = match.getId(); + Map row = new HashMap<>(); + + if (parsedQuery.includeMetadata) { + // put all the metadata + if (match.getMetadata() != null) { + match.getMetadata() + .getFieldsMap() + .forEach( + (key, value) -> { + if (log + .isDebugEnabled()) { + log.debug( + "Key: {}, value: {} {}", + key, + value, + value + != null + ? value + .getClass() + : null); + } + Object + converted = + valueToObject( + value); + row.put( + key, + converted + != null + ? converted + .toString() + : null); + }); + } + } + row.put("id", id); + results.add(row); + })); + return results; + } + + @NotNull + private QueryRequest mapQueryToQueryRequest(Query parsedQuery) { + QueryVector.Builder builder = QueryVector.newBuilder(); + + if (parsedQuery.vector != null) { + builder.addAllValues(parsedQuery.vector); } - return query; - } + if (parsedQuery.sparseVector != null) { + builder.setSparseValues( + SparseValues.newBuilder() + .addAllValues(parsedQuery.sparseVector.getValues()) + .addAllIndices(parsedQuery.sparseVector.getIndices()) + .build()); + } + + if (parsedQuery.filter != null && !parsedQuery.filter.isEmpty()) { + builder.setFilter(buildFilter(parsedQuery.filter)); + } + + if (parsedQuery.namespace != null) { + builder.setNamespace(parsedQuery.namespace); + } + + QueryVector queryVector = builder.build(); + QueryRequest.Builder requestBuilder = QueryRequest.newBuilder(); + + if (parsedQuery.namespace != null) { + requestBuilder.setNamespace(parsedQuery.namespace); + } - @SneakyThrows - private static String convertValueToJson(Object value) { - return MAPPER.writeValueAsString(value); + QueryRequest batchQueryRequest = + requestBuilder + .addQueries(queryVector) + .setTopK(parsedQuery.topK) + .setIncludeMetadata(parsedQuery.includeMetadata) + .setIncludeValues(parsedQuery.includeValues) + .build(); + return batchQueryRequest; } public static Object valueToObject(Value value) { diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider index d19f39a3b..697c1930e 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.ai.agents.datasource.DataSourceProvider @@ -1 +1,2 @@ -ai.langstream.agents.vector.pinecone.PineconeDataSource \ No newline at end of file +ai.langstream.agents.vector.pinecone.PineconeDataSource +ai.langstream.agents.vector.milvus.MilvusDataSource \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider index 78e07aa79..790ed3517 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.database.VectorDatabaseWriterProvider @@ -1,2 +1,3 @@ ai.langstream.agents.vector.pinecone.PineconeWriter -ai.langstream.agents.vector.cassandra.CassandraWriter \ No newline at end of file +ai.langstream.agents.vector.cassandra.CassandraWriter +ai.langstream.agents.vector.milvus.MilvusWriter \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider index ea494f999..a13622550 100644 --- a/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider +++ b/langstream-agents/langstream-vector-agents/src/main/resources/META-INF/services/ai.langstream.api.runner.assets.AssetManagerProvider @@ -1 +1,2 @@ -ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider \ No newline at end of file +ai.langstream.agents.vector.cassandra.CassandraAssetsManagerProvider +ai.langstream.agents.vector.milvus.MilvusAssetsManagerProvider \ No newline at end of file diff --git a/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/MilvusDataSourceTest.java b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/MilvusDataSourceTest.java new file mode 100644 index 000000000..9fe9f58d7 --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/test/java/ai/langstream/agents/vector/datasource/impl/MilvusDataSourceTest.java @@ -0,0 +1,156 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.agents.vector.datasource.impl; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +import ai.langstream.agents.vector.milvus.MilvusAssetsManagerProvider; +import ai.langstream.agents.vector.milvus.MilvusDataSource; +import ai.langstream.agents.vector.milvus.MilvusWriter; +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.runner.assets.AssetManager; +import ai.langstream.api.runner.code.SimpleRecord; +import com.datastax.oss.streaming.ai.datasource.QueryStepDataSource; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Slf4j +@Testcontainers +class MilvusDataSourceTest { + + // @Container + @Test + @Disabled() // "This test requires a running Milvus instance" + void testMilvusQuery() throws Exception { + + String collectionName = "book"; + String databaseName = "default"; + + int milvusPort = 19530; + String milvusHost = "localhost"; + log.info("Connecting to Milvus at {}:{}", milvusHost, milvusPort); + + MilvusDataSource dataSource = new MilvusDataSource(); + Map config = + Map.of( + "user", + "root", + "password", + "Milvus", + "host", + milvusHost, + "port", + milvusPort); + + MilvusAssetsManagerProvider assetsManagerProvider = new MilvusAssetsManagerProvider(); + AssetManager collectionManager = assetsManagerProvider.createInstance("milvus-collection"); + AssetDefinition assetDefinition = new AssetDefinition(); + assetDefinition.setAssetType("milvus-collection"); + assetDefinition.setConfig( + Map.of( + "collection-name", + collectionName, + "datasource", + Map.of("configuration", config), + "database-name", + databaseName, + "create-statements", + List.of( + """ + { + "collection-name": "%s", + "database-name": "%s", + "dimension": 5, + "field-types": [ + { + "name": "id", + "primary-key": true, + "data-type": "Int64" + }, + { + "name": "text", + "data-type": "string" + } + ] + } + """ + .formatted(collectionName, databaseName)))); + collectionManager.initialize(assetDefinition); + collectionManager.deleteAssetIfExists(); + assertFalse(collectionManager.assetExists()); + collectionManager.deployAsset(); + + try (QueryStepDataSource datasource = dataSource.createDataSourceImplementation(config); + MilvusWriter.MilvusVectorDatabaseWriter writer = + new MilvusWriter().createImplementation(config)) { + + datasource.initialize(null); + + writer.initialise( + Map.of( + "collection-name", + collectionName, + "fields", + List.of( + Map.of( + "name", + "vector", + "expression", + "fn:toListOfFloat(value.vector)"), + Map.of("name", "text", "expression", "value.text"), + Map.of("name", "id", "expression", "fn:toLong(key.id)")))); + + SimpleRecord record = + SimpleRecord.of( + "{\"id\": 10}", + """ + { + "vector": [1,2,3,4,5], + "text": "Lorem ipsum..." + } + """); + writer.upsert(record, Map.of()).get(); + + String query = + """ + { + "vectors": ?, + "topK": 5, + "vector-field-name": "vector", + "collection-name": "%s", + "database-name": "%s", + "consistency-level": "STRONG", + "params": "", + "output-fields": ["id", "distance", "text"] + } + """ + .formatted(collectionName, databaseName); + List params = List.of(List.of(1f, 2f, 3f, 4f, 5f)); + List> results = datasource.fetchData(query, params); + log.info("Results: {}", results); + + assertEquals(1, results.size()); + assertEquals("10", results.get(0).get("id")); + assertEquals("0.0", results.get(0).get("distance")); + assertEquals("Lorem ipsum...", results.get(0).get("text")); + } + } +} diff --git a/langstream-agents/langstream-vector-agents/src/test/resources/milvus-compose-test.yml b/langstream-agents/langstream-vector-agents/src/test/resources/milvus-compose-test.yml new file mode 100644 index 000000000..dba5796ce --- /dev/null +++ b/langstream-agents/langstream-vector-agents/src/test/resources/milvus-compose-test.yml @@ -0,0 +1,76 @@ +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3.5' + +services: + etcd: + image: quay.io/coreos/etcd:v3.5.5 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + volumes: + - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd + command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd + healthcheck: + test: ["CMD", "etcdctl", "endpoint", "health"] + interval: 30s + timeout: 20s + retries: 3 + + minio: + image: minio/minio:RELEASE.2023-03-20T20-16-18Z + environment: + MINIO_ACCESS_KEY: minioadmin + MINIO_SECRET_KEY: minioadmin + ports: + - "9001:9001" + - "9000:9000" + volumes: + - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data + command: minio server /minio_data --console-address ":9001" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 30s + timeout: 20s + retries: 3 + + milvus: + image: milvusdb/milvus:v2.3.0 + command: ["milvus", "run", "standalone"] + environment: + ETCD_ENDPOINTS: etcd:2379 + MINIO_ADDRESS: minio:9000 + volumes: + - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"] + interval: 30s + start_period: 90s + timeout: 20s + retries: 3 + ports: + - "19530:19530" + - "9091:9091" + depends_on: + - "etcd" + - "minio" + +networks: + default: + name: milvus diff --git a/langstream-core/src/main/java/ai/langstream/impl/assets/MilvusAssetsProvider.java b/langstream-core/src/main/java/ai/langstream/impl/assets/MilvusAssetsProvider.java new file mode 100644 index 000000000..f79f3d7be --- /dev/null +++ b/langstream-core/src/main/java/ai/langstream/impl/assets/MilvusAssetsProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.impl.assets; + +import static ai.langstream.api.util.ConfigurationUtils.requiredField; +import static ai.langstream.api.util.ConfigurationUtils.requiredListField; +import static ai.langstream.api.util.ConfigurationUtils.requiredNonEmptyField; + +import ai.langstream.api.model.AssetDefinition; +import ai.langstream.api.util.ConfigurationUtils; +import ai.langstream.impl.common.AbstractAssetProvider; +import java.util.Map; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class MilvusAssetsProvider extends AbstractAssetProvider { + + public MilvusAssetsProvider() { + super(Set.of("milvus-collection")); + } + + @Override + protected void validateAsset(AssetDefinition assetDefinition, Map asset) { + Map configuration = ConfigurationUtils.getMap("config", null, asset); + requiredField(configuration, "datasource", describe(assetDefinition)); + final Map datasource = + ConfigurationUtils.getMap("datasource", Map.of(), configuration); + final Map datasourceConfiguration = + ConfigurationUtils.getMap("configuration", Map.of(), datasource); + switch (assetDefinition.getAssetType()) { + case "milvus-collection" -> { + requiredNonEmptyField(configuration, "collection-name", describe(assetDefinition)); + requiredListField(configuration, "create-statements", describe(assetDefinition)); + } + default -> throw new IllegalStateException( + "Unexpected value: " + assetDefinition.getAssetType()); + } + } + + @Override + protected boolean lookupResource(String fieldName) { + return "datasource".contains(fieldName); + } +} diff --git a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java index 9810f3726..f3fa1a3be 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/resources/VectorDatabaseResourceProvider.java @@ -44,7 +44,7 @@ public Map createImplementation( validateEnumField( configuration, "service", - Set.of("astra", "cassandra", "pinecone"), + Set.of("astra", "cassandra", "pinecone", "milvus"), describe(resource)); switch (service) { @@ -57,6 +57,9 @@ public Map createImplementation( case "pinecone": validatePineconeDatabaseResource(resource); break; + case "milvus": + validateMilvusDatabaseResource(resource); + break; default: throw new IllegalStateException(); } @@ -64,6 +67,16 @@ public Map createImplementation( return resource.configuration(); } + private void validateMilvusDatabaseResource(Resource resource) { + Map configuration = resource.configuration(); + + requiredNonEmptyField(configuration, "user", describe(resource)); + requiredNonEmptyField(configuration, "host", describe(resource)); + requiredNonEmptyField(configuration, "password", describe(resource)); + requiredNonEmptyField(configuration, "index-name", describe(resource)); + ConfigurationUtils.validateInteger(configuration, "port", 1, 300000, describe(resource)); + } + protected void validatePineconeDatabaseResource(Resource resource) { Map configuration = resource.configuration(); diff --git a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider index e52045974..e02682733 100644 --- a/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider +++ b/langstream-core/src/main/resources/META-INF/services/ai.langstream.api.runtime.AssetNodeProvider @@ -1 +1,2 @@ -ai.langstream.impl.assets.CassandraAssetsProvider \ No newline at end of file +ai.langstream.impl.assets.CassandraAssetsProvider +ai.langstream.impl.assets.MilvusAssetsProvider \ No newline at end of file diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java new file mode 100644 index 000000000..0bcf7b9c0 --- /dev/null +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/MilvusVectorAssetQueryWriteIT.java @@ -0,0 +1,173 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ai.langstream.kafka; + +import ai.langstream.AbstractApplicationRunner; +import java.util.List; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +@Slf4j +class MilvusVectorAssetQueryWriteIT extends AbstractApplicationRunner { + + @Test + @Disabled() // "This test requires a running Milvus instance" + public void testMilvus() throws Exception { + String tenant = "tenant"; + String[] expectedAgents = {"app-step1"}; + + String configuration = + """ + configuration: + resources: + - type: "vector-database" + name: "MilvusDatasource" + configuration: + service: "milvus" + host: "localhost" + port: 19530 + """; + + Map applicationWriter = + Map.of( + "configuration.yaml", + configuration, + "pipeline.yaml", + """ + assets: + - name: "documents-table" + asset-type: "milvus-collection" + creation-mode: create-if-not-exists + config: + collection-name: "documents" + database-name: "default" + datasource: "MilvusDatasource" + create-statements: + - | + { + "collection-name": "documents", + "database-name": "default", + "dimension": 5, + "field-types": [ + { + "name": "id", + "primary-key": true, + "data-type": "Int64" + }, + { + "name": "description", + "data-type": "string" + }, + { + "name": "name", + "data-type": "string" + } + ] + } + topics: + - name: "input-topic" + creation-mode: create-if-not-exists + pipeline: + - id: step1 + name: "Write a record to Milvus" + type: "vector-db-sink" + input: "input-topic" + configuration: + datasource: "MilvusDatasource" + collection-name: "documents" + fields: + - name: id + expression: "fn:toLong(value.documentId)" + - name: vector + expression: "fn:toListOfFloat(value.embeddings)" + - name: name + expression: "value.name" + - name: description + expression: "value.description" + """); + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, "app", applicationWriter, buildInstanceYaml(), expectedAgents)) { + try (KafkaProducer producer = createProducer(); ) { + + sendMessage( + "input-topic", + "{\"documentId\":1, \"embeddings\":[0.1,0.2,0.3,0.4,0.5],\"name\":\"A name\",\"description\":\"A description\"}", + producer); + + executeAgentRunners(applicationRuntime); + + applicationDeployer.cleanup(tenant, applicationRuntime.implementation()); + } + } + + Map applicationReader = + Map.of( + "configuration.yaml", + configuration, + "pipeline.yaml", + """ + topics: + - name: "questions-topic" + creation-mode: create-if-not-exists + - name: "answers-topic" + creation-mode: create-if-not-exists + pipeline: + - id: step1 + name: "Read a record from Milvus" + type: "query-vector-db" + input: "questions-topic" + output: "answers-topic" + configuration: + datasource: "MilvusDatasource" + query: | + { + "collection-name": "documents", + "vectors": ?, + "top-k": 1 + } + only-first: true + output-field: "value.queryresult" + fields: + - "value.embeddings" + + """); + + try (ApplicationRuntime applicationRuntime = + deployApplication( + tenant, "app", applicationReader, buildInstanceYaml(), expectedAgents)) { + try (KafkaProducer producer = createProducer(); + KafkaConsumer consumer = createConsumer("answers-topic")) { + + sendMessage("questions-topic", "{\"embeddings\":[0.1,0.2,0.3,0.4,0.5]}", producer); + + executeAgentRunners(applicationRuntime); + + waitForMessages( + consumer, + List.of( + "{\"embeddings\":[0.1,0.2,0.3,0.4,0.5],\"queryresult\":{\"distance\":\"0.0\",\"id\":\"1\"}}")); + + applicationDeployer.cleanup(tenant, applicationRuntime.implementation()); + } + } + } +}