Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Simple LogRecordProcessor and refactored shutdown and forceFlush #136

Draft
wants to merge 10 commits into
base: add-log-record-exporters
Choose a base branch
from
10 changes: 9 additions & 1 deletion api/src/OpenTelemetry/Exporter/LogRecord.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,18 @@ module OpenTelemetry.Exporter.LogRecord (
logRecordExporterExport,
logRecordExporterForceFlush,
logRecordExporterShutdown,
ExportResult (..),
FlushResult (..),
flushErrorHandler,
takeWorstFlushResult,
exportResultToFlushResult,
ShutdownResult (..),
shutdownErrorHandler,
takeWorstShutdownResult,
flushResultToShutdownResult,
) where

import OpenTelemetry.Internal.Common.Types
import OpenTelemetry.Internal.Logs.Types (
LogRecordExporter,
LogRecordExporterArguments (..),
Expand All @@ -16,5 +25,4 @@ import OpenTelemetry.Internal.Logs.Types (
logRecordExporterShutdown,
mkLogRecordExporter,
)
import OpenTelemetry.Processor.LogRecord (ShutdownResult (..))

88 changes: 82 additions & 6 deletions api/src/OpenTelemetry/Internal/Common/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@ module OpenTelemetry.Internal.Common.Types (
AnyValue (..),
ToValue (..),
ShutdownResult (..),
shutdownErrorHandler,
takeWorstShutdownResult,
flushResultToShutdownResult,
FlushResult (..),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a driveby request on my part, but it would be nice to reexport FlushResult so that callers of forceFlushTracerProvider can actually use the type.

flushErrorHandler,
takeWorstFlushResult,
exportResultToFlushResult,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Helper functions to convert from ExportResult -> FlushResult and FlushResult -> ShutdownResult. These are used to propagate Failures and Timeouts up the chain of export to forceFlush to shutdown.

ExportResult (..),
) where

import Control.Exception (SomeException)
import Control.Exception (Exception, SomeException)
import Data.ByteString (ByteString)
import Data.Data (Data)
import Data.Foldable (fold, toList)
import qualified Data.HashMap.Strict as H
import Data.Hashable (Hashable)
import Data.Int (Int64)
Expand Down Expand Up @@ -159,22 +166,91 @@ instance ToValue AnyValue where
toValue = id


data ShutdownResult = ShutdownSuccess | ShutdownFailure | ShutdownTimeout
data ShutdownResult
= ShutdownSuccess
| ShutdownTimeout
| ShutdownError [SomeException]


{- | (<>) concatenates the error lists if both arguments are @ShutdownError@, otherwise it returns the worst of the two in this order
- @ShutdownError@
- @ShutdownTimeout@
- @ShutdownSuccess@
-}
instance Semigroup ShutdownResult where
ShutdownError l <> ShutdownError r = ShutdownError (l <> r)
ShutdownError es <> _ = ShutdownError es
_ <> ShutdownError es = ShutdownError es
ShutdownTimeout <> _ = ShutdownTimeout
_ <> ShutdownTimeout = ShutdownTimeout
ShutdownSuccess <> ShutdownSuccess = ShutdownSuccess


-- | mempty is ShutdownSuccess
instance Monoid ShutdownResult where
mempty = ShutdownSuccess


shutdownErrorHandler :: SomeException -> IO ShutdownResult
shutdownErrorHandler = pure . ShutdownError . pure
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used with handle and catch from Control.Exception



flushResultToShutdownResult :: FlushResult -> ShutdownResult
flushResultToShutdownResult FlushSuccess = ShutdownSuccess
flushResultToShutdownResult FlushTimeout = ShutdownTimeout
flushResultToShutdownResult (FlushError es) = ShutdownError es


takeWorstShutdownResult :: (Foldable t) => t ShutdownResult -> ShutdownResult
takeWorstShutdownResult = fold
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Developers may be able to use fold instead of this function, but I think takeWorsShutdownResult better communicates the meaning of the action and fold might be less well known



-- | The outcome of a call to @OpenTelemetry.Trace.forceFlush@ or @OpenTelemetry.Logs.forceFlush@
data FlushResult
= -- | One or more spans or @LogRecord@s did not export from all associated exporters
= -- | Flushing spans or @LogRecord@s to all associated exporters succeeded.
FlushSuccess
| -- | One or more spans or @LogRecord@s did not export from all associated exporters
-- within the alotted timeframe.
FlushTimeout
| -- | Flushing spans or @LogRecord@s to all associated exporters succeeded.
FlushSuccess
| -- | One or more exporters failed to successfully export one or more
-- unexported spans or @LogRecord@s.
FlushError
FlushError [SomeException]
deriving (Show)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd recommend tweaking FlushError a bit here to carry exceptions with it:

| FlushError [SomeException]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.



{- | (<>) concatenates the error lists if both arguments are @FlushError@, otherwise it returns the worst of the two in this order
- @FlushError@
- @FlushTimeout@
- @FlushSuccess@
-}
instance Semigroup FlushResult where
FlushError l <> FlushError r = FlushError (l <> r)
FlushError es <> _ = FlushError es
_ <> FlushError es = FlushError es
FlushTimeout <> _ = FlushTimeout
_ <> FlushTimeout = FlushTimeout
FlushSuccess <> FlushSuccess = FlushSuccess


-- | mempty is FlushSuccess
instance Monoid FlushResult where
mempty = FlushSuccess


flushErrorHandler :: SomeException -> IO FlushResult
flushErrorHandler = pure . FlushError . pure


takeWorstFlushResult :: (Foldable t) => t FlushResult -> FlushResult
takeWorstFlushResult = fold


exportResultToFlushResult :: ExportResult -> FlushResult
exportResultToFlushResult Success = FlushSuccess
exportResultToFlushResult (Failure mErr) = FlushError $ toList mErr


data ExportResult
= Success
| Failure (Maybe SomeException)
deriving (Show)
56 changes: 36 additions & 20 deletions api/src/OpenTelemetry/Internal/Logs/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module OpenTelemetry.Internal.Logs.Core (

import Control.Applicative
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import Control.Monad.Trans
import Control.Monad.Trans.Maybe
Expand Down Expand Up @@ -108,41 +109,56 @@ setGlobalLoggerProvider :: (MonadIO m) => LoggerProvider -> m ()
setGlobalLoggerProvider = liftIO . writeIORef globalLoggerProvider


defaultShutdownTimeout :: Int
defaultShutdownTimeout = 5_000_000


{- | This method provides a way for provider to do any cleanup required.

This will also trigger shutdowns on all internal processors.
-}
shutdownLoggerProvider :: (MonadIO m) => LoggerProvider -> m ()
shutdownLoggerProvider LoggerProvider {loggerProviderProcessors} = liftIO $ do
asyncShutdownResults <- V.forM loggerProviderProcessors $ \processor -> do
logRecordProcessorShutdown processor
mapM_ wait asyncShutdownResults
shutdownLoggerProvider
:: (MonadIO m)
=> Maybe Int
-- ^ Optional timeout in microseconds, defaults to 5,000,000 (5s)
-> LoggerProvider
-> m ShutdownResult
-- ^ Result that denotes whether the shutdown action succeeded, failed, or timed out.
shutdownLoggerProvider mtimeout LoggerProvider {loggerProviderProcessors} = liftIO $ do
mresult <-
timeout (fromMaybe defaultShutdownTimeout mtimeout) $
takeWorstShutdownResult
<$> forConcurrently
loggerProviderProcessors
(handle shutdownErrorHandler . logRecordProcessorShutdown)

case mresult of
Nothing -> pure ShutdownTimeout
Just res -> pure res


defaultForceFlushTimeout :: Int
defaultForceFlushTimeout = 5_000_000


{- | This method provides a way for provider to immediately export all @LogRecord@s that have not yet
been exported for all the internal processors.
-}
forceFlushLoggerProvider
:: (MonadIO m)
=> LoggerProvider
-> Maybe Int
=> Maybe Int
-- ^ Optional timeout in microseconds, defaults to 5,000,000 (5s)
-> LoggerProvider
-> m FlushResult
-- ^ Result that denotes whether the flush action succeeded, failed, or timed out.
forceFlushLoggerProvider LoggerProvider {loggerProviderProcessors} mtimeout = liftIO $ do
jobs <- V.forM loggerProviderProcessors $ \processor -> async $ do
logRecordProcessorForceFlush processor
forceFlushLoggerProvider mtimeout LoggerProvider {loggerProviderProcessors} = liftIO $ do
mresult <-
timeout (fromMaybe 5_000_000 mtimeout) $
V.foldM
( \status action -> do
res <- waitCatch action
pure $! case res of
Left _err -> FlushError
Right _ok -> status
)
FlushSuccess
jobs
timeout (fromMaybe defaultForceFlushTimeout mtimeout) $
takeWorstFlushResult
<$> forConcurrently
loggerProviderProcessors
(handle flushErrorHandler . logRecordProcessorForceFlush)

case mresult of
Nothing -> pure FlushTimeout
Just res -> pure res
Expand Down
15 changes: 7 additions & 8 deletions api/src/OpenTelemetry/Internal/Logs/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ module OpenTelemetry.Internal.Logs.Types (
) where

import Control.Concurrent (MVar, newMVar, withMVar)
import Control.Concurrent.Async
import Data.Function (on)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as H
Expand All @@ -33,7 +32,7 @@ import Data.Text (Text)
import Data.Vector (Vector)
import OpenTelemetry.Common (Timestamp, TraceFlags)
import OpenTelemetry.Context.Types (Context)
import OpenTelemetry.Internal.Common.Types (ExportResult, InstrumentationLibrary, ShutdownResult)
import OpenTelemetry.Internal.Common.Types
import OpenTelemetry.Internal.Trace.Id (SpanId, TraceId)
import OpenTelemetry.LogAttributes
import OpenTelemetry.Resource (MaterializedResources)
Expand All @@ -43,9 +42,9 @@ import OpenTelemetry.Resource (MaterializedResources)
data LogRecordExporterArguments = LogRecordExporterArguments
{ logRecordExporterArgumentsExport :: Vector ReadableLogRecord -> IO ExportResult
-- ^ See @logRecordExporterExport@ for documentation
, logRecordExporterArgumentsForceFlush :: IO ()
, logRecordExporterArgumentsForceFlush :: IO FlushResult
-- ^ See @logRecordExporterArgumentsForceFlush@ for documentation
, logRecordExporterArgumentsShutdown :: IO ()
, logRecordExporterArgumentsShutdown :: IO ShutdownResult
-- ^ See @logRecordExporterArgumentsShutdown@ for documentation
}

Expand Down Expand Up @@ -97,7 +96,7 @@ the process after an invocation, but before the exporter exports the ReadlableLo
ForceFlush SHOULD complete or abort within some timeout. ForceFlush can be implemented as a blocking API or an asynchronous API which
notifies the caller via a callback or an event. OpenTelemetry SDK authors MAY decide if they want to make the flush timeout configurable.
-}
logRecordExporterForceFlush :: LogRecordExporter -> IO ()
logRecordExporterForceFlush :: LogRecordExporter -> IO FlushResult
logRecordExporterForceFlush = flip withMVar logRecordExporterArgumentsForceFlush . unExporter


Expand All @@ -109,7 +108,7 @@ allowed and SHOULD return a Failure result.
Shutdown SHOULD NOT block indefinitely (e.g. if it attempts to flush the data and the destination is unavailable).
OpenTelemetry SDK authors MAY decide if they want to make the shutdown timeout configurable.
-}
logRecordExporterShutdown :: LogRecordExporter -> IO ()
logRecordExporterShutdown :: LogRecordExporter -> IO ShutdownResult
logRecordExporterShutdown = flip withMVar logRecordExporterArgumentsShutdown . unExporter


