
Security News
OWASP 2025 Top 10 Adds Software Supply Chain Failures, Ranked Top Community Concern
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.
milvus-haystack
Advanced tools
pip install --upgrade pymilvus milvus-haystack
Use the MilvusDocumentStore in a Haystack pipeline as a quick start.
from haystack import Document
from milvus_haystack import MilvusDocumentStore
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
documents = [Document(
content="A Foo Document",
meta={"page": "100", "chapter": "intro"},
embedding=[-10.0] * 128,
)]
document_store.write_documents(documents)
print(document_store.count_documents()) # 1
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
document_store = MilvusDocumentStore(
connection_args={"uri": "http://localhost:19530"},
drop_old=True,
)
from haystack.utils import Secret
document_store = MilvusDocumentStore(
connection_args={
"uri": "https://in03-ba4234asae.api.gcp-us-west1.zillizcloud.com", # Your Public Endpoint
"token": Secret.from_env_var("ZILLIZ_CLOUD_API_KEY"), # API key, we recommend using the Secret class to load the token from env variable for security.
"secure": True
},
drop_old=True,
)
Prepare an OpenAI API key and set it as an environment variable:
export OPENAI_API_KEY=<your_api_key>
import glob
import os
from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever
current_file_path = os.path.abspath(__file__)
file_paths = [current_file_path] # You can replace it with your own file paths.
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", MarkdownToDocument())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=2))
indexing_pipeline.add_component("embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.run({"converter": {"sources": file_paths}})
print("Number of documents:", document_store.count_documents())
question = "How to set the service uri with milvus lite?" # You can replace it with your own question.
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
retrieval_pipeline.connect("embedder", "retriever")
retrieval_results = retrieval_pipeline.run({"embedder": {"text": question}})
for doc in retrieval_results["retriever"]["documents"]:
print(doc.content)
print("-" * 10)
from haystack.utils import Secret
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
prompt_template = """Answer the following query based on the provided context. If the context does
not include an answer, reply with 'I don't know'.\n
Query: {{query}}
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Answer:
"""
rag_pipeline = Pipeline()
rag_pipeline.add_component("text_embedder", OpenAITextEmbedder())
rag_pipeline.add_component("retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3))
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component("generator", OpenAIGenerator(api_key=Secret.from_token(os.getenv("OPENAI_API_KEY")),
generation_kwargs={"temperature": 0}))
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")
results = rag_pipeline.run(
{
"text_embedder": {"text": question},
"prompt_builder": {"query": question},
}
)
print('RAG answer:', results["generator"]["replies"][0])
This example demonstrates the basic approach to sparse indexing and retrieval using Haystack's sparse embedders.
from haystack import Document, Pipeline
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.components.embedders.fastembed import (
FastembedSparseDocumentEmbedder,
FastembedSparseTextEmbedder,
)
from milvus_haystack import MilvusDocumentStore, MilvusSparseEmbeddingRetriever
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
sparse_vector_field="sparse_vector", # Specify a name of the sparse vector field to enable sparse retrieval.
drop_old=True,
)
documents = [
Document(content="My name is Wolfgang and I live in Berlin"),
Document(content="I saw a black horse running"),
Document(content="Germany has many big cities"),
Document(content="full text search is supported by Milvus."),
]
sparse_document_embedder = FastembedSparseDocumentEmbedder()
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("sparse_document_embedder", sparse_document_embedder)
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("sparse_document_embedder", "writer")
indexing_pipeline.run({"sparse_document_embedder": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_text_embedder", FastembedSparseTextEmbedder())
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetriever(document_store=document_store))
retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "sparse_retriever.query_sparse_embedding")
query = "who supports full text search?"
result = retrieval_pipeline.run({"sparse_text_embedder": {"text": query}})
print(result["sparse_retriever"]["documents"][0])
# Document(id=..., content: 'full text search is supported by Milvus.', sparse_embedding: vector with 48 non-zero elements)
Milvus provides a built-in BM25 function that can generate sparse vectors directly from text fields. This approach simplifies the pipeline construction compared to using Haystack's sparse embedders. The main differences are:
BM25BuiltInFunction in the document store with some field specification parameters.Below is a complete example using Milvus' built-in BM25 function. The code with + signs shows the simplified approach using Milvus' built-in functionality, while the code with - signs shows the original approach that requires explicit sparse embedding:
+ from milvus_haystack.function import BM25BuiltInFunction
+
document_store = MilvusDocumentStore(
connection_args={"uri": "http://localhost:19530"},
sparse_vector_field="sparse_vector",
text_field="text",
+ builtin_function=[
+ BM25BuiltInFunction( # The BM25 function converts the text into a sparse vector.
+ input_field_names="text", output_field_names="sparse_vector",
+ )
+ ],
drop_old=True,
)
- sparse_document_embedder = FastembedSparseDocumentEmbedder()
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
- indexing_pipeline.add_component("sparse_document_embedder", sparse_document_embedder)
indexing_pipeline.add_component("writer", writer)
- indexing_pipeline.connect("sparse_document_embedder", "writer")
- indexing_pipeline.run({"sparse_document_embedder": {"documents": documents}})
+ indexing_pipeline.run({"writer": {"documents": documents}})
retrieval_pipeline = Pipeline()
- retrieval_pipeline.add_component("sparse_text_embedder", FastembedSparseTextEmbedder())
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetriever(document_store=document_store))
- retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "sparse_retriever.query_sparse_embedding")
query = "who supports full text search?"
- result = retrieval_pipeline.run({"sparse_text_embedder": {"text": query}})
+ result = retrieval_pipeline.run({"sparse_retriever": {"query_text": query}})
print(result["sparse_retriever"]["documents"][0])
This example demonstrates the basic approach to perform hybrid retrieval using Haystack's sparse embedders.
from haystack import Document, Pipeline
from haystack.components.embedders import OpenAIDocumentEmbedder, OpenAITextEmbedder
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from haystack_integrations.components.embedders.fastembed import (
FastembedSparseDocumentEmbedder,
FastembedSparseTextEmbedder,
)
from milvus_haystack import MilvusDocumentStore, MilvusHybridRetriever
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
sparse_vector_field="sparse_vector", # Specify a name of the sparse vector field to enable hybrid retrieval.
)
documents = [
Document(content="My name is Wolfgang and I live in Berlin"),
Document(content="I saw a black horse running"),
Document(content="Germany has many big cities"),
Document(content="full text search is supported by Milvus."),
]
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("sparse_doc_embedder", FastembedSparseDocumentEmbedder())
indexing_pipeline.add_component("dense_doc_embedder", OpenAIDocumentEmbedder())
indexing_pipeline.add_component("writer", writer)
indexing_pipeline.connect("sparse_doc_embedder", "dense_doc_embedder")
indexing_pipeline.connect("dense_doc_embedder", "writer")
indexing_pipeline.run({"sparse_doc_embedder": {"documents": documents}})
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("sparse_text_embedder",
FastembedSparseTextEmbedder(model="prithvida/Splade_PP_en_v1"))
retrieval_pipeline.add_component("dense_text_embedder", OpenAITextEmbedder())
retrieval_pipeline.add_component(
"retriever",
MilvusHybridRetriever(
document_store=document_store,
# reranker=WeightedRanker(0.5, 0.5), # Default is RRFRanker()
)
)
retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "retriever.query_sparse_embedding")
retrieval_pipeline.connect("dense_text_embedder.embedding", "retriever.query_embedding")
question = "who supports full text search?"
results = retrieval_pipeline.run(
{"dense_text_embedder": {"text": question},
"sparse_text_embedder": {"text": question}}
)
print(results["retriever"]["documents"][0])
# Document(id=..., content: 'full text search is supported by Milvus.', embedding: vector of size 1536, sparse_embedding: vector with 48 non-zero elements)
Milvus provides a built-in BM25 function that can generate sparse vectors directly from text fields. This approach simplifies the pipeline construction compared to using Haystack's sparse embedders, making it a useful complement to semantic search. The main differences are:
BM25BuiltInFunction in the document store with some field specification parameters.Below is a complete example using Milvus' built-in BM25 function for hybrid retrieval. The code with + signs shows the simplified approach using Milvus' built-in functionality, while the code with - signs shows the original approach that requires explicit sparse embedding:
+ from milvus_haystack.function import BM25BuiltInFunction
+
document_store = MilvusDocumentStore(
connection_args={"uri": "http://localhost:19530"},
sparse_vector_field="sparse_vector",
text_field="text",
+ builtin_function=[
+ BM25BuiltInFunction( # The BM25 function converts the text into a sparse vector.
+ input_field_names="text", output_field_names="sparse_vector",
+ )
+ ],
drop_old=True,
)
- sparse_document_embedder = FastembedSparseDocumentEmbedder()
writer = DocumentWriter(document_store=document_store, policy=DuplicatePolicy.NONE)
indexing_pipeline = Pipeline()
- indexing_pipeline.add_component("sparse_document_embedder", sparse_document_embedder)
indexing_pipeline.add_component("writer", writer)
- indexing_pipeline.connect("sparse_document_embedder", "writer")
- indexing_pipeline.run({"sparse_document_embedder": {"documents": documents}})
+ indexing_pipeline.run({"writer": {"documents": documents}})
retrieval_pipeline = Pipeline()
- retrieval_pipeline.add_component("sparse_text_embedder", FastembedSparseTextEmbedder())
retrieval_pipeline.add_component("sparse_retriever", MilvusSparseEmbeddingRetriever(document_store=document_store))
- retrieval_pipeline.connect("sparse_text_embedder.sparse_embedding", "sparse_retriever.query_sparse_embedding")
query = "who supports full text search?"
- result = retrieval_pipeline.run({"sparse_text_embedder": {"text": query}})
+ result = retrieval_pipeline.run({"sparse_retriever": {"query_text": query}})
print(result["sparse_retriever"]["documents"][0])
milvus-haystack is distributed under the terms of the Apache-2.0 license.
FAQs
Unknown package
We found that milvus-haystack demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 3 open source maintainers collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OWASP’s 2025 Top 10 introduces Software Supply Chain Failures as a new category, reflecting rising concern over dependency and build system risks.

Research
/Security News
Socket researchers discovered nine malicious NuGet packages that use time-delayed payloads to crash applications and corrupt industrial control systems.

Security News
Socket CTO Ahmad Nassri discusses why supply chain attacks now target developer machines and what AI means for the future of enterprise security.