Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]: Introducing async add method aadd (proposal) #1923

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions mem0/embeddings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional

from mem0.configs.embeddings.base import BaseEmbedderConfig
from mem0.utils.concurrency import run_in_executor


class EmbeddingBase(ABC):
Expand All @@ -16,6 +17,19 @@ def __init__(self, config: Optional[BaseEmbedderConfig] = None):
self.config = BaseEmbedderConfig()
else:
self.config = config

async def aembed(self, text):
"""Async version of the embed method.

The default implementation delegates to the synchronous generate_response method using
`run_in_executor`. Subclasses that need to provide a true async implementation
should override this method to reduce the overhead of using `run_in_executor`.
"""
return await run_in_executor(
None,
self.embed,
text
)

@abstractmethod
def embed(self, text):
Expand Down
17 changes: 16 additions & 1 deletion mem0/embeddings/openai.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from typing import Optional

from openai import OpenAI
from openai import AsyncOpenAI, OpenAI

from mem0.configs.embeddings.base import BaseEmbedderConfig
from mem0.embeddings.base import EmbeddingBase
Expand All @@ -17,6 +17,7 @@ def __init__(self, config: Optional[BaseEmbedderConfig] = None):
api_key = self.config.api_key or os.getenv("OPENAI_API_KEY")
base_url = self.config.openai_base_url or os.getenv("OPENAI_API_BASE")
self.client = OpenAI(api_key=api_key, base_url=base_url)
self.async_client = AsyncOpenAI(api_key=api_key, base_url=base_url)

def embed(self, text):
"""
Expand All @@ -30,3 +31,17 @@ def embed(self, text):
"""
text = text.replace("\n", " ")
return self.client.embeddings.create(input=[text], model=self.config.model).data[0].embedding

async def aembed(self, text):
"""
Get the embedding for the given text using OpenAI.

Args:
text (str): The text to embed.

Returns:
list: The embedding vector.
"""
text = text.replace("\n", " ")
response = await self.async_client.embeddings.create(input=[text], model=self.config.model)
return response.data[0].embedding
14 changes: 14 additions & 0 deletions mem0/llms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Optional

from mem0.configs.llms.base import BaseLlmConfig
from mem0.utils.concurrency import run_in_executor


class LLMBase(ABC):
Expand All @@ -15,7 +16,20 @@ def __init__(self, config: Optional[BaseLlmConfig] = None):
self.config = BaseLlmConfig()
else:
self.config = config

async def agenerate_response(self, message):
"""Async version of the generate_response method.

The default implementation delegates to the synchronous generate_response method using
`run_in_executor`. Subclasses that need to provide a true async implementation
should override this method to reduce the overhead of using `run_in_executor`.
"""
return await run_in_executor(
None,
self.generate_response,
message
)

@abstractmethod
def generate_response(self, messages):
"""
Expand Down
59 changes: 58 additions & 1 deletion mem0/llms/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Dict, List, Optional

from openai import OpenAI
from openai import AsyncOpenAI, OpenAI

from mem0.configs.llms.base import BaseLlmConfig
from mem0.llms.base import LLMBase
Expand All @@ -20,10 +20,15 @@ def __init__(self, config: Optional[BaseLlmConfig] = None):
api_key=os.environ.get("OPENROUTER_API_KEY"),
base_url=self.config.openrouter_base_url or os.getenv("OPENROUTER_API_BASE") or "https://openrouter.ai/api/v1",
)
self.async_client = AsyncOpenAI(
api_key=os.environ.get("OPENROUTER_API_KEY"),
base_url=self.config.openrouter_base_url or os.getenv("OPENROUTER_API_BASE") or "https://openrouter.ai/api/v1",
)
else:
api_key = self.config.api_key or os.getenv("OPENAI_API_KEY")
base_url = self.config.openai_base_url or os.getenv("OPENAI_API_BASE") or "https://api.openai.com/v1"
self.client = OpenAI(api_key=api_key, base_url=base_url)
self.async_client = AsyncOpenAI(api_key=api_key, base_url=base_url)

def _parse_response(self, response, tools):
"""
Expand Down Expand Up @@ -106,3 +111,55 @@ def generate_response(

response = self.client.chat.completions.create(**params)
return self._parse_response(response, tools)

async def agenerate_response(
self,
messages: List[Dict[str, str]],
response_format=None,
tools: Optional[List[Dict]] = None,
tool_choice: str = "auto",
):
"""
Generate a response based on the given messages using OpenAI.

Args:
messages (list): List of message dicts containing 'role' and 'content'.
response_format (str or object, optional): Format of the response. Defaults to "text".
tools (list, optional): List of tools that the model can call. Defaults to None.
tool_choice (str, optional): Tool choice method. Defaults to "auto".

Returns:
str: The generated response.
"""
params = {
"model": self.config.model,
"messages": messages,
"temperature": self.config.temperature,
"max_tokens": self.config.max_tokens,
"top_p": self.config.top_p,
}

if os.getenv("OPENROUTER_API_KEY"):
openrouter_params = {}
if self.config.models:
openrouter_params["models"] = self.config.models
openrouter_params["route"] = self.config.route
params.pop("model")

if self.config.site_url and self.config.app_name:
extra_headers = {
"HTTP-Referer": self.config.site_url,
"X-Title": self.config.app_name,
}
openrouter_params["extra_headers"] = extra_headers

params.update(**openrouter_params)

if response_format:
params["response_format"] = response_format
if tools: # TODO: Remove tools if no issues found with new memory addition logic
params["tools"] = tools
params["tool_choice"] = tool_choice

response = await self.async_client.chat.completions.create(**params)
return self._parse_response(response, tools)
38 changes: 37 additions & 1 deletion mem0/llms/openai_structured.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
from typing import Dict, List, Optional

from openai import OpenAI
from openai import AsyncOpenAI, OpenAI

from mem0.configs.llms.base import BaseLlmConfig
from mem0.llms.base import LLMBase
Expand All @@ -18,6 +18,7 @@ def __init__(self, config: Optional[BaseLlmConfig] = None):
api_key = self.config.api_key or os.getenv("OPENAI_API_KEY")
base_url = self.config.openai_base_url or os.getenv("OPENAI_API_BASE") or "https://api.openai.com/v1"
self.client = OpenAI(api_key=api_key, base_url=base_url)
self.async_client = AsyncOpenAI(api_key=api_key, base_url=base_url)

def _parse_response(self, response, tools):
"""
Expand Down Expand Up @@ -85,3 +86,38 @@ def generate_response(
response = self.client.beta.chat.completions.parse(**params)

return self._parse_response(response, tools)

async def agenerate_response(
self,
messages: List[Dict[str, str]],
response_format=None,
tools: Optional[List[Dict]] = None,
tool_choice: str = "auto",
):
"""
Generate a response based on the given messages using OpenAI.

Args:
messages (list): List of message dicts containing 'role' and 'content'.
response_format (str or object, optional): Format of the response. Defaults to "text".
tools (list, optional): List of tools that the model can call. Defaults to None.
tool_choice (str, optional): Tool choice method. Defaults to "auto".

Returns:
str: The generated response.
"""
params = {
"model": self.config.model,
"messages": messages,
"temperature": self.config.temperature,
}

if response_format:
params["response_format"] = response_format
if tools:
params["tools"] = tools
params["tool_choice"] = tool_choice

response = await self.async_client.beta.chat.completions.parse(**params)

return self._parse_response(response, tools)
Loading
Loading