Skip to content

Commit

Permalink
lmdb: transactions improvements
Browse files Browse the repository at this point in the history
* Remove `remove_by_id` method
* Add `Lmdb::delete` method
* Re-use read transaction in some method instead of create a new one

Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
  • Loading branch information
yukibtc committed Nov 20, 2024
1 parent 99695b0 commit 5e0896f
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 124 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
* sdk: cleanup `Client` methods ([Yuki Kishimoto])
* relay-builder: port selection by using random port generation ([Yuki Kishimoto])
* lmdb: optimize vector initialization in unit tests ([Xiao Yu])
* lmdb: commit also read txn and force sync after delete operations ([Yuki Kishimoto])
* lmdb: commit also read txn ([Yuki Kishimoto])
* lmdb: transactions improvements ([Yuki Kishimoto])
* nwc: increase default timeout to 60 secs ([Yuki Kishimoto])
* book: convert JS snippets to TypeScript ([Yuki Kishimoto])
* book: use `pyright` to check python snippets ([Yuki Kishimoto])
Expand Down
163 changes: 72 additions & 91 deletions crates/nostr-lmdb/src/store/lmdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,6 @@ impl Lmdb {
})
}

pub(crate) fn force_sync(&self) -> Result<(), Error> {
self.env.force_sync()?;
Ok(())
}

// pub(crate) fn close(self) -> Result<(), Error> {
// self.env.force_sync()?;
// let closing_event = self.env.prepare_for_closing();
// closing_event.wait();
// Ok(())
// }

