Skip to content

Commit

Permalink
imp: add support for custom executors (#728)
Browse files Browse the repository at this point in the history
  • Loading branch information
omarabid authored Nov 13, 2024
1 parent 57681d9 commit 6f30dc5
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ bytes = "1.0.1"
chrono = { version = "0.4.19", default-features = false, features = [
"serde",
"clock",
"wasmbind"
] }
web-time = { version = "1.1.0", features = ["serde"] }
cfg-if = "1.0.0"
either = "1.8.0"
futures = { version = "0.3.15" }
Expand Down
2 changes: 1 addition & 1 deletion src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use jsonwebtoken::{Algorithm, EncodingKey, Header};
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::time::{Duration, SystemTime};
use web_time::{Duration, SystemTime};

use snafu::*;

Expand Down
66 changes: 65 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,13 @@ use http_body_util::BodyExt;
use service::middleware::auth_header::AuthHeaderLayer;
use std::convert::{Infallible, TryInto};
use std::fmt;
use std::future::Future;
use std::io::Write;
use std::marker::PhantomData;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use web_time::Duration;

use http::{header::HeaderName, StatusCode};
use hyper::{Request, Response};
Expand Down Expand Up @@ -385,6 +387,7 @@ pub struct OctocrabBuilder<Svc, Config, Auth, LayerReady> {
auth: Auth,
config: Config,
_layer_ready: PhantomData<LayerReady>,
executor: Option<Box<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) -> ()>>,
}

//Indicates weather the builder supports config
Expand All @@ -407,6 +410,7 @@ impl OctocrabBuilder<NoSvc, NoConfig, NoAuth, NotLayerReady> {
auth: NoAuth {},
config: NoConfig {},
_layer_ready: PhantomData,
executor: None,
}
}
}
Expand All @@ -424,6 +428,29 @@ impl<Config, Auth> OctocrabBuilder<NoSvc, Config, Auth, NotLayerReady> {
auth: self.auth,
config: self.config,
_layer_ready: PhantomData,
executor: None,
}
}
}

impl<Svc, Config, Auth, B> OctocrabBuilder<Svc, Config, Auth, LayerReady>
where
Svc: Service<Request<OctoBody>, Response = Response<B>> + Send + 'static,
Svc::Future: Send + 'static,
Svc::Error: Into<BoxError>,
B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
{
pub fn with_executor(
self,
executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) -> ()>,
) -> OctocrabBuilder<Svc, Config, Auth, LayerReady> {
OctocrabBuilder {
service: self.service,
auth: self.auth,
config: self.config,
_layer_ready: PhantomData,
executor: Some(executor),
}
}
}
Expand All @@ -445,12 +472,14 @@ where
service: stack,
auth,
config,
executor,
..
} = self;
OctocrabBuilder {
service: layer.layer(stack),
auth,
config,
executor,
_layer_ready: PhantomData,
}
}
Expand All @@ -467,6 +496,7 @@ impl<Svc, Auth, LayerState> OctocrabBuilder<Svc, NoConfig, Auth, LayerState> {
OctocrabBuilder {
service: self.service,
auth: self.auth,
executor: self.executor,
config,
_layer_ready: PhantomData,
}
Expand All @@ -490,6 +520,10 @@ where
.layer(self.service)
.map_err(|e| e.into());

if let Some(executor) = self.executor {
return Ok(Octocrab::new_with_executor(service, self.auth, executor));
}

Ok(Octocrab::new(service, self.auth))
}
}
Expand All @@ -500,6 +534,7 @@ impl<Svc, Config, LayerState> OctocrabBuilder<Svc, Config, NoAuth, LayerState> {
service: self.service,
auth,
config: self.config,
executor: self.executor,
_layer_ready: PhantomData,
}
}
Expand Down Expand Up @@ -796,6 +831,10 @@ impl OctocrabBuilder<NoSvc, DefaultOctocrabBuilderConfig, NoAuth, NotLayerReady>

let client = AuthHeaderLayer::new(auth_header, base_uri, upload_uri).layer(client);

if let Some(executor) = self.executor {
return Ok(Octocrab::new_with_executor(client, auth_state, executor));
}

Ok(Octocrab::new(client, auth_state))
}
}
Expand Down Expand Up @@ -1003,6 +1042,31 @@ impl Octocrab {
}
}

/// Creates a new `Octocrab` with a custom executor
fn new_with_executor<S>(
service: S,
auth_state: AuthState,
executor: Box<dyn Fn(Pin<Box<dyn Future<Output = ()>>>) -> ()>,
) -> Self
where
S: Service<Request<OctoBody>, Response = Response<BoxBody<Bytes, crate::Error>>>
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError>,
{
// Use Buffer pair to return the background worker
let (service, worker) = Buffer::pair(BoxService::new(service.map_err(Into::into)), 1024);

// Execute the background worker with the custom executor
executor(Box::pin(worker));

Self {
client: service,
auth_state,
}
}

/// Returns a new `Octocrab` based on the current builder but
/// authorizing via a specific installation ID.
/// Typically you will first construct an `Octocrab` using
Expand Down

0 comments on commit 6f30dc5

Please sign in to comment.