Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Bounce Queue a TSA Action #272

Open
MHillyer opened this issue Sep 11, 2024 · 1 comment
Open

Make Bounce Queue a TSA Action #272

MHillyer opened this issue Sep 11, 2024 · 1 comment
Labels
enhancement New feature or request

Comments

@MHillyer
Copy link
Collaborator

No description provided.

@MHillyer MHillyer converted this from a draft issue Sep 11, 2024
@MHillyer MHillyer added this to the Fall 24 Release milestone Sep 11, 2024
@MHillyer MHillyer added the enhancement New feature or request label Sep 11, 2024
@wez wez removed their assignment Sep 16, 2024
@wez
Copy link
Collaborator

wez commented Sep 16, 2024

we have some suspension actions in:

Suspend,
SetConfig(EgressPathConfigValue),
SuspendTenant,
SuspendCampaign,

implementation of them is here:

Action::Suspend => {
create_ready_q_suspension(&rule_hash, m, &record, &source)?;
}
Action::SuspendTenant => {
create_tenant_suspension(&rule_hash, m, &record, false)?;
}
Action::SuspendCampaign => {
create_tenant_suspension(&rule_hash, m, &record, true)?;
}

which ultimately shows up via websocket:

async fn process_suspension_subscription_inner(mut socket: WebSocket) -> anyhow::Result<()> {
let mut rx = SUSPENSION_TX.tx.subscribe();
// send the current set of suspensions first
{
let suspensions = do_get_suspension().await?;
for record in &suspensions.ready_q {
let json = serde_json::to_string(&SuspensionEntry::ReadyQ(record.clone()))?;
socket.send(Message::Text(json)).await?;
}
}
// then wait for more to show up
loop {
let event = rx.recv().await?;
let json = serde_json::to_string(&event)?;
socket.send(Message::Text(json)).await?;
}
}
async fn process_suspension_subscription(socket: WebSocket) {
if let Err(err) = process_suspension_subscription_inner(socket).await {
tracing::error!("error in websocket: {err:#}");
}
}
pub async fn subscribe_suspension_v1(
_: TrustedIpRequired,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(|socket| process_suspension_subscription(socket))
}

and on the client side:

local function process_suspension_subscriptions(url)
-- Generate the websocket URL from the user-provided HTTP URL
local endpoint =
string.format('%s/subscribe_suspension_v1', url):gsub('^http', 'ws')
local stream, response = kumo.http.connect_websocket(endpoint)
-- Loop and consume all suspensions from the host; the initial
-- connection will pre-populate the stream with any current
-- suspensions, and then will later deliver any subsequently
-- generated suspension events in realtime.
while true do
local data = kumo.json_parse(stream:recv())
if data.ReadyQ then
apply_ready_q_suspension(data.ReadyQ)
elseif data.SchedQ then
apply_sched_q_suspension(data.SchedQ)
end
end
end
kumo.on('kumo.tsa.suspension.subscriber', function(args)
local url = args[1]
-- If we encounter an error (likely cause: tsa-daemon restarting),
-- then we'll try again after a short sleep
while true do
local status, err = pcall(process_suspension_subscriptions, url)
print('TSA Error, will retry in 30 seconds', status, err)
kumo.sleep(30)
end
end)

which ultimately calls through to:

module.set(
"suspend",
lua.create_function(move |lua, request: Value| {
let request: SuspendV1Request = lua.from_value(request)?;
let duration = request.duration();
let id = Uuid::new_v4();
let entry = AdminSuspendEntry {
id,
campaign: request.campaign,
tenant: request.tenant,
domain: request.domain,
reason: request.reason,
expires: Instant::now() + duration,
};
AdminSuspendEntry::add(entry);
lua.to_value(&id)
})?,
)?;

The goal here is to add Bounce equivalents of these

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
Status: Todo
Development

No branches or pull requests

2 participants