/// Get a read transaction
#[inline]
pub(crate) fn read_txn(&self) -> Result<RoTxn, Error> {
Expand Down Expand Up @@ -194,37 +182,35 @@ impl Lmdb {
self.ac_index.put(txn, &ac_index_key, id)?;

for tag in event.tags.iter() {
if let Some(tag_name) = tag.single_letter_tag() {
if let Some(tag_value) = tag.content() {
// Index by tag (with created_at and id)
let tc_index_key: Vec<u8> = index::make_tc_index_key(
&tag_name,
tag_value,
&event.created_at,
event.id.as_bytes(),
);
self.tc_index.put(txn, &tc_index_key, id)?;

// Index by author and tag (with created_at and id)
let atc_index_key: Vec<u8> = index::make_atc_index_key(
&event.pubkey.to_bytes(),
&tag_name,
tag_value,
&event.created_at,
event.id.as_bytes(),
);
self.atc_index.put(txn, &atc_index_key, id)?;

// Index by kind and tag (with created_at and id)
let ktc_index_key: Vec<u8> = index::make_ktc_index_key(
event.kind.as_u16(),
&tag_name,
tag_value,
&event.created_at,
event.id.as_bytes(),
);
self.ktc_index.put(txn, &ktc_index_key, id)?;
}
if let (Some(tag_name), Some(tag_value)) = (tag.single_letter_tag(), tag.content()) {
// Index by author and tag (with created_at and id)
let atc_index_key: Vec<u8> = index::make_atc_index_key(
&event.pubkey.to_bytes(),
&tag_name,
tag_value,
&event.created_at,
event.id.as_bytes(),
);
self.atc_index.put(txn, &atc_index_key, id)?;

// Index by kind and tag (with created_at and id)
let ktc_index_key: Vec<u8> = index::make_ktc_index_key(
event.kind.as_u16(),
&tag_name,
tag_value,
&event.created_at,
event.id.as_bytes(),
);
self.ktc_index.put(txn, &ktc_index_key, id)?;

// Index by tag (with created_at and id)
let tc_index_key: Vec<u8> = index::make_tc_index_key(
&tag_name,
tag_value,
&event.created_at,
event.id.as_bytes(),
);
self.tc_index.put(txn, &tc_index_key, id)?;
}
}

Expand All @@ -233,6 +219,19 @@ impl Lmdb {

/// Remove the event
pub(crate) fn remove(&self, txn: &mut RwTxn, event: &DatabaseEvent) -> Result<(), Error> {
self.events.delete(txn, event.id())?;

let ci_index_key: Vec<u8> = index::make_ci_index_key(&event.created_at, event.id());
self.ci_index.delete(txn, &ci_index_key)?;

let akc_index_key: Vec<u8> =
index::make_akc_index_key(event.author(), event.kind, &event.created_at, event.id());
self.akc_index.delete(txn, &akc_index_key)?;

let ac_index_key: Vec<u8> =
index::make_ac_index_key(event.author(), &event.created_at, event.id());
self.ac_index.delete(txn, &ac_index_key)?;

for tag in event.iter_tags() {
if let Some((tag_name, tag_value)) = tag.extract() {
// Index by author and tag (with created_at and id)
Expand Down Expand Up @@ -262,19 +261,6 @@ impl Lmdb {
}
}

let ac_index_key: Vec<u8> =
index::make_ac_index_key(event.author(), &event.created_at, event.id());
self.ac_index.delete(txn, &ac_index_key)?;

let ci_index_key: Vec<u8> = index::make_ci_index_key(&event.created_at, event.id());
self.ci_index.delete(txn, &ci_index_key)?;

let akc_index_key: Vec<u8> =
index::make_akc_index_key(event.author(), event.kind, &event.created_at, event.id());
self.akc_index.delete(txn, &akc_index_key)?;

self.events.delete(txn, event.id())?;

Ok(())
}

Expand Down Expand Up @@ -324,6 +310,14 @@ impl Lmdb {
Ok(output)
}

pub fn delete(&self, read_txn: &RoTxn, txn: &mut RwTxn, filter: Filter) -> Result<(), Error> {
let events = self.single_filter_query(read_txn, filter)?;
for event in events.into_iter() {
self.remove(txn, &event)?;
}
Ok(())
}

/// Find all events that match the filter
fn single_filter_query<'a>(
&self,
Expand Down Expand Up @@ -627,24 +621,11 @@ impl Lmdb {
Ok(None)
}

/// Remove an event by ID
pub fn remove_by_id(
&self,
read_txn: &RoTxn,
write_txn: &mut RwTxn,
event_id: &[u8],
) -> Result<(), Error> {
if let Some(event) = self.get_event_by_id(read_txn, event_id)? {
self.remove(write_txn, &event)?;
}

Ok(())
}

// Remove all replaceable events with the matching author-kind
// Kind must be a replaceable (not parameterized replaceable) event kind
pub fn remove_replaceable(
&self,
read_txn: &RoTxn,
txn: &mut RwTxn,
coordinate: &Coordinate,
until: Timestamp,
Expand All @@ -653,9 +634,8 @@ impl Lmdb {
return Err(Error::WrongEventKind);
}

let read_txn = self.read_txn()?;
let iter = self.akc_iter(
&read_txn,
read_txn,
&coordinate.public_key.to_bytes(),
coordinate.kind.as_u16(),
Timestamp::zero(),
Expand All @@ -664,7 +644,10 @@ impl Lmdb {

for result in iter {
let (_key, id) = result?;
self.remove_by_id(&read_txn, txn, id)?;

if let Some(event) = self.get_event_by_id(read_txn, id)? {
self.remove(txn, &event)?;
}
}

Ok(())
Expand All @@ -674,6 +657,7 @@ impl Lmdb {
// Kind must be a parameterized-replaceable event kind
pub fn remove_parameterized_replaceable(
&self,
read_txn: &RoTxn,
txn: &mut RwTxn,
coordinate: &Coordinate,
until: Timestamp,
Expand All @@ -682,9 +666,8 @@ impl Lmdb {
return Err(Error::WrongEventKind);
}

let read_txn = self.read_txn()?;
let iter = self.atc_iter(
&read_txn,
read_txn,
&coordinate.public_key.to_bytes(),
&SingleLetterTag::lowercase(Alphabet::D),
&coordinate.identifier,
Expand All @@ -696,12 +679,10 @@ impl Lmdb {
let (_key, id) = result?;

// Our index doesn't have Kind embedded, so we have to check it
let event = self
.get_event_by_id(&read_txn, id)?
.ok_or(Error::NotFound)?;
let event = self.get_event_by_id(read_txn, id)?.ok_or(Error::NotFound)?;

if event.kind == coordinate.kind.as_u16() {
self.remove_by_id(&read_txn, txn, id)?;
self.remove(txn, &event)?;
}
}

Expand Down Expand Up @@ -750,8 +731,8 @@ impl Lmdb {
let start_prefix = index::make_ci_index_key(until, &EVENT_ID_ALL_ZEROS);
let end_prefix = index::make_ci_index_key(since, &EVENT_ID_ALL_255);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
Bound::Included(start_prefix.as_slice()),
Bound::Excluded(end_prefix.as_slice()),
);
Ok(self.ci_index.range(txn, &range)?)
}
Expand All @@ -772,8 +753,8 @@ impl Lmdb {
);
let end_prefix = index::make_tc_index_key(tag_name, tag_value, since, &EVENT_ID_ALL_255);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
Bound::Included(start_prefix.as_slice()),
Bound::Excluded(end_prefix.as_slice()),
);
Ok(self.tc_index.range(txn, &range)?)
}
Expand All @@ -788,8 +769,8 @@ impl Lmdb {
let start_prefix = index::make_ac_index_key(author, &until, &EVENT_ID_ALL_ZEROS);
let end_prefix = index::make_ac_index_key(author, &since, &EVENT_ID_ALL_255);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
Bound::Included(start_prefix.as_slice()),
Bound::Excluded(end_prefix.as_slice()),
);
Ok(self.ac_index.range(txn, &range)?)
}
Expand All @@ -805,8 +786,8 @@ impl Lmdb {
let start_prefix = index::make_akc_index_key(author, kind, &until, &EVENT_ID_ALL_ZEROS);
let end_prefix = index::make_akc_index_key(author, kind, &since, &EVENT_ID_ALL_255);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
Bound::Included(start_prefix.as_slice()),
Bound::Excluded(end_prefix.as_slice()),
);
Ok(self.akc_index.range(txn, &range)?)
}
Expand All @@ -830,8 +811,8 @@ impl Lmdb {
let end_prefix: Vec<u8> =
index::make_atc_index_key(author, tag_name, tag_value, since, &EVENT_ID_ALL_255);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
Bound::Included(start_prefix.as_slice()),
Bound::Excluded(end_prefix.as_slice()),
);
Ok(self.atc_index.range(txn, &range)?)
}
Expand All @@ -855,8 +836,8 @@ impl Lmdb {
let end_prefix =
index::make_ktc_index_key(kind, tag_name, tag_value, since, &EVENT_ID_ALL_255);
let range = (
Bound::Included(&*start_prefix),
Bound::Excluded(&*end_prefix),
Bound::Included(start_prefix.as_slice()),
Bound::Excluded(end_prefix.as_slice()),
);
Ok(self.ktc_index.range(txn, &range)?)
}
Expand Down
Loading

0 comments on commit 5e0896f

Please sign in to comment.