Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asyncify the Tree Building #975

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
4 changes: 2 additions & 2 deletions codex/erasure/erasure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ proc encodeData(
return failure("Unable to store block!")
idx.inc(params.steps)

without tree =? CodexTree.init(cids[]), err:
without tree =? (await CodexTree.init(cids[])), err:
return failure(err)

without treeCid =? tree.rootCid, err:
Expand Down Expand Up @@ -441,7 +441,7 @@ proc decode*(
finally:
decoder.release()

without tree =? CodexTree.init(cids[0..<encoded.originalBlocksCount]), err:
without tree =? (await CodexTree.init(cids[0..<encoded.originalBlocksCount])), err:
return failure(err)

without treeCid =? tree.rootCid, err:
Expand Down
39 changes: 25 additions & 14 deletions codex/merkletree/codex/codex.nim
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,18 @@
mhash.coder(@x & @y & @[ key.byte ], digest)
success digest

func init*(
proc init*(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might need to avoid making this init async so we can avoid using waitFor later on... instead this is a good pattern to follow:
https://github.com/codex-storage/nim-ethers/blob/80b2ead97ce32cca74cef4a4501ab97106cb578b/ethers/providers/jsonrpc.nim#L50-L86

Then later, we can use CodexTree.init synchronously, and when needed, we can let layers = await tree.layers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this pattern. If I applied it, SlotBuilder verifyTree and verifyRoot funcs would have to become async procs to await for the internal field. This would propagate the asycing through the codebase further. Is this desirable? Or is there a way around this?

_: type CodexTree,
mcodec: MultiCodec = Sha256HashCodec,
leaves: openArray[ByteHash]): ?!CodexTree =
leaves: seq[ByteHash]): Future[?!CodexTree] {.async.} =

if leaves.len == 0:
return failure "Empty leaves"

without mhash =? mcodec.mhash(), error:
return failure error

Check warning on line 157 in codex/merkletree/codex/codex.nim

View check run for this annotation

Codecov / codecov/patch

codex/merkletree/codex/codex.nim#L157

Added line #L157 was not covered by tests

let
mhash = ? mcodec.mhash()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the heck did this = ? even do?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 OK this raises an exception, so this change is actually modifying behavior. I suppose this is OK if it doesn't break expectations at callers, but we could simply preserve this behavior and propagate the exception using a Chronos raises clause. I see you've done this change in several places, and I'm assuming you did the due diligence to ensure this doesn't cause surprises, so I won't block your PR for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could simply preserve this behavior and propagate the exception using a Chronos raises clause

That would actually change the behaviour. = ? will either assign a value to mhash or it will return a Result with an error to the closure. By raising an exception, and specifying that exception type in {.async: (raises: [...]).}, we're not returning a Result, but raising an exception.

Imo, the proposed change here does not alter the behaviour.

compressor = proc(x, y: seq[byte], key: ByteTreeKey): ?!ByteHash {.noSideEffect.} =
compress(x, y, key, mhash)
Zero: ByteHash = newSeq[byte](mhash.size)
Expand All @@ -165,33 +167,42 @@
var
self = CodexTree(mcodec: mcodec, compress: compressor, zero: Zero)

self.layers = ? merkleTreeWorker(self, leaves, isBottomLayer = true)
without layers =? (await merkleTreeWorker(self, leaves, isBottomLayer = true)), error:
return failure error

self.layers = layers
success self

func init*(
proc init*(
_: type CodexTree,
leaves: openArray[MultiHash]): ?!CodexTree =
leaves: seq[MultiHash]): Future[?!CodexTree] {.async.} =

if leaves.len == 0:
return failure "Empty leaves"

let
mcodec = leaves[0].mcodec
leaves = leaves.mapIt( it.digestBytes )
leafBytes = leaves.mapIt( it.digestBytes )

CodexTree.init(mcodec, leaves)
await CodexTree.init(mcodec, leafBytes)

func init*(
proc init*(
_: type CodexTree,
leaves: openArray[Cid]): ?!CodexTree =
leaves: seq[Cid]): Future[?!CodexTree] {.async.} =

if leaves.len == 0:
return failure "Empty leaves"

let
mcodec = (? leaves[0].mhash.mapFailure).mcodec
leaves = leaves.mapIt( (? it.mhash.mapFailure).digestBytes )
without mhash =? leaves[0].mhash.mapFailure, error:
return failure error

var hashes = newSeq[seq[byte]]()
for leaf in leaves:
without hash =? leaf.mhash.mapFailure, error:
return failure error
hashes.add(hash.digestBytes)
Comment on lines +196 to +203
Copy link
Contributor

@emizzle emizzle Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this really need a rewrite? In other words, was there a compilation issue that forced this re-write?

Copy link
Contributor Author

@benbierens benbierens Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out a cleaner way to unroll the results. If you've got one, I'm eager to learn.


CodexTree.init(mcodec, leaves)
await CodexTree.init(mhash.mcodec, hashes)

proc fromNodes*(
_: type CodexTree,
Expand Down
20 changes: 14 additions & 6 deletions codex/merkletree/merkletree.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import std/bitops

import pkg/chronos
import pkg/questionable/results

import ../errors
Expand Down Expand Up @@ -120,10 +121,10 @@ func reconstructRoot*[H, K](proof: MerkleProof[H, K], leaf: H): ?!H =
func verify*[H, K](proof: MerkleProof[H, K], leaf: H, root: H): ?!bool =
success bool(root == ? proof.reconstructRoot(leaf))

func merkleTreeWorker*[H, K](
proc merkleTreeWorker*[H, K](
self: MerkleTree[H, K],
xs: openArray[H],
isBottomLayer: static bool): ?!seq[seq[H]] =
xs: seq[H],
isBottomLayer: static bool): Future[?!seq[seq[H]]] {.async.} =

let a = low(xs)
let b = high(xs)
Expand All @@ -145,9 +146,16 @@ func merkleTreeWorker*[H, K](

for i in 0..<halfn:
const key = when isBottomLayer: K.KeyBottomLayer else: K.KeyNone
ys[i] = ? self.compress( xs[a + 2 * i], xs[a + 2 * i + 1], key = key )
without y =? self.compress( xs[a + 2 * i], xs[a + 2 * i + 1], key = key ), error:
return failure error
ys[i] = y
await sleepAsync(1.micros) # cooperative scheduling
if isOdd:
const key = when isBottomLayer: K.KeyOddAndBottomLayer else: K.KeyOdd
ys[halfn] = ? self.compress( xs[n], self.zero, key = key )
without y =? self.compress( xs[n], self.zero, key = key ), error:
return failure error
ys[halfn] = y

success @[ @xs ] & ? self.merkleTreeWorker(ys, isBottomLayer = false)
without v =? (await self.merkleTreeWorker(ys, isBottomLayer = false)), error:
return failure error
success @[ @xs ] & v
15 changes: 9 additions & 6 deletions codex/merkletree/poseidon2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import std/sequtils

import pkg/chronos
import pkg/poseidon2
import pkg/constantine/math/io/io_fields
import pkg/constantine/platforms/abstractions
Expand Down Expand Up @@ -67,9 +68,9 @@ converter toKey*(key: PoseidonKeysEnum): Poseidon2Hash =
of KeyOdd: KeyOddF
of KeyOddAndBottomLayer: KeyOddAndBottomLayerF

func init*(
proc init*(
_: type Poseidon2Tree,
leaves: openArray[Poseidon2Hash]): ?!Poseidon2Tree =
leaves: seq[Poseidon2Hash]): Future[?!Poseidon2Tree] {.async.} =

if leaves.len == 0:
return failure "Empty leaves"
Expand All @@ -83,13 +84,15 @@ func init*(
var
self = Poseidon2Tree(compress: compressor, zero: Poseidon2Zero)

self.layers = ? merkleTreeWorker(self, leaves, isBottomLayer = true)
without l =? (await merkleTreeWorker(self, leaves, isBottomLayer = true)), err:
return failure err
self.layers = l
success self

func init*(
proc init*(
_: type Poseidon2Tree,
leaves: openArray[array[31, byte]]): ?!Poseidon2Tree =
Poseidon2Tree.init(
leaves: seq[array[31, byte]]): Future[?!Poseidon2Tree] {.async.} =
await Poseidon2Tree.init(
leaves.mapIt( Poseidon2Hash.fromBytes(it) ))

proc fromNodes*(
Expand Down
24 changes: 16 additions & 8 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@
finally:
await stream.close()

without tree =? CodexTree.init(cids), err:
without tree =? (await CodexTree.init(cids)), err:
return failure(err)

without treeCid =? tree.rootCid(CIDv1, dataCodec), err:
Expand Down Expand Up @@ -446,19 +446,23 @@
self.taskpool)

without encoded =? (await erasure.encode(manifest, ecK, ecM)), error:
trace "Unable to erasure code dataset"
error "Unable to erasure code dataset"
return failure(error)

without builder =? Poseidon2Builder.new(self.networkStore.localStore, encoded), err:
trace "Unable to create slot builder"
error "Unable to create slot builder"
return failure(err)

Check warning on line 454 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L454

Added line #L454 was not covered by tests

if err =? (await builder.init()).errorOption:
error "Failed to initialize slot builder"
return failure(err)

without verifiable =? (await builder.buildManifest()), err:
trace "Unable to build verifiable manifest"
error "Unable to build verifiable manifest"
return failure(err)

without manifestBlk =? await self.storeManifest(verifiable), err:
trace "Unable to store verifiable manifest"
error "Unable to store verifiable manifest"
return failure(err)

let
Expand Down Expand Up @@ -550,17 +554,21 @@
trace "Received a request to store a slot"

without cid =? Cid.init(request.content.cid).mapFailure, err:
trace "Unable to parse Cid", cid
error "Unable to parse Cid", cid
return failure(err)

without manifest =? (await self.fetchManifest(cid)), err:
trace "Unable to fetch manifest for cid", cid, err = err.msg
error "Unable to fetch manifest for cid", cid, err = err.msg
return failure(err)

without builder =? Poseidon2Builder.new(
self.networkStore, manifest, manifest.verifiableStrategy
), err:
trace "Unable to create slots builder", err = err.msg
error "Unable to create slots builder", err = err.msg
return failure(err)

Check warning on line 568 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L568

Added line #L568 was not covered by tests

if err =? (await builder.init()).errorOption:
error "Failed to initialize slot builder"
return failure(err)

let
Expand Down
50 changes: 30 additions & 20 deletions codex/slots/builder/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@
if blk.isEmpty:
success (self.emptyBlock, self.emptyDigestTree)
else:
without tree =?
T.digestTree(blk.data, self.cellSize.int), err:
without tree =? (await T.digestTree(blk.data, self.cellSize.int)), err:
error "Failed to create digest for block", err = err.msg
return failure(err)

Expand Down Expand Up @@ -213,7 +212,7 @@
error "Failed to select slot blocks", err = err.msg
return failure(err)

T.init(cellHashes)
await T.init(cellHashes)

proc buildSlot*[T, H](
self: SlotsBuilder[T, H],
Expand Down Expand Up @@ -251,8 +250,8 @@

tree.root()

func buildVerifyTree*[T, H](self: SlotsBuilder[T, H], slotRoots: openArray[H]): ?!T =
T.init(@slotRoots)
proc buildVerifyTree*[T, H](self: SlotsBuilder[T, H], slotRoots: seq[H]): Future[?!T] {.async.} =
await T.init(@slotRoots)

proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
## Build all slot trees and store them in the block store.
Expand All @@ -272,7 +271,7 @@
return failure(err)
slotRoot

without tree =? self.buildVerifyTree(self.slotRoots) and root =? tree.root, err:
without tree =? (await self.buildVerifyTree(self.slotRoots)) and root =? tree.root, err:
error "Failed to build slot roots tree", err = err.msg
return failure(err)

Expand Down Expand Up @@ -305,6 +304,29 @@
self.cellSize,
self.strategy.strategyType)

proc init*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Hm... I think the fact that you need to call new and then init is not expected. According to the Status style guide and to some of our own code (e.g. here), init is supposed to return a constructed object (and not void), so it'd actually replace new in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that an init call can be confusing when not used for construction. In general, Codex follows the convention of using init for constructing object types and new for constructing ref types, as per @dryajov's comment: https://discord.com/channels/895609329053474826/895622391349272637/1086322836001534072

Maybe an easy fix is to call this initialise instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this seems like it was replacing let x = ? someResultFunc()... with without x =? someResultFunc(), which doesn't seem necessary to me. In other words, it feels like we should make the switch to async without changing any other semantics if possible. It's easy to get into trouble modifying lines "while we're here" as there can be unintended consequences, especially in nim and especially with this many changes in a PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't have changed them if I didn't have to. = ? doesn't compile in async procs.

without emptyTree =? (await T.digestTree(self.emptyBlock, self.cellSize.int)), err:
return failure err
self.emptyDigestTree = emptyTree

if self.manifest.verifiable:
without tree =? (await self.buildVerifyTree(self.slotRoots)), err:
return failure err

without verifyRoot =? tree.root, err:
return failure err

without expectedRoot =? self.manifest.verifyRoot.fromVerifyCid(), err:
return failure err

if verifyRoot != expectedRoot:
return failure "Existing slots root doesn't match reconstructed root."

Check warning on line 324 in codex/slots/builder/builder.nim

View check run for this annotation

Codecov / codecov/patch

codex/slots/builder/builder.nim#L324

Added line #L324 was not covered by tests
self.verifiableTree = some tree

trace "Slots builder initialized"
success()

proc new*[T, H](
_: type SlotsBuilder[T, H],
store: BlockStore,
Expand Down Expand Up @@ -346,7 +368,6 @@
numBlocksTotal = numSlotBlocksTotal * manifest.numSlots # number of blocks per slot

emptyBlock = newSeq[byte](manifest.blockSize.int)
emptyDigestTree = ? T.digestTree(emptyBlock, cellSize.int)

strategy = ? strategy.init(
0,
Expand All @@ -372,24 +393,13 @@
strategy: strategy,
cellSize: cellSize,
emptyBlock: emptyBlock,
numSlotBlocks: numSlotBlocksTotal,
emptyDigestTree: emptyDigestTree)
numSlotBlocks: numSlotBlocksTotal)

if manifest.verifiable:
if manifest.slotRoots.len == 0 or
manifest.slotRoots.len != manifest.numSlots:
return failure "Manifest is verifiable but slot roots are missing or invalid."

let
slotRoots = manifest.slotRoots.mapIt( (? it.fromSlotCid() ))
tree = ? self.buildVerifyTree(slotRoots)
expectedRoot = ? manifest.verifyRoot.fromVerifyCid()
verifyRoot = ? tree.root

if verifyRoot != expectedRoot:
return failure "Existing slots root doesn't match reconstructed root."

self.slotRoots = slotRoots
self.verifiableTree = some tree
self.slotRoots = self.manifest.slotRoots.mapIt( (? it.fromSlotCid() ))

success self
4 changes: 4 additions & 0 deletions codex/slots/proofs/prover.nim
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
error "Unable to create slots builder", err = err.msg
return failure(err)

if err =? (await builder.init()).errorOption:
error "Failed to initialize builder", err = err.msg
return failure(err)

Check warning on line 71 in codex/slots/proofs/prover.nim

View check run for this annotation

Codecov / codecov/patch

codex/slots/proofs/prover.nim#L71

Added line #L71 was not covered by tests

without sampler =? AnySampler.new(slotIdx, self.store, builder), err:
error "Unable to create data sampler", err = err.msg
return failure(err)
Expand Down
Loading
Loading