Skip to content

Commit

Permalink
replace CqrsError with anyhow::Error
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrea Pavoni committed Nov 27, 2024
1 parent 7e1a80a commit 5bb5b93
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 72 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ chrono = { version = "0.4", features = ["serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] }
uuid = { version = "1.4.1", features = ["serde", "v4"] }
anyhow = "1.0.93"
12 changes: 5 additions & 7 deletions examples/lib/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

use std::collections::HashMap;

use anyhow::{anyhow, Error};
use async_trait::async_trait;

use mini_cqrs_es::{CqrsError, Event, EventStore, Uuid};
use mini_cqrs_es::{Event, EventStore, Uuid};

// Event Store
#[derive(Clone)]
Expand All @@ -24,7 +25,7 @@ impl InMemoryEventStore {

#[async_trait]
impl EventStore for InMemoryEventStore {
async fn save_events(&mut self, aggregate_id: Uuid, events: &[Event]) -> Result<(), CqrsError> {
async fn save_events(&mut self, aggregate_id: Uuid, events: &[Event]) -> Result<(), Error> {
if let Some(current_events) = self.events.get_mut(&aggregate_id) {
current_events.extend(events.to_vec());
} else {
Expand All @@ -34,14 +35,11 @@ impl EventStore for InMemoryEventStore {
Ok(())
}

async fn load_events(&self, aggregate_id: Uuid) -> Result<Vec<Event>, CqrsError> {
async fn load_events(&self, aggregate_id: Uuid) -> Result<Vec<Event>, Error> {
if let Some(events) = self.events.get(&aggregate_id) {
Ok(events.to_vec())
} else {
Err(CqrsError::new(format!(
"No events for aggregate id `{}`",
aggregate_id
)))
Err(anyhow!("No events for aggregate id `{}`", aggregate_id))
}
}
}
26 changes: 12 additions & 14 deletions examples/lib/common_game.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@

use std::{collections::HashMap, sync::Arc};

use anyhow::{anyhow, Error};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;

use mini_cqrs_es::{
make_event_consumers_group, wrap_event, Aggregate, AggregateSnapshot, Command, CqrsError,
Event, EventConsumer, EventConsumersGroup, EventPayload, ModelReader, QueriesRunner, Query,
make_event_consumers_group, wrap_event, Aggregate, AggregateSnapshot, Command, Event,
EventConsumer, EventConsumersGroup, EventPayload, ModelReader, QueriesRunner, Query,
Repository, SnapshotStore, Uuid,
};

Expand Down Expand Up @@ -42,7 +43,7 @@ impl<A> SnapshotStore for InMemorySnapshotStore<A>
where
A: Aggregate,
{
async fn save_snapshot<T>(&mut self, snapshot: AggregateSnapshot<T>) -> Result<(), CqrsError>
async fn save_snapshot<T>(&mut self, snapshot: AggregateSnapshot<T>) -> Result<(), Error>
where
T: Aggregate + Clone,
{
Expand All @@ -54,7 +55,7 @@ where
Ok(())
}

async fn load_snapshot<T>(&self, aggregate_id: Uuid) -> Result<AggregateSnapshot<T>, CqrsError>
async fn load_snapshot<T>(&self, aggregate_id: Uuid) -> Result<AggregateSnapshot<T>, Error>
where
T: Aggregate + Clone,
{
Expand All @@ -63,10 +64,7 @@ where

Ok(AggregateSnapshot::new(&aggregate, None))
} else {
Err(CqrsError::new(format!(
"No snapshot for aggregate id `{}`",
aggregate_id
)))
Err(anyhow!("No snapshot for aggregate id `{}`", aggregate_id))
}
}
}
Expand All @@ -83,12 +81,12 @@ pub struct CmdStartGame {
impl Command for CmdStartGame {
type Aggregate = GameAggregate;

async fn handle(&self, aggregate: &Self::Aggregate) -> Result<Vec<Event>, CqrsError> {
async fn handle(&self, aggregate: &Self::Aggregate) -> Result<Vec<Event>, Error> {
if aggregate.status != GameStatus::Playing {
return Err(CqrsError::new(format!(
return Err(anyhow!(
"Game is already finished with state {:?}",
aggregate.status
)));
));
}

let res = vec![GameEvent::GameStarted {
Expand All @@ -112,7 +110,7 @@ pub struct CmdAttackPlayer {
impl Command for CmdAttackPlayer {
type Aggregate = GameAggregate;

async fn handle(&self, aggregate: &Self::Aggregate) -> Result<Vec<Event>, CqrsError> {
async fn handle(&self, aggregate: &Self::Aggregate) -> Result<Vec<Event>, Error> {
let mut player = if aggregate.player_1.id == self.attacker.id {
aggregate.player_1.clone()
} else {
Expand Down Expand Up @@ -316,7 +314,7 @@ impl GetGameQuery {

#[async_trait]
impl Query for GetGameQuery {
type Output = Result<Option<GameModel>, CqrsError>;
type Output = Result<Option<GameModel>, Error>;

async fn apply(&self) -> Self::Output {
let result: Option<GameModel> = self.repo.lock().await.get_game(self.aggregate_id).await;
Expand Down Expand Up @@ -359,7 +357,7 @@ impl ModelReader for GameView {
type Repo = InMemoryRepository;
type Model = GameModel;

async fn update(&mut self, data: Self::Model) -> Result<(), CqrsError> {
async fn update(&mut self, data: Self::Model) -> Result<(), Error> {
self.repo
.lock()
.await
Expand Down
13 changes: 7 additions & 6 deletions src/aggregate/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::{Aggregate, AggregateSnapshot, CqrsError, EventStore, SnapshotStore, Uuid};
use crate::{Aggregate, AggregateSnapshot, EventStore, SnapshotStore, Uuid};
use anyhow::Error;
use async_trait::async_trait;

/// The `AggregateManager` trait defines the behavior for loading and storing the state of aggregates.
Expand All @@ -9,12 +10,12 @@ use async_trait::async_trait;
#[async_trait]
pub trait AggregateManager: Clone + Send + Sync {
/// Loads an aggregate from the event store.
async fn load<A>(&mut self, aggregate_id: Uuid) -> Result<A, CqrsError>
async fn load<A>(&mut self, aggregate_id: Uuid) -> Result<A, Error>
where
A: Aggregate + Clone;

/// Stores an aggregate to the event store.
async fn store<A>(&mut self, _aggregate: &A) -> Result<(), CqrsError>
async fn store<A>(&mut self, _aggregate: &A) -> Result<(), Error>
where
A: Aggregate + Clone,
{
Expand Down Expand Up @@ -47,7 +48,7 @@ impl<'a, ES> AggregateManager for SimpleAggregateManager<'a, ES>
where
ES: EventStore + Clone,
{
async fn load<A: Aggregate>(&mut self, aggregate_id: Uuid) -> Result<A, CqrsError> {
async fn load<A: Aggregate>(&mut self, aggregate_id: Uuid) -> Result<A, Error> {
let mut aggregate = A::default();
aggregate.set_aggregate_id(aggregate_id);

Expand Down Expand Up @@ -87,7 +88,7 @@ impl<SS> AggregateManager for SnapshotAggregateManager<SS>
where
SS: SnapshotStore + Clone + Send + Sync,
{
async fn load<A>(&mut self, aggregate_id: Uuid) -> Result<A, CqrsError>
async fn load<A>(&mut self, aggregate_id: Uuid) -> Result<A, Error>
where
A: Aggregate + Clone,
{
Expand All @@ -100,7 +101,7 @@ where
}
}

async fn store<A>(&mut self, aggregate: &A) -> Result<(), CqrsError>
async fn store<A>(&mut self, aggregate: &A) -> Result<(), Error>
where
A: Aggregate + Clone,
{
Expand Down
7 changes: 4 additions & 3 deletions src/aggregate/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;

use crate::{Aggregate, CqrsError, Uuid};
use crate::{Aggregate, Uuid};
use anyhow::Error;

/// The `SnapshotStore` trait defines the behavior for storing and loading aggregate snapshots.
///
Expand All @@ -12,12 +13,12 @@ use crate::{Aggregate, CqrsError, Uuid};
#[async_trait]
pub trait SnapshotStore {
/// Saves an aggregate snapshot to the snapshot store.
async fn save_snapshot<T>(&mut self, aggregate: AggregateSnapshot<T>) -> Result<(), CqrsError>
async fn save_snapshot<T>(&mut self, aggregate: AggregateSnapshot<T>) -> Result<(), Error>
where
T: Aggregate;

/// Loads an aggregate snapshot from the snapshot store.
async fn load_snapshot<T>(&self, aggregate_id: Uuid) -> Result<AggregateSnapshot<T>, CqrsError>
async fn load_snapshot<T>(&self, aggregate_id: Uuid) -> Result<AggregateSnapshot<T>, Error>
where
T: Aggregate;
}
Expand Down
5 changes: 3 additions & 2 deletions src/command.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Error;
use async_trait::async_trait;

use crate::{Aggregate, CqrsError, Event};
use crate::{Aggregate, Event};

/// The `Command` trait defines the behavior of a command in a Command-Query Responsibility Segregation (CQRS) application.
///
Expand Down Expand Up @@ -39,5 +40,5 @@ pub trait Command {
type Aggregate: Aggregate;

/// Handles the command and returns a list of events.
async fn handle(&self, aggregate: &Self::Aggregate) -> Result<Vec<Event>, CqrsError>;
async fn handle(&self, aggregate: &Self::Aggregate) -> Result<Vec<Event>, Error>;
}
12 changes: 7 additions & 5 deletions src/cqrs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::{
query::QueriesRunner, Aggregate, AggregateManager, Command, CqrsError, EventConsumersGroup,
EventStore, Uuid,
query::QueriesRunner, Aggregate, AggregateManager, Command, EventConsumersGroup, EventStore,
Uuid,
};
use anyhow::Error;

/// The `Cqrs` struct represents the main entry point of a Command-Query Responsibility Segregation (CQRS) application.
///
Expand All @@ -12,7 +13,8 @@ use crate::{
/// Here's an example of how to create and use a `Cqrs` instance in a CQRS application:
///
/// ```rust
/// use mini_cqrs_es::{Cqrs, AggregateManager, EventStore, EventConsumersGroup, Command, CqrsError, Uuid};
/// use mini_cqrs_es::{Cqrs, AggregateManager, EventStore, EventConsumersGroup, Command, Uuid};
/// use anyhow::Error;
///
/// // Define custom aggregate manager, event store, and event consumers.
/// struct MyAggregateManager;
Expand All @@ -38,7 +40,7 @@ use crate::{
/// impl Command for MyCommand {
/// type Aggregate = MyAggregate; // Replace with your own aggregate type
///
/// async fn handle(&self, aggregate: &MyAggregate) -> Result<Vec<Event>, CqrsError> {
/// async fn handle(&self, aggregate: &MyAggregate) -> Result<Vec<Event>, Error> {
/// // Implement command handling logic
/// unimplemented!()
/// }
Expand Down Expand Up @@ -90,7 +92,7 @@ where
}

/// Executes a command on an aggregate.
pub async fn execute<C>(&mut self, aggregate_id: Uuid, command: &C) -> Result<Uuid, CqrsError>
pub async fn execute<C>(&mut self, aggregate_id: Uuid, command: &C) -> Result<Uuid, Error>
where
C: Command,
{
Expand Down
26 changes: 0 additions & 26 deletions src/error.rs

This file was deleted.

11 changes: 6 additions & 5 deletions src/events.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::Error;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{de::DeserializeOwned, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use crate::{CqrsError, Uuid};
use crate::Uuid;

/// The `Event` struct represents a change to the state of an aggregate in a CQRS application.
///
Expand All @@ -13,7 +14,7 @@ use crate::{CqrsError, Uuid};
///
/// You can extract the payload from an event using the `get_payload` method, which deserializes the payload data into a specific
/// type that implements the `EventPayload` trait.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event {
/// The ID of the event.
pub id: String,
Expand Down Expand Up @@ -97,7 +98,7 @@ pub trait EventPayload<Evt = Self>: Serialize + DeserializeOwned + Clone + ToStr
/// associated with specific aggregate IDs.
#[async_trait]
pub trait EventStore: Send + Sync {
async fn save_events(&mut self, aggregate_id: Uuid, events: &[Event]) -> Result<(), CqrsError>;
async fn save_events(&mut self, aggregate_id: Uuid, events: &[Event]) -> Result<(), Error>;

async fn load_events(&self, aggregate_id: Uuid) -> Result<Vec<Event>, CqrsError>;
async fn load_events(&self, aggregate_id: Uuid) -> Result<Vec<Event>, Error>;
}
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ mod aggregate;
mod command;
mod consumer;
mod cqrs;
mod error;
mod events;
mod query;
mod repository;
Expand All @@ -30,7 +29,6 @@ pub use aggregate::{
pub use command::Command;
pub use consumer::{EventConsumer, EventConsumersGroup};
pub use cqrs::Cqrs;
pub use error::CqrsError;
pub use events::{Event, EventPayload, EventStore};
pub use query::{ModelReader, QueriesRunner, Query};
pub use repository::Repository;
Expand Down
5 changes: 3 additions & 2 deletions src/query.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use anyhow::Error;
use async_trait::async_trait;

use crate::{CqrsError, Repository};
use crate::Repository;

/// The `ModelReader` trait defines the behavior of a read model, it' responsible for updating read models with data extracted from events.
///
Expand All @@ -14,7 +15,7 @@ pub trait ModelReader: Send {
type Model: Send + Sync + Clone + 'static;

/// Updates the read model with the provided data.
async fn update(&mut self, data: Self::Model) -> Result<(), CqrsError>;
async fn update(&mut self, data: Self::Model) -> Result<(), Error>;
}

/// The `QueriesRunner` trait defines the behavior for executing queries on read models.
Expand Down

0 comments on commit 5bb5b93

Please sign in to comment.