Expand Down Expand Up @@ -143,7 +142,7 @@ data LogRecordProcessor = LogRecordProcessor
-- ^ Called when a LogRecord is emitted. This method is called synchronously on the thread that emitted the LogRecord, therefore it SHOULD NOT block or throw exceptions.
--
-- A LogRecordProcessor may freely modify logRecord for the duration of the OnEmit call. If logRecord is needed after OnEmit returns (i.e. for asynchronous processing) only reads are permitted.
, logRecordProcessorShutdown :: IO (Async ShutdownResult)
, logRecordProcessorShutdown :: IO ShutdownResult
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes more sense to me for the LoggerProvider or a Processor to internally implement or choose not to implement concurrency than for it to be enforced by the types. I imagine there are situations where one would not want something to be asynchronous. Removing Async from the signature also allows combinators like withAsync and mapConcurrently to be used.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least for tracing, the docs say this:

Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event. OpenTelemetry client authors can decide if they want to make the shutdown timeout configurable.

I can appreciate the argument in the other direction, but would prefer to keep Async here to stay consistent with the SpanProcessor API https://hackage.haskell.org/package/hs-opentelemetry-api-0.1.0.0/docs/OpenTelemetry-Processor.html#t:Processor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason I made the change is that having Async in the signature made it much more difficult to propagate and combine ExportResults, FlushResults, and ShutdownResults. I also figured that it would be good if either shutdown, forceFlush, and export were either all asynchronous or all synchronous.

