From e404a2ffb4c136901e57259d3360674bb92826a9 Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Tue, 9 Jul 2024 16:58:18 -0700 Subject: [PATCH 01/10] Added Simple LogRecordProcessor and refactored shutdown and forceFlush --- .../OpenTelemetry/Internal/Common/Types.hs | 43 +++++++++++++ api/src/OpenTelemetry/Internal/Logs/Core.hs | 56 ++++++++++------ api/src/OpenTelemetry/Internal/Logs/Types.hs | 15 ++--- sdk/hs-opentelemetry-sdk.cabal | 1 + .../Processor/Simple/LogRecord.hs | 64 ++++++++++++++++++- .../OpenTelemetry/LogRecordProcessorSpec.hs | 59 +++++++++++++++++ 6 files changed, 209 insertions(+), 29 deletions(-) create mode 100644 sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs diff --git a/api/src/OpenTelemetry/Internal/Common/Types.hs b/api/src/OpenTelemetry/Internal/Common/Types.hs index c604bc14..3f7f9482 100644 --- a/api/src/OpenTelemetry/Internal/Common/Types.hs +++ b/api/src/OpenTelemetry/Internal/Common/Types.hs @@ -10,7 +10,13 @@ module OpenTelemetry.Internal.Common.Types ( AnyValue (..), ToValue (..), ShutdownResult (..), + takeWorseShutdownResult, + takeWorstShutdownResult, + flushResultToShutdownResult, FlushResult (..), + takeWorseFlushResult, + takeWorstFlushResult, + exportResultToFlushResult, ExportResult (..), ) where @@ -162,6 +168,25 @@ instance ToValue AnyValue where data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout +flushResultToShutdownResult :: FlushResult -> ShutdownResult +flushResultToShutdownResult FlushSuccess = ShutdownSuccess +flushResultToShutdownResult FlushTimeout = ShutdownTimeout +flushResultToShutdownResult FlushError = ShutdownFailure + + +-- | Returns @ShutdownFailure@ if either argument is @ShutdownFailure@, @ShutdownTimeout@ if either argument is @ShutdownTimeout@, and @ShutdownSuccess@ otherwise. +takeWorseShutdownResult :: ShutdownResult -> ShutdownResult -> ShutdownResult +takeWorseShutdownResult ShutdownFailure _ = ShutdownFailure +takeWorseShutdownResult _ ShutdownFailure = ShutdownFailure +takeWorseShutdownResult ShutdownTimeout _ = ShutdownTimeout +takeWorseShutdownResult _ ShutdownTimeout = ShutdownTimeout +takeWorseShutdownResult _ _ = ShutdownSuccess + + +takeWorstShutdownResult :: (Foldable t) => t ShutdownResult -> ShutdownResult +takeWorstShutdownResult = foldr takeWorseShutdownResult ShutdownSuccess + + -- | The outcome of a call to @OpenTelemetry.Trace.forceFlush@ or @OpenTelemetry.Logs.forceFlush@ data FlushResult = -- | One or more spans or @LogRecord@s did not export from all associated exporters @@ -175,6 +200,24 @@ data FlushResult deriving (Show) +-- | Returns @FlushError@ if either argument is @FlushError@, @FlushTimeout@ if either argument is @FlushTimeout@, and @FlushSuccess@ otherwise. +takeWorseFlushResult :: FlushResult -> FlushResult -> FlushResult +takeWorseFlushResult FlushError _ = FlushError +takeWorseFlushResult _ FlushError = FlushError +takeWorseFlushResult FlushTimeout _ = FlushTimeout +takeWorseFlushResult _ FlushTimeout = FlushTimeout +takeWorseFlushResult _ _ = FlushSuccess + + +takeWorstFlushResult :: (Foldable t) => t FlushResult -> FlushResult +takeWorstFlushResult = foldr takeWorseFlushResult FlushSuccess + + +exportResultToFlushResult :: ExportResult -> FlushResult +exportResultToFlushResult Success = FlushSuccess +exportResultToFlushResult (Failure _) = FlushError + + data ExportResult = Success | Failure (Maybe SomeException) diff --git a/api/src/OpenTelemetry/Internal/Logs/Core.hs b/api/src/OpenTelemetry/Internal/Logs/Core.hs index f37f4fdf..0e5a04b8 100644 --- a/api/src/OpenTelemetry/Internal/Logs/Core.hs +++ b/api/src/OpenTelemetry/Internal/Logs/Core.hs @@ -21,6 +21,7 @@ module OpenTelemetry.Internal.Logs.Core ( import Control.Applicative import Control.Concurrent.Async +import Control.Exception import Control.Monad import Control.Monad.Trans import Control.Monad.Trans.Maybe @@ -108,15 +109,36 @@ setGlobalLoggerProvider :: (MonadIO m) => LoggerProvider -> m () setGlobalLoggerProvider = liftIO . writeIORef globalLoggerProvider +defaultShutdownTimeout :: Int +defaultShutdownTimeout = 5_000_000 + + {- | This method provides a way for provider to do any cleanup required. This will also trigger shutdowns on all internal processors. -} -shutdownLoggerProvider :: (MonadIO m) => LoggerProvider -> m () -shutdownLoggerProvider LoggerProvider {loggerProviderProcessors} = liftIO $ do - asyncShutdownResults <- V.forM loggerProviderProcessors $ \processor -> do - logRecordProcessorShutdown processor - mapM_ wait asyncShutdownResults +shutdownLoggerProvider + :: (MonadIO m) + => Maybe Int + -- ^ Optional timeout in microseconds, defaults to 5,000,000 (5s) + -> LoggerProvider + -> m ShutdownResult + -- ^ Result that denotes whether the shutdown action succeeded, failed, or timed out. +shutdownLoggerProvider mtimeout LoggerProvider {loggerProviderProcessors} = liftIO $ do + mresult <- + timeout (fromMaybe defaultShutdownTimeout mtimeout) $ + takeWorstShutdownResult + <$> forConcurrently + loggerProviderProcessors + (\lrp -> logRecordProcessorShutdown lrp `catch` \(SomeException _) -> pure ShutdownFailure) + + case mresult of + Nothing -> pure ShutdownTimeout + Just res -> pure res + + +defaultForceFlushTimeout :: Int +defaultForceFlushTimeout = 5_000_000 {- | This method provides a way for provider to immediately export all @LogRecord@s that have not yet @@ -124,25 +146,19 @@ shutdownLoggerProvider LoggerProvider {loggerProviderProcessors} = liftIO $ do -} forceFlushLoggerProvider :: (MonadIO m) - => LoggerProvider - -> Maybe Int + => Maybe Int -- ^ Optional timeout in microseconds, defaults to 5,000,000 (5s) + -> LoggerProvider -> m FlushResult -- ^ Result that denotes whether the flush action succeeded, failed, or timed out. -forceFlushLoggerProvider LoggerProvider {loggerProviderProcessors} mtimeout = liftIO $ do - jobs <- V.forM loggerProviderProcessors $ \processor -> async $ do - logRecordProcessorForceFlush processor +forceFlushLoggerProvider mtimeout LoggerProvider {loggerProviderProcessors} = liftIO $ do mresult <- - timeout (fromMaybe 5_000_000 mtimeout) $ - V.foldM - ( \status action -> do - res <- waitCatch action - pure $! case res of - Left _err -> FlushError - Right _ok -> status - ) - FlushSuccess - jobs + timeout (fromMaybe defaultForceFlushTimeout mtimeout) $ + takeWorstFlushResult + <$> forConcurrently + loggerProviderProcessors + (\lrp -> logRecordProcessorForceFlush lrp `catch` \(SomeException _) -> pure FlushError) + case mresult of Nothing -> pure FlushTimeout Just res -> pure res diff --git a/api/src/OpenTelemetry/Internal/Logs/Types.hs b/api/src/OpenTelemetry/Internal/Logs/Types.hs index 17bc56de..e689a59f 100644 --- a/api/src/OpenTelemetry/Internal/Logs/Types.hs +++ b/api/src/OpenTelemetry/Internal/Logs/Types.hs @@ -24,7 +24,6 @@ module OpenTelemetry.Internal.Logs.Types ( ) where import Control.Concurrent (MVar, newMVar, withMVar) -import Control.Concurrent.Async import Data.Function (on) import Data.HashMap.Strict (HashMap) import qualified Data.HashMap.Strict as H @@ -33,7 +32,7 @@ import Data.Text (Text) import Data.Vector (Vector) import OpenTelemetry.Common (Timestamp, TraceFlags) import OpenTelemetry.Context.Types (Context) -import OpenTelemetry.Internal.Common.Types (ExportResult, InstrumentationLibrary, ShutdownResult) +import OpenTelemetry.Internal.Common.Types import OpenTelemetry.Internal.Trace.Id (SpanId, TraceId) import OpenTelemetry.LogAttributes import OpenTelemetry.Resource (MaterializedResources) @@ -43,9 +42,9 @@ import OpenTelemetry.Resource (MaterializedResources) data LogRecordExporterArguments = LogRecordExporterArguments { logRecordExporterArgumentsExport :: Vector ReadableLogRecord -> IO ExportResult -- ^ See @logRecordExporterExport@ for documentation - , logRecordExporterArgumentsForceFlush :: IO () + , logRecordExporterArgumentsForceFlush :: IO FlushResult -- ^ See @logRecordExporterArgumentsForceFlush@ for documentation - , logRecordExporterArgumentsShutdown :: IO () + , logRecordExporterArgumentsShutdown :: IO ShutdownResult -- ^ See @logRecordExporterArgumentsShutdown@ for documentation } @@ -97,7 +96,7 @@ the process after an invocation, but before the exporter exports the ReadlableLo ForceFlush SHOULD complete or abort within some timeout. ForceFlush can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry SDK authors MAY decide if they want to make the flush timeout configurable. -} -logRecordExporterForceFlush :: LogRecordExporter -> IO () +logRecordExporterForceFlush :: LogRecordExporter -> IO FlushResult logRecordExporterForceFlush = flip withMVar logRecordExporterArgumentsForceFlush . unExporter @@ -109,7 +108,7 @@ allowed and SHOULD return a Failure result. Shutdown SHOULD NOT block indefinitely (e.g. if it attempts to flush the data and the destination is unavailable). OpenTelemetry SDK authors MAY decide if they want to make the shutdown timeout configurable. -} -logRecordExporterShutdown :: LogRecordExporter -> IO () +logRecordExporterShutdown :: LogRecordExporter -> IO ShutdownResult logRecordExporterShutdown = flip withMVar logRecordExporterArgumentsShutdown . unExporter @@ -143,7 +142,7 @@ data LogRecordProcessor = LogRecordProcessor -- ^ Called when a LogRecord is emitted. This method is called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions. -- -- A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted. - , logRecordProcessorShutdown :: IO (Async ShutdownResult) + , logRecordProcessorShutdown :: IO ShutdownResult -- ^ Shuts down the processor. Called when SDK is shut down. This is an opportunity for processor to do any cleanup required. -- -- Shutdown SHOULD be called only once for each LogRecordProcessor instance. After the call to Shutdown, subsequent calls to OnEmit are not allowed. SDKs SHOULD ignore these calls gracefully, if possible. @@ -154,7 +153,7 @@ data LogRecordProcessor = LogRecordProcessor -- -- Shutdown SHOULD complete or abort within some timeout. Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. -- OpenTelemetry SDK authors can decide if they want to make the shutdown timeout configurable. - , logRecordProcessorForceFlush :: IO () + , logRecordProcessorForceFlush :: IO FlushResult -- ^ This is a hint to ensure that any tasks associated with LogRecords for which the LogRecordProcessor had already received events prior to the call to ForceFlush SHOULD be completed -- as soon as possible, preferably before returning from this method. -- diff --git a/sdk/hs-opentelemetry-sdk.cabal b/sdk/hs-opentelemetry-sdk.cabal index 9cf0ab8d..e8e08d06 100644 --- a/sdk/hs-opentelemetry-sdk.cabal +++ b/sdk/hs-opentelemetry-sdk.cabal @@ -105,6 +105,7 @@ test-suite hs-opentelemetry-sdk-test other-modules: OpenTelemetry.BaggageSpec OpenTelemetry.ContextSpec + OpenTelemetry.LogRecordProcessorSpec OpenTelemetry.ResourceSpec OpenTelemetry.TraceSpec Paths_hs_opentelemetry_sdk diff --git a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs index fa520756..350e5376 100644 --- a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs +++ b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs @@ -1,2 +1,64 @@ -module OpenTelemetry.Processor.Simple.LogRecord () where +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE ScopedTypeVariables #-} +module OpenTelemetry.Processor.Simple.LogRecord ( + simpleProcessor, +) where + +import Control.Concurrent.Async (async, cancel) +import Control.Concurrent.Chan.Unagi +import Control.Exception +import Control.Monad (forever) +import qualified Data.Vector as V +import OpenTelemetry.Internal.Common.Types +import OpenTelemetry.Internal.Logs.Types + + +{- | This is an implementation of LogRecordProcessor which passes finished logs and passes the export-friendly ReadableLogRecord +representation to the configured LogRecordExporter, as soon as they are finished. +-} +simpleProcessor :: LogRecordExporter -> IO LogRecordProcessor +simpleProcessor exporter = do + (inChan :: InChan ReadWriteLogRecord, outChan :: OutChan ReadWriteLogRecord) <- newChan + exportWorker <- async $ forever $ do + bracket + (readChan outChan) + (writeChan inChan) + exportSingleLogRecord + + let logRecordProcessorForceFlush = + ( do + chanFlushRes <- + takeWorstFlushResult + . fmap exportResultToFlushResult + <$> forceFlushOutChan outChan [] + + exporterFlushRes <- logRecordExporterForceFlush exporter + + pure $ takeWorseFlushResult exporterFlushRes chanFlushRes + ) + `catch` \(SomeException _) -> pure FlushError + + pure $ + LogRecordProcessor + { logRecordProcessorOnEmit = \lr _ -> writeChan inChan lr + , logRecordProcessorShutdown = mask $ \restore -> do + cancel exportWorker + flushResult <- restore logRecordProcessorForceFlush + + shutdownResult <- logRecordExporterShutdown exporter + + pure $ takeWorseShutdownResult shutdownResult $ flushResultToShutdownResult flushResult + , logRecordProcessorForceFlush + } + where + forceFlushOutChan outChan acc = do + (Element m, _) <- tryReadChan outChan + mlr <- m + case mlr of + Nothing -> pure acc + Just lr -> do + res <- exportSingleLogRecord lr + forceFlushOutChan outChan (res : acc) + + exportSingleLogRecord = logRecordExporterExport exporter . V.singleton . mkReadableLogRecord diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs new file mode 100644 index 00000000..c28bd986 --- /dev/null +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -0,0 +1,59 @@ +{-# LANGUAGE NamedFieldPuns #-} + +module OpenTelemetry.LogRecordProcessorSpec where + +import Data.IORef +import qualified Data.Vector as V +import OpenTelemetry.Exporter.LogRecord +import OpenTelemetry.Internal.Common.Types +import OpenTelemetry.Logs.Core +import Test.Hspec + + +getTestExporter :: IO (IORef Int, LogRecordExporter) +getTestExporter = do + numExportsRef <- newIORef 0 + shutdownRef <- newIORef False + + let logRecordExporterExportInternal logRecordsByLibrary = do + shutdown <- readIORef shutdownRef + if shutdown + then pure (Failure Nothing) + else do + let numLogRecords = foldr (\lrs n -> n + V.length lrs) 0 logRecordsByLibrary + modifyIORef numExportsRef (+ numLogRecords) + + pure Success + + logRecordExporterForceFlushInternal = pure FlushSuccess + + logRecordExporterShutdownInternal = pure ShutdownSuccess + + testExporter <- + mkLogRecordExporter $ + LogRecordExporterInternal + { logRecordExporterExportInternal + , logRecordExporterForceFlushInternal + , logRecordExporterShutdownInternal + } + pure + ( numExportsRef + , testExporter + ) + + +spec :: Spec +spec = describe "LogRecordProcessor" $ do + describe "Simple Processor" $ do + it "Sends LogRecords to the Exporter" $ do + (numExportsRef, testExporter) <- getTestExporter + + let lp = createLoggerProvider [testExporter] emptyLoggerProviderOptions + l = makeLogger lp + + pending + + it "Force flushes correctly" $ do + pending + it "Shuts down correctly" $ do + pending From 77a9b64ea145e24156c226a9d69511c7869c924d Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Tue, 9 Jul 2024 17:05:42 -0700 Subject: [PATCH 02/10] working on tests --- sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index c28bd986..bae8a1f5 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -1,4 +1,5 @@ {-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} module OpenTelemetry.LogRecordProcessorSpec where @@ -6,6 +7,8 @@ import Data.IORef import qualified Data.Vector as V import OpenTelemetry.Exporter.LogRecord import OpenTelemetry.Internal.Common.Types +import OpenTelemetry.LogRecordExporter +import OpenTelemetry.LogRecordProcessor.Simple import OpenTelemetry.Logs.Core import Test.Hspec @@ -47,9 +50,12 @@ spec = describe "LogRecordProcessor" $ do describe "Simple Processor" $ do it "Sends LogRecords to the Exporter" $ do (numExportsRef, testExporter) <- getTestExporter + processor <- simpleProcessor testExporter - let lp = createLoggerProvider [testExporter] emptyLoggerProviderOptions - l = makeLogger lp + let lp = createLoggerProvider [processor] emptyLoggerProviderOptions + l = makeLogger lp "Test Library" + + emitLogRecord l (emptyLogRecordArguments "something") pending From a3fbc19d4da296add8b7c93e64c4609dec023a65 Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Tue, 9 Jul 2024 17:40:26 -0700 Subject: [PATCH 03/10] Finished writing tests --- .../OpenTelemetry/Internal/Common/Types.hs | 1 + api/src/OpenTelemetry/Logs/Core.hs | 1 + .../OpenTelemetry/LogRecordProcessorSpec.hs | 74 +++++++++++++++++-- sdk/test/Spec.hs | 2 + 4 files changed, 71 insertions(+), 7 deletions(-) diff --git a/api/src/OpenTelemetry/Internal/Common/Types.hs b/api/src/OpenTelemetry/Internal/Common/Types.hs index 3f7f9482..e01e8048 100644 --- a/api/src/OpenTelemetry/Internal/Common/Types.hs +++ b/api/src/OpenTelemetry/Internal/Common/Types.hs @@ -221,3 +221,4 @@ exportResultToFlushResult (Failure _) = FlushError data ExportResult = Success | Failure (Maybe SomeException) + deriving (Show) diff --git a/api/src/OpenTelemetry/Logs/Core.hs b/api/src/OpenTelemetry/Logs/Core.hs index 9a19e335..ffad2f11 100644 --- a/api/src/OpenTelemetry/Logs/Core.hs +++ b/api/src/OpenTelemetry/Logs/Core.hs @@ -20,6 +20,7 @@ module OpenTelemetry.Logs.Core ( IsReadableLogRecord (..), IsReadWriteLogRecord (..), LogRecordArguments (..), + emptyLogRecordArguments, AnyValue (..), ToValue (..), SeverityNumber (..), diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index bae8a1f5..95b0de2a 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -1,15 +1,19 @@ +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} module OpenTelemetry.LogRecordProcessorSpec where +import qualified Data.HashMap.Strict as H import Data.IORef import qualified Data.Vector as V +import qualified OpenTelemetry.Context as Context import OpenTelemetry.Exporter.LogRecord import OpenTelemetry.Internal.Common.Types -import OpenTelemetry.LogRecordExporter -import OpenTelemetry.LogRecordProcessor.Simple import OpenTelemetry.Logs.Core +import OpenTelemetry.Processor.LogRecord +import OpenTelemetry.Processor.Simple.LogRecord +import System.IO.Unsafe import Test.Hspec @@ -30,6 +34,34 @@ getTestExporter = do logRecordExporterForceFlushInternal = pure FlushSuccess + logRecordExporterShutdownInternal = do + writeIORef shutdownRef True + pure ShutdownSuccess + testExporter <- + mkLogRecordExporter $ + LogRecordExporterInternal + { logRecordExporterExportInternal + , logRecordExporterForceFlushInternal + , logRecordExporterShutdownInternal + } + pure + ( numExportsRef + , testExporter + ) + + +getTestExporterWithoutShutdown :: IO (IORef Int, LogRecordExporter) +getTestExporterWithoutShutdown = do + numExportsRef <- newIORef 0 + + let logRecordExporterExport logRecordsByLibrary = do + let numLogRecords = foldr (\lrs n -> n + V.length lrs) 0 logRecordsByLibrary + modifyIORef numExportsRef (+ numLogRecords) + + pure Success + + logRecordExporterForceFlushInternal = pure FlushSuccess + logRecordExporterShutdownInternal = pure ShutdownSuccess testExporter <- @@ -55,11 +87,39 @@ spec = describe "LogRecordProcessor" $ do let lp = createLoggerProvider [processor] emptyLoggerProviderOptions l = makeLogger lp "Test Library" - emitLogRecord l (emptyLogRecordArguments "something") + emitLogRecord l emptyLogRecordArguments + emitLogRecord l emptyLogRecordArguments + emitLogRecord l emptyLogRecordArguments - pending + -- WARNING: There might be a better way to ensure exporting than forceFlush + forceFlushLoggerProvider Nothing lp - it "Force flushes correctly" $ do - pending + numExports <- readIORef numExportsRef + numExports `shouldBe` 3 it "Shuts down correctly" $ do - pending + (numExportsRef, testExporter) <- getTestExporter + (numExportsNoShutdownRef, testExporterNoShutdown) <- getTestExporterWithoutShutdown + processor <- simpleProcessor testExporter + processorNoShutdown <- simpleProcessor testExporterNoShutdown + + let lp = createLoggerProvider [processor, processorNoShutdown] emptyLoggerProviderOptions + l = makeLogger lp "Test Library" + + emitLogRecord l (emptyLogRecordArguments "something") + emitLogRecord l (emptyLogRecordArguments "another thing") + emitLogRecord l (emptyLogRecordArguments "a third thing") + + -- WARNING: There might be a better way to ensure exporting than forceFlush + shutdownLoggerProvider Nothing lp + + numExports <- readIORef numExportsRef + numExports `shouldBe` 3 + exportRes <- logRecordExporterExport testExporter H.empty + exportRes `shouldSatisfy` \case + Success -> False + Failure _ -> True + + lr <- emitLogRecord l (emptyLogRecordArguments ("a bad one" :: String)) + logRecordProcessorOnEmit processorNoShutdown lr Context.empty + numExportsNoShutdown <- readIORef numExportsNoShutdownRef + numExportsNoShutdown `shouldBe` 3 diff --git a/sdk/test/Spec.hs b/sdk/test/Spec.hs index 49b35ea7..4a9d2801 100644 --- a/sdk/test/Spec.hs +++ b/sdk/test/Spec.hs @@ -1,5 +1,6 @@ import qualified OpenTelemetry.BaggageSpec as BaggageSpec import qualified OpenTelemetry.ContextSpec as ContextSpec +import qualified OpenTelemetry.LogRecordProcessorSpec as LogRecordProcessorSpec import qualified OpenTelemetry.ResourceSpec as ResourceSpec import OpenTelemetry.Trace (initializeGlobalTracerProvider) import qualified OpenTelemetry.TraceSpec as TraceSpec @@ -14,3 +15,4 @@ main = do ContextSpec.spec TraceSpec.spec ResourceSpec.spec + LogRecordProcessorSpec.spec From fa407781e7da13c5bc3f731ed447d50e10a8d465 Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Wed, 10 Jul 2024 10:07:41 -0700 Subject: [PATCH 04/10] Fixed infinite loop because of bracket --- sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs | 2 +- sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs index 350e5376..9e63cf2f 100644 --- a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs +++ b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs @@ -21,7 +21,7 @@ simpleProcessor :: LogRecordExporter -> IO LogRecordProcessor simpleProcessor exporter = do (inChan :: InChan ReadWriteLogRecord, outChan :: OutChan ReadWriteLogRecord) <- newChan exportWorker <- async $ forever $ do - bracket + bracketOnError (readChan outChan) (writeChan inChan) exportSingleLogRecord diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index 95b0de2a..bdaa4278 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -114,6 +114,7 @@ spec = describe "LogRecordProcessor" $ do numExports <- readIORef numExportsRef numExports `shouldBe` 3 + exportRes <- logRecordExporterExport testExporter H.empty exportRes `shouldSatisfy` \case Success -> False From b9df7ce0f5c06277eb92815a3a3ff19d2ee356ac Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Wed, 10 Jul 2024 12:49:37 -0700 Subject: [PATCH 05/10] Removed HashMap from logRecordExporterExport and updated tests --- sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs | 6 ++---- sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs | 13 +++++-------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs index 9e63cf2f..e3acb96b 100644 --- a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs +++ b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs @@ -24,7 +24,7 @@ simpleProcessor exporter = do bracketOnError (readChan outChan) (writeChan inChan) - exportSingleLogRecord + (logRecordExporterExport exporter . V.singleton . mkReadableLogRecord) let logRecordProcessorForceFlush = ( do @@ -58,7 +58,5 @@ simpleProcessor exporter = do case mlr of Nothing -> pure acc Just lr -> do - res <- exportSingleLogRecord lr + res <- logRecordExporterExport exporter $ V.singleton $ mkReadableLogRecord lr forceFlushOutChan outChan (res : acc) - - exportSingleLogRecord = logRecordExporterExport exporter . V.singleton . mkReadableLogRecord diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index bdaa4278..b49e623c 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -4,7 +4,6 @@ module OpenTelemetry.LogRecordProcessorSpec where -import qualified Data.HashMap.Strict as H import Data.IORef import qualified Data.Vector as V import qualified OpenTelemetry.Context as Context @@ -22,13 +21,12 @@ getTestExporter = do numExportsRef <- newIORef 0 shutdownRef <- newIORef False - let logRecordExporterExportInternal logRecordsByLibrary = do + let logRecordExporterExportInternal logRecords = do shutdown <- readIORef shutdownRef if shutdown then pure (Failure Nothing) else do - let numLogRecords = foldr (\lrs n -> n + V.length lrs) 0 logRecordsByLibrary - modifyIORef numExportsRef (+ numLogRecords) + modifyIORef numExportsRef $ (+) $ V.length logRecords pure Success @@ -54,9 +52,8 @@ getTestExporterWithoutShutdown :: IO (IORef Int, LogRecordExporter) getTestExporterWithoutShutdown = do numExportsRef <- newIORef 0 - let logRecordExporterExport logRecordsByLibrary = do - let numLogRecords = foldr (\lrs n -> n + V.length lrs) 0 logRecordsByLibrary - modifyIORef numExportsRef (+ numLogRecords) + let logRecordExporterExport logRecords = do + modifyIORef numExportsRef $ (+) $ V.length $ logRecords pure Success @@ -115,7 +112,7 @@ spec = describe "LogRecordProcessor" $ do numExports <- readIORef numExportsRef numExports `shouldBe` 3 - exportRes <- logRecordExporterExport testExporter H.empty + exportRes <- logRecordExporterExport testExporter V.empty exportRes `shouldSatisfy` \case Success -> False Failure _ -> True From 9b6b234a0a168bedba3eb9900ed807eae4d5afb2 Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Mon, 15 Jul 2024 16:02:39 -0700 Subject: [PATCH 06/10] Fixed Simple Processor to work with upstream changes --- api/src/OpenTelemetry/Exporter/LogRecord.hs | 10 +++++++++- api/src/OpenTelemetry/Logs/Core.hs | 1 + api/src/OpenTelemetry/Processor/LogRecord.hs | 6 ++++++ .../OpenTelemetry/Processor/Simple/LogRecord.hs | 13 +++++++------ .../OpenTelemetry/LogRecordProcessorSpec.hs | 17 +++++++++-------- 5 files changed, 32 insertions(+), 15 deletions(-) diff --git a/api/src/OpenTelemetry/Exporter/LogRecord.hs b/api/src/OpenTelemetry/Exporter/LogRecord.hs index 817c1c02..9cca9d2e 100644 --- a/api/src/OpenTelemetry/Exporter/LogRecord.hs +++ b/api/src/OpenTelemetry/Exporter/LogRecord.hs @@ -5,9 +5,18 @@ module OpenTelemetry.Exporter.LogRecord ( logRecordExporterExport, logRecordExporterForceFlush, logRecordExporterShutdown, + ExportResult (..), + FlushResult (..), + takeWorseFlushResult, + takeWorstFlushResult, + exportResultToFlushResult, ShutdownResult (..), + takeWorseShutdownResult, + takeWorstShutdownResult, + flushResultToShutdownResult, ) where +import OpenTelemetry.Internal.Common.Types import OpenTelemetry.Internal.Logs.Types ( LogRecordExporter, LogRecordExporterArguments (..), @@ -16,5 +25,4 @@ import OpenTelemetry.Internal.Logs.Types ( logRecordExporterShutdown, mkLogRecordExporter, ) -import OpenTelemetry.Processor.LogRecord (ShutdownResult (..)) diff --git a/api/src/OpenTelemetry/Logs/Core.hs b/api/src/OpenTelemetry/Logs/Core.hs index ffad2f11..cb9fef91 100644 --- a/api/src/OpenTelemetry/Logs/Core.hs +++ b/api/src/OpenTelemetry/Logs/Core.hs @@ -16,6 +16,7 @@ module OpenTelemetry.Logs.Core ( -- * @LogRecord@ operations ReadableLogRecord, + mkReadableLogRecord, ReadWriteLogRecord, IsReadableLogRecord (..), IsReadWriteLogRecord (..), diff --git a/api/src/OpenTelemetry/Processor/LogRecord.hs b/api/src/OpenTelemetry/Processor/LogRecord.hs index 4338e777..26e8b8f2 100644 --- a/api/src/OpenTelemetry/Processor/LogRecord.hs +++ b/api/src/OpenTelemetry/Processor/LogRecord.hs @@ -1,6 +1,12 @@ module OpenTelemetry.Processor.LogRecord ( LogRecordProcessor (..), + FlushResult (..), + takeWorseFlushResult, + takeWorstFlushResult, ShutdownResult (..), + takeWorseShutdownResult, + takeWorstShutdownResult, + flushResultToShutdownResult, ) where import OpenTelemetry.Internal.Common.Types diff --git a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs index e3acb96b..ad881d20 100644 --- a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs +++ b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs @@ -10,8 +10,9 @@ import Control.Concurrent.Chan.Unagi import Control.Exception import Control.Monad (forever) import qualified Data.Vector as V -import OpenTelemetry.Internal.Common.Types -import OpenTelemetry.Internal.Logs.Types +import OpenTelemetry.Exporter.LogRecord +import OpenTelemetry.Logs.Core +import OpenTelemetry.Processor.LogRecord {- | This is an implementation of LogRecordProcessor which passes finished logs and passes the export-friendly ReadableLogRecord @@ -19,12 +20,12 @@ representation to the configured LogRecordExporter, as soon as they are finished -} simpleProcessor :: LogRecordExporter -> IO LogRecordProcessor simpleProcessor exporter = do - (inChan :: InChan ReadWriteLogRecord, outChan :: OutChan ReadWriteLogRecord) <- newChan + (inChan :: InChan ReadableLogRecord, outChan :: OutChan ReadableLogRecord) <- newChan exportWorker <- async $ forever $ do bracketOnError (readChan outChan) (writeChan inChan) - (logRecordExporterExport exporter . V.singleton . mkReadableLogRecord) + (logRecordExporterExport exporter . V.singleton) let logRecordProcessorForceFlush = ( do @@ -41,7 +42,7 @@ simpleProcessor exporter = do pure $ LogRecordProcessor - { logRecordProcessorOnEmit = \lr _ -> writeChan inChan lr + { logRecordProcessorOnEmit = \lr _ -> writeChan inChan $ mkReadableLogRecord lr , logRecordProcessorShutdown = mask $ \restore -> do cancel exportWorker flushResult <- restore logRecordProcessorForceFlush @@ -58,5 +59,5 @@ simpleProcessor exporter = do case mlr of Nothing -> pure acc Just lr -> do - res <- logRecordExporterExport exporter $ V.singleton $ mkReadableLogRecord lr + res <- logRecordExporterExport exporter $ V.singleton lr forceFlushOutChan outChan (res : acc) diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index b49e623c..dcbc4db9 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -5,6 +5,7 @@ module OpenTelemetry.LogRecordProcessorSpec where import Data.IORef +import qualified Data.Text as T import qualified Data.Vector as V import qualified OpenTelemetry.Context as Context import OpenTelemetry.Exporter.LogRecord @@ -52,7 +53,7 @@ getTestExporterWithoutShutdown :: IO (IORef Int, LogRecordExporter) getTestExporterWithoutShutdown = do numExportsRef <- newIORef 0 - let logRecordExporterExport logRecords = do + let logRecordExporterExportInternal logRecords = do modifyIORef numExportsRef $ (+) $ V.length $ logRecords pure Success @@ -84,9 +85,9 @@ spec = describe "LogRecordProcessor" $ do let lp = createLoggerProvider [processor] emptyLoggerProviderOptions l = makeLogger lp "Test Library" - emitLogRecord l emptyLogRecordArguments - emitLogRecord l emptyLogRecordArguments - emitLogRecord l emptyLogRecordArguments + emitLogRecord l (emptyLogRecordArguments ("something" :: T.Text)) + emitLogRecord l (emptyLogRecordArguments ("another thing" :: T.Text)) + emitLogRecord l (emptyLogRecordArguments ("a third thing" :: T.Text)) -- WARNING: There might be a better way to ensure exporting than forceFlush forceFlushLoggerProvider Nothing lp @@ -102,9 +103,9 @@ spec = describe "LogRecordProcessor" $ do let lp = createLoggerProvider [processor, processorNoShutdown] emptyLoggerProviderOptions l = makeLogger lp "Test Library" - emitLogRecord l (emptyLogRecordArguments "something") - emitLogRecord l (emptyLogRecordArguments "another thing") - emitLogRecord l (emptyLogRecordArguments "a third thing") + emitLogRecord l (emptyLogRecordArguments ("something" :: T.Text)) + emitLogRecord l (emptyLogRecordArguments ("another thing" :: T.Text)) + emitLogRecord l (emptyLogRecordArguments ("a third thing" :: T.Text)) -- WARNING: There might be a better way to ensure exporting than forceFlush shutdownLoggerProvider Nothing lp @@ -117,7 +118,7 @@ spec = describe "LogRecordProcessor" $ do Success -> False Failure _ -> True - lr <- emitLogRecord l (emptyLogRecordArguments ("a bad one" :: String)) + lr <- emitLogRecord l (emptyLogRecordArguments ("a bad one" :: T.Text)) logRecordProcessorOnEmit processorNoShutdown lr Context.empty numExportsNoShutdown <- readIORef numExportsNoShutdownRef numExportsNoShutdown `shouldBe` 3 From 6a12572a0cf26dc58255f2053188d8c46e93e63b Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Mon, 22 Jul 2024 12:41:12 -0700 Subject: [PATCH 07/10] Updated processor tests with emptyLogRecordArguments updates --- sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index dcbc4db9..07fe9d12 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -85,9 +85,9 @@ spec = describe "LogRecordProcessor" $ do let lp = createLoggerProvider [processor] emptyLoggerProviderOptions l = makeLogger lp "Test Library" - emitLogRecord l (emptyLogRecordArguments ("something" :: T.Text)) - emitLogRecord l (emptyLogRecordArguments ("another thing" :: T.Text)) - emitLogRecord l (emptyLogRecordArguments ("a third thing" :: T.Text)) + emitLogRecord l emptyLogRecordArguments + emitLogRecord l emptyLogRecordArguments + emitLogRecord l emptyLogRecordArguments -- WARNING: There might be a better way to ensure exporting than forceFlush forceFlushLoggerProvider Nothing lp @@ -103,9 +103,9 @@ spec = describe "LogRecordProcessor" $ do let lp = createLoggerProvider [processor, processorNoShutdown] emptyLoggerProviderOptions l = makeLogger lp "Test Library" - emitLogRecord l (emptyLogRecordArguments ("something" :: T.Text)) - emitLogRecord l (emptyLogRecordArguments ("another thing" :: T.Text)) - emitLogRecord l (emptyLogRecordArguments ("a third thing" :: T.Text)) + emitLogRecord l emptyLogRecordArguments + emitLogRecord l emptyLogRecordArguments + emitLogRecord l emptyLogRecordArguments -- WARNING: There might be a better way to ensure exporting than forceFlush shutdownLoggerProvider Nothing lp @@ -118,7 +118,7 @@ spec = describe "LogRecordProcessor" $ do Success -> False Failure _ -> True - lr <- emitLogRecord l (emptyLogRecordArguments ("a bad one" :: T.Text)) + lr <- emitLogRecord l emptyLogRecordArguments logRecordProcessorOnEmit processorNoShutdown lr Context.empty numExportsNoShutdown <- readIORef numExportsNoShutdownRef numExportsNoShutdown `shouldBe` 3 From 7c35830c42633faf993139968529d9117cae68a7 Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Wed, 24 Jul 2024 15:36:29 -0700 Subject: [PATCH 08/10] Export ImmutableLogRecord from Core.hs --- api/src/OpenTelemetry/Logs/Core.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/api/src/OpenTelemetry/Logs/Core.hs b/api/src/OpenTelemetry/Logs/Core.hs index cb9fef91..16bdb6fb 100644 --- a/api/src/OpenTelemetry/Logs/Core.hs +++ b/api/src/OpenTelemetry/Logs/Core.hs @@ -20,6 +20,7 @@ module OpenTelemetry.Logs.Core ( ReadWriteLogRecord, IsReadableLogRecord (..), IsReadWriteLogRecord (..), + ImmutableLogRecord (..), LogRecordArguments (..), emptyLogRecordArguments, AnyValue (..), From 58f8451236c1187f6b73786c53fb9fc4fd173058 Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Wed, 31 Jul 2024 11:22:25 -0700 Subject: [PATCH 09/10] Fixed to work with changes to LogRecordExporterArguments --- .../OpenTelemetry/LogRecordProcessorSpec.hs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs index 07fe9d12..b0bba6a2 100644 --- a/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs +++ b/sdk/test/OpenTelemetry/LogRecordProcessorSpec.hs @@ -22,7 +22,7 @@ getTestExporter = do numExportsRef <- newIORef 0 shutdownRef <- newIORef False - let logRecordExporterExportInternal logRecords = do + let logRecordExporterArgumentsExport logRecords = do shutdown <- readIORef shutdownRef if shutdown then pure (Failure Nothing) @@ -31,17 +31,17 @@ getTestExporter = do pure Success - logRecordExporterForceFlushInternal = pure FlushSuccess + logRecordExporterArgumentsForceFlush = pure FlushSuccess - logRecordExporterShutdownInternal = do + logRecordExporterArgumentsShutdown = do writeIORef shutdownRef True pure ShutdownSuccess testExporter <- mkLogRecordExporter $ - LogRecordExporterInternal - { logRecordExporterExportInternal - , logRecordExporterForceFlushInternal - , logRecordExporterShutdownInternal + LogRecordExporterArguments + { logRecordExporterArgumentsExport + , logRecordExporterArgumentsForceFlush + , logRecordExporterArgumentsShutdown } pure ( numExportsRef @@ -53,21 +53,21 @@ getTestExporterWithoutShutdown :: IO (IORef Int, LogRecordExporter) getTestExporterWithoutShutdown = do numExportsRef <- newIORef 0 - let logRecordExporterExportInternal logRecords = do + let logRecordExporterArgumentsExport logRecords = do modifyIORef numExportsRef $ (+) $ V.length $ logRecords pure Success - logRecordExporterForceFlushInternal = pure FlushSuccess + logRecordExporterArgumentsForceFlush = pure FlushSuccess - logRecordExporterShutdownInternal = pure ShutdownSuccess + logRecordExporterArgumentsShutdown = pure ShutdownSuccess testExporter <- mkLogRecordExporter $ - LogRecordExporterInternal - { logRecordExporterExportInternal - , logRecordExporterForceFlushInternal - , logRecordExporterShutdownInternal + LogRecordExporterArguments + { logRecordExporterArgumentsExport + , logRecordExporterArgumentsForceFlush + , logRecordExporterArgumentsShutdown } pure ( numExportsRef From 9250ec2c58c19350da558640a86a432f06a67b8c Mon Sep 17 00:00:00 2001 From: evanlauer1 Date: Wed, 31 Jul 2024 12:37:58 -0700 Subject: [PATCH 10/10] Added SomeException to Flush and Shutdown Results, made Monoid instances, and made error handlers --- api/src/OpenTelemetry/Exporter/LogRecord.hs | 4 +- .../OpenTelemetry/Internal/Common/Types.hs | 88 +++++++++++++------ api/src/OpenTelemetry/Internal/Logs/Core.hs | 4 +- api/src/OpenTelemetry/Logs/Core.hs | 2 + api/src/OpenTelemetry/Processor/LogRecord.hs | 4 +- api/src/OpenTelemetry/Trace/Core.hs | 2 +- sdk/src/OpenTelemetry/Processor/Batch/Span.hs | 4 +- .../Processor/Simple/LogRecord.hs | 11 ++- 8 files changed, 76 insertions(+), 43 deletions(-) diff --git a/api/src/OpenTelemetry/Exporter/LogRecord.hs b/api/src/OpenTelemetry/Exporter/LogRecord.hs index 9cca9d2e..eac155f3 100644 --- a/api/src/OpenTelemetry/Exporter/LogRecord.hs +++ b/api/src/OpenTelemetry/Exporter/LogRecord.hs @@ -7,11 +7,11 @@ module OpenTelemetry.Exporter.LogRecord ( logRecordExporterShutdown, ExportResult (..), FlushResult (..), - takeWorseFlushResult, + flushErrorHandler, takeWorstFlushResult, exportResultToFlushResult, ShutdownResult (..), - takeWorseShutdownResult, + shutdownErrorHandler, takeWorstShutdownResult, flushResultToShutdownResult, ) where diff --git a/api/src/OpenTelemetry/Internal/Common/Types.hs b/api/src/OpenTelemetry/Internal/Common/Types.hs index e01e8048..b866c1e7 100644 --- a/api/src/OpenTelemetry/Internal/Common/Types.hs +++ b/api/src/OpenTelemetry/Internal/Common/Types.hs @@ -10,19 +10,20 @@ module OpenTelemetry.Internal.Common.Types ( AnyValue (..), ToValue (..), ShutdownResult (..), - takeWorseShutdownResult, + shutdownErrorHandler, takeWorstShutdownResult, flushResultToShutdownResult, FlushResult (..), - takeWorseFlushResult, + flushErrorHandler, takeWorstFlushResult, exportResultToFlushResult, ExportResult (..), ) where -import Control.Exception (SomeException) +import Control.Exception (Exception, SomeException) import Data.ByteString (ByteString) import Data.Data (Data) +import Data.Foldable (fold, toList) import qualified Data.HashMap.Strict as H import Data.Hashable (Hashable) import Data.Int (Int64) @@ -165,57 +166,88 @@ instance ToValue AnyValue where toValue = id -data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout +data ShutdownResult + = ShutdownSuccess + | ShutdownTimeout + | ShutdownError [SomeException] + + +{- | (<>) concatenates the error lists if both arguments are @ShutdownError@, otherwise it returns the worst of the two in this order +- @ShutdownError@ +- @ShutdownTimeout@ +- @ShutdownSuccess@ +-} +instance Semigroup ShutdownResult where + ShutdownError l <> ShutdownError r = ShutdownError (l <> r) + ShutdownError es <> _ = ShutdownError es + _ <> ShutdownError es = ShutdownError es + ShutdownTimeout <> _ = ShutdownTimeout + _ <> ShutdownTimeout = ShutdownTimeout + ShutdownSuccess <> ShutdownSuccess = ShutdownSuccess + + +-- | mempty is ShutdownSuccess +instance Monoid ShutdownResult where + mempty = ShutdownSuccess + + +shutdownErrorHandler :: SomeException -> IO ShutdownResult +shutdownErrorHandler = pure . ShutdownError . pure flushResultToShutdownResult :: FlushResult -> ShutdownResult flushResultToShutdownResult FlushSuccess = ShutdownSuccess flushResultToShutdownResult FlushTimeout = ShutdownTimeout -flushResultToShutdownResult FlushError = ShutdownFailure - - --- | Returns @ShutdownFailure@ if either argument is @ShutdownFailure@, @ShutdownTimeout@ if either argument is @ShutdownTimeout@, and @ShutdownSuccess@ otherwise. -takeWorseShutdownResult :: ShutdownResult -> ShutdownResult -> ShutdownResult -takeWorseShutdownResult ShutdownFailure _ = ShutdownFailure -takeWorseShutdownResult _ ShutdownFailure = ShutdownFailure -takeWorseShutdownResult ShutdownTimeout _ = ShutdownTimeout -takeWorseShutdownResult _ ShutdownTimeout = ShutdownTimeout -takeWorseShutdownResult _ _ = ShutdownSuccess +flushResultToShutdownResult (FlushError es) = ShutdownError es takeWorstShutdownResult :: (Foldable t) => t ShutdownResult -> ShutdownResult -takeWorstShutdownResult = foldr takeWorseShutdownResult ShutdownSuccess +takeWorstShutdownResult = fold -- | The outcome of a call to @OpenTelemetry.Trace.forceFlush@ or @OpenTelemetry.Logs.forceFlush@ data FlushResult - = -- | One or more spans or @LogRecord@s did not export from all associated exporters + = -- | Flushing spans or @LogRecord@s to all associated exporters succeeded. + FlushSuccess + | -- | One or more spans or @LogRecord@s did not export from all associated exporters -- within the alotted timeframe. FlushTimeout - | -- | Flushing spans or @LogRecord@s to all associated exporters succeeded. - FlushSuccess | -- | One or more exporters failed to successfully export one or more -- unexported spans or @LogRecord@s. - FlushError + FlushError [SomeException] deriving (Show) --- | Returns @FlushError@ if either argument is @FlushError@, @FlushTimeout@ if either argument is @FlushTimeout@, and @FlushSuccess@ otherwise. -takeWorseFlushResult :: FlushResult -> FlushResult -> FlushResult -takeWorseFlushResult FlushError _ = FlushError -takeWorseFlushResult _ FlushError = FlushError -takeWorseFlushResult FlushTimeout _ = FlushTimeout -takeWorseFlushResult _ FlushTimeout = FlushTimeout -takeWorseFlushResult _ _ = FlushSuccess +{- | (<>) concatenates the error lists if both arguments are @FlushError@, otherwise it returns the worst of the two in this order +- @FlushError@ +- @FlushTimeout@ +- @FlushSuccess@ +-} +instance Semigroup FlushResult where + FlushError l <> FlushError r = FlushError (l <> r) + FlushError es <> _ = FlushError es + _ <> FlushError es = FlushError es + FlushTimeout <> _ = FlushTimeout + _ <> FlushTimeout = FlushTimeout + FlushSuccess <> FlushSuccess = FlushSuccess + + +-- | mempty is FlushSuccess +instance Monoid FlushResult where + mempty = FlushSuccess + + +flushErrorHandler :: SomeException -> IO FlushResult +flushErrorHandler = pure . FlushError . pure takeWorstFlushResult :: (Foldable t) => t FlushResult -> FlushResult -takeWorstFlushResult = foldr takeWorseFlushResult FlushSuccess +takeWorstFlushResult = fold exportResultToFlushResult :: ExportResult -> FlushResult exportResultToFlushResult Success = FlushSuccess -exportResultToFlushResult (Failure _) = FlushError +exportResultToFlushResult (Failure mErr) = FlushError $ toList mErr data ExportResult diff --git a/api/src/OpenTelemetry/Internal/Logs/Core.hs b/api/src/OpenTelemetry/Internal/Logs/Core.hs index 0e5a04b8..88521383 100644 --- a/api/src/OpenTelemetry/Internal/Logs/Core.hs +++ b/api/src/OpenTelemetry/Internal/Logs/Core.hs @@ -130,7 +130,7 @@ shutdownLoggerProvider mtimeout LoggerProvider {loggerProviderProcessors} = lift takeWorstShutdownResult <$> forConcurrently loggerProviderProcessors - (\lrp -> logRecordProcessorShutdown lrp `catch` \(SomeException _) -> pure ShutdownFailure) + (handle shutdownErrorHandler . logRecordProcessorShutdown) case mresult of Nothing -> pure ShutdownTimeout @@ -157,7 +157,7 @@ forceFlushLoggerProvider mtimeout LoggerProvider {loggerProviderProcessors} = li takeWorstFlushResult <$> forConcurrently loggerProviderProcessors - (\lrp -> logRecordProcessorForceFlush lrp `catch` \(SomeException _) -> pure FlushError) + (handle flushErrorHandler . logRecordProcessorForceFlush) case mresult of Nothing -> pure FlushTimeout diff --git a/api/src/OpenTelemetry/Logs/Core.hs b/api/src/OpenTelemetry/Logs/Core.hs index 16bdb6fb..7a36fb3c 100644 --- a/api/src/OpenTelemetry/Logs/Core.hs +++ b/api/src/OpenTelemetry/Logs/Core.hs @@ -7,7 +7,9 @@ module OpenTelemetry.Logs.Core ( setGlobalLoggerProvider, getGlobalLoggerProvider, shutdownLoggerProvider, + ShutdownResult (..), forceFlushLoggerProvider, + FlushResult (..), -- * @Logger@ operations InstrumentationLibrary (..), diff --git a/api/src/OpenTelemetry/Processor/LogRecord.hs b/api/src/OpenTelemetry/Processor/LogRecord.hs index 26e8b8f2..b56be241 100644 --- a/api/src/OpenTelemetry/Processor/LogRecord.hs +++ b/api/src/OpenTelemetry/Processor/LogRecord.hs @@ -1,10 +1,10 @@ module OpenTelemetry.Processor.LogRecord ( LogRecordProcessor (..), FlushResult (..), - takeWorseFlushResult, + flushErrorHandler, takeWorstFlushResult, ShutdownResult (..), - takeWorseShutdownResult, + shutdownErrorHandler, takeWorstShutdownResult, flushResultToShutdownResult, ) where diff --git a/api/src/OpenTelemetry/Trace/Core.hs b/api/src/OpenTelemetry/Trace/Core.hs index 6698c6a9..32a64e03 100644 --- a/api/src/OpenTelemetry/Trace/Core.hs +++ b/api/src/OpenTelemetry/Trace/Core.hs @@ -871,7 +871,7 @@ forceFlushTracerProvider TracerProvider {..} mtimeout = liftIO $ do ( \status action -> do res <- waitCatch action pure $! case res of - Left _err -> FlushError + Left err -> FlushError $ pure err Right _ok -> status ) FlushSuccess diff --git a/sdk/src/OpenTelemetry/Processor/Batch/Span.hs b/sdk/src/OpenTelemetry/Processor/Batch/Span.hs index 24e4f1f4..26a87dac 100644 --- a/sdk/src/OpenTelemetry/Processor/Batch/Span.hs +++ b/sdk/src/OpenTelemetry/Processor/Batch/Span.hs @@ -343,8 +343,8 @@ batchProcessor BatchTimeoutConfig {..} exporter = liftIO $ do ShutdownTimeout Just er -> case er of - Left _ -> - ShutdownFailure + Left e -> + ShutdownError $ pure e Right _ -> ShutdownSuccess } diff --git a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs index ad881d20..87bd3450 100644 --- a/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs +++ b/sdk/src/OpenTelemetry/Processor/Simple/LogRecord.hs @@ -28,7 +28,8 @@ simpleProcessor exporter = do (logRecordExporterExport exporter . V.singleton) let logRecordProcessorForceFlush = - ( do + handle flushErrorHandler $ + do chanFlushRes <- takeWorstFlushResult . fmap exportResultToFlushResult @@ -36,20 +37,18 @@ simpleProcessor exporter = do exporterFlushRes <- logRecordExporterForceFlush exporter - pure $ takeWorseFlushResult exporterFlushRes chanFlushRes - ) - `catch` \(SomeException _) -> pure FlushError + pure $ exporterFlushRes <> chanFlushRes pure $ LogRecordProcessor { logRecordProcessorOnEmit = \lr _ -> writeChan inChan $ mkReadableLogRecord lr - , logRecordProcessorShutdown = mask $ \restore -> do + , logRecordProcessorShutdown = handle shutdownErrorHandler $ mask $ \restore -> do cancel exportWorker flushResult <- restore logRecordProcessorForceFlush shutdownResult <- logRecordExporterShutdown exporter - pure $ takeWorseShutdownResult shutdownResult $ flushResultToShutdownResult flushResult + pure $ (shutdownResult <>) $ flushResultToShutdownResult flushResult , logRecordProcessorForceFlush } where