Skip to content

Commit

Permalink
use a shared db for content dvms
Browse files Browse the repository at this point in the history
  • Loading branch information
believethehype committed May 17, 2024
1 parent 8171706 commit f5548c6
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 90 deletions.
31 changes: 21 additions & 10 deletions nostr_dvm/tasks/content_discovery_currently_popular.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c

self.last_schedule = Timestamp.now().as_secs()

if self.options.get("db_name"):
self.db_name = self.options.get("db_name")
if self.options.get("db_since"):
self.db_since = int(self.options.get("db_since"))


use_logger = False
if use_logger:
init_logger(LogLevel.DEBUG)

self.sync_db()
if self.dvm_config.UPDATE_DATABASE:
self.sync_db()

if not self.personalized:
self.result = self.calculate_result(self.request_form)

Expand Down Expand Up @@ -117,16 +125,16 @@ def calculate_result(self, request_form):
# Negentropy reconciliation
# Query events from database
timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
lasthour = Timestamp.from_secs(timestamp_hour_ago)
since = Timestamp.from_secs(timestamp_hour_ago)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(lasthour)
filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)
events = cli.database().query([filter1])
ns.finallist = {}
for event in events:
if event.created_at().as_secs() > timestamp_hour_ago:
filt = Filter().kinds([definitions.EventDefinitions.KIND_ZAP, definitions.EventDefinitions.KIND_REPOST,
definitions.EventDefinitions.KIND_REACTION,
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(lasthour)
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(since)
reactions = cli.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.finallist[event.id().to_hex()] = len(reactions)
Expand Down Expand Up @@ -156,7 +164,8 @@ def schedule(self, dvm_config):
return 0
else:
if Timestamp.now().as_secs() >= self.last_schedule + dvm_config.SCHEDULE_UPDATES_SECONDS:
self.sync_db()
if self.dvm_config.UPDATE_DATABASE:
self.sync_db()
self.last_schedule = Timestamp.now().as_secs()
self.result = self.calculate_result(self.request_form)
return 1
Expand Down Expand Up @@ -193,11 +202,12 @@ def sync_db(self):
# We build an example here that we can call by either calling this file directly from the main directory,
# or by adding it to our playground. You can call the example and adjust it to your needs or redefine it in the
# playground or elsewhere
def build_example(name, identifier, admin_config, processing_msg=None):
def build_example(name, identifier, admin_config, options, processing_msg=None, update_db=True):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 minutes
dvm_config.UPDATE_DATABASE = update_db
# Activate these to use a subscription based model instead
# dvm_config.SUBSCRIPTION_REQUIRED = True
# dvm_config.SUBSCRIPTION_DAILY_COST = 1
Expand Down Expand Up @@ -234,14 +244,15 @@ def build_example(name, identifier, admin_config, processing_msg=None):
#admin_config.REBROADCAST_NIP89 = False

return DicoverContentCurrentlyPopular(name=name, dvm_config=dvm_config, nip89config=nip89config,
admin_config=admin_config)
admin_config=admin_config, options=options)


def build_example_subscription(name, identifier, admin_config, processing_msg=None):
def build_example_subscription(name, identifier, admin_config, options, processing_msg=None, update_db=True):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 minutes
dvm_config.SCHEDULE_UPDATES_SECONDS = 180 # Every 3 minutes
dvm_config.UPDATE_DATABASE = update_db
# Activate these to use a subscription based model instead
# dvm_config.SUBSCRIPTION_DAILY_COST = 1
dvm_config.FIX_COST = 0
Expand Down Expand Up @@ -295,7 +306,7 @@ def build_example_subscription(name, identifier, admin_config, processing_msg=No
# admin_config.PRIVKEY = dvm_config.PRIVATE_KEY

return DicoverContentCurrentlyPopular(name=name, dvm_config=dvm_config, nip89config=nip89config,
nip88config=nip88config,
nip88config=nip88config, options=options,
admin_config=admin_config)


Expand Down
45 changes: 27 additions & 18 deletions nostr_dvm/tasks/content_discovery_currently_popular_followers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,17 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c

self.last_schedule = Timestamp.now().as_secs()

if self.options.get("db_name"):
self.db_name = self.options.get("db_name")
if self.options.get("db_since"):
self.db_since = int(self.options.get("db_since"))

use_logger = False
if use_logger:
init_logger(LogLevel.DEBUG)

self.sync_db()
if self.dvm_config.UPDATE_DATABASE:
self.sync_db()

def is_input_supported(self, tags, client=None, dvm_config=None):
for tag in tags:
Expand Down Expand Up @@ -101,7 +107,7 @@ def process(self, request_form):
cli = ClientBuilder().database(database).signer(signer).opts(opts).build()
cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nos.lol")
cli.add_relay("wss://pablof7z.nostr1.com")
cli.add_relay("wss://nostr.mom")

ropts = RelayOptions().ping(False)
cli.add_relay_with_opts("wss://nostr.band", ropts)
Expand All @@ -115,8 +121,8 @@ def process(self, request_form):

# Negentropy reconciliation
# Query events from database
timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
lasthour = Timestamp.from_secs(timestamp_hour_ago)
timestamp_since = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_since)


result_list = []
Expand All @@ -136,18 +142,18 @@ def process(self, request_form):
following = PublicKey.parse(tag.as_vec()[1])
followings.append(following)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).authors(followings).since(lasthour)
filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).authors(followings).since(since)
events = cli.database().query([filter1])

