Skip to content

Commit

Permalink
CR feedback.
Browse files Browse the repository at this point in the history
Fix JobHostConfiguration problem, needed so that Script can get access to the new IDistributedLockManager
Resolves #1201

lease decoupled from storage
Resolves #1183

Create a new IDistributedLockManager interface that SingletonManager layers on top of it.
Create a default implementation based on the existing blob-lease code today.
See new Singleton tests for how this could be swapped out.
  • Loading branch information
MikeStall committed Jul 12, 2017
1 parent c320218 commit 1d6e30f
Show file tree
Hide file tree
Showing 26 changed files with 995 additions and 549 deletions.
14 changes: 1 addition & 13 deletions src/Microsoft.Azure.WebJobs.Host/DefaultExtensionRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ namespace Microsoft.Azure.WebJobs.Host
{
internal class DefaultExtensionRegistry : IExtensionRegistry
{
private readonly JobHostMetadataProvider _metadataProvider;

private ConcurrentDictionary<Type, ConcurrentBag<object>> _registry = new ConcurrentDictionary<Type, ConcurrentBag<object>>();

public DefaultExtensionRegistry(JobHostMetadataProvider metadataProvider = null)
public DefaultExtensionRegistry()
{
_metadataProvider = metadataProvider;
}

public void RegisterExtension(Type type, object instance)
Expand All @@ -35,15 +32,6 @@ public void RegisterExtension(Type type, object instance)
throw new ArgumentOutOfRangeException("instance");
}

if (_metadataProvider != null)
{
IExtensionConfigProvider extension = instance as IExtensionConfigProvider;
if (extension != null)
{
_metadataProvider.AddExtension(extension);
}
}

ConcurrentBag<object> instances = _registry.GetOrAdd(type, (t) => new ConcurrentBag<object>());
instances.Add(instance);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,6 @@ namespace Microsoft.Azure.WebJobs.Host.Executors
{
internal static class JobHostConfigurationExtensions
{
// Do full initialization (both static and runtime).
// This can be called multiple times on a config.
public static async Task<JobHostContext> CreateAndLogHostStartedAsync(
this JobHostConfiguration config,
JobHost host,
CancellationToken shutdownToken,
CancellationToken cancellationToken)
{
JobHostContext context = await config.CreateJobHostContextAsync(host, shutdownToken, cancellationToken);

return context;
}

// Static initialization. Returns a service provider with some new services initialized.
// The new services:
// - can retrieve static config like binders and converters; but the listeners haven't yet started.
Expand Down Expand Up @@ -103,7 +90,8 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat
services.AddService<TraceWriter>(trace);

// Add built-in extensions
config.AddAttributesFromAssembly(typeof(TableAttribute).Assembly);
var metadataProvider = new JobHostMetadataProvider();
metadataProvider.AddAttributesFromAssembly(typeof(TableAttribute).Assembly);

var exts = config.GetExtensions();
bool builtinsAdded = exts.GetExtensions<IExtensionConfigProvider>().OfType<TableExtension>().Any();
Expand All @@ -121,9 +109,30 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat
};
InvokeExtensionConfigProviders(context);

// After this point, all user configuration has been set.

if (singletonManager == null)
{
singletonManager = new SingletonManager(storageAccountProvider, exceptionHandler, config.Singleton, trace, config.LoggerFactory, hostIdProvider, services.GetService<INameResolver>());
var logger = config.LoggerFactory?.CreateLogger(LogCategories.Singleton);

IDistributedLockManager lockManager = services.GetService<IDistributedLockManager>();
if (lockManager == null)
{
lockManager = new BlobLeaseDistributedLockManager(
storageAccountProvider,
trace,
logger);
services.AddService<IDistributedLockManager>(lockManager);
}

singletonManager = new SingletonManager(
lockManager,
config.Singleton,
trace,
exceptionHandler,
config.LoggerFactory,
hostIdProvider,
services.GetService<INameResolver>());
services.AddService<SingletonManager>(singletonManager);
}

Expand All @@ -138,6 +147,10 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat
bindingProvider = DefaultBindingProvider.Create(nameResolver, config.LoggerFactory, storageAccountProvider, extensionTypeLocator, blobWrittenWatcherAccessor, extensions);
services.AddService<IBindingProvider>(bindingProvider);
}

var converterManager = (ConverterManager)config.ConverterManager;
metadataProvider.Initialize(bindingProvider, converterManager, exts);
services.AddService<IJobHostMetadataProvider>(metadataProvider);

return services;
}
Expand All @@ -146,13 +159,13 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat
// This mainly means:
// - indexing the functions
// - spinning up the listeners (so connecting to the services)
private static async Task<JobHostContext> CreateJobHostContextAsync(this JobHostConfiguration config, JobHost host, CancellationToken shutdownToken, CancellationToken cancellationToken)
public static async Task<JobHostContext> CreateJobHostContextAsync(
this JobHostConfiguration config,
ServiceProviderWrapper services, // Results from first phase
JobHost host,
CancellationToken shutdownToken,
CancellationToken cancellationToken)
{
// If we already initialized the services, get the previous initialization so that
// we don't double-execute the extension Initialize() methods.
var partialInit = config.TakeOwnershipOfPartialInitialization();
var services = partialInit ?? config.CreateStaticServices();

FunctionExecutor functionExecutor = services.GetService<FunctionExecutor>();
IFunctionIndexProvider functionIndexProvider = services.GetService<IFunctionIndexProvider>();
ITriggerBindingProvider triggerBindingProvider = services.GetService<ITriggerBindingProvider>();
Expand Down Expand Up @@ -364,7 +377,13 @@ private static async Task<JobHostContext> CreateJobHostContextAsync(this JobHost
startupLogger?.LogInformation(msg);
}

return new JobHostContext(functions, hostCallExecutor, listener, trace, functionEventCollector, loggerFactory);
return new JobHostContext(
functions,
hostCallExecutor,
listener,
trace,
functionEventCollector,
loggerFactory);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal sealed class JobHostContext : IDisposable
private readonly TraceWriter _trace;
private readonly IAsyncCollector<FunctionInstanceLogEntry> _functionEventCollector; // optional
private readonly ILoggerFactory _loggerFactory;

private bool _disposed;

public JobHostContext(IFunctionIndexLookup functionLookup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@ internal class JobHostMetadataProvider : IJobHostMetadataProvider

// Map of simple assembly name to assembly.
private readonly Dictionary<string, Assembly> _resolvedAssemblies = new Dictionary<string, Assembly>(StringComparer.OrdinalIgnoreCase);

private readonly JobHostConfiguration _config;


private IBindingProvider _root;

public JobHostMetadataProvider(JobHostConfiguration config)
public JobHostMetadataProvider()
{
_config = config;
}

internal void Initialize(IBindingProvider bindingProvider)
internal void Initialize(IBindingProvider bindingProvider, ConverterManager converter, IExtensionRegistry extensionRegistry)
{
foreach (var extension in extensionRegistry.GetExtensions<IExtensionConfigProvider>())
{
this.AddExtension(extension);
}

this._root = bindingProvider;

// Populate assembly resolution from converters.
var converter = this._config.GetService<IConverterManager>() as ConverterManager;
// Populate assembly resolution from converters.
if (converter != null)
{
converter.AddAssemblies((type) => this.AddAssembly(type));
Expand Down
3 changes: 2 additions & 1 deletion src/Microsoft.Azure.WebJobs.Host/GlobalSuppressions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,5 @@
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1800:DoNotCastUnnecessarily", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#GetFromData(System.Collections.Generic.IReadOnlyDictionary`2<System.String,System.Object>)")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#UtcNow")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#RandGuid")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1014:MarkAssembliesWithClsCompliant")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1014:MarkAssembliesWithClsCompliant")][assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#RandGuid")]
[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignore", Scope = "member", Target = "Microsoft.Azure.WebJobs.JobHost.#EnsureHostStartedAsync(System.Threading.CancellationToken)")]
109 changes: 86 additions & 23 deletions src/Microsoft.Azure.WebJobs.Host/JobHost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Indexers;
using Microsoft.Azure.WebJobs.Host.Listeners;
Expand All @@ -35,19 +36,26 @@ public class JobHost : IDisposable
private readonly WebJobsShutdownWatcher _shutdownWatcher;
private readonly CancellationTokenSource _stoppingTokenSource;

private Task<JobHostContext> _contextTask;
private bool _contextTaskInitialized;
private object _contextTaskLock = new object();

private JobHostContext _context;
private IListener _listener;
private object _contextLock = new object();

// Null if we haven't yet started runtime initialization.
// Points to an incomplete task during initialization.
// Points to a completed task after initialization.
private Task _initializationRunning = null;

// These are services that are accessible without starting the execution container.
// They include the initial set of JobHostConfiguration services as well as
// additional services created.
private ServiceProviderWrapper _services;

private int _state;
private Task _stopTask;
private object _stopTaskLock = new object();
private bool _disposed;

// Common lock to protect fields.
private object _lock = new object();

private ILogger _logger;

/// <summary>
Expand Down Expand Up @@ -146,7 +154,7 @@ public Task StopAsync()
}

// Multiple threads may call StopAsync concurrently. Both need to return the same task instance.
lock (_stopTaskLock)
lock (_lock)
{
if (_stopTask == null)
{
Expand Down Expand Up @@ -336,37 +344,92 @@ private static IFunctionDefinition ResolveFunctionDefinition(MethodInfo method,
return function;
}

private async Task<JobHostContext> CreateContextAndLogHostStartedAsync(CancellationToken cancellationToken)
private void ThrowIfDisposed()
{
JobHostContext context = await _config.CreateAndLogHostStartedAsync(this, _shutdownTokenSource.Token, cancellationToken);
if (_disposed)
{
throw new ObjectDisposedException(null);
}
}

lock (_contextLock)
// If multiple threads call this, only one should do the init. The rest should wait.
// When this task is signalled, _context is initialized.
private Task EnsureHostStartedAsync(CancellationToken cancellationToken)
{
if (_context != null)
{
if (_context == null)
return Task.CompletedTask;
}

TaskCompletionSource<bool> tsc = null;

lock (_lock)
{
if (_initializationRunning == null)
{
_context = context;
_listener = context.Listener;
// This thread wins the race and owns initialing.
tsc = new TaskCompletionSource<bool>();
_initializationRunning = tsc.Task;
}
}

_logger = _context.LoggerFactory?.CreateLogger(LogCategories.Startup);
if (tsc != null)
{
// Ignore the return value and use tsc so that all threads are awaiting the same thing.
Task ignore = RuntimeInitAsync(cancellationToken, tsc);
}

return _context;
return _initializationRunning;
}

private Task EnsureHostStartedAsync(CancellationToken cancellationToken)
// Caller gaurantees this is single-threaded.
// Set initializationTask when complete, many threads can wait on that.
// When complete, the fields should be initialized to allow runtime usage.
private async Task RuntimeInitAsync(CancellationToken cancellationToken, TaskCompletionSource<bool> initializationTask)
{
return LazyInitializer.EnsureInitialized<Task<JobHostContext>>(ref _contextTask,
ref _contextTaskInitialized,
ref _contextTaskLock,
() => CreateContextAndLogHostStartedAsync(cancellationToken));
try
{
// Do real initialization
PopulateStaticServices();

JobHostContext context = await _config.CreateJobHostContextAsync(_services, this, _shutdownTokenSource.Token, cancellationToken);

_context = context;
_listener = context.Listener;
_logger = _context.LoggerFactory?.CreateLogger(LogCategories.Startup);

initializationTask.SetResult(true);
}
catch (Exception e)
{
initializationTask.SetException(e);
}
}

private void ThrowIfDisposed()
// Ensure the static services are initialized.
// These are derived from the underlying JobHostConfiguration.
// Caller ensures this is single threaded.
private void PopulateStaticServices()
{
if (_disposed)
if (this._services != null)
{
throw new ObjectDisposedException(null);
return; // already Created
}

var services = this._config.CreateStaticServices();

_services = services;
}

/// <summary>
/// Get set of services.
/// </summary>
public IServiceProvider Services
{
get
{
PopulateStaticServices();
return _services;
}
}
}
Expand Down
Loading

0 comments on commit 1d6e30f

Please sign in to comment.