Also, the way I rewrote shutdownLoggerProvider seems cleaner and more abstract (using forConcurrently) than the way it was written for shutdownTracerProvider before. Plus, people defining a LogRecordProcessor no longer have to worry about the asynchronous part because it is handled by the LoggerProvider.

Let me know your thoughts.

-- ^ Shuts down the processor. Called when SDK is shut down. This is an opportunity for processor to do any cleanup required.
--
-- Shutdown SHOULD be called only once for each LogRecordProcessor instance. After the call to Shutdown, subsequent calls to OnEmit are not allowed. SDKs SHOULD ignore these calls gracefully, if possible.
Expand All @@ -154,7 +153,7 @@ data LogRecordProcessor = LogRecordProcessor
--
-- Shutdown SHOULD complete or abort within some timeout. Shutdown can be implemented as a blocking API or an asynchronous API which notifies the caller via a callback or an event.
-- OpenTelemetry SDK authors can decide if they want to make the shutdown timeout configurable.
, logRecordProcessorForceFlush :: IO ()
, logRecordProcessorForceFlush :: IO FlushResult
-- ^ This is a hint to ensure that any tasks associated with LogRecords for which the LogRecordProcessor had already received events prior to the call to ForceFlush SHOULD be completed
-- as soon as possible, preferably before returning from this method.
--
Expand Down
5 changes: 5 additions & 0 deletions api/src/OpenTelemetry/Logs/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ module OpenTelemetry.Logs.Core (
setGlobalLoggerProvider,
getGlobalLoggerProvider,
shutdownLoggerProvider,
ShutdownResult (..),
forceFlushLoggerProvider,
FlushResult (..),

-- * @Logger@ operations
InstrumentationLibrary (..),
Expand All @@ -16,10 +18,13 @@ module OpenTelemetry.Logs.Core (

-- * @LogRecord@ operations
ReadableLogRecord,
mkReadableLogRecord,
ReadWriteLogRecord,
IsReadableLogRecord (..),
IsReadWriteLogRecord (..),
ImmutableLogRecord (..),
LogRecordArguments (..),
emptyLogRecordArguments,
AnyValue (..),
ToValue (..),
SeverityNumber (..),
Expand Down
6 changes: 6 additions & 0 deletions api/src/OpenTelemetry/Processor/LogRecord.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
module OpenTelemetry.Processor.LogRecord (
LogRecordProcessor (..),
FlushResult (..),
flushErrorHandler,
takeWorstFlushResult,
ShutdownResult (..),
shutdownErrorHandler,
takeWorstShutdownResult,
flushResultToShutdownResult,
) where

import OpenTelemetry.Internal.Common.Types
Expand Down
2 changes: 1 addition & 1 deletion api/src/OpenTelemetry/Trace/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ forceFlushTracerProvider TracerProvider {..} mtimeout = liftIO $ do
( \status action -> do
res <- waitCatch action
pure $! case res of
Left _err -> FlushError
Left err -> FlushError $ pure err
Right _ok -> status
)
FlushSuccess
Expand Down
1 change: 1 addition & 0 deletions sdk/hs-opentelemetry-sdk.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ test-suite hs-opentelemetry-sdk-test
other-modules:
OpenTelemetry.BaggageSpec
OpenTelemetry.ContextSpec
OpenTelemetry.LogRecordProcessorSpec
OpenTelemetry.ResourceSpec
OpenTelemetry.TraceSpec
Paths_hs_opentelemetry_sdk
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/OpenTelemetry/Processor/Batch/Span.hs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ batchProcessor BatchTimeoutConfig {..} exporter = liftIO $ do
ShutdownTimeout
Just er ->
case er of
Left _ ->
ShutdownFailure
Left e ->
ShutdownError $ pure e
Right _ ->
ShutdownSuccess
}
Expand Down
Loading