Skip to content

Commit

Permalink
Merge pull request #48 from valmi-io/feat.new_prompts
Browse files Browse the repository at this point in the history
Added support for new prompts
  • Loading branch information
supradeep2819 authored Jun 19, 2024
2 parents 2fb4f0f + 6630fe2 commit 20e3b45
Show file tree
Hide file tree
Showing 15 changed files with 613 additions and 111 deletions.
18 changes: 18 additions & 0 deletions core/migrations/0033_prompt_time_grain_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.1.5 on 2024-06-17 11:51

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0032_auto_20240607_1146'),
]

operations = [
migrations.AddField(
model_name='prompt',
name='time_grain_enabled',
field=models.BooleanField(default=False),
),
]
18 changes: 18 additions & 0 deletions core/migrations/0034_prompt_time_window_enabled.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 3.1.5 on 2024-06-18 11:44

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('core', '0033_prompt_time_grain_enabled'),
]

operations = [
migrations.AddField(
model_name='prompt',
name='time_window_enabled',
field=models.BooleanField(default=True),
),
]
34 changes: 21 additions & 13 deletions core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,14 @@ class Credential(models.Model):

def __str__(self):
return f"{self.connector}: {self.connector_config} : {self.workspace}: {self.id} : {self.name}"


class StorageCredentials(models.Model):
id = models.UUIDField(primary_key=True, editable=False, default=uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))
workspace = models.ForeignKey(to=Workspace, on_delete=models.CASCADE, related_name="storage_credentials")
connector_config = models.JSONField(blank=False, null=False)


class Source(models.Model):
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
Expand Down Expand Up @@ -135,28 +136,34 @@ class Connector(models.Model):


class Package(models.Model):
name = models.CharField(primary_key=True,max_length=256, null=False, blank=False)
name = models.CharField(primary_key=True, max_length=256, null=False, blank=False)
scopes = ArrayField(models.CharField(max_length=64), blank=True, default=list)
gated = models.BooleanField(null=False, blank = False, default=True)
gated = models.BooleanField(null=False, blank=False, default=True)


class Prompt(models.Model):
id = models.UUIDField(primary_key=True, editable=False, default=uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))
name = models.CharField(max_length=256, null=False, blank=False,unique=True)
description = models.CharField(max_length=1000, null=False, blank=False,default="aaaaaa")
type = models.CharField(null=False, blank = False,max_length=256, default="SRC_SHOPIFY")
name = models.CharField(max_length=256, null=False, blank=False, unique=True)
description = models.CharField(max_length=1000, null=False, blank=False, default="aaaaaa")
type = models.CharField(null=False, blank=False, max_length=256, default="SRC_SHOPIFY")
# spec = models.JSONField(blank=False, null=True)
filters = models.JSONField(default=dict)
operators = models.JSONField(default={
'string': ["=", "!=", "IN", "NOT IN"],
'integer': ["=", ">", "<", ">=", "<=", "!="]
})
query = models.CharField(max_length=1000,null=False, blank=False,default="query")
package_id = models.CharField(null=False, blank = False,max_length=20,default="P0")
gated = models.BooleanField(null=False, blank = False, default=True)
query = models.CharField(max_length=1000, null=False, blank=False, default="query")
package_id = models.CharField(null=False, blank=False, max_length=20, default="P0")
gated = models.BooleanField(null=False, blank=False, default=True)
time_grain_enabled = models.BooleanField(null=False, blank=False, default=False)
time_window_enabled = models.BooleanField(null=False, blank=False, default=True)


class SourceAccessInfo(models.Model):
source = models.ForeignKey(to=Source, on_delete=models.CASCADE, related_name="source_access_info",primary_key=True)
storage_credentials = models.ForeignKey(to=StorageCredentials,on_delete=models.CASCADE,related_name="source_access_info")
source = models.ForeignKey(to=Source, on_delete=models.CASCADE, related_name="source_access_info", primary_key=True)
storage_credentials = models.ForeignKey(
to=StorageCredentials, on_delete=models.CASCADE, related_name="source_access_info")


