Commit 396700dd authored by chenzk's avatar chenzk
Browse files

v1.0

parents
Pipeline #2603 failed with stages
in 0 seconds
import os
import pytest
from dbgpt.configs.model_config import ROOT_PATH
from dbgpt.core import Chunk, HumanPromptTemplate, ModelMessage, ModelRequest
from dbgpt.model.proxy.llms.chatgpt import OpenAILLMClient
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.rag.retriever import RetrieverStrategy
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.assembler import EmbeddingAssembler
from dbgpt_ext.rag.knowledge import KnowledgeFactory
from dbgpt_ext.storage.graph_store.tugraph_store import TuGraphStoreConfig
from dbgpt_ext.storage.knowledge_graph.community_summary import (
CommunitySummaryKnowledgeGraph,
)
from dbgpt_ext.storage.knowledge_graph.knowledge_graph import (
BuiltinKnowledgeGraph,
)
"""GraphRAG example.
```
# Set LLM config (url/sk) in `.env`.
# Install pytest utils: `pip install pytest pytest-asyncio`
GRAPH_STORE_TYPE=TuGraph
TUGRAPH_HOST=127.0.0.1
TUGRAPH_PORT=7687
TUGRAPH_USERNAME=admin
TUGRAPH_PASSWORD=73@TuGraph
```
Examples:
..code-block:: shell
pytest -s examples/rag/graph_rag_example.py
"""
llm_client = OpenAILLMClient()
model_name = "gpt-4o-mini"
@pytest.mark.asyncio
async def test_naive_graph_rag():
await __run_graph_rag(
knowledge_file="examples/test_files/graphrag-mini.md",
chunk_strategy="CHUNK_BY_SIZE",
knowledge_graph=__create_naive_kg_connector(),
question="What's the relationship between TuGraph and DB-GPT ?",
)
@pytest.mark.asyncio
async def test_community_graph_rag():
await __run_graph_rag(
knowledge_file="examples/test_files/graphrag-mini.md",
chunk_strategy="CHUNK_BY_MARKDOWN_HEADER",
knowledge_graph=__create_community_kg_connector(),
question="What's the relationship between TuGraph and DB-GPT ?",
)
def __create_naive_kg_connector():
"""Create knowledge graph connector."""
return BuiltinKnowledgeGraph(
config=TuGraphStoreConfig(),
name="naive_graph_rag_test",
embedding_fn=None,
llm_client=llm_client,
llm_model=model_name,
)
def __create_community_kg_connector():
"""Create community knowledge graph connector."""
return CommunitySummaryKnowledgeGraph(
config=TuGraphStoreConfig(),
name="community_graph_rag_test",
embedding_fn=DefaultEmbeddingFactory.openai(),
llm_client=llm_client,
llm_model=model_name,
)
async def ask_chunk(chunk: Chunk, question) -> str:
rag_template = (
"Based on the following [Context] {context}, answer [Question] {question}."
)
template = HumanPromptTemplate.from_template(rag_template)
messages = template.format_messages(context=chunk.content, question=question)
model_messages = ModelMessage.from_base_messages(messages)
request = ModelRequest(model=model_name, messages=model_messages)
response = await llm_client.generate(request=request)
if not response.success:
code = str(response.error_code)
reason = response.text
raise Exception(f"request llm failed ({code}) {reason}")
return response.text
async def __run_graph_rag(knowledge_file, chunk_strategy, knowledge_graph, question):
file_path = os.path.join(ROOT_PATH, knowledge_file).format()
knowledge = KnowledgeFactory.from_file_path(file_path)
try:
chunk_parameters = ChunkParameters(chunk_strategy=chunk_strategy)
# get embedding assembler
assembler = await EmbeddingAssembler.aload_from_knowledge(
knowledge=knowledge,
chunk_parameters=chunk_parameters,
index_store=knowledge_graph,
retrieve_strategy=RetrieverStrategy.GRAPH,
)
await assembler.apersist()
# get embeddings retriever
retriever = assembler.as_retriever(1)
chunks = await retriever.aretrieve_with_scores(question, score_threshold=0.3)
# chat
print(f"{await ask_chunk(chunks[0], question)}")
finally:
knowledge_graph.delete_vector_name(knowledge_graph.get_config().name)
import asyncio
import os
from dbgpt.configs.model_config import ROOT_PATH
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.assembler import EmbeddingAssembler
from dbgpt_ext.rag.knowledge import KnowledgeFactory
from dbgpt_ext.storage.full_text.elasticsearch import (
ElasticDocumentStore,
ElasticsearchStoreConfig,
)
"""Keyword rag example.
pre-requirements:
set your Elasticsearch environment.
Examples:
..code-block:: shell
python examples/rag/keyword_rag_example.py
"""
def _create_es_connector():
"""Create es connector."""
config = ElasticsearchStoreConfig(
uri="localhost",
port="9200",
user="elastic",
password="dbgpt",
)
return ElasticDocumentStore(config, name="keyword_rag_test")
async def main():
file_path = os.path.join(ROOT_PATH, "docs/docs/awel/awel.md")
knowledge = KnowledgeFactory.from_file_path(file_path)
keyword_store = _create_es_connector()
chunk_parameters = ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
# get embedding assembler
assembler = EmbeddingAssembler.load_from_knowledge(
knowledge=knowledge,
chunk_parameters=chunk_parameters,
index_store=keyword_store,
)
assembler.persist()
# get embeddings retriever
retriever = assembler.as_retriever(3)
chunks = await retriever.aretrieve_with_scores("what is awel talk about", 0.3)
print(f"keyword rag example results:{chunks}")
if __name__ == "__main__":
asyncio.run(main())
"""Rag Metadata Properties filter example.
pre-requirements:
make sure you have set your embedding model path in your example code.
Examples:
..code-block:: shell
python examples/rag/metadata_filter_example.py
"""
import asyncio
import os
from dbgpt.configs.model_config import MODEL_PATH, PILOT_PATH, ROOT_PATH
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.storage.vector_store.filters import MetadataFilter, MetadataFilters
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.assembler import EmbeddingAssembler
from dbgpt_ext.rag.knowledge import KnowledgeFactory
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
def _create_vector_connector():
"""Create vector connector."""
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
)
return ChromaStore(
config,
name="embedding_rag_test",
embedding_fn=DefaultEmbeddingFactory(
default_model_name=os.path.join(MODEL_PATH, "text2vec-large-chinese"),
).create(),
)
async def main():
file_path = os.path.join(ROOT_PATH, "docs/docs/awel/awel.md")
knowledge = KnowledgeFactory.from_file_path(file_path)
vector_store = _create_vector_connector()
chunk_parameters = ChunkParameters(chunk_strategy="CHUNK_BY_MARKDOWN_HEADER")
# get embedding assembler
assembler = EmbeddingAssembler.load_from_knowledge(
knowledge=knowledge,
chunk_parameters=chunk_parameters,
index_store=vector_store,
)
assembler.persist()
# get embeddings retriever
retriever = assembler.as_retriever(3)
# create metadata filter
metadata_filter = MetadataFilter(key="Header2", value="AWEL Design")
filters = MetadataFilters(filters=[metadata_filter])
chunks = await retriever.aretrieve_with_scores(
"what is awel talk about", 0.0, filters
)
print(f"embedding rag example results:{chunks}")
vector_store.delete_vector_name("metadata_rag_test")
if __name__ == "__main__":
asyncio.run(main())
"""A RAG example using the OpenAPIEmbeddings.
Example:
Test with `OpenAI embeddings
<https://platform.openai.com/docs/api-reference/embeddings/create>`_.
.. code-block:: shell
export API_SERVER_BASE_URL=${OPENAI_API_BASE:-"https://api.openai.com/v1"}
export API_SERVER_API_KEY="${OPENAI_API_KEY}"
export API_SERVER_EMBEDDINGS_MODEL="text-embedding-ada-002"
python examples/rag/rag_embedding_api_example.py
Test with DB-GPT `API Server
<https://docs.dbgpt.site/docs/installation/advanced_usage/OpenAI_SDK_call#start-apiserver>`_.
.. code-block:: shell
export API_SERVER_BASE_URL="http://localhost:8100/api/v1"
export API_SERVER_API_KEY="your_api_key"
export API_SERVER_EMBEDDINGS_MODEL="text2vec"
python examples/rag/rag_embedding_api_example.py
"""
import asyncio
import os
from typing import Optional
from dbgpt.configs.model_config import PILOT_PATH, ROOT_PATH
from dbgpt.rag.embedding import OpenAPIEmbeddings
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.assembler import EmbeddingAssembler
from dbgpt_ext.rag.knowledge import KnowledgeFactory
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
def _create_embeddings(
api_url: str = None, api_key: Optional[str] = None, model_name: Optional[str] = None
) -> OpenAPIEmbeddings:
if not api_url:
api_server_base_url = os.getenv(
"API_SERVER_BASE_URL", "http://localhost:8100/api/v1/"
)
api_url = f"{api_server_base_url}/embeddings"
if not api_key:
api_key = os.getenv("API_SERVER_API_KEY")
if not model_name:
model_name = os.getenv("API_SERVER_EMBEDDINGS_MODEL", "text2vec")
return OpenAPIEmbeddings(api_url=api_url, api_key=api_key, model_name=model_name)
def _create_vector_connector():
"""Create vector connector."""
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
)
return ChromaStore(
config,
name="embedding_rag_test",
embedding_fn=_create_embeddings(),
)
async def main():
file_path = os.path.join(ROOT_PATH, "docs/docs/awel/awel.md")
knowledge = KnowledgeFactory.from_file_path(file_path)
vector_store = _create_vector_connector()
chunk_parameters = ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
# get embedding assembler
assembler = EmbeddingAssembler.load_from_knowledge(
knowledge=knowledge,
chunk_parameters=chunk_parameters,
index_store=vector_store,
)
assembler.persist()
# get embeddings retriever
retriever = assembler.as_retriever(3)
chunks = await retriever.aretrieve_with_scores("what is awel talk about", 0.3)
print(f"embedding rag example results:{chunks}")
vector_store.delete_vector_name("embedding_api_rag_test")
if __name__ == "__main__":
asyncio.run(main())
import asyncio
import os
from typing import Optional
from dbgpt.configs.model_config import MODEL_PATH, PILOT_PATH, ROOT_PATH
from dbgpt.core import Embeddings
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.rag.evaluation import RetrieverEvaluator
from dbgpt.rag.evaluation.retriever import (
RetrieverHitRateMetric,
RetrieverMRRMetric,
RetrieverSimilarityMetric,
)
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.assembler import EmbeddingAssembler
from dbgpt_ext.rag.knowledge import KnowledgeFactory
from dbgpt_ext.rag.operators import EmbeddingRetrieverOperator
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
def _create_embeddings(
model_name: Optional[str] = "text2vec-large-chinese",
) -> Embeddings:
"""Create embeddings."""
return DefaultEmbeddingFactory(
default_model_name=os.path.join(MODEL_PATH, model_name),
).create()
def _create_vector_connector():
"""Create vector connector."""
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
)
return ChromaStore(
config,
name="embedding_rag_test",
embedding_fn=_create_embeddings(),
)
async def main():
file_path = os.path.join(ROOT_PATH, "docs/docs/awel/awel.md")
knowledge = KnowledgeFactory.from_file_path(file_path)
embeddings = _create_embeddings()
vector_connector = _create_vector_connector(embeddings)
chunk_parameters = ChunkParameters(chunk_strategy="CHUNK_BY_MARKDOWN_HEADER")
# get embedding assembler
assembler = EmbeddingAssembler.load_from_knowledge(
knowledge=knowledge,
chunk_parameters=chunk_parameters,
index_store=vector_connector,
)
assembler.persist()
dataset = [
{
"query": "what is awel talk about",
"contexts": [
"# What is AWEL? \n\nAgentic Workflow Expression Language(AWEL) is a "
"set of intelligent agent workflow expression language specially "
"designed for large model application\ndevelopment. It provides great "
"functionality and flexibility. Through the AWEL API, you can focus on "
"the development of business logic for LLMs applications\nwithout "
"paying attention to cumbersome model and environment details.\n\nAWEL "
"adopts a layered API design. AWEL's layered API design architecture is "
"shown in the figure below."
],
},
]
evaluator = RetrieverEvaluator(
operator_cls=EmbeddingRetrieverOperator,
embeddings=embeddings,
operator_kwargs={
"top_k": 5,
"index_store": vector_connector,
},
)
metrics = [
RetrieverHitRateMetric(),
RetrieverMRRMetric(),
RetrieverSimilarityMetric(embeddings=embeddings),
]
results = await evaluator.evaluate(dataset, metrics)
for result in results:
for metric in result:
print("Metric:", metric.metric_name)
print("Question:", metric.query)
print("Score:", metric.score)
print(f"Results:\n{results}")
if __name__ == "__main__":
asyncio.run(main())
"""Query rewrite example.
pre-requirements:
1. install openai python sdk
```
pip install openai
```
2. set openai key and base
```
export OPENAI_API_KEY={your_openai_key}
export OPENAI_API_BASE={your_openai_base}
```
or
```
import os
os.environ["OPENAI_API_KEY"] = {your_openai_key}
os.environ["OPENAI_API_BASE"] = {your_openai_base}
```
Examples:
..code-block:: shell
python examples/rag/rewrite_rag_example.py
"""
import asyncio
from dbgpt.model.proxy import OpenAILLMClient
from dbgpt.rag.retriever import QueryRewrite
async def main():
query = "compare steve curry and lebron james"
llm_client = OpenAILLMClient()
reinforce = QueryRewrite(
llm_client=llm_client,
model_name="gpt-3.5-turbo",
)
return await reinforce.rewrite(origin_query=query, nums=1)
if __name__ == "__main__":
output = asyncio.run(main())
print(f"output: \n\n{output}")
"""AWEL: Simple rag db schema embedding operator example
if you not set vector_store_connector, it will return all tables schema in database.
```
retriever_task = DBSchemaRetrieverOperator(
connector=_create_temporary_connection()
)
```
if you set vector_store_connector, it will recall topk similarity tables schema in database.
```
retriever_task = DBSchemaRetrieverOperator(
connector=_create_temporary_connection()
top_k=1,
index_store=vector_store_connector
)
```
Examples:
..code-block:: shell
curl --location 'http://127.0.0.1:5555/api/v1/awel/trigger/examples/rag/dbschema' \
--header 'Content-Type: application/json' \
--data '{"query": "what is user name?"}'
"""
import os
from typing import Dict, List
from dbgpt._private.pydantic import BaseModel, Field
from dbgpt.configs.model_config import MODEL_PATH, PILOT_PATH
from dbgpt.core import Chunk
from dbgpt.core.awel import DAG, HttpTrigger, JoinOperator, MapOperator
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt_ext.datasource.rdbms.conn_sqlite import SQLiteTempConnector
from dbgpt_ext.rag.operators import DBSchemaAssemblerOperator
from dbgpt_ext.rag.operators.db_schema import DBSchemaRetrieverOperator
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
def _create_vector_connector():
"""Create vector connector."""
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
)
return ChromaStore(
config,
name="embedding_rag_test",
embedding_fn=DefaultEmbeddingFactory(
default_model_name=os.path.join(MODEL_PATH, "text2vec-large-chinese"),
).create(),
)
def _create_temporary_connection():
"""Create a temporary database connection for testing."""
connect = SQLiteTempConnector.create_temporary_db()
connect.create_temp_tables(
{
"user": {
"columns": {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT",
"age": "INTEGER",
},
"data": [
(1, "Tom", 10),
(2, "Jerry", 16),
(3, "Jack", 18),
(4, "Alice", 20),
(5, "Bob", 22),
],
}
}
)
return connect
def _join_fn(chunks: List[Chunk], query: str) -> str:
print(f"db schema info is {[chunk.content for chunk in chunks]}")
return query
class TriggerReqBody(BaseModel):
query: str = Field(..., description="User query")
class RequestHandleOperator(MapOperator[TriggerReqBody, Dict]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, input_value: TriggerReqBody) -> Dict:
params = {
"query": input_value.query,
}
print(f"Receive input value: {input_value}")
return params
with DAG("simple_rag_db_schema_example") as dag:
trigger = HttpTrigger(
"/examples/rag/dbschema", methods="POST", request_body=TriggerReqBody
)
request_handle_task = RequestHandleOperator()
query_operator = MapOperator(lambda request: request["query"])
index_store = _create_vector_connector()
connector = _create_temporary_connection()
assembler_task = DBSchemaAssemblerOperator(
connector=connector,
index_store=index_store,
)
join_operator = JoinOperator(combine_function=_join_fn)
retriever_task = DBSchemaRetrieverOperator(
connector=_create_temporary_connection(),
top_k=1,
index_store=index_store,
)
result_parse_task = MapOperator(lambda chunks: [chunk.content for chunk in chunks])
trigger >> assembler_task >> join_operator
trigger >> request_handle_task >> query_operator >> join_operator
join_operator >> retriever_task >> result_parse_task
if __name__ == "__main__":
if dag.leaf_nodes[0].dev_mode:
# Development mode, you can run the dag locally for debugging.
from dbgpt.core.awel import setup_dev_environment
setup_dev_environment([dag], port=5555)
else:
pass
"""AWEL: Simple rag embedding operator example.
Examples:
pre-requirements:
python examples/awel/simple_rag_embedding_example.py
..code-block:: shell
curl --location --request POST 'http://127.0.0.1:5555/api/v1/awel/trigger/examples/rag/embedding' \
--header 'Content-Type: application/json' \
--data-raw '{
"url": "https://docs.dbgpt.site/docs/latest/awel/"
}'
"""
import os
from typing import Dict, List
from dbgpt._private.pydantic import BaseModel, Field
from dbgpt.configs.model_config import MODEL_PATH, PILOT_PATH
from dbgpt.core.awel import DAG, HttpTrigger, MapOperator
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt.rag.knowledge import KnowledgeType
from dbgpt_ext.rag.operators import EmbeddingAssemblerOperator, KnowledgeOperator
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
def _create_vector_connector():
"""Create vector connector."""
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
)
return ChromaStore(
config,
name="embedding_rag_test",
embedding_fn=DefaultEmbeddingFactory(
default_model_name=os.path.join(MODEL_PATH, "text2vec-large-chinese"),
).create(),
)
class TriggerReqBody(BaseModel):
url: str = Field(..., description="url")
class RequestHandleOperator(MapOperator[TriggerReqBody, Dict]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, input_value: TriggerReqBody) -> Dict:
params = {
"url": input_value.url,
}
print(f"Receive input value: {input_value}")
return params
class ResultOperator(MapOperator):
"""The Result Operator."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, chunks: List) -> str:
result = f"embedding success, there are {len(chunks)} chunks."
print(result)
return result
with DAG("simple_sdk_rag_embedding_example") as dag:
trigger = HttpTrigger(
"/examples/rag/embedding", methods="POST", request_body=TriggerReqBody
)
request_handle_task = RequestHandleOperator()
knowledge_operator = KnowledgeOperator(knowledge_type=KnowledgeType.URL.name)
vector_store = _create_vector_connector()
url_parser_operator = MapOperator(map_function=lambda x: x["url"])
embedding_operator = EmbeddingAssemblerOperator(
index_store=vector_store,
)
output_task = ResultOperator()
(
trigger
>> request_handle_task
>> url_parser_operator
>> knowledge_operator
>> embedding_operator
>> output_task
)
if __name__ == "__main__":
if dag.leaf_nodes[0].dev_mode:
# Development mode, you can run the dag locally for debugging.
from dbgpt.core.awel import setup_dev_environment
setup_dev_environment([dag], port=5555)
else:
pass
"""AWEL: Simple rag embedding operator example
pre-requirements:
1. install openai python sdk
```
pip install openai
```
2. set openai key and base
```
export OPENAI_API_KEY={your_openai_key}
export OPENAI_API_BASE={your_openai_base}
```
3. make sure you have vector store.
if there are no data in vector store, please run examples/awel/simple_rag_embedding_example.py
ensure your embedding model in DB-GPT/models/.
Examples:
..code-block:: shell
DBGPT_SERVER="http://127.0.0.1:5555"
curl -X POST $DBGPT_SERVER/api/v1/awel/trigger/examples/rag/retrieve \
-H "Content-Type: application/json" -d '{ \
"query": "what is awel talk about?"
}'
"""
import os
from typing import Dict, List
from dbgpt._private.pydantic import BaseModel, Field
from dbgpt.configs.model_config import MODEL_PATH, PILOT_PATH
from dbgpt.core import Chunk
from dbgpt.core.awel import DAG, HttpTrigger, JoinOperator, MapOperator
from dbgpt.model.proxy import OpenAILLMClient
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt_ext.rag.operators import (
EmbeddingRetrieverOperator,
QueryRewriteOperator,
RerankOperator,
)
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
class TriggerReqBody(BaseModel):
query: str = Field(..., description="User query")
class RequestHandleOperator(MapOperator[TriggerReqBody, Dict]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, input_value: TriggerReqBody) -> Dict:
params = {
"query": input_value.query,
}
print(f"Receive input value: {input_value}")
return params
def _context_join_fn(context_dict: Dict, chunks: List[Chunk]) -> Dict:
"""context Join function for JoinOperator.
Args:
context_dict (Dict): context dict
chunks (List[Chunk]): chunks
Returns:
Dict: context dict
"""
context_dict["context"] = "\n".join([chunk.content for chunk in chunks])
return context_dict
def _create_vector_connector():
"""Create vector connector."""
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
)
return ChromaStore(
config,
name="embedding_rag_test",
embedding_fn=DefaultEmbeddingFactory(
default_model_name=os.path.join(MODEL_PATH, "text2vec-large-chinese"),
).create(),
)
with DAG("simple_sdk_rag_retriever_example") as dag:
vector_store = _create_vector_connector()
trigger = HttpTrigger(
"/examples/rag/retrieve", methods="POST", request_body=TriggerReqBody
)
request_handle_task = RequestHandleOperator()
query_parser = MapOperator(map_function=lambda x: x["query"])
context_join_operator = JoinOperator(combine_function=_context_join_fn)
rewrite_operator = QueryRewriteOperator(llm_client=OpenAILLMClient())
retriever_context_operator = EmbeddingRetrieverOperator(
top_k=3,
index_store=vector_store,
)
retriever_operator = EmbeddingRetrieverOperator(
top_k=3,
index_store=vector_store,
)
rerank_operator = RerankOperator()
model_parse_task = MapOperator(lambda out: out.to_dict())
trigger >> request_handle_task >> context_join_operator
(
trigger
>> request_handle_task
>> query_parser
>> retriever_context_operator
>> context_join_operator
)
context_join_operator >> rewrite_operator >> retriever_operator >> rerank_operator
if __name__ == "__main__":
if dag.leaf_nodes[0].dev_mode:
# Development mode, you can run the dag locally for debugging.
from dbgpt.core.awel import setup_dev_environment
setup_dev_environment([dag], port=5555)
else:
pass
"""Summary extractor example.
pre-requirements:
1. install openai python sdk
```
pip install openai
```
2. set openai key and base
```
export OPENAI_API_KEY={your_openai_key}
export OPENAI_API_BASE={your_openai_base}
```
or
```
import os
os.environ["OPENAI_API_KEY"] = {your_openai_key}
os.environ["OPENAI_API_BASE"] = {your_openai_base}
```
Examples:
..code-block:: shell
python examples/rag/summary_extractor_example.py
"""
import asyncio
import os
from dbgpt.configs.model_config import ROOT_PATH
from dbgpt.model.proxy import OpenAILLMClient
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.assembler import SummaryAssembler
from dbgpt_ext.rag.knowledge import KnowledgeFactory
async def main():
file_path = os.path.join(ROOT_PATH, "docs/docs/awel/awel.md")
llm_client = OpenAILLMClient()
knowledge = KnowledgeFactory.from_file_path(file_path)
chunk_parameters = ChunkParameters(chunk_strategy="CHUNK_BY_SIZE")
assembler = SummaryAssembler.load_from_knowledge(
knowledge=knowledge,
chunk_parameters=chunk_parameters,
llm_client=llm_client,
model_name="gpt-3.5-turbo",
)
return await assembler.generate_summary()
if __name__ == "__main__":
output = asyncio.run(main())
print(f"output: \n\n{output}")
import asyncio
import json
import shutil
import pandas as pd
from dbgpt.configs.model_config import PILOT_PATH
from dbgpt.core import (
ChatPromptTemplate,
HumanPromptTemplate,
SQLOutputParser,
SystemPromptTemplate,
)
from dbgpt.core.awel import (
DAG,
BranchOperator,
InputOperator,
InputSource,
JoinOperator,
MapOperator,
is_empty_data,
)
from dbgpt.core.operators import PromptBuilderOperator, RequestBuilderOperator
from dbgpt.datasource.operators import DatasourceOperator
from dbgpt.model.operators import LLMOperator
from dbgpt.model.proxy import OpenAILLMClient
from dbgpt.rag.embedding import DefaultEmbeddingFactory
from dbgpt_ext.datasource.rdbms.conn_sqlite import SQLiteTempConnector
from dbgpt_ext.rag import ChunkParameters
from dbgpt_ext.rag.operators import DBSchemaAssemblerOperator
from dbgpt_ext.rag.operators.db_schema import DBSchemaRetrieverOperator
from dbgpt_ext.storage.vector_store.chroma_store import ChromaStore, ChromaVectorConfig
# Delete old vector store directory(/tmp/awel_with_data_vector_store)
shutil.rmtree("/tmp/awel_with_data_vector_store", ignore_errors=True)
embeddings = DefaultEmbeddingFactory.openai()
# Here we use the openai LLM model, if you want to use other models, you can replace
# it according to the previous example.
llm_client = OpenAILLMClient()
db_conn = SQLiteTempConnector.create_temporary_db()
db_conn.create_temp_tables(
{
"user": {
"columns": {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT",
"age": "INTEGER",
},
"data": [
(1, "Tom", 10),
(2, "Jerry", 16),
(3, "Jack", 18),
(4, "Alice", 20),
(5, "Bob", 22),
],
}
}
)
config = ChromaVectorConfig(
persist_path=PILOT_PATH,
name="db_schema_vector_store",
embedding_fn=embeddings,
)
vector_store = ChromaStore(config)
antv_charts = [
{"response_line_chart": "used to display comparative trend analysis data"},
{
"response_pie_chart": "suitable for scenarios such as proportion and distribution statistics"
},
{
"response_table": "suitable for display with many display columns or non-numeric columns"
},
# {"response_data_text":" the default display method, suitable for single-line or simple content display"},
{
"response_scatter_plot": "Suitable for exploring relationships between variables, detecting outliers, etc."
},
{
"response_bubble_chart": "Suitable for relationships between multiple variables, highlighting outliers or special situations, etc."
},
{
"response_donut_chart": "Suitable for hierarchical structure representation, category proportion display and highlighting key categories, etc."
},
{
"response_area_chart": "Suitable for visualization of time series data, comparison of multiple groups of data, analysis of data change trends, etc."
},
{
"response_heatmap": "Suitable for visual analysis of time series data, large-scale data sets, distribution of classified data, etc."
},
]
display_type = "\n".join(
f"{key}:{value}" for dict_item in antv_charts for key, value in dict_item.items()
)
system_prompt = """You are a database expert. Please answer the user's question based on the database selected by the user and some of the available table structure definitions of the database.
Database name:
{db_name}
Table structure definition:
{table_info}
Constraint:
1.Please understand the user's intention based on the user's question, and use the given table structure definition to create a grammatically correct {dialect} sql. If sql is not required, answer the user's question directly..
2.Always limit the query to a maximum of {top_k} results unless the user specifies in the question the specific number of rows of data he wishes to obtain.
3.You can only use the tables provided in the table structure information to generate sql. If you cannot generate sql based on the provided table structure, please say: "The table structure information provided is not enough to generate sql queries." It is prohibited to fabricate information at will.
4.Please be careful not to mistake the relationship between tables and columns when generating SQL.
5.Please check the correctness of the SQL and ensure that the query performance is optimized under correct conditions.
6.Please choose the best one from the display methods given below for data rendering, and put the type name into the name parameter value that returns the required format. If you cannot find the most suitable one, use 'Table' as the display method.
the available data display methods are as follows: {display_type}
User Question:
{user_input}
Please think step by step and respond according to the following JSON format:
{response}
Ensure the response is correct json and can be parsed by Python json.loads.
"""
RESPONSE_FORMAT_SIMPLE = {
"thoughts": "thoughts summary to say to user",
"sql": "SQL Query to run",
"display_type": "Data display method",
}
prompt = ChatPromptTemplate(
messages=[
SystemPromptTemplate.from_template(
system_prompt,
response_format=json.dumps(
RESPONSE_FORMAT_SIMPLE, ensure_ascii=False, indent=4
),
),
HumanPromptTemplate.from_template("{user_input}"),
]
)
class TwoSumOperator(MapOperator[pd.DataFrame, int]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, df: pd.DataFrame) -> int:
return await self.blocking_func_to_async(self._two_sum, df)
def _two_sum(self, df: pd.DataFrame) -> int:
return df["age"].sum()
def branch_even(x: int) -> bool:
return x % 2 == 0
def branch_odd(x: int) -> bool:
return not branch_even(x)
class DataDecisionOperator(BranchOperator[int, int]):
def __init__(self, odd_task_name: str, even_task_name: str, **kwargs):
super().__init__(**kwargs)
self.odd_task_name = odd_task_name
self.even_task_name = even_task_name
async def branches(self):
return {branch_even: self.even_task_name, branch_odd: self.odd_task_name}
class OddOperator(MapOperator[int, str]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, x: int) -> str:
print(f"{x} is odd")
return f"{x} is odd"
class EvenOperator(MapOperator[int, str]):
def __init__(self, **kwargs):
super().__init__(**kwargs)
async def map(self, x: int) -> str:
print(f"{x} is even")
return f"{x} is even"
class MergeOperator(JoinOperator[str]):
def __init__(self, **kwargs):
super().__init__(combine_function=self.merge_func, **kwargs)
async def merge_func(self, odd: str, even: str) -> str:
return odd if not is_empty_data(odd) else even
with DAG("load_schema_dag") as load_schema_dag:
input_task = InputOperator.dummy_input()
# Load database schema to vector store
assembler_task = DBSchemaAssemblerOperator(
connector=db_conn,
index_store=vector_store,
chunk_parameters=ChunkParameters(chunk_strategy="CHUNK_BY_SIZE"),
)
input_task >> assembler_task
chunks = asyncio.run(assembler_task.call())
print(chunks)
with DAG("chat_data_dag") as chat_data_dag:
input_task = InputOperator(input_source=InputSource.from_callable())
retriever_task = DBSchemaRetrieverOperator(
top_k=1,
index_store=vector_store,
)
content_task = MapOperator(lambda cks: [c.content for c in cks])
merge_task = JoinOperator(
lambda table_info, ext_dict: {"table_info": table_info, **ext_dict}
)
prompt_task = PromptBuilderOperator(prompt)
req_build_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = LLMOperator(llm_client=llm_client)
sql_parse_task = SQLOutputParser()
db_query_task = DatasourceOperator(connector=db_conn)
(
input_task
>> MapOperator(lambda x: x["user_input"])
>> retriever_task
>> content_task
>> merge_task
)
input_task >> merge_task
merge_task >> prompt_task >> req_build_task >> llm_task >> sql_parse_task
sql_parse_task >> MapOperator(lambda x: x["sql"]) >> db_query_task
two_sum_task = TwoSumOperator()
decision_task = DataDecisionOperator(
odd_task_name="odd_task", even_task_name="even_task"
)
odd_task = OddOperator(task_name="odd_task")
even_task = EvenOperator(task_name="even_task")
merge_task = MergeOperator()
db_query_task >> two_sum_task >> decision_task
decision_task >> odd_task >> merge_task
decision_task >> even_task >> merge_task
final_result = asyncio.run(
merge_task.call(
{
"user_input": "Query the name and age of users younger than 18 years old",
"db_name": "user_management",
"dialect": "SQLite",
"top_k": 1,
"display_type": display_type,
"response": json.dumps(
RESPONSE_FORMAT_SIMPLE, ensure_ascii=False, indent=4
),
}
)
)
print("The final result is:")
print(final_result)
import asyncio
from dbgpt.core import BaseOutputParser
from dbgpt.core.awel import DAG
from dbgpt.core.operators import (
BaseLLMOperator,
PromptBuilderOperator,
RequestBuilderOperator,
)
from dbgpt.model.proxy import OpenAILLMClient
with DAG("simple_sdk_llm_example_dag") as dag:
prompt_task = PromptBuilderOperator(
"Write a SQL of {dialect} to query all data of {table_name}."
)
model_pre_handle_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = BaseLLMOperator(OpenAILLMClient())
out_parse_task = BaseOutputParser()
prompt_task >> model_pre_handle_task >> llm_task >> out_parse_task
if __name__ == "__main__":
output = asyncio.run(
out_parse_task.call(call_data={"dialect": "mysql", "table_name": "user"})
)
print(f"output: \n\n{output}")
import asyncio
import json
from typing import Dict, List
from dbgpt.core import SQLOutputParser
from dbgpt.core.awel import (
DAG,
InputOperator,
JoinOperator,
MapOperator,
SimpleCallDataInputSource,
)
from dbgpt.core.operators import (
BaseLLMOperator,
PromptBuilderOperator,
RequestBuilderOperator,
)
from dbgpt.datasource.operators.datasource_operator import DatasourceOperator
from dbgpt.model.proxy import OpenAILLMClient
from dbgpt.rag.operators.datasource import DatasourceRetrieverOperator
from dbgpt_ext.datasource.rdbms.conn_sqlite import SQLiteTempConnector
def _create_temporary_connection():
"""Create a temporary database connection for testing."""
conn = SQLiteTempConnector.create_temporary_db()
conn.create_temp_tables(
{
"user": {
"columns": {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT",
"age": "INTEGER",
},
"data": [
(1, "Tom", 10),
(2, "Jerry", 16),
(3, "Jack", 18),
(4, "Alice", 20),
(5, "Bob", 22),
],
}
}
)
return conn
def _sql_prompt() -> str:
"""This is a prompt template for SQL generation.
Format of arguments:
{db_name}: database name
{table_info}: table structure information
{dialect}: database dialect
{top_k}: maximum number of results
{user_input}: user question
{response}: response format
Returns:
str: prompt template
"""
return """Please answer the user's question based on the database selected by the user and some of the available table structure definitions of the database.
Database name:
{db_name}
Table structure definition:
{table_info}
Constraint:
1.Please understand the user's intention based on the user's question, and use the given table structure definition to create a grammatically correct {dialect} sql. If sql is not required, answer the user's question directly..
2.Always limit the query to a maximum of {top_k} results unless the user specifies in the question the specific number of rows of data he wishes to obtain.
3.You can only use the tables provided in the table structure information to generate sql. If you cannot generate sql based on the provided table structure, please say: "The table structure information provided is not enough to generate sql queries." It is prohibited to fabricate information at will.
4.Please be careful not to mistake the relationship between tables and columns when generating SQL.
5.Please check the correctness of the SQL and ensure that the query performance is optimized under correct conditions.
User Question:
{user_input}
Please think step by step and respond according to the following JSON format:
{response}
Ensure the response is correct json and can be parsed by Python json.loads.
"""
def _join_func(query_dict: Dict, db_summary: List[str]):
"""Join function for JoinOperator.
Build the format arguments for the prompt template.
Args:
query_dict (Dict): The query dict from DAG input.
db_summary (List[str]): The table structure information from DatasourceRetrieverOperator.
Returns:
Dict: The query dict with the format arguments.
"""
default_response = {
"thoughts": "thoughts summary to say to user",
"sql": "SQL Query to run",
}
response = json.dumps(default_response, ensure_ascii=False, indent=4)
query_dict["table_info"] = db_summary
query_dict["response"] = response
return query_dict
class SQLResultOperator(JoinOperator[Dict]):
"""Merge the SQL result and the model result."""
def __init__(self, **kwargs):
super().__init__(combine_function=self._combine_result, **kwargs)
def _combine_result(self, sql_result_df, model_result: Dict) -> Dict:
model_result["data_df"] = sql_result_df
return model_result
with DAG("simple_sdk_llm_sql_example") as dag:
db_connection = _create_temporary_connection()
input_task = InputOperator(input_source=SimpleCallDataInputSource())
retriever_task = DatasourceRetrieverOperator(connector=db_connection)
# Merge the input data and the table structure information.
prompt_input_task = JoinOperator(combine_function=_join_func)
prompt_task = PromptBuilderOperator(_sql_prompt())
model_pre_handle_task = RequestBuilderOperator(model="gpt-3.5-turbo")
llm_task = BaseLLMOperator(OpenAILLMClient())
out_parse_task = SQLOutputParser()
sql_parse_task = MapOperator(map_function=lambda x: x["sql"])
db_query_task = DatasourceOperator(connector=db_connection)
sql_result_task = SQLResultOperator()
input_task >> prompt_input_task
input_task >> retriever_task >> prompt_input_task
(
prompt_input_task
>> prompt_task
>> model_pre_handle_task
>> llm_task
>> out_parse_task
>> sql_parse_task
>> db_query_task
>> sql_result_task
)
out_parse_task >> sql_result_task
if __name__ == "__main__":
input_data = {
"db_name": "test_db",
"dialect": "sqlite",
"top_k": 5,
"user_input": "What is the name and age of the user with age less than 18",
}
output = asyncio.run(sql_result_task.call(call_data=input_data))
print(f"\nthoughts: {output.get('thoughts')}\n")
print(f"sql: {output.get('sql')}\n")
print(f"result data:\n{output.get('data_df')}")
# TuGraph DB项目生态图谱
Entities:
(TuGraph-family/tugraph-db#github_repo)
(vesoft-inc/nebula#github_repo)
(PaddlePaddle/Paddle#github_repo)
(apache/brpc#github_repo)
(TuGraph-family/tugraph-web#github_repo)
(TuGraph-family/tugraph-db-client-java#github_repo)
(alibaba/GraphScope#github_repo)
(ClickHouse/ClickHouse#github_repo)
(TuGraph-family/fma-common#github_repo)
(vesoft-inc/nebula-docs-cn#github_repo)
(eosphoros-ai/DB-GPT#github_repo)
(eosphoros-ai#github_organization)
(yandex#github_organization)
(alibaba#github_organization)
(TuGraph-family#github_organization)
(baidu#github_organization)
(apache#github_organization)
(vesoft-inc#github_organization)
Relationships:
(TuGraph-family/tugraph-db#common_developer#vesoft-inc/nebula#common_developer count 10)
(TuGraph-family/tugraph-db#common_developer#PaddlePaddle/Paddle#common_developer count 9)
(TuGraph-family/tugraph-db#common_developer#apache/brpc#common_developer count 7)
(TuGraph-family/tugraph-db#common_developer#TuGraph-family/tugraph-web#common_developer count 7)
(TuGraph-family/tugraph-db#common_developer#TuGraph-family/tugraph-db-client-java#common_developer count 7)
(TuGraph-family/tugraph-db#common_developer#alibaba/GraphScope#common_developer count 6)
(TuGraph-family/tugraph-db#common_developer#ClickHouse/ClickHouse#common_developer count 6)
(TuGraph-family/tugraph-db#common_developer#TuGraph-family/fma-common#common_developer count 6)
(TuGraph-family/tugraph-db#common_developer#vesoft-inc/nebula-docs-cn#common_developer count 6)
(TuGraph-family/tugraph-db#common_developer#eosphoros-ai/DB-GPT#common_developer count 6)
(eosphoros-ai/DB-GPT#belong_to#eosphoros-ai#belong_to)
(ClickHouse/ClickHouse#belong_to#yandex#belong_to)
(alibaba/GraphScope#belong_to#alibaba#belong_to)
(TuGraph-family/tugraph-db#belong_to#TuGraph-family#belong_to)
(TuGraph-family/tugraph-web#belong_to#TuGraph-family#belong_to)
(TuGraph-family/fma-common#belong_to#TuGraph-family#belong_to)
(TuGraph-family/tugraph-db-client-java#belong_to#TuGraph-family#belong_to)
(PaddlePaddle/Paddle#belong_to#baidu#belong_to)
(apache/brpc#belong_to#apache#belong_to)
(vesoft-inc/nebula#belong_to#vesoft-inc#belong_to)
(vesoft-inc/nebula-docs-cn#belong_to#vesoft-inc#belong_to)
# DB-GPT项目生态图谱
Entities:
(eosphoros-ai/DB-GPT#github_repo)
(chatchat-space/Langchain-Chatchat#github_repo)
(hiyouga/LLaMA-Factory#github_repo)
(lm-sys/FastChat#github_repo)
(langchain-ai/langchain#github_repo)
(eosphoros-ai/DB-GPT-Hub#github_repo)
(THUDM/ChatGLM-6B#github_repo)
(langgenius/dify#github_repo)
(vllm-project/vllm#github_repo)
(QwenLM/Qwen#github_repo)
(PaddlePaddle/PaddleOCR#github_repo)
(vllm-project#github_organization)
(eosphoros-ai#github_organization)
(PaddlePaddle#github_organization)
(QwenLM#github_organization)
(THUDM#github_organization)
(lm-sys#github_organization)
(chatchat-space#github_organization)
(langchain-ai#github_organization)
(langgenius#github_organization)
Relationships:
(eosphoros-ai/DB-GPT#common_developer#chatchat-space/Langchain-Chatchat#common_developer count 82)
(eosphoros-ai/DB-GPT#common_developer#hiyouga/LLaMA-Factory#common_developer count 45)
(eosphoros-ai/DB-GPT#common_developer#lm-sys/FastChat#common_developer count 39)
(eosphoros-ai/DB-GPT#common_developer#langchain-ai/langchain#common_developer count 37)
(eosphoros-ai/DB-GPT#common_developer#eosphoros-ai/DB-GPT-Hub#common_developer count 37)
(eosphoros-ai/DB-GPT#common_developer#THUDM/ChatGLM-6B#common_developer count 31)
(eosphoros-ai/DB-GPT#common_developer#langgenius/dify#common_developer count 30)
(eosphoros-ai/DB-GPT#common_developer#vllm-project/vllm#common_developer count 27)
(eosphoros-ai/DB-GPT#common_developer#QwenLM/Qwen#common_developer count 26)
(eosphoros-ai/DB-GPT#common_developer#PaddlePaddle/PaddleOCR#common_developer count 24)
(vllm-project/vllm#belong_to#vllm-project#belong_to)
(eosphoros-ai/DB-GPT#belong_to#eosphoros-ai#belong_to)
(eosphoros-ai/DB-GPT-Hub#belong_to#eosphoros-ai#belong_to)
(PaddlePaddle/PaddleOCR#belong_to#PaddlePaddle#belong_to)
(QwenLM/Qwen#belong_to#QwenLM#belong_to)
(THUDM/ChatGLM-6B#belong_to#THUDM#belong_to)
(lm-sys/FastChat#belong_to#lm-sys#belong_to)
(chatchat-space/Langchain-Chatchat#belong_to#chatchat-space#belong_to)
(langchain-ai/langchain#belong_to#langchain-ai#belong_to)
(langgenius/dify#belong_to#langgenius#belong_to)
# TuGraph简介
TuGraph图数据库由蚂蚁集团与清华大学联合研发,构建了一套包含图存储、图计算、图学习、图研发平台的完善的图技术体系,支持海量多源的关联数据的实时处理,显著提升数据分析效率,支撑了蚂蚁支付、安全、社交、公益、数据治理等300多个场景应用。拥有业界领先规模的图集群,解决了图数据分析面临的大数据量、高吞吐率和低延迟等重大挑战,是蚂蚁集团金融风控能力的重要基础设施,显著提升了欺诈洗钱等金融风险的实时识别能力和审理分析效率,并面向金融、工业、政务服务等行业客户。TuGraph产品家族中,开源产品包括:TuGraph DB、TuGraph Analytics、OSGraph、ChatTuGraph等。内源产品包括:GeaBase、GeaFlow、GeaLearn、GeaMaker等。
# DB-GPT简介
DB-GPT是一个开源的AI原生数据应用开发框架(AI Native Data App Development framework with AWEL(Agentic Workflow Expression Language) and Agents)。目的是构建大模型领域的基础设施,通过开发多模型管理(SMMF)、Text2SQL效果优化、RAG框架以及优化、Multi-Agents框架协作、AWEL(智能体工作流编排)等多种技术能力,让围绕数据库构建大模型应用更简单,更方便。
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment