Skip to content

Commit

Permalink
Iterate
Browse files Browse the repository at this point in the history
  • Loading branch information
serefyarar committed Jul 14, 2024
1 parent bda22d4 commit 5c58285
Show file tree
Hide file tree
Showing 25 changed files with 464 additions and 1,598 deletions.
5 changes: 4 additions & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@
"http-proxy-middleware": "^3.0.0",
"jade": "^1.11.0",
"joi": "^17.8.3",
"knex": "^3.1.0",
"lodash": "^4.17.21",
"moment": "^2.29.4",
"moralis": "^2.18.1",
"morgan": "~1.9.1",
"multer": "^1.4.5-lts.1",
"node-fetch": "^3.3.0",
"nodemon": "^3.0.1",
"pg": "^8.11.3",
"objection": "^3.1.4",
"pg": "^8.12.0",
"pgvector": "^0.2.0",
"redis": "^4.6.5",
"striptags": "^3.2.0",
"uint8arrays": "^4.0.3",
Expand Down
26 changes: 13 additions & 13 deletions api/src/agents/basic_assistant.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,23 @@ export const handleUserMessage = async (
? messages.filter((_, index) => index <= questionIndex)
: messages;

const transformedHistory = chat_history
//.filter((m) => m.id !== message.id)
.map((c) => {
return {
id: c.id,
role: c.role,
content: c.content,
};
});
const chatRequest = {
indexIds: reqIndexIds,
input: {
question: messages[questionIndex].content,
chat_history: chat_history
.filter((m) => m.id !== message.id)
.map((c) => {
return {
id: c.id,
role: c.role,
content: c.content,
};
}),
},
messages: transformedHistory,
basePrompt: "seref/first-system",
};

let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
`${process.env.LLM_INDEXER_HOST}/chat/external`,
chatRequest,
{
responseType: "stream",
Expand Down
14 changes: 9 additions & 5 deletions api/src/agents/basic_subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@ export const handleNewItemEvent = async (

const { indexIds, messages } = subscription;
const chatRequest = {
basePrompt: "seref/index-relevancy-check",
indexIds,
input: {
information: JSON.stringify(item),
chat_history: messages.map((c) => {
messages: [
...messages.map((c) => {
return {
id: c.id,
role: c.role,
content: c.content,
};
}),
},
{
role: "system",
content: `New information: ${JSON.stringify(item)}`,
},
],
};
try {
let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
`${process.env.LLM_INDEXER_HOST}/chat/external`,
chatRequest,
{
responseType: "text",
Expand Down
10 changes: 2 additions & 8 deletions api/src/controllers/conversation.js
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,8 @@ export const refreshSummary = async (req, res, next) => {
const response = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/external`,
{
inputs: {
chat_history: conversation.messages,
},
prompt: `Given the following conversation titles, generate one and unique title.
The title should be clear, descriptive, and relevant to the topic at hand.
Write title from the chat_history in maximum 10 words. Dont add "title" or "conversation", just use content.
Chat_history: {chat_history}
`,
basePrompt: "seref/index-conversation-summary",
messages: [...conversation.messages],
},
{
responseType: "text",
Expand Down
192 changes: 75 additions & 117 deletions api/src/controllers/discovery.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,115 +5,92 @@ import { CeramicClient } from "@ceramicnetwork/http-client";
import RedisClient from "../clients/redis.js";
import { flattenSources } from "../utils/helpers.js";
import { DIDService } from "../services/did.js";
import { searchItems } from "../language/search_item.js";
import { DIDSession } from "did-session";

const redis = RedisClient.getInstance();
const pubSubClient = RedisClient.getPubSubInstance();

const ceramic = new CeramicClient(process.env.CERAMIC_HOST);

export const chat = async (req, res, next) => {
export const search = async (req, res, next) => {
/*
const searchRequest = {
indexIds: req.body.indexIds,
query: req.body.query,
page: req.body.page || 1,
limit: req.body.limit || 10,
filters: req.body.filters || [],
};
*/
const { vector, sources, ...rest } = req.body;
const definition = req.app.get("runtimeDefinition");
const didService = new DIDService(definition);
const reqIndexIds = await flattenSources(sources, didService);

const resp = await searchItems({
indexIds: reqIndexIds,
vector,
});

let ceramicResp = await ceramic.multiQuery(
resp.map((doc) => {
return {
streamId: doc.item_id,
};
}),
);

ceramicResp = Object.values(ceramicResp).map((doc) => {
return {
id: doc.id.toString(),
controllerDID: doc.state.metadata.controllers[0],
...doc.content,
};
});

return res.status(200).json(ceramicResp);
};

export const completions = async (req, res, next) => {
const definition = req.app.get("runtimeDefinition");
const { id, messages, sources, ...rest } = req.body;
const { messages, prompt, sources, ...rest } = req.body;

const didService = new DIDService(definition);
const reqIndexIds = await flattenSources(sources, didService);

const payload = {
indexIds: reqIndexIds,
};
if (messages) {
payload.messages = messages;
} else if (prompt) {
payload.prompt = prompt;
}

try {
const chatRequest = {
indexIds: reqIndexIds,
input: {
question: messages.at(-1).content,
chat_history: [...messages.slice(0, -1)],
},
model_args: {
...rest,
},
};
let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/stream`,
chatRequest,
`${process.env.LLM_INDEXER_HOST}/chat/external`,
payload,
{
responseType: "stream",
},
);
res.set(resp.headers);

let cmdMode = false;
let inferredCmd = "";
resp.data.on("data", (chunk) => {
const plainText = chunk.toString();
if (plainText.includes("<<")) {
cmdMode = true;
} else if (plainText.includes(">>")) {
cmdMode = false;
} else if (cmdMode) {
inferredCmd += plainText;
} else {
res.write(chunk);
}
res.write(chunk);
});

resp.data.on("end", async () => {
if (inferredCmd) {
await redis.hSet(
`subscriptions`,
id,
JSON.stringify({
indexIds: reqIndexIds,
messages,
}),
);
}
console.log("Stream ended", inferredCmd);
res.end();
});
} catch (error) {
// Handle the exception
console.error("An error occurred:", error.message);
res.status(500).json({ error: "Internal Server Error" });
}
};

const handleMessage = async (query, payload) => {
try {
const chatRequest = {
prompt: `
Think step by step.
For the given information, determine whether the new field might be relevant to the user's intent based on the question. You are a relevancy extractor whose task is to classify whether the new information is relevant or not.
If the information field might be NOT relevant, respond with "NOT_RELEVANT", and don't output anything else. If the information field might be relevant and new to the question, summarize the relevancy reason based on the information field.
Do not add any HTML tags or title fields. Use links when useful. "Cast" means Farcaster Cast, so write your message accordingly.
----------------
QUESTION: {query}
----------------
INFORMATION: {information}
`,
inputs: {
query,
information: JSON.stringify(payload),
},
};

const resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/external`,
chatRequest,
{
responseType: "text",
},
);

if (resp.data && !resp.data.includes("NOT_RELEVANT")) {
return resp.data;
} else {
console.log(`Not relevant`);
return false;
}
} catch (error) {
console.error("An error occurred:", error.message);
//throw new Error("Internal Server Error");
}
};

export const updates = async (req, res, next) => {
const definition = req.app.get("runtimeDefinition");
const { query, sources } = req.query;
Expand All @@ -132,7 +109,6 @@ export const updates = async (req, res, next) => {
const reqIndexChannels = reqIndexIds.map((id) => `indexStream:${id}`);

await pubSubClient.subscribe(reqIndexChannels, async (payload, channel) => {
const response = await handleMessage(query, payload);
const indexId = channel.replace(`indexStream:`, "");
if (response) {
res.write(
Expand All @@ -154,32 +130,6 @@ export const updates = async (req, res, next) => {
});
};

export const search = async (req, res, next) => {
try {
const searchRequest = {
indexIds: req.body.indexIds,
query: req.body.query,
page: req.body.page || 1,
limit: req.body.limit || 10,
filters: req.body.filters || [],
};

let resp = await axios.post(
`${process.env.LLM_INDEXER_HOST}/search/query`,
searchRequest,
{
responseType: "stream",
},
);
res.set(resp.headers);
resp.data.pipe(res);
} catch (error) {
// Handle the exception
console.error("An error occurred:", error);
res.status(500).json({ error: "Internal Server Error" });
}
};

export const questions = async (req, res, next) => {
try {
const definition = req.app.get("runtimeDefinition");
Expand All @@ -188,6 +138,13 @@ export const questions = async (req, res, next) => {
const didService = new DIDService(definition);
const reqIndexIds = await flattenSources(sources, didService);

const items = await searchItems({
indexIds: reqIndexIds,
});
if (items.length === 0) {
return res.status(200).json({ questions: [] });
}

const sourcesHash = ethers.utils.keccak256(
ethers.utils.toUtf8Bytes(JSON.stringify(reqIndexIds)),
);
Expand All @@ -196,33 +153,34 @@ export const questions = async (req, res, next) => {

if (questionCache) {
let resp = JSON.parse(questionCache);
if (resp.questions) {
resp.questions = resp.questions.slice(0, 4);
}

return res.status(200).json(resp);
return res.status(200).json({ questions: resp });
}

try {
let response = await axios.post(
`${process.env.LLM_INDEXER_HOST}/chat/questions`,
`${process.env.LLM_INDEXER_HOST}/chat/external`,
{
indexIds: reqIndexIds,
basePrompt: "seref/question_generation_prompt-new",
},
);
redis.set(`questions:${sourcesHash}`, JSON.stringify(response.data), {
EX: 86400,
});

if (response.data.questions) {
response.data.questions = response.data.questions.slice(0, 4);
let questions = response.data.split(`\n`).filter((q) => q.length > 0);
if (questions && questions.length > 0) {
questions = questions.slice(0, 4);
redis.set(`questions:${sourcesHash}`, JSON.stringify(questions), {
EX: 86400,
});
} else {
questions = [];
}

res.status(200).json(response.data);
res.status(200).json({ questions });
} catch (error) {
return res.status(400).json({ error: error.message });
}
} catch (error) {
res.status(500).json({ error: error.message });
}
};

export const chat = async (req, res, next) => {};
20 changes: 20 additions & 0 deletions api/src/language/search_item.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pgvector from "pgvector/knex";
import pkg from "knex";
const { knex } = pkg;

const cli = knex({
client: "pg",
connection: process.env.POSTGRES_CONNECTION_STRING,
});

export const searchItems = async (params) => {
const { indexIds, vector } = params;
let query = cli("index_embeddings").select(`*`).whereIn("index_id", indexIds);

if (vector) {
query = query.orderByRaw("?? <=> ?", ["vector", pgvector.toSql(vector)]);
}

const documents = await query.limit(10);
return documents;
};
Loading

0 comments on commit 5c58285

Please sign in to comment.