diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 32dbd016c..c7eac74e3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,6 +117,24 @@ jobs: RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug testArtifactOutput: sdk-java-kafka-next-gen-integration-test-report + sdk-java-invocation-status-killed: + name: Run SDK-Java integration tests with InvocationStatusKilled + permissions: + contents: read + issues: read + checks: write + pull-requests: write + actions: read + secrets: inherit + needs: docker + uses: restatedev/sdk-java/.github/workflows/integration.yaml@main + with: + restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} + envVars: | + RESTATE_WORKER__EXPERIMENTAL_FEATURE_INVOCATION_STATUS_KILLED=true + RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug + testArtifactOutput: sdk-java-invocation-status-killed-integration-test-report + sdk-python: name: Run SDK-Python integration tests permissions: diff --git a/Cargo.lock b/Cargo.lock index c87af8888..4242a86d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6994,6 +6994,7 @@ dependencies = [ "codederror", "derive_builder", "derive_more", + "enumset", "futures", "googletest", "humantime", diff --git a/cli/src/clients/admin_interface.rs b/cli/src/clients/admin_interface.rs index b86265a63..8463f9279 100644 --- a/cli/src/clients/admin_interface.rs +++ b/cli/src/clients/admin_interface.rs @@ -41,7 +41,12 @@ pub trait AdminClientInterface { async fn purge_invocation(&self, id: &str) -> reqwest::Result>; - async fn cancel_invocation(&self, id: &str, kill: bool) -> reqwest::Result>; + async fn cancel_invocation( + &self, + id: &str, + kill: bool, + retry: bool, + ) -> reqwest::Result>; async fn patch_state( &self, @@ -132,7 +137,12 @@ impl AdminClientInterface for AdminClient { self.run(reqwest::Method::DELETE, url).await } - async fn cancel_invocation(&self, id: &str, kill: bool) -> reqwest::Result> { + async fn cancel_invocation( + &self, + id: &str, + kill: bool, + retry: bool, + ) -> reqwest::Result> { let mut url = self .base_url .join(&format!("/invocations/{}", id)) @@ -140,7 +150,12 @@ impl AdminClientInterface for AdminClient { url.set_query(Some(&format!( "mode={}", - if kill { "kill" } else { "cancel" } + match (kill, retry) { + (false, false) => "cancel", + (false, true) => "cancel-and-restart", + (true, false) => "kill", + (true, true) => "kill-and-restart", + } ))); self.run(reqwest::Method::DELETE, url).await diff --git a/cli/src/clients/datafusion_helpers.rs b/cli/src/clients/datafusion_helpers.rs index f7b781bf0..b72013e0e 100644 --- a/cli/src/clients/datafusion_helpers.rs +++ b/cli/src/clients/datafusion_helpers.rs @@ -128,6 +128,7 @@ pub enum InvocationState { Running, Suspended, BackingOff, + Killed, Completed, } @@ -142,6 +143,7 @@ impl FromStr for InvocationState { "suspended" => Self::Suspended, "backing-off" => Self::BackingOff, "completed" => Self::Completed, + "killed" => Self::Killed, _ => Self::Unknown, }) } @@ -157,6 +159,7 @@ impl Display for InvocationState { InvocationState::Running => write!(f, "running"), InvocationState::Suspended => write!(f, "suspended"), InvocationState::BackingOff => write!(f, "backing-off"), + InvocationState::Killed => write!(f, "killed"), InvocationState::Completed => write!(f, "completed"), } } diff --git a/cli/src/commands/invocations/cancel.rs b/cli/src/commands/invocations/cancel.rs index f4683864a..3bd3c1eda 100644 --- a/cli/src/commands/invocations/cancel.rs +++ b/cli/src/commands/invocations/cancel.rs @@ -37,6 +37,9 @@ pub struct Cancel { /// Ungracefully kill the invocation and its children #[clap(long)] kill: bool, + /// After cancelling/killing, restart the invocation using the same input. + #[clap(long, alias = "retry")] + restart: bool, } pub async fn run_cancel(State(env): State, opts: &Cancel) -> Result<()> { @@ -67,16 +70,19 @@ pub async fn run_cancel(State(env): State, opts: &Cancel) -> Result<()> // Get the invocation and confirm let prompt = format!( "Are you sure you want to {} these invocations?", - if opts.kill { - Styled(Style::Danger, "kill") - } else { - Styled(Style::Warn, "cancel") - }, + match (opts.kill, opts.restart) { + (false, false) => Styled(Style::Warn, "cancel"), + (false, true) => Styled(Style::Warn, "cancel and restart"), + (true, false) => Styled(Style::Danger, "kill"), + (true, true) => Styled(Style::Danger, "kill and restart"), + } ); confirm_or_exit(&prompt)?; for inv in invocations { - let result = client.cancel_invocation(&inv.id, opts.kill).await?; + let result = client + .cancel_invocation(&inv.id, opts.kill, opts.restart) + .await?; let _ = result.success_or_error()?; } diff --git a/cli/src/ui/invocations.rs b/cli/src/ui/invocations.rs index d1af069b5..0ba03447e 100644 --- a/cli/src/ui/invocations.rs +++ b/cli/src/ui/invocations.rs @@ -113,6 +113,7 @@ pub fn invocation_status_style(status: InvocationState) -> Style { InvocationState::Suspended => DStyle::new().dim(), InvocationState::BackingOff => DStyle::new().red(), InvocationState::Completed => DStyle::new().blue(), + InvocationState::Killed => DStyle::new().red(), } } diff --git a/crates/admin/src/rest_api/invocations.rs b/crates/admin/src/rest_api/invocations.rs index c5e793403..5761ab8fc 100644 --- a/crates/admin/src/rest_api/invocations.rs +++ b/crates/admin/src/rest_api/invocations.rs @@ -31,6 +31,10 @@ pub enum DeletionMode { Kill, #[serde(alias = "purge")] Purge, + #[serde(alias = "kill-and-restart")] + KillAndRestart, + #[serde(alias = "cancel-and-restart")] + CancelAndRestart, } #[derive(Debug, Default, Deserialize, JsonSchema)] pub struct DeleteInvocationParams { @@ -90,6 +94,12 @@ pub async fn delete_invocation( Command::TerminateInvocation(InvocationTermination::kill(invocation_id)) } DeletionMode::Purge => Command::PurgeInvocation(PurgeInvocationRequest { invocation_id }), + DeletionMode::CancelAndRestart => { + Command::TerminateInvocation(InvocationTermination::cancel_and_restart(invocation_id)) + } + DeletionMode::KillAndRestart => { + Command::TerminateInvocation(InvocationTermination::kill_and_restart(invocation_id)) + } }; let partition_key = invocation_id.partition_key(); diff --git a/crates/invoker-api/src/handle.rs b/crates/invoker-api/src/handle.rs index 747efda88..385053cbf 100644 --- a/crates/invoker-api/src/handle.rs +++ b/crates/invoker-api/src/handle.rs @@ -60,6 +60,8 @@ pub trait InvokerHandle { &mut self, partition_leader_epoch: PartitionLeaderEpoch, invocation_id: InvocationId, + // If true, acknowledge the abort. This will generate a Failed effect + acknowledge: bool, ) -> impl Future> + Send; fn register_partition( diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index 63845f5f7..d0d554dc1 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -128,6 +128,7 @@ pub mod test_util { &mut self, _partition_leader_epoch: PartitionLeaderEpoch, _invocation_id: InvocationId, + _acknowledge: bool, ) -> Result<(), NotRunningError> { Ok(()) } diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index 2eabaef42..d717f2c50 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -45,6 +45,7 @@ pub(crate) enum InputCommand { Abort { partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, }, /// Command used to clean up internal state when a partition leader is going away @@ -129,11 +130,13 @@ impl restate_invoker_api::InvokerHandle for InvokerHandle { &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, ) -> Result<(), NotRunningError> { self.input .send(InputCommand::Abort { partition, invocation_id, + acknowledge, }) .map_err(|_| NotRunningError) } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 933ed28ea..6f773c92d 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -61,6 +61,7 @@ pub use input_command::ChannelStatusReader; pub use input_command::InvokerHandle; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_types::deployment::PinnedDeployment; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::invocation::InvocationTarget; use restate_types::schema::service::ServiceMetadataResolver; @@ -351,8 +352,8 @@ where self.handle_register_partition(partition, partition_key_range, storage_reader, sender); }, - InputCommand::Abort { partition, invocation_id } => { - self.handle_abort_invocation(partition, invocation_id); + InputCommand::Abort { partition, invocation_id, acknowledge } => { + self.handle_abort_invocation(partition, invocation_id, acknowledge).await; } InputCommand::AbortAllPartition { partition } => { self.handle_abort_partition(partition); @@ -808,12 +809,13 @@ where restate.invoker.partition_leader_epoch = ?partition, ) )] - fn handle_abort_invocation( + async fn handle_abort_invocation( &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, ) { - if let Some((_, _, mut ism)) = self + if let Some((tx, _, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -823,6 +825,14 @@ where ism.abort(); self.quota.unreserve_slot(); self.status_store.on_end(&partition, &invocation_id); + if acknowledge { + let _ = tx + .send(Effect { + invocation_id, + kind: EffectKind::Failed(KILLED_INVOCATION_ERROR), + }) + .await; + } } else { trace!("Ignoring Abort command because there is no matching partition/invocation"); } @@ -1415,7 +1425,9 @@ mod tests { assert_eq!(*available_slots, 1); // Abort the invocation - service_inner.handle_abort_invocation(MOCK_PARTITION, invocation_id); + service_inner + .handle_abort_invocation(MOCK_PARTITION, invocation_id, false) + .await; // Check the quota let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota); diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index 992c4fe14..ae6cdd19a 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -17,11 +17,11 @@ use futures::Stream; use futures_util::stream; use restate_rocksdb::RocksDbPerfGuard; use restate_storage_api::invocation_status_table::{ - InvocationStatus, InvocationStatusTable, InvocationStatusV1, ReadOnlyInvocationStatusTable, + InvocationStatus, InvocationStatusTable, InvocationStatusV1, + InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, }; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; -use restate_types::invocation::InvocationTarget; use restate_types::storage::StorageCodec; use std::ops::RangeInclusive; use tracing::trace; @@ -169,7 +169,7 @@ fn delete_invocation_status(storage: &mut S, invocation_id: &I fn invoked_invocations( storage: &mut S, partition_key_range: RangeInclusive, -) -> Vec> { +) -> Vec> { let _x = RocksDbPerfGuard::new("invoked-invocations"); let mut invocations = storage.for_each_key_value_in_place( FullScanPartitionKeyRange::(partition_key_range.clone()), @@ -239,12 +239,16 @@ fn all_invocation_status( fn read_invoked_v1_full_invocation_id( mut k: &mut &[u8], v: &mut &[u8], -) -> Result> { +) -> Result> { let invocation_id = invocation_id_from_v1_key_bytes(&mut k)?; let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; if let InvocationStatus::Invoked(invocation_meta) = invocation_status.0 { - Ok(Some((invocation_id, invocation_meta.invocation_target))) + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: true, + })) } else { Ok(None) } @@ -253,13 +257,23 @@ fn read_invoked_v1_full_invocation_id( fn read_invoked_full_invocation_id( mut k: &mut &[u8], v: &mut &[u8], -) -> Result> { +) -> Result> { // TODO this can be improved by simply parsing InvocationTarget and the Status enum let invocation_id = invocation_id_from_key_bytes(&mut k)?; let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; if let InvocationStatus::Invoked(invocation_meta) = invocation_status { - Ok(Some((invocation_id, invocation_meta.invocation_target))) + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: true, + })) + } else if let InvocationStatus::Killed(invocation_meta) = invocation_status { + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: false, + })) } else { Ok(None) } @@ -274,9 +288,9 @@ impl ReadOnlyInvocationStatusTable for PartitionStore { get_invocation_status(self, invocation_id) } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { stream::iter(invoked_invocations( self, self.partition_key_range().clone(), @@ -300,9 +314,9 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> { try_migrate_and_get_invocation_status(self, invocation_id) } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { stream::iter(invoked_invocations( self, self.partition_key_range().clone(), diff --git a/crates/partition-store/src/journal_table/mod.rs b/crates/partition-store/src/journal_table/mod.rs index ccf3ccb36..d7628c8b6 100644 --- a/crates/partition-store/src/journal_table/mod.rs +++ b/crates/partition-store/src/journal_table/mod.rs @@ -25,7 +25,7 @@ use restate_types::identifiers::{ }; use restate_types::storage::StorageCodec; use std::io::Cursor; -use std::ops::RangeInclusive; +use std::ops::{Range, RangeInclusive}; define_table_key!( Journal, @@ -121,14 +121,14 @@ fn all_journals( })) } -fn delete_journal( +fn delete_journal_range( storage: &mut S, invocation_id: &InvocationId, - journal_length: EntryIndex, + journal_range: Range, ) { let mut key = write_journal_entry_key(invocation_id, 0); let k = &mut key; - for journal_index in 0..journal_length { + for journal_index in journal_range { k.journal_index = Some(journal_index); storage.delete_key(k); } @@ -201,10 +201,14 @@ impl<'a> JournalTable for PartitionStoreTransaction<'a> { put_journal_entry(self, invocation_id, journal_index, journal_entry) } - async fn delete_journal(&mut self, invocation_id: &InvocationId, journal_length: EntryIndex) { + async fn delete_journal_range( + &mut self, + invocation_id: &InvocationId, + journal_range: Range, + ) { self.assert_partition_key(invocation_id); - let _x = RocksDbPerfGuard::new("delete-journal"); - delete_journal(self, invocation_id, journal_length) + let _x = RocksDbPerfGuard::new("delete-journal-range"); + delete_journal_range(self, invocation_id, journal_range) } } diff --git a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs index ed662ae9d..7855822fc 100644 --- a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs @@ -22,7 +22,8 @@ use googletest::prelude::*; use once_cell::sync::Lazy; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, InvocationStatusV1, - JournalMetadata, ReadOnlyInvocationStatusTable, StatusTimestamps, + InvokedOrKilledInvocationStatusLite, JournalMetadata, ReadOnlyInvocationStatusTable, + StatusTimestamps, }; use restate_storage_api::Transaction; use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey}; @@ -91,6 +92,21 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus { source: Source::Ingress(*RPC_REQUEST_ID), completion_retention_duration: Duration::ZERO, idempotency_key: None, + restart_when_completed: false, + }) +} + +fn killed_status(invocation_target: InvocationTarget) -> InvocationStatus { + InvocationStatus::Killed(InFlightInvocationMetadata { + invocation_target, + journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()), + pinned_deployment: None, + response_sinks: HashSet::new(), + timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)), + source: Source::Ingress(*RPC_REQUEST_ID), + completion_retention_duration: Duration::ZERO, + idempotency_key: None, + restart_when_completed: false, }) } @@ -105,6 +121,7 @@ fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus { source: Source::Ingress(*RPC_REQUEST_ID), completion_retention_duration: Duration::ZERO, idempotency_key: None, + restart_when_completed: false, }, waiting_for_completed_entries: HashSet::default(), } @@ -131,7 +148,7 @@ async fn populate_data(txn: &mut T) { txn.put_invocation_status( &INVOCATION_ID_4, - &invoked_status(INVOCATION_TARGET_4.clone()), + &killed_status(INVOCATION_TARGET_4.clone()), ) .await; @@ -154,22 +171,34 @@ async fn verify_point_lookups(txn: &mut T) { txn.get_invocation_status(&INVOCATION_ID_4) .await .expect("should not fail"), - invoked_status(INVOCATION_TARGET_4.clone()) + killed_status(INVOCATION_TARGET_4.clone()) ); } async fn verify_all_svc_with_status_invoked(txn: &mut T) { let actual = txn - .all_invoked_invocations() + .all_invoked_or_killed_invocations() .try_collect::>() .await .unwrap(); assert_that!( actual, unordered_elements_are![ - eq((*INVOCATION_ID_1, INVOCATION_TARGET_1.clone())), - eq((*INVOCATION_ID_2, INVOCATION_TARGET_2.clone())), - eq((*INVOCATION_ID_4, INVOCATION_TARGET_4.clone())) + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_1, + invocation_target: INVOCATION_TARGET_1.clone(), + is_invoked: true, + }), + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_2, + invocation_target: INVOCATION_TARGET_2.clone(), + is_invoked: true, + }), + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_4, + invocation_target: INVOCATION_TARGET_4.clone(), + is_invoked: false, + }), ] ); } diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 7d793769f..babc960f1 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -119,6 +119,7 @@ message InvocationStatusV2 { INBOXED = 2; INVOKED = 3; SUSPENDED = 4; + KILLED = 6; COMPLETED = 5; } @@ -152,6 +153,7 @@ message InvocationStatusV2 { uint32 journal_length = 14; optional string deployment_id = 15; optional dev.restate.service.protocol.ServiceProtocolVersion service_protocol_version = 16; + bool restart_when_completed = 23; // Suspended repeated uint32 waiting_for_completed_entries = 17; @@ -518,14 +520,29 @@ message OutboxMessage { ResponseResult response_result = 3; } + // TODO remove this in Restate 1.3 message OutboxKill { InvocationId invocation_id = 1; } + // TODO remove this in Restate 1.3 message OutboxCancel { InvocationId invocation_id = 1; } + message OutboxTermination { + enum TerminationFlavor { + UNKNOWN = 0; + KILL = 1; + KILL_AND_RESTART = 2; + CANCEL = 3; + CANCEL_AND_RESTART = 4; + } + + InvocationId invocation_id = 1; + TerminationFlavor flavor = 2; + } + message AttachInvocationRequest { oneof query { InvocationId invocation_id = 1; @@ -542,6 +559,7 @@ message OutboxMessage { OutboxKill kill = 4; OutboxCancel cancel = 5; AttachInvocationRequest attach_invocation_request = 6; + OutboxTermination termination = 7; } } diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index c052e900c..a68e2595b 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -189,6 +189,7 @@ pub enum InvocationStatus { metadata: InFlightInvocationMetadata, waiting_for_completed_entries: HashSet, }, + Killed(InFlightInvocationMetadata), Completed(CompletedInvocation), /// Service instance is currently not invoked #[default] @@ -203,6 +204,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.invocation_target), InvocationStatus::Invoked(metadata) => Some(&metadata.invocation_target), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.invocation_target), + InvocationStatus::Killed(metadata) => Some(&metadata.invocation_target), InvocationStatus::Completed(completed) => Some(&completed.invocation_target), _ => None, } @@ -215,6 +217,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.source), InvocationStatus::Invoked(metadata) => Some(&metadata.source), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.source), + InvocationStatus::Killed(metadata) => Some(&metadata.source), InvocationStatus::Completed(completed) => Some(&completed.source), _ => None, } @@ -227,6 +230,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => metadata.metadata.idempotency_key.as_ref(), InvocationStatus::Invoked(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Suspended { metadata, .. } => metadata.idempotency_key.as_ref(), + InvocationStatus::Killed(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Completed(completed) => completed.idempotency_key.as_ref(), _ => None, } @@ -237,6 +241,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(metadata.journal_metadata), _ => None, } } @@ -246,6 +251,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(&metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(&metadata.journal_metadata), _ => None, } } @@ -255,6 +261,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(&mut metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(&mut metadata.journal_metadata), _ => None, } } @@ -264,6 +271,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -273,6 +281,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -282,6 +291,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -295,6 +305,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&mut metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.response_sinks), + InvocationStatus::Killed(metadata) => Some(&mut metadata.response_sinks), _ => None, } } @@ -306,6 +317,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.response_sinks), + InvocationStatus::Killed(metadata) => Some(&metadata.response_sinks), _ => None, } } @@ -317,6 +329,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.timestamps), + InvocationStatus::Killed(metadata) => Some(&metadata.timestamps), InvocationStatus::Completed(completed) => Some(&completed.timestamps), _ => None, } @@ -329,6 +342,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&mut metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.timestamps), + InvocationStatus::Killed(metadata) => Some(&mut metadata.timestamps), InvocationStatus::Completed(completed) => Some(&mut completed.timestamps), _ => None, } @@ -459,6 +473,9 @@ pub struct InFlightInvocationMetadata { /// If zero, the invocation completion will not be retained. pub completion_retention_duration: Duration, pub idempotency_key: Option, + + /// When the invocation completes, restart it. + pub restart_when_completed: bool, } impl InFlightInvocationMetadata { @@ -482,6 +499,7 @@ impl InFlightInvocationMetadata { completion_retention_duration: pre_flight_invocation_metadata .completion_retention_duration, idempotency_key: pre_flight_invocation_metadata.idempotency_key, + restart_when_completed: false, }, InvocationInput { argument: pre_flight_invocation_metadata.argument, @@ -550,15 +568,23 @@ impl CompletedInvocation { } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct InvokedOrKilledInvocationStatusLite { + pub invocation_id: InvocationId, + pub invocation_target: InvocationTarget, + /// If true, original status is Invoked, otherwise is Killed + pub is_invoked: bool, +} + pub trait ReadOnlyInvocationStatusTable { fn get_invocation_status( &mut self, invocation_id: &InvocationId, ) -> impl Future> + Send; - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send; + ) -> impl Stream> + Send; fn all_invocation_statuses( &self, @@ -602,6 +628,7 @@ mod test_util { source: Source::Ingress(PartitionProcessorRpcRequestId::default()), completion_retention_duration: Duration::ZERO, idempotency_key: None, + restart_when_completed: false, } } } diff --git a/crates/storage-api/src/journal_table/mod.rs b/crates/storage-api/src/journal_table/mod.rs index 47763c14a..dd6482c6d 100644 --- a/crates/storage-api/src/journal_table/mod.rs +++ b/crates/storage-api/src/journal_table/mod.rs @@ -14,7 +14,7 @@ use restate_types::identifiers::{EntryIndex, InvocationId, JournalEntryId, Parti use restate_types::journal::enriched::EnrichedRawEntry; use restate_types::journal::{CompletionResult, EntryType}; use std::future::Future; -use std::ops::RangeInclusive; +use std::ops::{Range, RangeInclusive}; /// Different types of journal entries persisted by the runtime #[derive(Debug, Clone, PartialEq, Eq)] @@ -78,5 +78,13 @@ pub trait JournalTable: ReadOnlyJournalTable { &mut self, invocation_id: &InvocationId, journal_length: EntryIndex, + ) -> impl Future + Send { + self.delete_journal_range(invocation_id, 0..journal_length) + } + + fn delete_journal_range( + &mut self, + invocation_id: &InvocationId, + journal_range: Range, ) -> impl Future + Send; } diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index e255824b1..fd74897e1 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -91,7 +91,8 @@ pub mod v1 { use crate::storage::v1::journal_entry::completion_result::{Empty, Failure, Success}; use crate::storage::v1::journal_entry::{completion_result, CompletionResult, Entry, Kind}; use crate::storage::v1::outbox_message::{ - OutboxCancel, OutboxKill, OutboxServiceInvocation, OutboxServiceInvocationResponse, + outbox_termination, OutboxCancel, OutboxKill, OutboxServiceInvocation, + OutboxServiceInvocationResponse, OutboxTermination, }; use crate::storage::v1::service_invocation_response_sink::{ Ingress, PartitionProcessor, ResponseSink, @@ -349,6 +350,7 @@ pub mod v1 { journal_length, deployment_id, service_protocol_version, + restart_when_completed, waiting_for_completed_entries, result, } = value; @@ -443,6 +445,7 @@ pub mod v1 { .unwrap_or_default() .try_into()?, idempotency_key: idempotency_key.map(ByteString::from), + restart_when_completed, }, )) } @@ -465,12 +468,36 @@ pub mod v1 { .unwrap_or_default() .try_into()?, idempotency_key: idempotency_key.map(ByteString::from), + restart_when_completed, }, waiting_for_completed_entries: waiting_for_completed_entries .into_iter() .collect(), }, ), + invocation_status_v2::Status::Killed => { + Ok(crate::invocation_status_table::InvocationStatus::Killed( + crate::invocation_status_table::InFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + journal_metadata: crate::invocation_status_table::JournalMetadata { + length: journal_length, + span_context: expect_or_fail!(span_context)?.try_into()?, + }, + pinned_deployment: derive_pinned_deployment( + deployment_id, + service_protocol_version, + )?, + source, + completion_retention_duration: completion_retention_duration + .unwrap_or_default() + .try_into()?, + idempotency_key: idempotency_key.map(ByteString::from), + restart_when_completed, + }, + )) + } invocation_status_v2::Status::Completed => { Ok(crate::invocation_status_table::InvocationStatus::Completed( crate::invocation_status_table::CompletedInvocation { @@ -545,6 +572,7 @@ pub mod v1 { journal_length: 0, deployment_id: None, service_protocol_version: None, + restart_when_completed: false, waiting_for_completed_entries: vec![], result: None, }, @@ -597,6 +625,7 @@ pub mod v1 { journal_length: 0, deployment_id: None, service_protocol_version: None, + restart_when_completed: false, waiting_for_completed_entries: vec![], result: None, }, @@ -610,6 +639,7 @@ pub mod v1 { source, completion_retention_duration, idempotency_key, + restart_when_completed, }, ) => { let (deployment_id, service_protocol_version) = match pinned_deployment { @@ -658,6 +688,7 @@ pub mod v1 { journal_length: journal_metadata.length, deployment_id, service_protocol_version, + restart_when_completed, waiting_for_completed_entries: vec![], result: None, } @@ -673,6 +704,7 @@ pub mod v1 { source, completion_retention_duration, idempotency_key, + restart_when_completed, }, waiting_for_completed_entries, } => { @@ -722,12 +754,77 @@ pub mod v1 { journal_length: journal_metadata.length, deployment_id, service_protocol_version, + restart_when_completed, waiting_for_completed_entries: waiting_for_completed_entries .into_iter() .collect(), result: None, } } + crate::invocation_status_table::InvocationStatus::Killed( + crate::invocation_status_table::InFlightInvocationMetadata { + invocation_target, + journal_metadata, + pinned_deployment, + response_sinks, + timestamps, + source, + completion_retention_duration, + idempotency_key, + restart_when_completed, + }, + ) => { + let (deployment_id, service_protocol_version) = match pinned_deployment { + None => (None, None), + Some(pinned_deployment) => ( + Some(pinned_deployment.deployment_id.to_string()), + Some(pinned_deployment.service_protocol_version.as_repr()), + ), + }; + + InvocationStatusV2 { + status: invocation_status_v2::Status::Killed.into(), + invocation_target: Some(invocation_target.into()), + source: Some(source.into()), + span_context: Some(journal_metadata.span_context.into()), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { + timestamps.inboxed_transition_time() + } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { + timestamps.running_transition_time() + } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), + response_sinks: response_sinks + .into_iter() + .map(|s| ServiceInvocationResponseSink::from(Some(s))) + .collect(), + argument: None, + headers: vec![], + execution_time: None, + completion_retention_duration: Some( + completion_retention_duration.into(), + ), + idempotency_key: idempotency_key.map(|key| key.to_string()), + inbox_sequence_number: None, + journal_length: journal_metadata.length, + deployment_id, + service_protocol_version, + restart_when_completed, + waiting_for_completed_entries: vec![], + result: None, + } + } crate::invocation_status_table::InvocationStatus::Completed( crate::invocation_status_table::CompletedInvocation { invocation_target, @@ -767,6 +864,7 @@ pub mod v1 { journal_length: 0, deployment_id: None, service_protocol_version: None, + restart_when_completed: false, waiting_for_completed_entries: vec![], result: Some(response_result.into()), }, @@ -856,6 +954,9 @@ pub mod v1 { crate::invocation_status_table::InvocationStatus::Scheduled(_) => { panic!("Unexpected conversion to old InvocationStatus when using Scheduled variant. This is a bug in the table implementation.") } + crate::invocation_status_table::InvocationStatus::Killed(_) => { + panic!("Unexpected conversion to old InvocationStatus when using Killed variant. This is a bug in the table implementation.") + } }; InvocationStatus { @@ -951,6 +1052,7 @@ pub mod v1 { source, completion_retention_duration: completion_retention_time, idempotency_key, + restart_when_completed: false, }) } } @@ -966,6 +1068,7 @@ pub mod v1 { source, completion_retention_duration: completion_retention_time, idempotency_key, + .. } = value; let (deployment_id, service_protocol_version) = match pinned_deployment { @@ -1061,6 +1164,7 @@ pub mod v1 { source: caller, completion_retention_duration: completion_retention_time, idempotency_key, + restart_when_completed: false, }, waiting_for_completed_entries, )) @@ -2553,6 +2657,41 @@ pub mod v1 { ), ) } + outbox_message::OutboxMessage::Termination(outbox_termination) => { + crate::outbox_table::OutboxMessage::InvocationTermination( + InvocationTermination { + invocation_id: restate_types::identifiers::InvocationId::try_from( + outbox_termination + .invocation_id + .ok_or(ConversionError::missing_field("invocation_id"))?, + )?, + flavor: match outbox_termination::TerminationFlavor::try_from( + outbox_termination.flavor, + ) + .map_err(|e| ConversionError::invalid_data(e))? + { + outbox_termination::TerminationFlavor::Unknown => { + return Err(ConversionError::UnexpectedEnumVariant( + "termination flavor", + outbox_termination.flavor, + )) + } + outbox_termination::TerminationFlavor::Kill => { + TerminationFlavor::Kill + } + outbox_termination::TerminationFlavor::KillAndRestart => { + TerminationFlavor::KillAndRestart + } + outbox_termination::TerminationFlavor::Cancel => { + TerminationFlavor::Cancel + } + outbox_termination::TerminationFlavor::CancelAndRestart => { + TerminationFlavor::CancelAndRestart + } + }, + }, + ) + } outbox_message::OutboxMessage::AttachInvocationRequest( outbox_message::AttachInvocationRequest { block_on_inflight, @@ -2637,6 +2776,24 @@ pub mod v1 { )), }) } + TerminationFlavor::KillAndRestart => { + outbox_message::OutboxMessage::Termination(OutboxTermination { + invocation_id: Some(InvocationId::from( + invocation_termination.invocation_id, + )), + flavor: outbox_termination::TerminationFlavor::KillAndRestart + .into(), + }) + } + TerminationFlavor::CancelAndRestart => { + outbox_message::OutboxMessage::Termination(OutboxTermination { + invocation_id: Some(InvocationId::from( + invocation_termination.invocation_id, + )), + flavor: outbox_termination::TerminationFlavor::CancelAndRestart + .into(), + }) + } }, crate::outbox_table::OutboxMessage::AttachInvocation( restate_types::invocation::AttachInvocationRequest { diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 67a7484b9..bb8e584c4 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -49,7 +49,7 @@ impl TimerKey { } } - fn neo_invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + pub fn neo_invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { TimerKey { timestamp, kind: TimerKeyKind::NeoInvoke { invocation_uuid }, diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index b61b2c2ff..2d010deea 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -83,6 +83,10 @@ pub(crate) fn append_invocation_status_row( row.status("suspended"); fill_in_flight_invocation_metadata(&mut row, output, metadata); } + InvocationStatus::Killed(metadata) => { + row.status("killed"); + fill_in_flight_invocation_metadata(&mut row, output, metadata); + } InvocationStatus::Free => { row.status("free"); } diff --git a/crates/storage-query-datafusion/src/invocation_status/schema.rs b/crates/storage-query-datafusion/src/invocation_status/schema.rs index 3391728dc..76780c2f5 100644 --- a/crates/storage-query-datafusion/src/invocation_status/schema.rs +++ b/crates/storage-query-datafusion/src/invocation_status/schema.rs @@ -21,7 +21,7 @@ define_table!(sys_invocation_status( /// [Invocation ID](/operate/invocation#invocation-identifier). id: DataType::LargeUtf8, - /// Either `inboxed` or `scheduled` or `invoked` or `suspended` or `completed` + /// Either `inboxed` or `scheduled` or `invoked` or `suspended` or `killed` or `completed` status: DataType::LargeUtf8, /// If `status = 'completed'`, this contains either `success` or `failure` diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 187f1818b..7e0ecada1 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -51,6 +51,9 @@ pub struct WorkerOptions { #[cfg_attr(feature = "schemars", schemars(skip))] experimental_feature_disable_idempotency_table: bool, + #[cfg_attr(feature = "schemars", schemars(skip))] + experimental_feature_invocation_status_killed: bool, + pub storage: StorageOptions, pub invoker: InvokerOptions, @@ -88,6 +91,10 @@ impl WorkerOptions { pub fn experimental_feature_disable_idempotency_table(&self) -> bool { self.experimental_feature_disable_idempotency_table } + + pub fn experimental_feature_invocation_status_killed(&self) -> bool { + self.experimental_feature_invocation_status_killed + } } impl Default for WorkerOptions { @@ -97,6 +104,7 @@ impl Default for WorkerOptions { num_timers_in_memory_limit: None, cleanup_interval: Duration::from_secs(60 * 60).into(), experimental_feature_disable_idempotency_table: false, + experimental_feature_invocation_status_killed: false, storage: StorageOptions::default(), invoker: Default::default(), max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"), diff --git a/crates/types/src/invocation.rs b/crates/types/src/invocation.rs index d90c6f48d..6f2144f2a 100644 --- a/crates/types/src/invocation.rs +++ b/crates/types/src/invocation.rs @@ -773,15 +773,35 @@ impl InvocationTermination { flavor: TerminationFlavor::Cancel, } } + + pub const fn kill_and_restart(invocation_id: InvocationId) -> Self { + Self { + invocation_id, + flavor: TerminationFlavor::KillAndRestart, + } + } + + pub const fn cancel_and_restart(invocation_id: InvocationId) -> Self { + Self { + invocation_id, + flavor: TerminationFlavor::CancelAndRestart, + } + } } -/// Flavor of the termination. Can be kill (hard stop) or graceful cancel. +/// Flavor of the termination. #[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize)] pub enum TerminationFlavor { /// hard termination, no clean up Kill, + /// hard termination with restart afterward using same input + #[serde(alias = "kill-and-restart")] + KillAndRestart, /// graceful termination allowing the invocation to clean up Cancel, + /// graceful termination allowing the invocation to clean up with restart afterward using same input + #[serde(alias = "cancel-and-restart")] + CancelAndRestart, } /// Message to purge an invocation. diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 214383ac3..1161043f8 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -51,6 +51,7 @@ codederror = { workspace = true } derive_builder = { workspace = true } derive_more = { workspace = true } futures = { workspace = true } +enumset = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } metrics = { workspace = true } diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index bc2c8c493..b2169eb27 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -173,9 +173,9 @@ mod tests { use restate_core::{TaskKind, TestCoreEnvBuilder}; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, + InvokedOrKilledInvocationStatusLite, }; use restate_types::identifiers::{InvocationId, InvocationUuid}; - use restate_types::invocation::InvocationTarget; use restate_types::partition_table::{FindPartition, PartitionTable}; use restate_types::Version; use std::future::Future; @@ -194,9 +194,9 @@ mod tests { std::future::pending() } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send + ) -> impl Stream> + Send { todo!(); #[allow(unreachable_code)] diff --git a/crates/worker/src/partition/leadership.rs b/crates/worker/src/partition/leadership.rs index d8be3056b..d8863c139 100644 --- a/crates/worker/src/partition/leadership.rs +++ b/crates/worker/src/partition/leadership.rs @@ -8,6 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use futures::future::OptionFuture; +use futures::stream::FuturesUnordered; +use futures::{stream, FutureExt, StreamExt, TryStreamExt}; +use metrics::counter; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; @@ -19,11 +23,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; use std::time::{Duration, SystemTime}; - -use futures::future::OptionFuture; -use futures::stream::FuturesUnordered; -use futures::{stream, FutureExt, StreamExt, TryStreamExt}; -use metrics::counter; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info, instrument, trace, warn}; @@ -37,10 +36,13 @@ use restate_errors::NotRunningError; use restate_invoker_api::InvokeInputJournal; use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; -use restate_storage_api::invocation_status_table::ReadOnlyInvocationStatusTable; +use restate_storage_api::invocation_status_table::{ + InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, +}; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::timer_table::{TimerKey, TimerTable}; use restate_timer::TokioClock; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::{ InvocationId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; @@ -63,6 +65,7 @@ use crate::partition::cleaner::Cleaner; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::shuffle::{HintSender, OutboxReaderError, Shuffle, ShuffleMetadata}; use crate::partition::state_machine::Action; +use crate::partition::types::{InvokerEffect, InvokerEffectKind}; use crate::partition::{respond_to_rpc, shuffle}; const BATCH_READY_UP_TO: usize = 10; @@ -94,6 +97,9 @@ pub(crate) enum ActionEffect { AwaitingRpcSelfProposeDone, } +type InvokerStream = + stream::Chain>, ReceiverStream>; + pub(crate) struct LeaderState { leader_epoch: LeaderEpoch, shuffle_hint_tx: HintSender, @@ -107,7 +113,7 @@ pub(crate) struct LeaderState { >, awaiting_rpc_self_propose: FuturesUnordered, - invoker_stream: ReceiverStream, + invoker_stream: InvokerStream, shuffle_stream: ReceiverStream, pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>, cleaner_task_id: TaskId, @@ -362,7 +368,7 @@ where async fn become_leader(&mut self, partition_store: &mut PartitionStore) -> Result<(), Error> { if let State::Candidate { leader_epoch, .. } = self.state { - let invoker_rx = Self::resume_invoked_invocations( + let invoker_stream = Self::resume_invoked_invocations( &mut self.invoker_tx, (self.partition_processor_metadata.partition_id, leader_epoch), self.partition_processor_metadata @@ -437,7 +443,7 @@ where self_proposer, awaiting_rpc_actions: Default::default(), awaiting_rpc_self_propose: Default::default(), - invoker_stream: ReceiverStream::new(invoker_rx), + invoker_stream, shuffle_stream: ReceiverStream::new(shuffle_rx), pending_cleanup_timers_to_schedule: Default::default(), }); @@ -454,9 +460,11 @@ where partition_key_range: RangeInclusive, partition_store: &mut PartitionStore, channel_size: usize, - ) -> Result, Error> { + ) -> Result { let (invoker_tx, invoker_rx) = mpsc::channel(channel_size); + let mut killed_invocations_effects = vec![]; + invoker_handle .register_partition( partition_leader_epoch, @@ -468,27 +476,43 @@ where .map_err(Error::Invoker)?; { - let invoked_invocations = partition_store.all_invoked_invocations(); + let invoked_invocations = partition_store.all_invoked_or_killed_invocations(); tokio::pin!(invoked_invocations); let mut count = 0; - while let Some(invocation_id_and_target) = invoked_invocations.next().await { - let (invocation_id, invocation_target) = invocation_id_and_target?; - invoker_handle - .invoke( - partition_leader_epoch, + while let Some(invoked_invocation) = invoked_invocations.next().await { + let InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target, + is_invoked, + } = invoked_invocation?; + if is_invoked { + invoker_handle + .invoke( + partition_leader_epoch, + invocation_id, + invocation_target, + InvokeInputJournal::NoCachedJournal, + ) + .await + .map_err(Error::Invoker)?; + } else { + // For killed invocations, there's no need to go through the invoker + // We simply return here the effect as if the invoker produced that. + killed_invocations_effects.push(InvokerEffect { invocation_id, - invocation_target, - InvokeInputJournal::NoCachedJournal, - ) - .await - .map_err(Error::Invoker)?; + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + }); + } count += 1; } debug!("Leader partition resumed {} invocations", count); } - Ok(invoker_rx) + Ok( + futures::stream::iter(killed_invocations_effects) + .chain(ReceiverStream::new(invoker_rx)), + ) } async fn become_follower(&mut self) { @@ -639,8 +663,11 @@ where .notify_completion(partition_leader_epoch, invocation_id, completion) .await .map_err(Error::Invoker)?, - Action::AbortInvocation(invocation_id) => invoker_tx - .abort_invocation(partition_leader_epoch, invocation_id) + Action::AbortInvocation { + invocation_id, + acknowledge, + } => invoker_tx + .abort_invocation(partition_leader_epoch, invocation_id, acknowledge) .await .map_err(Error::Invoker)?, Action::IngressResponse { diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 9e7e00099..dbcbfb684 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -15,6 +15,7 @@ use std::time::{Duration, Instant}; use anyhow::Context; use assert2::let_assert; +use enumset::EnumSet; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; use metrics::{counter, histogram}; use tokio::sync::{mpsc, watch}; @@ -41,6 +42,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::{StorageError, Transaction}; use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::config::WorkerOptions; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::{ LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; @@ -68,7 +70,7 @@ use crate::metric_definitions::{ }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; -use crate::partition::state_machine::{ActionCollector, StateMachine}; +use crate::partition::state_machine::{ActionCollector, ExperimentalFeature, StateMachine}; mod cleaner; pub mod invoker_storage_reader; @@ -91,6 +93,7 @@ pub(super) struct PartitionProcessorBuilder { num_timers_in_memory_limit: Option, disable_idempotency_table: bool, + invocation_status_killed: bool, cleanup_interval: Duration, channel_size: usize, max_command_batch_size: usize, @@ -126,6 +129,7 @@ where status, num_timers_in_memory_limit: options.num_timers_in_memory_limit(), disable_idempotency_table: options.experimental_feature_disable_idempotency_table(), + invocation_status_killed: options.experimental_feature_invocation_status_killed(), cleanup_interval: options.cleanup_interval(), channel_size: options.internal_queue_length(), max_command_batch_size: options.max_command_batch_size(), @@ -148,6 +152,7 @@ where num_timers_in_memory_limit, cleanup_interval, disable_idempotency_table, + invocation_status_killed, channel_size, max_command_batch_size, invoker_tx, @@ -162,6 +167,7 @@ where &mut partition_store, partition_key_range.clone(), disable_idempotency_table, + invocation_status_killed, ) .await?; @@ -211,6 +217,7 @@ where partition_store: &mut PartitionStore, partition_key_range: RangeInclusive, disable_idempotency_table: bool, + invocation_status_killed: bool, ) -> Result, StorageError> where Codec: RawEntryCodec + Default + Debug, @@ -219,12 +226,22 @@ where let outbox_seq_number = partition_store.get_outbox_seq_number().await?; let outbox_head_seq_number = partition_store.get_outbox_head_seq_number().await?; + let experimental_features = if disable_idempotency_table { + ExperimentalFeature::DisableIdempotencyTable.into() + } else { + EnumSet::empty() + } | if invocation_status_killed { + ExperimentalFeature::InvocationStatusKilled.into() + } else { + EnumSet::empty() + }; + let state_machine = StateMachine::new( inbox_seq_number, outbox_seq_number, outbox_head_seq_number, partition_key_range, - disable_idempotency_table, + experimental_features, ); Ok(state_machine) @@ -693,6 +710,14 @@ where completion_expiry_time, })) } + InvocationStatus::Killed(_) => { + Ok(PartitionProcessorRpcResponse::Output(InvocationOutput { + request_id, + response: IngressResponseResult::Failure(KILLED_INVOCATION_ERROR), + invocation_id: Some(invocation_id), + completion_expiry_time: None, + })) + } _ => Ok(PartitionProcessorRpcResponse::NotReady), } } diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index 723c8b1a2..67dffe1a6 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -47,7 +47,10 @@ pub enum Action { invocation_id: InvocationId, completion: Completion, }, - AbortInvocation(InvocationId), + AbortInvocation { + invocation_id: InvocationId, + acknowledge: bool, + }, IngressResponse { request_id: PartitionProcessorRpcRequestId, invocation_id: Option, diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 9677fae80..fd3a6249b 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -18,6 +18,7 @@ pub use actions::{Action, ActionCollector}; use assert2::let_assert; use bytes::Bytes; use bytestring::ByteString; +use enumset::EnumSet; use futures::{StreamExt, TryStreamExt}; use metrics::{histogram, Histogram}; use restate_invoker_api::InvokeInputJournal; @@ -45,10 +46,9 @@ use restate_storage_api::Result as StorageResult; use restate_tracing_instrumentation as instrumentation; use restate_types::deployment::PinnedDeployment; use restate_types::errors::{ - InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, - ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, - NOT_FOUND_INVOCATION_ERROR, NOT_READY_INVOCATION_ERROR, - WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR, + InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, + CANCELED_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR, + NOT_READY_INVOCATION_ERROR, WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR, }; use restate_types::identifiers::{ EntryIndex, InvocationId, PartitionKey, PartitionProcessorRpcRequestId, ServiceId, @@ -87,9 +87,19 @@ use std::fmt::{Debug, Formatter}; use std::marker::PhantomData; use std::ops::RangeInclusive; use std::time::Instant; -use tracing::error; +use tracing::{error, info}; use utils::SpanExt; +#[derive(Debug, Hash, enumset::EnumSetType, strum::Display)] +pub enum ExperimentalFeature { + /// This is used to disable writing to idempotency table/virtual object status table for idempotent invocations/workflow invocations. + /// From Restate 1.2 invocation ids are generated deterministically, so this additional index is not needed. + DisableIdempotencyTable, + /// If true, kill should wait for end signal from invoker, in order to implement the restart functionality. + /// This is enabled by experimental_feature_kill_and_restart. + InvocationStatusKilled, +} + pub struct StateMachine { // initialized from persistent storage inbox_seq_number: MessageIndex, @@ -100,9 +110,8 @@ pub struct StateMachine { partition_key_range: RangeInclusive, latency: Histogram, - /// This is used to disable writing to idempotency table/virtual object status table for idempotent invocations/workflow invocations. - /// From Restate 1.2 invocation ids are generated deterministically, so this additional index is not needed. - disable_idempotency_table: bool, + /// Enabled experimental features. + experimental_features: EnumSet, _codec: PhantomData, } @@ -160,7 +169,7 @@ impl StateMachine { outbox_seq_number: MessageIndex, outbox_head_seq_number: Option, partition_key_range: RangeInclusive, - disable_idempotency_table: bool, + experimental_features: EnumSet, ) -> Self { let latency = histogram!(crate::metric_definitions::PARTITION_HANDLE_INVOKER_EFFECT_COMMAND); @@ -170,7 +179,7 @@ impl StateMachine { outbox_head_seq_number, partition_key_range, latency, - disable_idempotency_table, + experimental_features, _codec: PhantomData, } } @@ -287,11 +296,11 @@ impl StateMachine { } Command::Timer(timer) => self.on_timer(&mut ctx, timer).await, Command::TerminateInvocation(invocation_termination) => { - self.try_terminate_invocation(&mut ctx, invocation_termination) + self.on_terminate_invocation(&mut ctx, invocation_termination) .await } Command::PurgeInvocation(purge_invocation_request) => { - self.try_purge_invocation(&mut ctx, purge_invocation_request.invocation_id) + self.on_purge_invocation(&mut ctx, purge_invocation_request.invocation_id) .await } Command::PatchState(mutation) => { @@ -482,7 +491,11 @@ impl StateMachine { // Store the invocation id mapping if we have to and continue the processing // TODO get rid of this code when we remove the usage of the virtual object table for workflows - if is_workflow_run && !self.disable_idempotency_table { + if is_workflow_run + && !self + .experimental_features + .contains(ExperimentalFeature::DisableIdempotencyTable) + { ctx.storage .put_virtual_object_status( &service_invocation @@ -494,7 +507,11 @@ impl StateMachine { .await; } // TODO get rid of this code when we remove the idempotency table - if has_idempotency_key && !self.disable_idempotency_table { + if has_idempotency_key + && !self + .experimental_features + .contains(ExperimentalFeature::DisableIdempotencyTable) + { Self::do_store_idempotency_id( ctx, service_invocation @@ -564,6 +581,17 @@ impl StateMachine { } } } + InvocationStatus::Killed(metadata) => { + self.send_response_to_sinks( + ctx, + service_invocation.response_sink.take().into_iter(), + KILLED_INVOCATION_ERROR, + Some(invocation_id), + None, + Some(&metadata.invocation_target), + ) + .await?; + } InvocationStatus::Completed(completed) => { // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. let completion_expiry_time = unsafe { completed.completion_expiry_time() }; @@ -810,7 +838,7 @@ impl StateMachine { Ok(()) } - async fn try_terminate_invocation< + async fn on_terminate_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -824,16 +852,24 @@ impl StateMachine { ctx: &mut StateMachineApplyContext<'_, State>, InvocationTermination { invocation_id, - flavor: termination_flavor, + flavor, }: InvocationTermination, ) -> Result<(), Error> { - match termination_flavor { - TerminationFlavor::Kill => self.try_kill_invocation(ctx, invocation_id).await, - TerminationFlavor::Cancel => self.try_cancel_invocation(ctx, invocation_id).await, + match flavor { + TerminationFlavor::Kill => self.on_kill_invocation(ctx, invocation_id).await, + TerminationFlavor::Cancel => self.on_cancel_invocation(ctx, invocation_id).await, + TerminationFlavor::KillAndRestart => { + self.on_kill_and_restart_invocation(ctx, invocation_id) + .await + } + TerminationFlavor::CancelAndRestart => { + self.on_cancel_and_restart_invocation(ctx, invocation_id) + .await + } } } - async fn try_kill_invocation< + async fn on_kill_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -841,6 +877,7 @@ impl StateMachine { + StateTable + JournalTable + OutboxTable + + TimerTable + FsmTable, >( &mut self, @@ -850,8 +887,13 @@ impl StateMachine { let status = ctx.get_invocation_status(&invocation_id).await?; match status { - InvocationStatus::Invoked(metadata) | InvocationStatus::Suspended { metadata, .. } => { - self.kill_invocation(ctx, invocation_id, metadata).await?; + InvocationStatus::Invoked(metadata) => { + self.kill_invoked_invocation(ctx, invocation_id, metadata, false) + .await?; + } + InvocationStatus::Suspended { metadata, .. } => { + self.kill_suspended_invocation(ctx, invocation_id, metadata, false) + .await?; } InvocationStatus::Inboxed(inboxed) => { self.terminate_inboxed_invocation( @@ -862,7 +904,23 @@ impl StateMachine { ) .await? } - _ => { + InvocationStatus::Scheduled(scheduled) => { + self.terminate_scheduled_invocation( + ctx, + TerminationFlavor::Kill, + invocation_id, + scheduled, + ) + .await? + } + InvocationStatus::Killed(_) => { + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received kill command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { trace!("Received kill command for unknown invocation with id '{invocation_id}'."); // We still try to send the abort signal to the invoker, // as it might be the case that previously the user sent an abort signal @@ -870,14 +928,14 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; Ok(()) } - async fn try_cancel_invocation< + async fn on_cancel_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -928,7 +986,23 @@ impl StateMachine { ) .await? } - _ => { + InvocationStatus::Scheduled(scheduled) => { + self.terminate_scheduled_invocation( + ctx, + TerminationFlavor::Cancel, + invocation_id, + scheduled, + ) + .await? + } + InvocationStatus::Killed(_) => { + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received cancel command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { trace!("Received cancel command for unknown invocation with id '{invocation_id}'."); // We still try to send the abort signal to the invoker, // as it might be the case that previously the user sent an abort signal @@ -936,7 +1010,159 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + // TODO + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + }; + + Ok(()) + } + + async fn on_kill_and_restart_invocation< + State: VirtualObjectStatusTable + + InvocationStatusTable + + InboxTable + + FsmTable + + StateTable + + JournalTable + + OutboxTable + + TimerTable + + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + ) -> Result<(), Error> { + let status = ctx.get_invocation_status(&invocation_id).await?; + + match status { + InvocationStatus::Invoked(metadata) => { + self.kill_invoked_invocation(ctx, invocation_id, metadata, true) + .await?; + } + InvocationStatus::Suspended { metadata, .. } => { + self.kill_suspended_invocation(ctx, invocation_id, metadata, true) + .await?; + } + InvocationStatus::Inboxed(_) => { + info!("Received kill and restart command for invocation '{invocation_id}' in inboxed state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Scheduled(_) => { + info!("Received kill and restart command for invocation '{invocation_id}' in scheduled state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Killed(mut metadata) => { + if !metadata.restart_when_completed { + // Update the restart_when_completed. + // This might happen if the user sent in a quick succession two kill commands, the first without retry and the second with retry + metadata.restart_when_completed = true; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + } + + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + info!("Received kill and restart command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { + trace!("Received kill and restart command for unknown invocation with id '{invocation_id}'."); + // We still try to send the abort signal to the invoker, + // as it might be the case that previously the user sent an abort signal + // but some message was still between the invoker/PP queues. + // This can happen because the invoke/resume and the abort invoker messages end up in different queues, + // and the abort message can overtake the invoke/resume. + // Consequently the invoker might have not received the abort and the user tried to send it again. + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + }; + + Ok(()) + } + + async fn on_cancel_and_restart_invocation< + State: VirtualObjectStatusTable + + InvocationStatusTable + + InboxTable + + FsmTable + + StateTable + + JournalTable + + OutboxTable + + TimerTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + ) -> Result<(), Error> { + let status = ctx.get_invocation_status(&invocation_id).await?; + + match status { + InvocationStatus::Invoked(mut metadata) => { + self.cancel_journal_leaves( + ctx, + invocation_id, + InvocationStatusProjection::Invoked, + metadata.journal_metadata.length, + ) + .await?; + if !metadata.restart_when_completed { + // Update the restart_when_completed. + metadata.restart_when_completed = true; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Invoked(metadata)) + .await; + } + } + InvocationStatus::Suspended { + mut metadata, + waiting_for_completed_entries, + } => { + if self + .cancel_journal_leaves( + ctx, + invocation_id, + InvocationStatusProjection::Suspended(waiting_for_completed_entries), + metadata.journal_metadata.length, + ) + .await? + { + metadata.restart_when_completed = true; + Self::do_resume_service(ctx, invocation_id, metadata).await?; + } + } + InvocationStatus::Inboxed(_) => { + info!("Received cancel and restart command for invocation '{invocation_id}' in inboxed state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Scheduled(_) => { + info!("Received cancel and restart command for invocation '{invocation_id}' in scheduled state. Ignoring it as it was not yet executed."); + } + InvocationStatus::Killed(mut metadata) => { + if !metadata.restart_when_completed { + // Update the restart_when_completed. + // This might happen if the user sent in a quick succession two kill commands, the first without retry and the second with retry + metadata.restart_when_completed = true; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + } + + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received cancel command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { + trace!("Received cancel command for unknown invocation with id '{invocation_id}'."); + // We still try to send the abort signal to the invoker, + // as it might be the case that previously the user sent an abort signal + // but some message was still between the invoker/PP queues. + // This can happen because the invoke/resume and the abort invoker messages end up in different queues, + // and the abort message can overtake the invoke/resume. + // Consequently the invoker might have not received the abort and the user tried to send it again. + // TODO + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; @@ -953,8 +1179,10 @@ impl StateMachine { inboxed_invocation: InboxedInvocation, ) -> Result<(), Error> { let error = match termination_flavor { - TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, - TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + TerminationFlavor::Kill | TerminationFlavor::KillAndRestart => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel | TerminationFlavor::CancelAndRestart => { + CANCELED_INVOCATION_ERROR + } }; let InboxedInvocation { @@ -1002,7 +1230,69 @@ impl StateMachine { Ok(()) } - async fn kill_invocation< + async fn terminate_scheduled_invocation< + State: InvocationStatusTable + TimerTable + OutboxTable + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + termination_flavor: TerminationFlavor, + invocation_id: InvocationId, + scheduled_invocation: ScheduledInvocation, + ) -> Result<(), Error> { + let error = match termination_flavor { + TerminationFlavor::Kill | TerminationFlavor::KillAndRestart => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel | TerminationFlavor::CancelAndRestart => { + CANCELED_INVOCATION_ERROR + } + }; + + let ScheduledInvocation { + metadata: + PreFlightInvocationMetadata { + response_sinks, + span_context, + invocation_target, + execution_time, + .. + }, + } = scheduled_invocation; + + // Reply back to callers with error, and publish end trace + self.send_response_to_sinks( + ctx, + response_sinks, + &error, + Some(invocation_id), + None, + Some(&invocation_target), + ) + .await?; + + // Delete timer + if let Some(execution_time) = execution_time { + Self::do_delete_timer( + ctx, + TimerKey::neo_invoke(execution_time.as_u64(), invocation_id.invocation_uuid()), + ) + .await?; + } else { + warn!("Scheduled invocations must always have an execution time."); + } + Self::do_free_invocation(ctx, invocation_id).await; + + self.notify_invocation_result( + ctx, + invocation_id, + invocation_target, + span_context, + MillisSinceEpoch::now(), + Err((error.code(), error.to_string())), + ); + + Ok(()) + } + + async fn kill_invoked_invocation< State: InboxTable + VirtualObjectStatusTable + InvocationStatusTable @@ -1015,14 +1305,78 @@ impl StateMachine { &mut self, ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, - metadata: InFlightInvocationMetadata, + mut metadata: InFlightInvocationMetadata, + restart: bool, ) -> Result<(), Error> { self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; - self.fail_invocation(ctx, invocation_id, metadata, KILLED_INVOCATION_ERROR) + if self + .experimental_features + .contains(ExperimentalFeature::InvocationStatusKilled) + { + debug_if_leader!( + ctx.is_leader, + restate.invocation.id = %invocation_id, + "Effect: Store killed invocation" + ); + + metadata.restart_when_completed = restart; + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } else { + if restart { + // Kill and restart won't work without ExperimentalFeature::InvocationStatusKilled + warn!("Ignoring the kill and restart command for '{invocation_id}' and simply executing kill, as this command is not implemented yet") + } + + self.end_invocation( + ctx, + invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) .await?; - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + Ok(()) + } + + async fn kill_suspended_invocation< + State: InboxTable + + VirtualObjectStatusTable + + InvocationStatusTable + + VirtualObjectStatusTable + + StateTable + + JournalTable + + OutboxTable + + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + mut metadata: InFlightInvocationMetadata, + restart: bool, + ) -> Result<(), Error> { + self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) + .await?; + + if restart { + metadata.restart_when_completed = true + } + + // No need to go through the Killed state when we're suspended, + // because it means we already got a terminal state from the invoker. + self.end_invocation( + ctx, + invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) + .await?; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); Ok(()) } @@ -1179,7 +1533,7 @@ impl StateMachine { } } - async fn try_purge_invocation< + async fn on_purge_invocation< State: InvocationStatusTable + IdempotencyTable + VirtualObjectStatusTable @@ -1276,7 +1630,7 @@ impl StateMachine { self.on_service_invocation(ctx, service_invocation).await } Timer::CleanInvocationStatus(invocation_id) => { - self.try_purge_invocation(ctx, invocation_id).await + self.on_purge_invocation(ctx, invocation_id).await } Timer::NeoInvoke(invocation_id) => self.on_neo_invoke_timer(ctx, invocation_id).await, } @@ -1355,17 +1709,7 @@ impl StateMachine { let status = ctx .get_invocation_status(&invoker_effect.invocation_id) .await?; - - match status { - InvocationStatus::Invoked(invocation_metadata) => { - self.on_invoker_effect(ctx, invoker_effect, invocation_metadata) - .await? - } - _ => { - trace!("Received invoker effect for unknown service invocation. Ignoring the effect and aborting."); - Self::do_send_abort_invocation_to_invoker(ctx, invoker_effect.invocation_id); - } - }; + self.on_invoker_effect(ctx, invoker_effect, status).await?; self.latency.record(start.elapsed()); Ok(()) @@ -1388,8 +1732,29 @@ impl StateMachine { invocation_id, kind, }: InvokerEffect, - invocation_metadata: InFlightInvocationMetadata, + invocation_status: InvocationStatus, ) -> Result<(), Error> { + let is_status_invoked = matches!(invocation_status, InvocationStatus::Invoked(_)); + let is_status_killed = matches!(invocation_status, InvocationStatus::Killed(_)); + + if !is_status_invoked && !is_status_killed { + trace!("Received invoker effect for invocation not in invoked nor killed status. Ignoring the effect."); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + return Ok(()); + } + if is_status_killed + && !matches!(kind, InvokerEffectKind::Failed(_) | InvokerEffectKind::End) + { + warn!( + "Received non terminal invoker effect for killed invocation. Ignoring the effect." + ); + return Ok(()); + } + + let invocation_metadata = invocation_status + .into_invocation_metadata() + .expect("Must be present if status is killed or invoked"); + match kind { InvokerEffectKind::PinnedDeployment(pinned_deployment) => { Self::do_store_pinned_deployment( @@ -1447,12 +1812,27 @@ impl StateMachine { } } InvokerEffectKind::End => { - self.end_invocation(ctx, invocation_id, invocation_metadata) - .await?; + self.end_invocation( + ctx, + invocation_id, + invocation_metadata, + if is_status_killed { + // It doesn't matter that the invocation successfully completed, we return failed anyway in this case. + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + } else { + None + }, + ) + .await?; } InvokerEffectKind::Failed(e) => { - self.fail_invocation(ctx, invocation_id, invocation_metadata, e) - .await?; + self.end_invocation( + ctx, + invocation_id, + invocation_metadata, + Some(ResponseResult::Failure(e)), + ) + .await?; } } @@ -1472,26 +1852,25 @@ impl StateMachine { ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, invocation_metadata: InFlightInvocationMetadata, + // If given, this will override any Output Entry available in the journal table + response_result_override: Option, ) -> Result<(), Error> { + if invocation_metadata.restart_when_completed { + return self + .restart_invocation(ctx, invocation_id, invocation_metadata) + .await; + } + + let invocation_target = invocation_metadata.invocation_target.clone(); let journal_length = invocation_metadata.journal_metadata.length; let completion_retention_time = invocation_metadata.completion_retention_duration; - self.notify_invocation_result( - ctx, - invocation_id, - invocation_metadata.invocation_target.clone(), - invocation_metadata.journal_metadata.span_context.clone(), - unsafe { invocation_metadata.timestamps.creation_time() }, - Ok(()), - ); - - // Pop from inbox - Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; - // If there are any response sinks, or we need to store back the completed status, // we need to find the latest output entry if !invocation_metadata.response_sinks.is_empty() || !completion_retention_time.is_zero() { - let result = if let Some(output_entry) = self + let response_result = if let Some(response_result) = response_result_override { + response_result + } else if let Some(output_entry) = self .read_last_output_entry(ctx, &invocation_id, journal_length) .await? { @@ -1506,21 +1885,44 @@ impl StateMachine { self.send_response_to_sinks( ctx, invocation_metadata.response_sinks.clone(), - result.clone(), + response_result.clone(), Some(invocation_id), None, Some(&invocation_metadata.invocation_target), ) .await?; + // Notify invocation result + self.notify_invocation_result( + ctx, + invocation_id, + invocation_metadata.invocation_target.clone(), + invocation_metadata.journal_metadata.span_context.clone(), + unsafe { invocation_metadata.timestamps.creation_time() }, + match &response_result { + ResponseResult::Success(_) => Ok(()), + ResponseResult::Failure(err) => Err((err.code(), err.message().to_owned())), + }, + ); + // Store the completed status, if needed if !completion_retention_time.is_zero() { let completed_invocation = CompletedInvocation::from_in_flight_invocation_metadata( invocation_metadata, - result, + response_result, ); Self::do_store_completed_invocation(ctx, invocation_id, completed_invocation).await; } + } else { + // Just notify Ok, no need to read the output entry + self.notify_invocation_result( + ctx, + invocation_id, + invocation_target.clone(), + invocation_metadata.journal_metadata.span_context.clone(), + unsafe { invocation_metadata.timestamps.creation_time() }, + Ok(()), + ); } // If no retention, immediately cleanup the invocation status @@ -1529,65 +1931,52 @@ impl StateMachine { } Self::do_drop_journal(ctx, invocation_id, journal_length).await; + // Consume inbox and move on + Self::consume_inbox(ctx, &invocation_target).await?; + Ok(()) } - async fn fail_invocation< + async fn restart_invocation< State: InboxTable + VirtualObjectStatusTable - + InvocationStatusTable - + VirtualObjectStatusTable - + StateTable + JournalTable + OutboxTable - + FsmTable, + + FsmTable + + InvocationStatusTable + + StateTable, >( &mut self, ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, - invocation_metadata: InFlightInvocationMetadata, - error: InvocationError, + mut invocation_metadata: InFlightInvocationMetadata, ) -> Result<(), Error> { - let journal_length = invocation_metadata.journal_metadata.length; + info!("Restarting invocation"); - self.notify_invocation_result( - ctx, - invocation_id, - invocation_metadata.invocation_target.clone(), - invocation_metadata.journal_metadata.span_context.clone(), - unsafe { invocation_metadata.timestamps.creation_time() }, - Err((error.code(), error.to_string())), + // We need to cleanup the journal except the first item. + let journal_length = invocation_metadata.journal_metadata.length; + debug_if_leader!( + ctx.is_leader, + restate.journal.length = journal_length, + "Effect: Drop journal except first entry" ); + ctx.storage + .delete_journal_range(&invocation_id, 1..journal_length) + .await; - let response_result = ResponseResult::from(error); + // Let's reset a bunch of parameters in the InFlightInvocationMetadata + invocation_metadata.journal_metadata.length = 1; + invocation_metadata.pinned_deployment = None; + invocation_metadata.restart_when_completed = false; - // Send responses out - self.send_response_to_sinks( + Self::invoke( ctx, - invocation_metadata.response_sinks.clone(), - response_result.clone(), - Some(invocation_id), - None, - Some(&invocation_metadata.invocation_target), + invocation_id, + invocation_metadata, + InvokeInputJournal::NoCachedJournal, ) .await?; - // Pop from inbox - Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; - - // Store the completed status or free it - if !invocation_metadata.completion_retention_duration.is_zero() { - let completed_invocation = CompletedInvocation::from_in_flight_invocation_metadata( - invocation_metadata, - response_result, - ); - Self::do_store_completed_invocation(ctx, invocation_id, completed_invocation).await; - } else { - Self::do_free_invocation(ctx, invocation_id).await; - } - - Self::do_drop_journal(ctx, invocation_id, journal_length).await; - Ok(()) } @@ -2785,6 +3174,17 @@ impl StateMachine { .await?; } } + InvocationStatus::Killed(metadata) => { + self.send_response_to_sinks( + ctx, + vec![attach_invocation_request.response_sink], + KILLED_INVOCATION_ERROR, + Some(invocation_id), + None, + Some(&metadata.invocation_target), + ) + .await?; + } InvocationStatus::Completed(completed) => { // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. let completion_expiry_time = unsafe { completed.completion_expiry_time() }; @@ -3487,11 +3887,14 @@ impl StateMachine { fn do_send_abort_invocation_to_invoker( ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, + acknowledge: bool, ) { debug_if_leader!(ctx.is_leader, restate.invocation.id = %invocation_id, "Effect: Send abort command to invoker"); - ctx.action_collector - .push(Action::AbortInvocation(invocation_id)); + ctx.action_collector.push(Action::AbortInvocation { + invocation_id, + acknowledge, + }); } async fn do_mutate_state( diff --git a/crates/worker/src/partition/state_machine/tests/idempotency.rs b/crates/worker/src/partition/state_machine/tests/idempotency.rs index 7cfe80c80..bbfb15af3 100644 --- a/crates/worker/src/partition/state_machine/tests/idempotency.rs +++ b/crates/worker/src/partition/state_machine/tests/idempotency.rs @@ -25,11 +25,13 @@ use rstest::*; use std::time::Duration; #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn start_and_complete_idempotent_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -59,7 +61,7 @@ async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_ta ); // Assert idempotency key mapping exists only with idempotency table writes enabled - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -131,13 +133,13 @@ async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_ta } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] async fn start_and_complete_idempotent_invocation_neo_table( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -167,7 +169,7 @@ async fn start_and_complete_idempotent_invocation_neo_table( ); // Assert idempotency key mapping exists only with idempotency table writes enabled - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -243,11 +245,13 @@ async fn start_and_complete_idempotent_invocation_neo_table( } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn complete_already_completed_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn complete_already_completed_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); @@ -302,13 +306,13 @@ async fn complete_already_completed_invocation(#[case] disable_idempotency_table } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] async fn attach_with_service_invocation_command_while_executing( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -399,16 +403,16 @@ async fn attach_with_service_invocation_command_while_executing( } #[rstest] -#[case(true, true)] -#[case(true, false)] -#[case(false, true)] -#[case(false, false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into(), true)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into(), false)] +#[case(EnumSet::empty(), true)] +#[case(EnumSet::empty(), false)] #[tokio::test] async fn attach_with_send_service_invocation( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, #[case] use_same_request_id: bool, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -524,11 +528,13 @@ async fn attach_with_send_service_invocation( } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn attach_inboxed_with_send_service_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_inboxed_with_send_service_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let request_id_1 = PartitionProcessorRpcRequestId::default(); @@ -620,11 +626,11 @@ async fn attach_inboxed_with_send_service_invocation(#[case] disable_idempotency } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn attach_command(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_command(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let completion_retention = Duration::from_secs(60) * 60 * 24; @@ -773,11 +779,13 @@ async fn attach_command_without_blocking_inflight() { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn purge_completed_idempotent_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn purge_completed_idempotent_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index f90560495..9669bc85b 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -10,21 +10,29 @@ use super::{fixtures, matchers, *}; +use crate::partition::state_machine::tests::matchers::{invoked, killed, suspended}; use assert2::assert; use assert2::let_assert; use googletest::any; use prost::Message; +use restate_storage_api::invocation_status_table::JournalMetadata; use restate_storage_api::journal_table::JournalTable; use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_types::identifiers::EntryIndex; use restate_types::invocation::TerminationFlavor; use restate_types::journal::enriched::EnrichedEntryHeader; use restate_types::service_protocol; +use rstest::rstest; use test_log::test; -#[test(tokio::test)] -async fn kill_inboxed_invocation() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into())] +#[case(EnumSet::empty())] +#[tokio::test] +async fn kill_inboxed_invocation( + #[case] experimental_features: EnumSet, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -76,41 +84,385 @@ async fn kill_inboxed_invocation() -> anyhow::Result<()> { // assert that invocation status was removed assert!(let InvocationStatus::Free = current_invocation_status); - fn outbox_message_matcher( - caller_id: InvocationId, - ) -> impl Matcher { - pat!( - restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( - restate_types::invocation::InvocationResponse { - id: eq(caller_id), - entry_index: eq(0), - result: eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) - } - )) + assert_that!( + actions, + contains( + matchers::actions::invocation_response_to_partition_processor( + caller_id, + 0, + eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + ) ) - } + ); + + let outbox_message = test_env.storage().get_next_outbox_message(0).await?; + assert_that!( + outbox_message, + some(( + ge(0), + matchers::outbox::invocation_response_to_partition_processor( + caller_id, + 0, + eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + ) + )) + ); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::Kill)] +#[case(EnumSet::empty(), TerminationFlavor::Kill)] +#[case(EnumSet::empty(), TerminationFlavor::Cancel)] +#[tokio::test] +async fn terminate_scheduled_invocation( + #[case] experimental_features: EnumSet, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_id = InvocationId::mock_random(); + let rpc_id = PartitionProcessorRpcRequestId::new(); + + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + execution_time: Some(MillisSinceEpoch::MAX), + response_sink: Some(ServiceInvocationResponseSink::ingress(rpc_id)), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let current_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Scheduled(_) = current_invocation_status); + + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; assert_that!( actions, - contains(pat!(Action::NewOutboxMessage { - message: outbox_message_matcher(caller_id) + contains(pat!(Action::IngressResponse { + request_id: eq(rpc_id), + invocation_id: some(eq(invocation_id)), + response: eq(IngressResponseResult::Failure(match termination_flavor { + TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + _ => panic!("Unexpected termination flavor"), + })) })) ); - let outbox_message = test_env.storage().get_next_outbox_message(0).await?; + // assert that invocation status was removed + let current_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Free = current_invocation_status); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::CancelAndRestart)] +#[tokio::test] +async fn terminate_and_restart_scheduled_invocation_has_no_effect( + #[case] experimental_features: EnumSet, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_id = InvocationId::mock_random(); + + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + execution_time: Some(MillisSinceEpoch::MAX), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let scheduled_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Scheduled(_) = scheduled_invocation_status); + + // This has no effect + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; + assert_that!(actions, empty()); + + // Invocation status didn't change at all + assert_eq!( + scheduled_invocation_status, + test_env + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap() + ); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::KillAndRestart)] +#[case(EnumSet::empty(), TerminationFlavor::CancelAndRestart)] +#[tokio::test] +async fn terminate_and_restart_inboxed_invocation_has_no_effect( + #[case] experimental_features: EnumSet, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_target = InvocationTarget::mock_virtual_object(); + let invocation_id = InvocationId::mock_generate(&invocation_target); + + // First invocation takes the lock + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id: InvocationId::mock_generate(&invocation_target), + invocation_target: invocation_target.clone(), + ..ServiceInvocation::mock() + })) + .await; + // This invocation will be inboxed + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + invocation_target: invocation_target.clone(), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let inboxed_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Inboxed(_) = inboxed_invocation_status); + + // This has no effect + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; + assert_that!(actions, empty()); + + // Invocation status didn't change at all + assert_eq!( + inboxed_invocation_status, + test_env + .storage() + .get_invocation_status(&invocation_id) + .await + .unwrap() + ); + + test_env.shutdown().await; + Ok(()) +} + +#[tokio::test] +async fn kill_and_restart_when_invoked() -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features( + ExperimentalFeature::InvocationStatusKilled.into(), + ) + .await; + + let invocation_id = InvocationId::mock_random(); + + // Start invocation and pin the deployment and add one entry (doesn't matter which one) + let _ = test_env + .apply_multiple([ + Command::Invoke(ServiceInvocation { + invocation_id, + ..ServiceInvocation::mock() + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { + deployment_id: Default::default(), + service_protocol_version: Default::default(), + }), + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 1, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::ClearAllState), + }, + }), + ]) + .await; assert_that!( - outbox_message, - some((ge(0), outbox_message_matcher(caller_id))) + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + invoked() + ); + + // First we should transition to killed status + let _ = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: TerminationFlavor::KillAndRestart, + })) + .await; + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + killed() + ); + + // Now send the Failed invoker effect to complete the kill procedure + let actions = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + })) + .await; + + // Should have restarted the invocation + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(invocation_id)) + ); + + // We should be back to invoked state with a reset journal and reset deployment id + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Invoked(pat!( + InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(1) }), + pinned_deployment: none(), + restart_when_completed: eq(false) + } + ))) ); test_env.shutdown().await; Ok(()) } -#[test(tokio::test)] -async fn kill_call_tree() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +#[tokio::test] +async fn kill_and_restart_when_suspended() -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features( + ExperimentalFeature::InvocationStatusKilled.into(), + ) + .await; + + let invocation_id = InvocationId::mock_random(); + + // Start invocation, pin the deployment, add one completable entry, suspend + let _ = test_env + .apply_multiple([ + Command::Invoke(ServiceInvocation { + invocation_id, + ..ServiceInvocation::mock() + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::PinnedDeployment(PinnedDeployment { + deployment_id: Default::default(), + service_protocol_version: Default::default(), + }), + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 1, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::Awakeable( + AwakeableEntry { result: None }, + )), + }, + }), + Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Suspended { + waiting_for_completed_entries: [1].into(), + }, + }), + ]) + .await; + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + suspended() + ); + + // This should immediately restart + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: TerminationFlavor::KillAndRestart, + })) + .await; + + // Should have restarted the invocation + assert_that!( + actions, + contains(matchers::actions::invoke_for_id(invocation_id)) + ); + + // We should be back to invoked state with a reset journal and reset deployment id + assert_that!( + test_env + .storage() + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Invoked(pat!( + InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(1) }), + pinned_deployment: none(), + restart_when_completed: eq(false) + } + ))) + ); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into())] +#[case(EnumSet::empty())] +#[tokio::test] +async fn kill_call_tree( + #[case] experimental_features: EnumSet, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let call_invocation_id = InvocationId::mock_random(); let background_call_invocation_id = InvocationId::mock_random(); @@ -170,31 +522,23 @@ async fn kill_call_tree() -> anyhow::Result<()> { ))) .await; - // Invocation should be gone - assert_that!( - test_env - .storage - .get_invocation_status(&invocation_id) - .await?, - pat!(InvocationStatus::Free) - ); - assert_that!( - test_env - .storage - .get_journal(&invocation_id, 4) - .try_collect::>() - .await?, - empty() - ); + let abort_command_matcher = + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + pat!(Action::AbortInvocation { + invocation_id: eq(invocation_id), + acknowledge: eq(true) + }) + } else { + pat!(Action::AbortInvocation { + invocation_id: eq(invocation_id), + acknowledge: eq(false) + }) + }; assert_that!( actions, all!( - contains(pat!(Action::AbortInvocation(eq(invocation_id)))), - contains(pat!(Action::Invoke { - invocation_id: eq(enqueued_invocation_id_on_same_target), - invocation_target: eq(invocation_target) - })), + contains(abort_command_matcher), contains(matchers::actions::terminate_invocation( call_invocation_id, TerminationFlavor::Kill @@ -213,6 +557,92 @@ async fn kill_call_tree() -> anyhow::Result<()> { }))) ) ); + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + // We don't pop the inbox yet, but only after invocation ends + assert_that!( + actions, + not(contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target.clone(), + ))) + ) + } else { + // Inbox should have been popped + assert_that!( + actions, + contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target.clone(), + )) + ) + }; + + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + // A couple of new expectations here: + // * the invocation status is now in killed state + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Killed { .. }) + ); + + // * No new journal entries will be accepted! + let _ = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 4, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::ClearAllState), + }, + })) + .await; + // Journal entry was ignored (journal length == 4) + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Killed(pat!(InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(4) }) + }))) + ); + + // Now send the Failed invoker effect + let actions = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + })) + .await; + + // The inbox is popped after the invoker sends failed + assert_that!( + actions, + contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target + )) + ); + } + + // Invocation should be finally gone + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Free) + ); + assert_that!( + test_env + .storage + .get_journal(&invocation_id, 4) + .try_collect::>() + .await?, + empty() + ); test_env.shutdown().await; Ok(()) diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs index ccc769ff9..7da00cb25 100644 --- a/crates/worker/src/partition/state_machine/tests/matchers.rs +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -11,6 +11,7 @@ use bytes::Bytes; use bytestring::ByteString; use googletest::prelude::*; +use restate_storage_api::invocation_status_table::InvocationStatus; use restate_storage_api::timer_table::{TimerKey, TimerKeyKind}; use restate_types::errors::codes; use restate_types::identifiers::EntryIndex; @@ -52,7 +53,7 @@ pub mod actions { use crate::partition::state_machine::Action; use restate_types::identifiers::InvocationId; - use restate_types::invocation::{InvocationResponse, ResponseResult}; + use restate_types::invocation::{InvocationTarget, ResponseResult}; pub fn invoke_for_id(invocation_id: InvocationId) -> impl Matcher { pat!(Action::Invoke { @@ -60,6 +61,16 @@ pub mod actions { }) } + pub fn invoke_for_id_and_target( + invocation_id: InvocationId, + invocation_target: InvocationTarget, + ) -> impl Matcher { + pat!(Action::Invoke { + invocation_id: eq(invocation_id), + invocation_target: eq(invocation_target) + }) + } + pub fn delete_sleep_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Action::DeleteTimer { timer_key: pat!(TimerKey { @@ -109,19 +120,51 @@ pub mod actions { response_result_matcher: impl Matcher + 'static, ) -> impl Matcher { pat!(Action::NewOutboxMessage { - message: pat!( - restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( - InvocationResponse { - id: eq(caller_invocation_id), - entry_index: eq(caller_entry_index), - result: response_result_matcher - } - )) + message: outbox::invocation_response_to_partition_processor( + caller_invocation_id, + caller_entry_index, + response_result_matcher ) }) } } +pub mod outbox { + use super::*; + + use restate_storage_api::outbox_table::OutboxMessage; + use restate_types::identifiers::InvocationId; + use restate_types::invocation::{InvocationResponse, ResponseResult}; + + pub fn invocation_response_to_partition_processor( + caller_invocation_id: InvocationId, + caller_entry_index: EntryIndex, + response_result_matcher: impl Matcher + 'static, + ) -> impl Matcher { + pat!( + restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( + InvocationResponse { + id: eq(caller_invocation_id), + entry_index: eq(caller_entry_index), + result: response_result_matcher + } + )) + ) + } +} + +pub fn invoked() -> impl Matcher { + pat!(InvocationStatus::Invoked { .. }) +} + +pub fn suspended() -> impl Matcher { + pat!(InvocationStatus::Suspended { .. }) +} + +pub fn killed() -> impl Matcher { + pat!(InvocationStatus::Killed { .. }) +} + pub fn completion( entry_index: EntryIndex, completion_result: CompletionResult, diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 9e68666ce..7881b0459 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -27,7 +27,6 @@ use ::tracing::info; use bytes::Bytes; use bytestring::ByteString; use futures::{StreamExt, TryStreamExt}; -use googletest::matcher::Matcher; use googletest::{all, assert_that, pat, property}; use restate_core::{task_center, TaskCenter, TaskCenterBuilder}; use restate_invoker_api::{EffectKind, InvokeInputJournal}; @@ -84,16 +83,18 @@ impl TestEnv { } pub async fn create() -> Self { - Self::create_with_options(false).await + Self::create_with_experimental_features(Default::default()).await } - pub async fn create_with_options(disable_idempotency_table: bool) -> Self { + pub async fn create_with_experimental_features( + experimental_features: EnumSet, + ) -> Self { Self::create_with_state_machine(StateMachine::new( 0, /* inbox_seq_number */ 0, /* outbox_seq_number */ None, /* outbox_head_seq_number */ PartitionKey::MIN..=PartitionKey::MAX, - disable_idempotency_table, + experimental_features, )) .await } @@ -967,7 +968,7 @@ async fn truncate_outbox_with_gap() -> Result<(), Error> { outbox_tail_index, Some(outbox_head_index), PartitionKey::MIN..=PartitionKey::MAX, - false, + EnumSet::empty(), )) .await; diff --git a/crates/worker/src/partition/state_machine/tests/workflow.rs b/crates/worker/src/partition/state_machine/tests/workflow.rs index 8c9c60b5b..dd94e28ac 100644 --- a/crates/worker/src/partition/state_machine/tests/workflow.rs +++ b/crates/worker/src/partition/state_machine/tests/workflow.rs @@ -20,11 +20,11 @@ use rstest::*; use std::time::Duration; #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn start_workflow_method(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn start_workflow_method(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -52,7 +52,7 @@ async fn start_workflow_method(#[case] disable_idempotency_table: bool) { ); // Assert service is locked only if we enable the idempotency table - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -184,11 +184,11 @@ async fn start_workflow_method(#[case] disable_idempotency_table: bool) { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn attach_by_workflow_key(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_by_workflow_key(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -322,11 +322,11 @@ async fn attach_by_workflow_key(#[case] disable_idempotency_table: bool) { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn purge_completed_workflow(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn purge_completed_workflow(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_random();