langchain-elasticsearch
This package contains the LangChain integration with Elasticsearch.
Installation
pip install -U langchain-elasticsearch
Elasticsearch setup
Elastic Cloud
You need a running Elasticsearch deployment. The easiest way to start one is through Elastic Cloud.
You can sign up for a free trial.
- Create a deployment
- Get your Cloud ID:
- In the Elastic Cloud console, click "Manage" next to your deployment
- Copy the Cloud ID and paste it into the
es_cloud_id
parameter below
- Create an API key:
- In the Elastic Cloud console, click "Open" next to your deployment
- In the left-hand side menu, go to "Stack Management", then to "API Keys"
- Click "Create API key"
- Enter a name for the API key and click "Create"
- Copy the API key and paste it into the
es_api_key
parameter below
Elastic Cloud
Alternatively, you can run Elasticsearch via Docker as described in the docs.
Usage
ElasticsearchStore
The ElasticsearchStore
class exposes Elasticsearch as a vector store.
from langchain_elasticsearch import ElasticsearchStore
embeddings = ...
vectorstore = ElasticsearchStore(
es_cloud_id="your-cloud-id",
es_api_key="your-api-key",
index_name="your-index-name",
embeddings=embeddings,
)
ElasticsearchRetriever
The ElasticsearchRetriever
class can be user to implement more complex queries.
This can be useful for power users and necessary if data was ingested outside of LangChain
(for example using a web crawler).
def fuzzy_query(search_query: str) -> Dict:
return {
"query": {
"match": {
text_field: {
"query": search_query,
"fuzziness": "AUTO",
}
},
},
}
fuzzy_retriever = ElasticsearchRetriever.from_es_params(
es_cloud_id="your-cloud-id",
es_api_key="your-api-key",
index_name="your-index-name",
body_func=fuzzy_query,
content_field=text_field,
)
fuzzy_retriever.get_relevant_documents("fooo")
ElasticsearchEmbeddings
The ElasticsearchEmbeddings
class provides an interface to generate embeddings using a model
deployed in an Elasticsearch cluster.
from langchain_elasticsearch import ElasticsearchEmbeddings
embeddings = ElasticsearchEmbeddings.from_credentials(
model_id="your-model-id",
input_field="your-input-field",
es_cloud_id="your-cloud-id",
es_api_key="your-api-key",
)
ElasticsearchChatMessageHistory
The ElasticsearchChatMessageHistory
class stores chat histories in Elasticsearch.
from langchain_elasticsearch import ElasticsearchChatMessageHistory
chat_history = ElasticsearchChatMessageHistory(
index="your-index-name",
session_id="your-session-id",
es_cloud_id="your-cloud-id",
es_api_key="your-api-key",
)
ElasticsearchCache
A caching layer for LLMs that uses Elasticsearch.
Simple example:
from elasticsearch import Elasticsearch
from langchain.globals import set_llm_cache
from langchain_elasticsearch import ElasticsearchCache
es_client = Elasticsearch(hosts="http://localhost:9200")
set_llm_cache(
ElasticsearchCache(
es_connection=es_client,
index_name="llm-chat-cache",
metadata={"project": "my_chatgpt_project"},
)
)
The index_name
parameter can also accept aliases. This allows to use the
ILM: Manage the index lifecycle
that we suggest to consider for managing retention and controlling cache growth.
Look at the class docstring for all parameters.
Index the generated text
The cached data won't be searchable by default.
The developer can customize the building of the Elasticsearch document in order to add indexed text fields,
where to put, for example, the text generated by the LLM.
This can be done by subclassing end overriding methods.
The new cache class can be applied also to a pre-existing cache index:
import json
from typing import Any, Dict, List
from elasticsearch import Elasticsearch
from langchain.globals import set_llm_cache
from langchain_core.caches import RETURN_VAL_TYPE
from langchain_elasticsearch import ElasticsearchCache
class SearchableElasticsearchCache(ElasticsearchCache):
@property
def mapping(self) -> Dict[str, Any]:
mapping = super().mapping
mapping["mappings"]["properties"]["parsed_llm_output"] = {
"type": "text",
"analyzer": "english",
}
return mapping
def build_document(
self, prompt: str, llm_string: str, return_val: RETURN_VAL_TYPE
) -> Dict[str, Any]:
body = super().build_document(prompt, llm_string, return_val)
body["parsed_llm_output"] = self._parse_output(body["llm_output"])
return body
@staticmethod
def _parse_output(data: List[str]) -> List[str]:
return [
json.loads(output)["kwargs"]["message"]["kwargs"]["content"]
for output in data
]
es_client = Elasticsearch(hosts="http://localhost:9200")
set_llm_cache(
SearchableElasticsearchCache(es_connection=es_client, index_name="llm-chat-cache")
)
When overriding the mapping and the document building,
please only make additive modifications, keeping the base mapping intact.