-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
111 lines (92 loc) · 4.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import json
import requests
import pandas as pd
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
CUBE_API_URL = "http://localhost:4000/cubejs-api/v1"
def get_cube_meta() -> str:
"""returns data about cube, view defintions
declared in cube-core.
"""
CUBE_META_URL = CUBE_API_URL + "/meta"
response = requests.get(url=CUBE_META_URL)
return response.text
def get_system_prompt() -> str:
"""injects cube meta information and returns a prompt string to be used as
system prompt in LLM.
"""
system_prompt = """
You are an expert in generating parameters for quering the cube semantic layer.
Generate json object to request data from cube semantic layer.
Do not assume existance of any field. If any unknown data is requested, just explain in reply why the user request cannot be translated to cube params.
If the user asks for current year data, generate parameters for until June of this current year 2024.
Do not give any assumed answers.
Do not join if not mentioned.
Here is the cube definition files used:
@CUBE_META_INFORMATION@
Recheck that you have generated properly.
Few examples:
user query: sales per person in this current year divided by month
json parameters: { "measures": [ "orders.total_sales" ], "timeDimensions": [ { "dimension": "orders.updated_at", "granularity": "month", "dateRange": "This year" } ], "order": { "orders.total_sales": "desc" }, "dimensions": [ "orders.email" ] }
user query: get me average order value grouped by city for this quarter, weekly.
json parameters: { "timeDimensions": [ { "dimension": "orders.updated_at", "granularity": "week", "dateRange": "This quarter" } ], "order": { "orders.updated_at": "asc" }, "dimensions": [ "orders.city" ], "measures": [ "orders.average_order_value" ] }
user query: get me average order value for each month in the year 2023 by city.
json parameters: {'timeDimensions': [{'dimension': 'orders.updated_at', 'granularity': 'month', 'dateRange': ['2023-01-01', '2023-12-31']}], 'dimensions': ['orders.city'], 'measures': ['orders.average_order_value']}
"""
system_prompt.replace("@CUBE_META_INFORMATION@", get_cube_meta())
return system_prompt
def get_data(params: dict) -> dict:
CUBE_DATAFETCH_URL = CUBE_API_URL + "/load"
response = requests.get(url=CUBE_DATAFETCH_URL, params={'query': json.dumps(params)})
return response.json()
def process_params(result):
"""check if llm was able to generate json parameters.
"""
try:
params_str = result.content.split("json parameters:")[1]
params = json.loads(params_str)
print(f"Parameters generated by LLM: {params}")
return params
except (IndexError, json.JSONDecodeError) as ex:
print(f"Error parsing parameters from LLM response: {ex}")
print(f"Raw content: {result.content}")
return result.content
def fetch_data_from_source(params):
"""check and return data if data fetching from source
is successful.
"""
try:
results = get_data(params)
data = results.get('data')
if data:
print("Data fetched from data source:")
print(data)
return pd.DataFrame(data)
else:
print("No data found in response.")
return "No data found in response."
except Exception as ex:
print(f"Error fetching data: {ex}")
return f"Error fetching data: {ex}"
def initiate_llm(user_query: str):
llm = ChatOpenAI(temperature=0, model='gpt-4o-mini')
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"{system_template}",
),
("human", "{input}"),
]
)
chain = prompt | llm
result = chain.invoke(
{
"system_template": get_system_prompt(),
"input": user_query
}
)
params = process_params(result)
yield params
if params:
yield fetch_data_from_source(params)