ns.finallist = {}
for event in events:
if event.created_at().as_secs() > timestamp_hour_ago:
filt = Filter().kinds(
[definitions.EventDefinitions.KIND_ZAP, definitions.EventDefinitions.KIND_REACTION, definitions.EventDefinitions.KIND_REPOST,
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(lasthour)
reactions = cli.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.finallist[event.id().to_hex()] = len(reactions)
#if event.created_at().as_secs() > timestamp_since:
filt = Filter().kinds(
[definitions.EventDefinitions.KIND_ZAP, definitions.EventDefinitions.KIND_REACTION, definitions.EventDefinitions.KIND_REPOST,
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(since)
reactions = cli.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.finallist[event.id().to_hex()] = len(reactions)



Expand Down Expand Up @@ -177,7 +183,8 @@ def schedule(self, dvm_config):

else:
if Timestamp.now().as_secs() >= self.last_schedule + dvm_config.SCHEDULE_UPDATES_SECONDS:
self.sync_db()
if self.dvm_config.UPDATE_DATABASE:
self.sync_db()
self.last_schedule = Timestamp.now().as_secs()
return 1

Expand Down Expand Up @@ -211,11 +218,12 @@ def sync_db(self):
# We build an example here that we can call by either calling this file directly from the main directory,
# or by adding it to our playground. You can call the example and adjust it to your needs or redefine it in the
# playground or elsewhere
def build_example(name, identifier, admin_config, processing_msg=None):
def build_example(name, identifier, admin_config, options, processing_msg=None, update_db=True):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 minutes
dvm_config.UPDATE_DATABASE = update_db
# Activate these to use a subscription based model instead
# dvm_config.SUBSCRIPTION_REQUIRED = True
# dvm_config.SUBSCRIPTION_DAILY_COST = 1
Expand Down Expand Up @@ -250,15 +258,16 @@ def build_example(name, identifier, admin_config, processing_msg=None):
# admin_config.UPDATE_PROFILE = False
# admin_config.REBROADCAST_NIP89 = False

return DicoverContentCurrentlyPopularFollowers(name=name, dvm_config=dvm_config, nip89config=nip89config,
return DicoverContentCurrentlyPopularFollowers(name=name, dvm_config=dvm_config, nip89config=nip89config, options=options,
admin_config=admin_config)


def build_example_subscription(name, identifier, admin_config, processing_msg=None):
def build_example_subscription(name, identifier, admin_config, options, processing_msg=None, update_db=True):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 minutes
dvm_config.SCHEDULE_UPDATES_SECONDS = 180 # Every 3 minutes
dvm_config.UPDATE_DATABASE = update_db
# Activate these to use a subscription based model instead
# dvm_config.SUBSCRIPTION_DAILY_COST = 1
dvm_config.FIX_COST = 0
Expand Down Expand Up @@ -305,7 +314,7 @@ def build_example_subscription(name, identifier, admin_config, processing_msg=No
# admin_config.PRIVKEY = dvm_config.PRIVATE_KEY

return DicoverContentCurrentlyPopularFollowers(name=name, dvm_config=dvm_config, nip89config=nip89config,
nip88config=nip88config,
nip88config=nip88config, options=options,
admin_config=admin_config)


Expand Down
35 changes: 26 additions & 9 deletions nostr_dvm/tasks/content_discovery_currently_popular_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ def __init__(self, name, dvm_config: DVMConfig, nip89config: NIP89Config, nip88c
if use_logger:
init_logger(LogLevel.DEBUG)

self.sync_db()
if self.dvm_config.UPDATE_DATABASE:
self.sync_db()
if not self.personalized:
self.result = self.calculate_result(self.request_form)

Expand Down Expand Up @@ -149,8 +150,10 @@ def calculate_result(self, request_form):

# Negentropy reconciliation
# Query events from database
timestamp_since = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_since)

filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE)
filter1 = Filter().kind(definitions.EventDefinitions.KIND_NOTE).since(since)

events = cli.database().query([filter1])
print(len(events))
Expand All @@ -160,10 +163,11 @@ def calculate_result(self, request_form):
if all(ele in event.content().lower() for ele in self.must_list):
if any(ele in event.content().lower() for ele in self.search_list):
if not any(ele in event.content().lower() for ele in self.avoid_list):

filt = Filter().kinds(
[definitions.EventDefinitions.KIND_ZAP, definitions.EventDefinitions.KIND_REACTION,
definitions.EventDefinitions.KIND_REPOST,
definitions.EventDefinitions.KIND_NOTE]).event(event.id())
definitions.EventDefinitions.KIND_NOTE]).event(event.id()).since(since)
reactions = cli.database().query([filt])
if len(reactions) >= self.min_reactions:
ns.finallist[event.id().to_hex()] = len(reactions)
Expand All @@ -183,7 +187,8 @@ def schedule(self, dvm_config):
return 0
else:
if Timestamp.now().as_secs() >= self.last_schedule + dvm_config.SCHEDULE_UPDATES_SECONDS:
self.sync_db()
if self.dvm_config.UPDATE_DATABASE:
self.sync_db()
self.last_schedule = Timestamp.now().as_secs()
self.result = self.calculate_result(self.request_form)
#print(self.result)
Expand All @@ -198,12 +203,22 @@ def sync_db(self):
cli = ClientBuilder().signer(signer).database(database).opts(opts).build()

cli.add_relay("wss://relay.damus.io")
cli.add_relay("wss://nostr.oxtr.dev")
cli.add_relay("wss://relay.nostr.net")
cli.add_relay("wss://relay.nostr.bg")
cli.add_relay("wss://nostr.wine")
cli.add_relay("wss://nostr21.com")

#RELAY_LIST = [ "wss://nostr.wine",
# , "wss://relay.nostr.bg",
# , "wss://relay.nostr.net"
# ]
cli.connect()

timestamp_hour_ago = Timestamp.now().as_secs() - self.db_since
lasthour = Timestamp.from_secs(timestamp_hour_ago)
timestamp_since = Timestamp.now().as_secs() - self.db_since
since = Timestamp.from_secs(timestamp_since)

filter1 = Filter().kinds([definitions.EventDefinitions.KIND_NOTE, definitions.EventDefinitions.KIND_REACTION, definitions.EventDefinitions.KIND_ZAP]).since(lasthour) # Notes, reactions, zaps
filter1 = Filter().kinds([definitions.EventDefinitions.KIND_NOTE, definitions.EventDefinitions.KIND_REACTION, definitions.EventDefinitions.KIND_ZAP]).since(since) # Notes, reactions, zaps

# filter = Filter().author(keys.public_key())
print("[" + self.dvm_config.IDENTIFIER + "] Syncing notes of the last " + str(self.db_since) + " seconds.. this might take a while..")
Expand All @@ -218,11 +233,12 @@ def sync_db(self):
# We build an example here that we can call by either calling this file directly from the main directory,
# or by adding it to our playground. You can call the example and adjust it to your needs or redefine it in the
# playground or elsewhere
def build_example(name, identifier, admin_config, options, image, description, processing_msg=None):
def build_example(name, identifier, admin_config, options, image, description, processing_msg=None, update_db=True):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 minutes
dvm_config.UPDATE_DATABASE = update_db
# Activate these to use a subscription based model instead
# dvm_config.SUBSCRIPTION_REQUIRED = True
# dvm_config.SUBSCRIPTION_DAILY_COST = 1
Expand Down Expand Up @@ -258,11 +274,12 @@ def build_example(name, identifier, admin_config, options, image, description, p
admin_config=admin_config, options=options)


def build_example_subscription(name, identifier, admin_config, options, image, description, processing_msg=None):
def build_example_subscription(name, identifier, admin_config, options, image, description, processing_msg=None, update_db=True):
dvm_config = build_default_config(identifier)
dvm_config.USE_OWN_VENV = False
dvm_config.SHOWLOG = True
dvm_config.SCHEDULE_UPDATES_SECONDS = 600 # Every 10 minutes
dvm_config.UPDATE_DATABASE = update_db
# Activate these to use a subscription based model instead
dvm_config.FIX_COST = 0
dvm_config.CUSTOM_PROCESSING_MESSAGE = processing_msg
Expand Down
3 changes: 2 additions & 1 deletion nostr_dvm/utils/dvmconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class DVMConfig:

RELAY_LIST = ["wss://relay.damus.io", "wss://nos.lol", "wss://nostr.wine",
"wss://nostr.mom", "wss://nostr.oxtr.dev", "wss://relay.nostr.bg",
"wss://relay.f7z.io", "wss://pablof7z.nostr1.com", "wss://relay.nostr.net", "wss://140.f7z.io",
"wss://relay.f7z.io", "wss://relay.nostr.net"
]

RELAY_TIMEOUT = 5
Expand All @@ -38,6 +38,7 @@ class DVMConfig:
SEND_FEEDBACK_EVENTS = True
SHOW_RESULT_BEFORE_PAYMENT: bool = False # if this is true show results even when not paid right after autoprocess
SCHEDULE_UPDATES_SECONDS = 0
UPDATE_DATABASE = True # DVMs that use a db manage their db by default. If a dvm should use the same db as another DVM, deactive it for those who do.
CUSTOM_PROCESSING_MESSAGE = None


Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from setuptools import setup, find_packages

VERSION = '0.4.0'
VERSION = '0.4.1'
DESCRIPTION = 'A framework to build and run Nostr NIP90 Data Vending Machines'
LONG_DESCRIPTION = ('A framework to build and run Nostr NIP90 Data Vending Machines. See the github repository for more information')

Expand Down
Loading

0 comments on commit f5548c6

Please sign in to comment.