class Account(models.Model):
id = models.UUIDField(primary_key=True, editable=False, default=uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))
Expand All @@ -171,14 +178,15 @@ class Explore(models.Model):
created_at = models.DateTimeField(default=timezone.now)
updated_at = models.DateTimeField(auto_now=True)
id = models.UUIDField(primary_key=True, editable=False, default=uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"))
name = models.CharField(max_length=256, null=False, blank=False,default="aaaaaa")
name = models.CharField(max_length=256, null=False, blank=False, default="aaaaaa")
workspace = models.ForeignKey(to=Workspace, on_delete=models.CASCADE, related_name="explore_workspace")
prompt = models.ForeignKey(to=Prompt, on_delete=models.CASCADE, related_name="explore_prompt")
sync = models.ForeignKey(to=Sync, on_delete=models.CASCADE, related_name="explore_sync")
ready = models.BooleanField(null=False, blank = False, default=False)
ready = models.BooleanField(null=False, blank=False, default=False)
account = models.ForeignKey(to=Account, on_delete=models.CASCADE, related_name="explore_account")
spreadsheet_url = models.URLField(null=True, blank=True, default="https://example.com")


class ValmiUserIDJitsuApiToken(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE, primary_key=True)
api_token = models.CharField(max_length=256, blank=True, null=True)
Expand Down
35 changes: 14 additions & 21 deletions core/routes/engine_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,21 @@
"""

import json
import logging
from typing import Dict, List

from decouple import Csv, config
# from opentelemetry import trace
from ninja import Router

from core.schemas.schemas import ConnectorSchema, DetailSchema, PackageSchema, PromptSchema, SyncSchema

from ..models import (
Connector,
Package,
Prompt,
Sync,
OAuthApiKeys
)
import json

from opentelemetry.metrics import get_meter_provider
# from opentelemetry import trace
from django.db import connection

from core.schemas.schemas import (ConnectorSchema, DetailSchema, PackageSchema,
PromptSchema, SyncSchema)
from valmi_app_backend.utils import replace_values_in_json

from ..models import Connector, OAuthApiKeys, Package, Prompt, Sync

router = Router()

# Get an instance of a logger
Expand Down Expand Up @@ -103,8 +96,8 @@ def create_connector(request, payload: PromptSchema):
logger.debug(data)
try:
logger.debug("creating prompt")
prompts = Prompt.objects.create(**data)
return (200, prompts)
prompt = Prompt.objects.create(**data)
return (200, prompt)
except Exception as ex:
logger.debug(f"prompt not created. Attempting to update.")
# Prompt.objects.filter(name=data['name']) will only return one item as name is unique for every prompt
Expand All @@ -116,11 +109,11 @@ def create_connector(request, payload: PromptSchema):
elif rows_updated == 1:
logger.debug(f"prompt updated")
else:
logger.debug(f"something went wrong while creating/updating prompt. message: {ex}")
finally:
if connection.queries:
last_query = connection.queries[-1]
logger.debug(f"last executed SQL: {last_query['sql']}")
msg = f"something went wrong while creating/updating prompt. message: {ex}"
logger.debug(msg)
return (400, {"detail": msg})
return (200, data)



@router.post("/packages/create", response={200: PackageSchema, 400: DetailSchema})
Expand Down
23 changes: 15 additions & 8 deletions core/routes/explore_api.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import asyncio
import logging
from typing import List
import uuid
from typing import List

from decouple import config
from core.models import Account, Explore, Prompt, Workspace
from core.schemas.schemas import DetailSchema, SyncStartStopSchemaIn
from core.schemas.explore import ExploreSchema, ExploreSchemaIn, ExploreSchemaOut
from ninja import Router

from core.models import Account, Explore, Prompt, Workspace
from core.schemas.explore import (ExploreSchema, ExploreSchemaIn,
ExploreSchemaOut)
from core.schemas.schemas import DetailSchema, SyncStartStopSchemaIn
from core.services.explore import ExploreService
from core.services.prompts import PromptService

logger = logging.getLogger(__name__)

router = Router()
Expand All @@ -32,6 +32,7 @@ def get_explores(request, workspace_id):
explore.sync_id = str(explore.sync.id)
latest_sync_info = ExploreService.get_latest_sync_info(explore.sync.id)
logger.debug(latest_sync_info)
# as we are using full_refresh as true in explore creation enable only if previous sync got succeded
# checking whether run is created for explore or not
if latest_sync_info.found == False:
explore.enabled = False
Expand All @@ -46,6 +47,8 @@ def get_explores(request, workspace_id):
else:
explore.last_sync_result = latest_sync_info.status.upper()
explore.sync_state = 'IDLE'
if latest_sync_info.status == 'success':
explore.last_sync_succeeded_at = latest_sync_info.created_at
explore.last_sync_created_at = latest_sync_info.created_at
# adding last successful sync info
last_successful_sync_info = PromptService.is_sync_finished(explore.sync.id)
Expand Down Expand Up @@ -89,21 +92,24 @@ def create_explore(request, workspace_id, payload: ExploreSchemaIn):
account = Account.objects.create(**account_info)
data["account"] = account
# create source

source = ExploreService.create_source(
table_name, data["prompt_id"], data["schema_id"], data["time_window"], data["filters"], workspace_id, account)
table_name, data["prompt_id"], data["schema_id"], data["time_window"], data["filters"], data["time_grain"], workspace_id, account)
# create destination
spreadsheet_title = f"valmi.io {prompt.name} sheet"
destination_data = ExploreService.create_destination(
spreadsheet_title, data["name"], data["sheet_url"], workspace_id, account)
spreadsheet_url = destination_data[0]
destination = destination_data[1]
# create sync
sync = ExploreService.create_sync(source, destination, workspace_id)
# creating the sync
sync = ExploreService.create_sync(data["name"], source, destination, workspace_id)
logger.debug("After sync")
# creating explore
del data["schema_id"]
del data["filters"]
del data["time_window"]
del data["sheet_url"]
del data["time_grain"]
# data["name"] = f"valmiio {prompt.name}"
data["sync"] = sync
data["ready"] = False
Expand All @@ -128,3 +134,4 @@ def get_explore_by_id(request, workspace_id, explore_id):
except Exception:
logger.exception("explore listing error")
return (500, {"detail": "The explore cannot be fetched."})
return (500, {"detail": "The explore cannot be fetched."})
20 changes: 13 additions & 7 deletions core/routes/prompt_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
from decimal import Decimal
import json
import logging
from decimal import Decimal
from typing import List

import psycopg2
Expand All @@ -10,7 +10,7 @@

from core.models import (Credential, Prompt, Source, SourceAccessInfo,
StorageCredentials, Sync)
from core.schemas.prompt import PromptPreviewSchemaIn, TableInfo
from core.schemas.prompt import PromptPreviewSchemaIn, TableInfo, TimeGrain
from core.schemas.schemas import (DetailSchema, PromptByIdSchema,
PromptSchemaOut)
from core.services.prompts import PromptService
Expand Down Expand Up @@ -52,19 +52,23 @@ def get_prompt_by_id(request, workspace_id, prompt_id):
if source_access_info := info.source_access_info.first():
storage_id = source_access_info.storage_credentials.id
if storage_id not in schemas:
logger.debug(info.credential.name)
schema = {
"id": str(storage_id),
"name": source_access_info.storage_credentials.connector_config["schema"],
"sources": [],
}
schemas[storage_id] = schema
formatted_output = info.credential.created_at.strftime('%B %d %Y %H:%M')
schemas[storage_id]["sources"].append({
"name": f"{info.credential.name}${info.credential.created_at}",
"name": f"{info.credential.name}@{formatted_output}",
"id": str(info.id),
})
# Convert schemas dictionary to a list (optional)
final_schemas = list(schemas.values())
prompt.schemas = final_schemas
if prompt.time_grain_enabled:
prompt.time_grain = TimeGrain.members()
logger.debug(prompt)
return prompt
except Exception:
Expand All @@ -75,6 +79,8 @@ def get_prompt_by_id(request, workspace_id, prompt_id):
def custom_serializer(obj):
if isinstance(obj, datetime.datetime):
return obj.isoformat()
if isinstance(obj, datetime.date):
return obj.isoformat()
if isinstance(obj, Decimal):
return str(obj)

Expand All @@ -83,7 +89,7 @@ def custom_serializer(obj):
def preview_data(request, workspace_id, prompt_id, prompt_req: PromptPreviewSchemaIn):
try:
prompt = Prompt.objects.get(id=prompt_id)

# checking wether prompt is enabled or not
if not PromptService.is_enabled(workspace_id, prompt):
detail_message = f"The prompt is not enabled. Please add '{prompt.type}' connector"
Expand All @@ -93,16 +99,16 @@ def preview_data(request, workspace_id, prompt_id, prompt_req: PromptPreviewSche
sync_id = sync.id
# checking wether sync has finished or not(from shopify to DB)
latest_sync_info = PromptService.is_sync_finished(sync_id)
# if latest_sync_info.found == False or latest_sync_info.status == 'running':
# return 400, {"detail": "The sync is not finished. Please wait for the sync to finish."}
if latest_sync_info.found == False:
return 400, {"detail": "The sync is not finished. Please wait for the sync to finish."}
storage_credentials = StorageCredentials.objects.get(id=prompt_req.schema_id)
schema_name = storage_credentials.connector_config["schema"]
table_info = TableInfo(
tableSchema=schema_name,
query=prompt.query
)

query = PromptService().build(table_info, prompt_req.time_window, prompt_req.filters)
query = PromptService().build(table_info, prompt_req.time_window, prompt_req.filters, prompt_req.time_grain)
query = query + " limit 10"
logger.debug(query)
host = storage_credentials.connector_config.get('host')
Expand Down
12 changes: 6 additions & 6 deletions core/routes/workspace_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ def create_sync(request, workspace_id, payload: SyncSchemaIn):
del data["destination_id"]
schedule = {}
if len(data["schedule"]) == 0:
schedule["run_interval"] = 3600000
schedule["run_interval"] = 86400000
data["schedule"] = schedule
data["workspace"] = Workspace.objects.get(id=workspace_id)
data["id"] = uuid.uuid4()
Expand Down Expand Up @@ -338,12 +338,12 @@ def create_sync(request, workspace_id, payload: SyncSchemaInWithSourcePayload):
stream["destination_sync_mode"] = "append_dedup"
# creating source credential
source_credential_payload = CredentialSchemaIn(
name="shopify", account=data["account"], connector_type=data["source"]["type"],
name=data["name"], account=data["account"], connector_type=data["source"]["type"],
connector_config=data["source"]["config"])
source_credential = create_credential(request, workspace_id, source_credential_payload)
# creating source
source_payload = SourceSchemaIn(
name="shopify", credential_id=source_credential.id, catalog=catalog)
name=data["name"], credential_id=source_credential.id, catalog=catalog)
source = create_source(request, workspace_id, source_payload)
workspace = Workspace.objects.get(id=workspace_id)
# creating default warehouse
Expand All @@ -352,17 +352,17 @@ def create_sync(request, workspace_id, payload: SyncSchemaInWithSourcePayload):
SourceAccessInfo.objects.create(**source_access_info)
# creating destination credential
destination_credential_payload = CredentialSchemaIn(
name="VALMI_ENGINE", account=data["account"], connector_type="DEST_POSTGRES-DEST", connector_config=storage_credentials.connector_config)
name="VALMI_DATA_STORE", account=data["account"], connector_type="DEST_POSTGRES-DEST", connector_config=storage_credentials.connector_config)
destination_credential = create_credential(request, workspace_id, destination_credential_payload)
# creating destination
destination_payload = DestinationSchemaIn(
name="VALMI_ENGINE", credential_id=destination_credential.id, catalog=catalog)
name="VALMI_DATA_STORE", credential_id=destination_credential.id, catalog=catalog)
destination = create_destination(request, workspace_id, destination_payload)
data["source"] = Source.objects.get(id=source.id)
data["destination"] = Destination.objects.get(id=destination.id)
del data["account"]
if data["schedule"] is None:
schedule = {"run_interval": 3600000}
schedule = {"run_interval": 86400000}
data["schedule"] = schedule
data["workspace"] = Workspace.objects.get(id=workspace_id)
if data["ui_state"] is None:
Expand Down
Loading

0 comments on commit 20e3b45

Please sign in to comment.