diff --git a/src/Microsoft.Azure.WebJobs.Host/DefaultExtensionRegistry.cs b/src/Microsoft.Azure.WebJobs.Host/DefaultExtensionRegistry.cs index 76d9920e1..a55881672 100644 --- a/src/Microsoft.Azure.WebJobs.Host/DefaultExtensionRegistry.cs +++ b/src/Microsoft.Azure.WebJobs.Host/DefaultExtensionRegistry.cs @@ -11,13 +11,10 @@ namespace Microsoft.Azure.WebJobs.Host { internal class DefaultExtensionRegistry : IExtensionRegistry { - private readonly JobHostMetadataProvider _metadataProvider; - private ConcurrentDictionary> _registry = new ConcurrentDictionary>(); - public DefaultExtensionRegistry(JobHostMetadataProvider metadataProvider = null) + public DefaultExtensionRegistry() { - _metadataProvider = metadataProvider; } public void RegisterExtension(Type type, object instance) @@ -35,15 +32,6 @@ public void RegisterExtension(Type type, object instance) throw new ArgumentOutOfRangeException("instance"); } - if (_metadataProvider != null) - { - IExtensionConfigProvider extension = instance as IExtensionConfigProvider; - if (extension != null) - { - _metadataProvider.AddExtension(extension); - } - } - ConcurrentBag instances = _registry.GetOrAdd(type, (t) => new ConcurrentBag()); instances.Add(instance); } diff --git a/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostConfigurationExtensions.cs b/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostConfigurationExtensions.cs index 1f2c6630f..2d17ee7b0 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostConfigurationExtensions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostConfigurationExtensions.cs @@ -33,19 +33,6 @@ namespace Microsoft.Azure.WebJobs.Host.Executors { internal static class JobHostConfigurationExtensions { - // Do full initialization (both static and runtime). - // This can be called multiple times on a config. - public static async Task CreateAndLogHostStartedAsync( - this JobHostConfiguration config, - JobHost host, - CancellationToken shutdownToken, - CancellationToken cancellationToken) - { - JobHostContext context = await config.CreateJobHostContextAsync(host, shutdownToken, cancellationToken); - - return context; - } - // Static initialization. Returns a service provider with some new services initialized. // The new services: // - can retrieve static config like binders and converters; but the listeners haven't yet started. @@ -103,7 +90,8 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat services.AddService(trace); // Add built-in extensions - config.AddAttributesFromAssembly(typeof(TableAttribute).Assembly); + var metadataProvider = new JobHostMetadataProvider(); + metadataProvider.AddAttributesFromAssembly(typeof(TableAttribute).Assembly); var exts = config.GetExtensions(); bool builtinsAdded = exts.GetExtensions().OfType().Any(); @@ -121,9 +109,30 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat }; InvokeExtensionConfigProviders(context); + // After this point, all user configuration has been set. + if (singletonManager == null) { - singletonManager = new SingletonManager(storageAccountProvider, exceptionHandler, config.Singleton, trace, config.LoggerFactory, hostIdProvider, services.GetService()); + var logger = config.LoggerFactory?.CreateLogger(LogCategories.Singleton); + + IDistributedLockManager lockManager = services.GetService(); + if (lockManager == null) + { + lockManager = new BlobLeaseDistributedLockManager( + storageAccountProvider, + trace, + logger); + services.AddService(lockManager); + } + + singletonManager = new SingletonManager( + lockManager, + config.Singleton, + trace, + exceptionHandler, + config.LoggerFactory, + hostIdProvider, + services.GetService()); services.AddService(singletonManager); } @@ -138,6 +147,10 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat bindingProvider = DefaultBindingProvider.Create(nameResolver, config.LoggerFactory, storageAccountProvider, extensionTypeLocator, blobWrittenWatcherAccessor, extensions); services.AddService(bindingProvider); } + + var converterManager = (ConverterManager)config.ConverterManager; + metadataProvider.Initialize(bindingProvider, converterManager, exts); + services.AddService(metadataProvider); return services; } @@ -146,13 +159,13 @@ public static ServiceProviderWrapper CreateStaticServices(this JobHostConfigurat // This mainly means: // - indexing the functions // - spinning up the listeners (so connecting to the services) - private static async Task CreateJobHostContextAsync(this JobHostConfiguration config, JobHost host, CancellationToken shutdownToken, CancellationToken cancellationToken) + public static async Task CreateJobHostContextAsync( + this JobHostConfiguration config, + ServiceProviderWrapper services, // Results from first phase + JobHost host, + CancellationToken shutdownToken, + CancellationToken cancellationToken) { - // If we already initialized the services, get the previous initialization so that - // we don't double-execute the extension Initialize() methods. - var partialInit = config.TakeOwnershipOfPartialInitialization(); - var services = partialInit ?? config.CreateStaticServices(); - FunctionExecutor functionExecutor = services.GetService(); IFunctionIndexProvider functionIndexProvider = services.GetService(); ITriggerBindingProvider triggerBindingProvider = services.GetService(); @@ -364,7 +377,13 @@ private static async Task CreateJobHostContextAsync(this JobHost startupLogger?.LogInformation(msg); } - return new JobHostContext(functions, hostCallExecutor, listener, trace, functionEventCollector, loggerFactory); + return new JobHostContext( + functions, + hostCallExecutor, + listener, + trace, + functionEventCollector, + loggerFactory); } } diff --git a/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContext.cs b/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContext.cs index d8e314bdf..65068786b 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContext.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Executors/JobHostContext.cs @@ -19,7 +19,7 @@ internal sealed class JobHostContext : IDisposable private readonly TraceWriter _trace; private readonly IAsyncCollector _functionEventCollector; // optional private readonly ILoggerFactory _loggerFactory; - + private bool _disposed; public JobHostContext(IFunctionIndexLookup functionLookup, diff --git a/src/Microsoft.Azure.WebJobs.Host/Extensions/JobHostMetadataProvider.cs b/src/Microsoft.Azure.WebJobs.Host/Extensions/JobHostMetadataProvider.cs index 44e4a68af..b61900369 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Extensions/JobHostMetadataProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Extensions/JobHostMetadataProvider.cs @@ -20,22 +20,23 @@ internal class JobHostMetadataProvider : IJobHostMetadataProvider // Map of simple assembly name to assembly. private readonly Dictionary _resolvedAssemblies = new Dictionary(StringComparer.OrdinalIgnoreCase); - - private readonly JobHostConfiguration _config; - + private IBindingProvider _root; - public JobHostMetadataProvider(JobHostConfiguration config) + public JobHostMetadataProvider() { - _config = config; } - internal void Initialize(IBindingProvider bindingProvider) + internal void Initialize(IBindingProvider bindingProvider, ConverterManager converter, IExtensionRegistry extensionRegistry) { + foreach (var extension in extensionRegistry.GetExtensions()) + { + this.AddExtension(extension); + } + this._root = bindingProvider; - // Populate assembly resolution from converters. - var converter = this._config.GetService() as ConverterManager; + // Populate assembly resolution from converters. if (converter != null) { converter.AddAssemblies((type) => this.AddAssembly(type)); diff --git a/src/Microsoft.Azure.WebJobs.Host/GlobalSuppressions.cs b/src/Microsoft.Azure.WebJobs.Host/GlobalSuppressions.cs index 16d796993..471ef5bc5 100644 --- a/src/Microsoft.Azure.WebJobs.Host/GlobalSuppressions.cs +++ b/src/Microsoft.Azure.WebJobs.Host/GlobalSuppressions.cs @@ -81,4 +81,5 @@ [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1800:DoNotCastUnnecessarily", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#GetFromData(System.Collections.Generic.IReadOnlyDictionary`2)")] [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#UtcNow")] [assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#RandGuid")] -[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1014:MarkAssembliesWithClsCompliant")] \ No newline at end of file +[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1014:MarkAssembliesWithClsCompliant")][assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1822:MarkMembersAsStatic", Scope = "member", Target = "Microsoft.Azure.WebJobs.Host.Bindings.SystemBindingData.#RandGuid")] +[assembly: System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1804:RemoveUnusedLocals", MessageId = "ignore", Scope = "member", Target = "Microsoft.Azure.WebJobs.JobHost.#EnsureHostStartedAsync(System.Threading.CancellationToken)")] \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/JobHost.cs b/src/Microsoft.Azure.WebJobs.Host/JobHost.cs index 5ed4accc9..e16d22eb7 100644 --- a/src/Microsoft.Azure.WebJobs.Host/JobHost.cs +++ b/src/Microsoft.Azure.WebJobs.Host/JobHost.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.Azure.WebJobs.Host; +using Microsoft.Azure.WebJobs.Host.Config; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Indexers; using Microsoft.Azure.WebJobs.Host.Listeners; @@ -35,19 +36,26 @@ public class JobHost : IDisposable private readonly WebJobsShutdownWatcher _shutdownWatcher; private readonly CancellationTokenSource _stoppingTokenSource; - private Task _contextTask; - private bool _contextTaskInitialized; - private object _contextTaskLock = new object(); - private JobHostContext _context; private IListener _listener; - private object _contextLock = new object(); + + // Null if we haven't yet started runtime initialization. + // Points to an incomplete task during initialization. + // Points to a completed task after initialization. + private Task _initializationRunning = null; + + // These are services that are accessible without starting the execution container. + // They include the initial set of JobHostConfiguration services as well as + // additional services created. + private ServiceProviderWrapper _services; private int _state; private Task _stopTask; - private object _stopTaskLock = new object(); private bool _disposed; + // Common lock to protect fields. + private object _lock = new object(); + private ILogger _logger; /// @@ -146,7 +154,7 @@ public Task StopAsync() } // Multiple threads may call StopAsync concurrently. Both need to return the same task instance. - lock (_stopTaskLock) + lock (_lock) { if (_stopTask == null) { @@ -336,37 +344,92 @@ private static IFunctionDefinition ResolveFunctionDefinition(MethodInfo method, return function; } - private async Task CreateContextAndLogHostStartedAsync(CancellationToken cancellationToken) + private void ThrowIfDisposed() { - JobHostContext context = await _config.CreateAndLogHostStartedAsync(this, _shutdownTokenSource.Token, cancellationToken); + if (_disposed) + { + throw new ObjectDisposedException(null); + } + } - lock (_contextLock) + // If multiple threads call this, only one should do the init. The rest should wait. + // When this task is signalled, _context is initialized. + private Task EnsureHostStartedAsync(CancellationToken cancellationToken) + { + if (_context != null) { - if (_context == null) + return Task.CompletedTask; + } + + TaskCompletionSource tsc = null; + + lock (_lock) + { + if (_initializationRunning == null) { - _context = context; - _listener = context.Listener; + // This thread wins the race and owns initialing. + tsc = new TaskCompletionSource(); + _initializationRunning = tsc.Task; } } - _logger = _context.LoggerFactory?.CreateLogger(LogCategories.Startup); + if (tsc != null) + { + // Ignore the return value and use tsc so that all threads are awaiting the same thing. + Task ignore = RuntimeInitAsync(cancellationToken, tsc); + } - return _context; + return _initializationRunning; } - private Task EnsureHostStartedAsync(CancellationToken cancellationToken) + // Caller gaurantees this is single-threaded. + // Set initializationTask when complete, many threads can wait on that. + // When complete, the fields should be initialized to allow runtime usage. + private async Task RuntimeInitAsync(CancellationToken cancellationToken, TaskCompletionSource initializationTask) { - return LazyInitializer.EnsureInitialized>(ref _contextTask, - ref _contextTaskInitialized, - ref _contextTaskLock, - () => CreateContextAndLogHostStartedAsync(cancellationToken)); + try + { + // Do real initialization + PopulateStaticServices(); + + JobHostContext context = await _config.CreateJobHostContextAsync(_services, this, _shutdownTokenSource.Token, cancellationToken); + + _context = context; + _listener = context.Listener; + _logger = _context.LoggerFactory?.CreateLogger(LogCategories.Startup); + + initializationTask.SetResult(true); + } + catch (Exception e) + { + initializationTask.SetException(e); + } } - private void ThrowIfDisposed() + // Ensure the static services are initialized. + // These are derived from the underlying JobHostConfiguration. + // Caller ensures this is single threaded. + private void PopulateStaticServices() { - if (_disposed) + if (this._services != null) { - throw new ObjectDisposedException(null); + return; // already Created + } + + var services = this._config.CreateStaticServices(); + + _services = services; + } + + /// + /// Get set of services. + /// + public IServiceProvider Services + { + get + { + PopulateStaticServices(); + return _services; } } } diff --git a/src/Microsoft.Azure.WebJobs.Host/JobHostConfiguration.cs b/src/Microsoft.Azure.WebJobs.Host/JobHostConfiguration.cs index 674588684..ee795d29e 100644 --- a/src/Microsoft.Azure.WebJobs.Host/JobHostConfiguration.cs +++ b/src/Microsoft.Azure.WebJobs.Host/JobHostConfiguration.cs @@ -31,12 +31,9 @@ public sealed class JobHostConfiguration : IServiceProvider private readonly JobHostBlobsConfiguration _blobsConfiguration = new JobHostBlobsConfiguration(); private readonly JobHostTraceConfiguration _traceConfiguration = new JobHostTraceConfiguration(); private readonly ConcurrentDictionary _services = new ConcurrentDictionary(); - private readonly JobHostMetadataProvider _metadataProvider; private string _hostId; - private ServiceProviderWrapper _partialInitServices; - /// /// Initializes a new instance of the class. /// @@ -66,8 +63,7 @@ public JobHostConfiguration(string dashboardAndStorageConnectionString) Aggregator = new FunctionResultAggregatorConfiguration(); // add our built in services here - _metadataProvider = new JobHostMetadataProvider(this); - IExtensionRegistry extensions = new DefaultExtensionRegistry(_metadataProvider); + IExtensionRegistry extensions = new DefaultExtensionRegistry(); ITypeLocator typeLocator = new DefaultTypeLocator(ConsoleProvider.Out, extensions); IConverterManager converterManager = new ConverterManager(); IWebJobsExceptionHandler exceptionHandler = new WebJobsExceptionHandler(); @@ -347,7 +343,7 @@ public object GetService(Type serviceType) } object service = null; - _services.TryGetValue(serviceType, out service); + _services.TryGetValue(serviceType, out service); return service; } @@ -404,44 +400,5 @@ public void AddExtension(IExtensionConfigProvider extension) var exts = this.GetExtensions(); exts.RegisterExtension(extension); } - - internal void AddAttributesFromAssembly(Assembly assembly) - { - _metadataProvider.AddAttributesFromAssembly(assembly); - } - - /// - /// Creates a for this configuration. - /// - /// The . - public IJobHostMetadataProvider CreateMetadataProvider() - { - var serviceProvider = this.CreateStaticServices(); - var bindingProvider = serviceProvider.GetService(); - - _metadataProvider.Initialize(bindingProvider); - - // Ensure all extensions have been called - - lock (this) - { - if (_partialInitServices == null) - { - _partialInitServices = serviceProvider; - } - } - - return _metadataProvider; - } - - internal ServiceProviderWrapper TakeOwnershipOfPartialInitialization() - { - lock (this) - { - var ctx = this._partialInitServices; - this._partialInitServices = null; - return ctx; - } - } } } diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/BlobLeaseDistributedLockManager.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/BlobLeaseDistributedLockManager.cs new file mode 100644 index 000000000..95a70dee1 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/BlobLeaseDistributedLockManager.cs @@ -0,0 +1,408 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Globalization; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Executors; +using Microsoft.Azure.WebJobs.Host.Storage; +using Microsoft.Azure.WebJobs.Host.Storage.Blob; +using Microsoft.Azure.WebJobs.Host.Timers; +using Microsoft.Extensions.Logging; +using Microsoft.WindowsAzure.Storage; +using Microsoft.WindowsAzure.Storage.Blob; + +namespace Microsoft.Azure.WebJobs.Host +{ + // Provides a blob-leased based implementation + internal class BlobLeaseDistributedLockManager : IDistributedLockManager + { + internal const string FunctionInstanceMetadataKey = "FunctionInstance"; + + private readonly IStorageAccountProvider _accountProvider; + private readonly ConcurrentDictionary _lockDirectoryMap = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + private readonly TraceWriter _trace; + private readonly ILogger _logger; + + public BlobLeaseDistributedLockManager( + IStorageAccountProvider accountProvider, + TraceWriter trace, + ILogger logger) + { + _accountProvider = accountProvider; + _trace = trace; + _logger = logger; + } + + public Task RenewAsync(IDistributedLock lockHandle, CancellationToken cancellationToken) + { + SingletonLockHandle singletonLockHandle = (SingletonLockHandle)lockHandle; + return singletonLockHandle.RenewAsync(_trace, _logger, cancellationToken); + } + + public async Task ReleaseLockAsync(IDistributedLock lockHandle, CancellationToken cancellationToken) + { + SingletonLockHandle singletonLockHandle = (SingletonLockHandle)lockHandle; + await ReleaseLeaseAsync(singletonLockHandle.Blob, singletonLockHandle.LeaseId, cancellationToken); + } + + public async virtual Task GetLockOwnerAsync(string account, string lockId, CancellationToken cancellationToken) + { + IStorageBlobDirectory lockDirectory = GetLockDirectory(account); + IStorageBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId); + + await ReadLeaseBlobMetadata(lockBlob, cancellationToken); + + // if the lease is Available, then there is no current owner + // (any existing owner value is the last owner that held the lease) + if (lockBlob.Properties.LeaseState == LeaseState.Available && + lockBlob.Properties.LeaseStatus == LeaseStatus.Unlocked) + { + return null; + } + + string owner = string.Empty; + lockBlob.Metadata.TryGetValue(FunctionInstanceMetadataKey, out owner); + + return owner; + } + + public async Task TryLockAsync( + string account, + string lockId, + string lockOwnerId, + string proposedLeaseId, + TimeSpan lockPeriod, + CancellationToken cancellationToken) + { + IStorageBlobDirectory lockDirectory = GetLockDirectory(account); + IStorageBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId); + string leaseId = await TryAcquireLeaseAsync(lockBlob, lockPeriod, proposedLeaseId, cancellationToken); + + if (string.IsNullOrEmpty(leaseId)) + { + return null; + } + + if (!string.IsNullOrEmpty(lockOwnerId)) + { + await WriteLeaseBlobMetadata(lockBlob, leaseId, lockOwnerId, cancellationToken); + } + + SingletonLockHandle lockHandle = new SingletonLockHandle(leaseId, lockId, lockBlob, lockPeriod); + + return lockHandle; + } + + internal IStorageBlobDirectory GetLockDirectory(string accountName) + { + if (string.IsNullOrEmpty(accountName)) + { + accountName = ConnectionStringNames.Storage; + } + + IStorageBlobDirectory storageDirectory = null; + if (!_lockDirectoryMap.TryGetValue(accountName, out storageDirectory)) + { + Task task = _accountProvider.GetStorageAccountAsync(accountName, CancellationToken.None); + IStorageAccount storageAccount = task.Result; + // singleton requires block blobs, cannot be premium + storageAccount.AssertTypeOneOf(StorageAccountType.GeneralPurpose, StorageAccountType.BlobOnly); + IStorageBlobClient blobClient = storageAccount.CreateBlobClient(); + storageDirectory = blobClient.GetContainerReference(HostContainerNames.Hosts) + .GetDirectoryReference(HostDirectoryNames.SingletonLocks); + _lockDirectoryMap[accountName] = storageDirectory; + } + + return storageDirectory; + } + + private static async Task TryAcquireLeaseAsync( + IStorageBlockBlob blob, + TimeSpan leasePeriod, + string proposedLeaseId, + CancellationToken cancellationToken) + { + bool blobDoesNotExist = false; + try + { + // Optimistically try to acquire the lease. The blob may not yet + // exist. If it doesn't we handle the 404, create it, and retry below + return await blob.AcquireLeaseAsync(leasePeriod, proposedLeaseId, cancellationToken); + } + catch (StorageException exception) + { + if (exception.RequestInformation != null) + { + if (exception.RequestInformation.HttpStatusCode == 409) + { + return null; + } + else if (exception.RequestInformation.HttpStatusCode == 404) + { + blobDoesNotExist = true; + } + else + { + throw; + } + } + else + { + throw; + } + } + + if (blobDoesNotExist) + { + await TryCreateAsync(blob, cancellationToken); + + try + { + return await blob.AcquireLeaseAsync(leasePeriod, null, cancellationToken); + } + catch (StorageException exception) + { + if (exception.RequestInformation != null && + exception.RequestInformation.HttpStatusCode == 409) + { + return null; + } + else + { + throw; + } + } + } + + return null; + } + + private static async Task ReleaseLeaseAsync(IStorageBlockBlob blob, string leaseId, CancellationToken cancellationToken) + { + try + { + // Note that this call returns without throwing if the lease is expired. See the table at: + // http://msdn.microsoft.com/en-us/library/azure/ee691972.aspx + await blob.ReleaseLeaseAsync( + accessCondition: new AccessCondition { LeaseId = leaseId }, + options: null, + operationContext: null, + cancellationToken: cancellationToken); + } + catch (StorageException exception) + { + if (exception.RequestInformation != null) + { + if (exception.RequestInformation.HttpStatusCode == 404 || + exception.RequestInformation.HttpStatusCode == 409) + { + // if the blob no longer exists, or there is another lease + // now active, there is nothing for us to release so we can + // ignore + } + else + { + throw; + } + } + else + { + throw; + } + } + } + + private static async Task TryCreateAsync(IStorageBlockBlob blob, CancellationToken cancellationToken) + { + bool isContainerNotFoundException = false; + + try + { + await blob.UploadTextAsync(string.Empty, cancellationToken: cancellationToken); + return true; + } + catch (StorageException exception) + { + if (exception.RequestInformation != null) + { + if (exception.RequestInformation.HttpStatusCode == 404) + { + isContainerNotFoundException = true; + } + else if (exception.RequestInformation.HttpStatusCode == 409 || + exception.RequestInformation.HttpStatusCode == 412) + { + // The blob already exists, or is leased by someone else + return false; + } + else + { + throw; + } + } + else + { + throw; + } + } + + Debug.Assert(isContainerNotFoundException); + + var container = blob.Container; + try + { + await container.CreateIfNotExistsAsync(cancellationToken); + } + catch (StorageException exc) + when (exc.RequestInformation.HttpStatusCode == 409 && string.Compare("ContainerBeingDeleted", exc.RequestInformation.ExtendedErrorInformation?.ErrorCode) == 0) + { + throw new StorageException("The host container is pending deletion and currently inaccessible."); + } + + try + { + await blob.UploadTextAsync(string.Empty, cancellationToken: cancellationToken); + return true; + } + catch (StorageException exception) + { + if (exception.RequestInformation != null && + (exception.RequestInformation.HttpStatusCode == 409 || exception.RequestInformation.HttpStatusCode == 412)) + { + // The blob already exists, or is leased by someone else + return false; + } + else + { + throw; + } + } + } + + private static async Task WriteLeaseBlobMetadata(IStorageBlockBlob blob, string leaseId, string functionInstanceId, CancellationToken cancellationToken) + { + blob.Metadata.Add(FunctionInstanceMetadataKey, functionInstanceId); + + await blob.SetMetadataAsync( + accessCondition: new AccessCondition { LeaseId = leaseId }, + options: null, + operationContext: null, + cancellationToken: cancellationToken); + } + + private static async Task ReadLeaseBlobMetadata(IStorageBlockBlob blob, CancellationToken cancellationToken) + { + try + { + await blob.FetchAttributesAsync(cancellationToken); + } + catch (StorageException exception) + { + if (exception.RequestInformation != null && + exception.RequestInformation.HttpStatusCode == 404) + { + // the blob no longer exists + } + else + { + throw; + } + } + } + + internal class SingletonLockHandle : IDistributedLock + { + private readonly TimeSpan _leasePeriod; + + private DateTimeOffset _lastRenewal; + private TimeSpan _lastRenewalLatency; + + public SingletonLockHandle() + { + } + + public SingletonLockHandle(string leaseId, string lockId, IStorageBlockBlob blob, TimeSpan leasePeriod) + { + this.LeaseId = leaseId; + this.LockId = lockId; + this._leasePeriod = leasePeriod; + this.Blob = blob; + } + + public string LeaseId { get; internal set; } + public string LockId { get; internal set; } + public IStorageBlockBlob Blob { get; internal set; } + + public async Task RenewAsync(TraceWriter trace, ILogger logger, CancellationToken cancellationToken) + { + try + { + AccessCondition condition = new AccessCondition + { + LeaseId = this.LeaseId + }; + DateTimeOffset requestStart = DateTimeOffset.UtcNow; + await this.Blob.RenewLeaseAsync(condition, null, null, cancellationToken); + _lastRenewal = DateTime.UtcNow; + _lastRenewalLatency = _lastRenewal - requestStart; + + // The next execution should occur after a normal delay. + return true; + } + catch (StorageException exception) + { + if (exception.IsServerSideError()) + { + string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock renewal failed for blob '{0}' with error code {1}.", + this.LockId, FormatErrorCode(exception)); + trace.Warning(msg, source: TraceSource.Execution); + logger?.LogWarning(msg); + return false; // The next execution should occur more quickly (try to renew the lease before it expires). + } + else + { + // Log the details we've been accumulating to help with debugging this scenario + int leasePeriodMilliseconds = (int)_leasePeriod.TotalMilliseconds; + string lastRenewalFormatted = _lastRenewal.ToString("yyyy-MM-ddTHH:mm:ss.FFFZ", CultureInfo.InvariantCulture); + int millisecondsSinceLastSuccess = (int)(DateTime.UtcNow - _lastRenewal).TotalMilliseconds; + int lastRenewalMilliseconds = (int)_lastRenewalLatency.TotalMilliseconds; + + string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock renewal failed for blob '{0}' with error code {1}. The last successful renewal completed at {2} ({3} milliseconds ago) with a duration of {4} milliseconds. The lease period was {5} milliseconds.", + this.LockId, FormatErrorCode(exception), lastRenewalFormatted, millisecondsSinceLastSuccess, lastRenewalMilliseconds, leasePeriodMilliseconds); + trace.Error(msg); + logger?.LogError(msg); + + // If we've lost the lease or cannot re-establish it, we want to fail any + // in progress function execution + throw; + } + } + } + + private static string FormatErrorCode(StorageException exception) + { + int statusCode; + if (!exception.TryGetStatusCode(out statusCode)) + { + return "''"; + } + + string message = statusCode.ToString(CultureInfo.InvariantCulture); + + string errorCode = exception.GetErrorCode(); + + if (errorCode != null) + { + message += ": " + errorCode; + } + + return message; + } + } + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/IDistributedLock.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/IDistributedLock.cs new file mode 100644 index 000000000..bf4e30483 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/IDistributedLock.cs @@ -0,0 +1,20 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.Azure.WebJobs.Host +{ + /// + /// Handle for a lock returned by + /// + public interface IDistributedLock + { + /// + /// The Lock identity. + /// + string LockId { get; } + } +} \ No newline at end of file diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/IDistributedLockManager.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/IDistributedLockManager.cs new file mode 100644 index 000000000..7e2d77e89 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/IDistributedLockManager.cs @@ -0,0 +1,79 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Azure.WebJobs.Host.Executors; + +namespace Microsoft.Azure.WebJobs.Host +{ + /// + /// Manage distributed lock. A lock is specified by (account, lockId). Interface implementations should cooperate with + /// + /// + /// 1. Account can be null or it may be a storage account name intended for . + /// 2. lockId has the same naming restrictions as blobs. + /// + public interface IDistributedLockManager + { + /// + /// Try to acquire a lock specified by (account, lockId). + /// + /// optional. A string specifying the "account" to use. LockIds are scoped to an account. This value may come from + /// The name of the lock. + /// A string hint specifying who owns this lock. Only used for diagnostics. + /// Optional. This can allow the caller to immediately assume the lease. + /// If this call acquires the lock, then this becomes the leaseId. + /// If the lock is already held with this same leaseId, then this call will acquire the lock. + /// The caller is responsible for enforcing that any simultaneous callers have different lease ids. + /// + /// The length of the lease to acquire. The caller is responsible for Renewing the lease well before this time is up. + /// The exact value here is restricted based on the underlying implementation. + /// + /// Null if can't acquire the lock. This is common if somebody else holds it. + Task TryLockAsync( + string account, + string lockId, + string lockOwnerId, + string proposedLeaseId, + TimeSpan lockPeriod, + CancellationToken cancellationToken); + + /// + /// Called by the client to renew the lease. The timing internals here are determined by + /// + /// + /// + /// A hint used for determining the next time delay in calling Renew. + /// True means the next execution should occur at a normal delay. False means the next execution should occur quickly; use this in network error cases. + /// + /// If this throws an exception, the lease is cancelled. + /// + Task RenewAsync(IDistributedLock lockHandle, CancellationToken cancellationToken); + + /// + /// Get the owner for a given lock or null if not held. + /// This is used for diagnostics only. + /// The lock owner can change immediately after this function returns, so callers can't be guaranteed the owner still the same. + /// + /// optional. A string specifying the account to use. LockIds are scoped to an account + /// the name of the lock. + /// + /// + Task GetLockOwnerAsync( + string account, + string lockId, + CancellationToken cancellationToken); + + /// + /// Release a lock that was previously acquired via TryLockAsync. + /// + /// previously acquired handle to be released + /// + /// + Task ReleaseLockAsync( + IDistributedLock lockHandle, + CancellationToken cancellationToken); + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/RenewableLockHandle.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/RenewableLockHandle.cs new file mode 100644 index 000000000..24729c4e5 --- /dev/null +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/RenewableLockHandle.cs @@ -0,0 +1,27 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.Timers; + +namespace Microsoft.Azure.WebJobs.Host +{ + /// + /// Internal lock handle returned by + /// + internal class RenewableLockHandle + { + public RenewableLockHandle(IDistributedLock handle, ITaskSeriesTimer renewal) + { + this.InnerLock = handle; + this.LeaseRenewalTimer = renewal; + } + + // The inner lock for the underlying implementation. + // This is a pluggable implementation. + public IDistributedLock InnerLock { get; private set; } + + // Handle to a timer for renewing the lease. + // We handle the timer. + public ITaskSeriesTimer LeaseRenewalTimer { get; private set; } + } +} diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonConfiguration.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonConfiguration.cs index 81861d11c..137c6141e 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonConfiguration.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonConfiguration.cs @@ -8,6 +8,8 @@ namespace Microsoft.Azure.WebJobs.Host { /// /// Configuration options governing the lock functionality of . + /// The configuration needs to cooperate with the SDK's registered . + /// For example, this configuration determines the refresh frequently for calls on . /// public sealed class SingletonConfiguration { diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonListener.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonListener.cs index c08a39870..1408d21af 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonListener.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonListener.cs @@ -22,7 +22,7 @@ internal class SingletonListener : IListener private readonly TraceWriter _trace; private readonly ILogger _logger; private string _lockId; - private object _lockHandle; + private RenewableLockHandle _lockHandle; private bool _isListening; public SingletonListener(MethodInfo method, SingletonAttribute attribute, SingletonManager singletonManager, IListener innerListener, TraceWriter trace, ILoggerFactory loggerFactory) diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonLock.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonLock.cs index 4bca49553..5967bafd7 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonLock.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonLock.cs @@ -16,7 +16,7 @@ internal class SingletonLock private readonly string _functionInstanceId; private readonly SingletonAttribute _attribute; private readonly SingletonManager _singletonManager; - private object _lockHandle; + private RenewableLockHandle _lockHandle; public SingletonLock(string id, string functionInstanceId, SingletonAttribute attribute, SingletonManager manager) { diff --git a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonManager.cs b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonManager.cs index a000d0168..37f2b58b7 100644 --- a/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonManager.cs +++ b/src/Microsoft.Azure.WebJobs.Host/Singleton/SingletonManager.cs @@ -2,27 +2,19 @@ // Licensed under the MIT License. See License.txt in the project root for license information. using System; -using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.Globalization; using System.Linq; using System.Reflection; using System.Threading; using System.Threading.Tasks; -using Microsoft.Azure.WebJobs.Host.Bindings; using Microsoft.Azure.WebJobs.Host.Bindings.Path; using Microsoft.Azure.WebJobs.Host.Executors; using Microsoft.Azure.WebJobs.Host.Listeners; -using Microsoft.Azure.WebJobs.Host.Loggers; using Microsoft.Azure.WebJobs.Host.Protocols; -using Microsoft.Azure.WebJobs.Host.Storage; -using Microsoft.Azure.WebJobs.Host.Storage.Blob; using Microsoft.Azure.WebJobs.Host.Timers; using Microsoft.Azure.WebJobs.Logging; using Microsoft.Extensions.Logging; -using Microsoft.WindowsAzure.Storage; -using Microsoft.WindowsAzure.Storage.Blob; namespace Microsoft.Azure.WebJobs.Host { @@ -30,38 +22,51 @@ namespace Microsoft.Azure.WebJobs.Host /// Encapsulates and manages blob leases for Singleton locks. /// internal class SingletonManager - { - internal const string FunctionInstanceMetadataKey = "FunctionInstance"; + { private readonly INameResolver _nameResolver; - private readonly IWebJobsExceptionHandler _exceptionHandler; - private readonly SingletonConfiguration _config; - private readonly IStorageAccountProvider _accountProvider; + private readonly SingletonConfiguration _config; private readonly ILogger _logger; private readonly ILoggerFactory _loggerFactory; - private ConcurrentDictionary _lockDirectoryMap = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - private TimeSpan _minimumLeaseRenewalInterval = TimeSpan.FromSeconds(1); + private readonly IDistributedLockManager _lockManager; + private readonly IWebJobsExceptionHandler _exceptionHandler; + private TraceWriter _trace; private IHostIdProvider _hostIdProvider; private string _hostId; + private TimeSpan _minimumLeaseRenewalInterval = TimeSpan.FromSeconds(1); + // For mock testing only internal SingletonManager() { } - public SingletonManager(IStorageAccountProvider accountProvider, IWebJobsExceptionHandler exceptionHandler, SingletonConfiguration config, - TraceWriter trace, ILoggerFactory loggerFactory, IHostIdProvider hostIdProvider, INameResolver nameResolver = null) + public SingletonManager(IDistributedLockManager lockManager, SingletonConfiguration config, + TraceWriter trace, IWebJobsExceptionHandler exceptionHandler, ILoggerFactory loggerFactory, IHostIdProvider hostIdProvider, INameResolver nameResolver = null) { - _accountProvider = accountProvider; + _lockManager = lockManager; _nameResolver = nameResolver; - _exceptionHandler = exceptionHandler; _config = config; _trace = trace; _loggerFactory = loggerFactory; + _exceptionHandler = exceptionHandler; _logger = _loggerFactory?.CreateLogger(LogCategories.Singleton); _hostIdProvider = hostIdProvider; } + // for testing + internal TimeSpan MinimumLeaseRenewalInterval + { + get + { + return _minimumLeaseRenewalInterval; + } + set + { + _minimumLeaseRenewalInterval = value; + } + } + internal virtual SingletonConfiguration Config { get @@ -82,22 +87,9 @@ internal string HostId } } - // for testing - internal TimeSpan MinimumLeaseRenewalInterval - { - get - { - return _minimumLeaseRenewalInterval; - } - set - { - _minimumLeaseRenewalInterval = value; - } - } - - public async virtual Task LockAsync(string lockId, string functionInstanceId, SingletonAttribute attribute, CancellationToken cancellationToken) + public async virtual Task LockAsync(string lockId, string functionInstanceId, SingletonAttribute attribute, CancellationToken cancellationToken) { - object lockHandle = await TryLockAsync(lockId, functionInstanceId, attribute, cancellationToken); + RenewableLockHandle lockHandle = await TryLockAsync(lockId, functionInstanceId, attribute, cancellationToken); if (lockHandle == null) { @@ -110,13 +102,12 @@ public async virtual Task LockAsync(string lockId, string functionInstan return lockHandle; } - public async virtual Task TryLockAsync(string lockId, string functionInstanceId, SingletonAttribute attribute, CancellationToken cancellationToken, bool retry = true) + public async virtual Task TryLockAsync(string lockId, string functionInstanceId, SingletonAttribute attribute, CancellationToken cancellationToken, bool retry = true) { - IStorageBlobDirectory lockDirectory = GetLockDirectory(attribute.Account); - IStorageBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId); TimeSpan lockPeriod = GetLockPeriod(attribute, _config); - string leaseId = await TryAcquireLeaseAsync(lockBlob, lockPeriod, cancellationToken); - if (string.IsNullOrEmpty(leaseId) && retry) + IDistributedLock handle = await _lockManager.TryLockAsync(attribute.Account, lockId, functionInstanceId, null, lockPeriod, cancellationToken); + + if ((handle == null) && retry) { // Someone else has the lease. Continue trying to periodically get the lease for // a period of time @@ -125,55 +116,58 @@ public async virtual Task TryLockAsync(string lockId, string functionIns _config.LockAcquisitionTimeout; TimeSpan timeWaited = TimeSpan.Zero; - while (string.IsNullOrEmpty(leaseId) && (timeWaited < acquisitionTimeout)) + while ((handle == null) && (timeWaited < acquisitionTimeout)) { await Task.Delay(_config.LockAcquisitionPollingInterval); timeWaited += _config.LockAcquisitionPollingInterval; - leaseId = await TryAcquireLeaseAsync(lockBlob, lockPeriod, cancellationToken); + handle = await _lockManager.TryLockAsync(attribute.Account, lockId, functionInstanceId, null, lockPeriod, cancellationToken); } } - if (string.IsNullOrEmpty(leaseId)) + if (handle == null) { return null; } + var renewal = CreateLeaseRenewalTimer(lockPeriod, handle); + + // start the renewal timer, which ensures that we maintain our lease until + // the lock is released + renewal.Start(); + string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock acquired ({0})", lockId); _trace.Verbose(msg, source: TraceSource.Execution); _logger?.LogDebug(msg); - if (!string.IsNullOrEmpty(functionInstanceId)) - { - await WriteLeaseBlobMetadata(lockBlob, leaseId, functionInstanceId, cancellationToken); - } - - SingletonLockHandle lockHandle = new SingletonLockHandle - { - LeaseId = leaseId, - LockId = lockId, - Blob = lockBlob, - LeaseRenewalTimer = CreateLeaseRenewalTimer(lockBlob, leaseId, lockId, lockPeriod, _exceptionHandler) - }; + return new RenewableLockHandle(handle, renewal); + } - // start the renewal timer, which ensures that we maintain our lease until - // the lock is released - lockHandle.LeaseRenewalTimer.Start(); + private ITaskSeriesTimer CreateLeaseRenewalTimer(TimeSpan leasePeriod, IDistributedLock lockHandle) + { + // renew the lease when it is halfway to expiring + TimeSpan normalUpdateInterval = new TimeSpan(leasePeriod.Ticks / 2); - return lockHandle; + IDelayStrategy speedupStrategy = new LinearSpeedupStrategy(normalUpdateInterval, MinimumLeaseRenewalInterval); + ITaskSeriesCommand command = new RenewLeaseCommand(this._lockManager, lockHandle, speedupStrategy); + return new TaskSeriesTimer(command, this._exceptionHandler, Task.Delay(normalUpdateInterval)); } - public async virtual Task ReleaseLockAsync(object lockHandle, CancellationToken cancellationToken) + internal static TimeSpan GetLockPeriod(SingletonAttribute attribute, SingletonConfiguration config) { - SingletonLockHandle singletonLockHandle = (SingletonLockHandle)lockHandle; + return attribute.Mode == SingletonMode.Listener ? + config.ListenerLockPeriod : config.LockPeriod; + } - if (singletonLockHandle.LeaseRenewalTimer != null) + public async virtual Task ReleaseLockAsync(RenewableLockHandle handle, CancellationToken cancellationToken) + { + if (handle.LeaseRenewalTimer != null) { - await singletonLockHandle.LeaseRenewalTimer.StopAsync(cancellationToken); + await handle.LeaseRenewalTimer.StopAsync(cancellationToken); } + + await _lockManager.ReleaseLockAsync(handle.InnerLock, cancellationToken); - await ReleaseLeaseAsync(singletonLockHandle.Blob, singletonLockHandle.LeaseId, cancellationToken); - - string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock released ({0})", singletonLockHandle.LockId); + string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock released ({0})", handle.InnerLock.LockId); _trace.Verbose(msg, source: TraceSource.Execution); _logger?.LogDebug(msg); } @@ -307,346 +301,29 @@ internal static void ValidateSingletonAttribute(SingletonAttribute attribute, Si public async virtual Task GetLockOwnerAsync(SingletonAttribute attribute, string lockId, CancellationToken cancellationToken) { - IStorageBlobDirectory lockDirectory = GetLockDirectory(attribute.Account); - IStorageBlockBlob lockBlob = lockDirectory.GetBlockBlobReference(lockId); - - await ReadLeaseBlobMetadata(lockBlob, cancellationToken); - - // if the lease is Available, then there is no current owner - // (any existing owner value is the last owner that held the lease) - if (lockBlob.Properties.LeaseState == LeaseState.Available && - lockBlob.Properties.LeaseStatus == LeaseStatus.Unlocked) - { - return null; - } - - string owner = string.Empty; - lockBlob.Metadata.TryGetValue(FunctionInstanceMetadataKey, out owner); - - return owner; - } - - internal IStorageBlobDirectory GetLockDirectory(string accountName) - { - if (string.IsNullOrEmpty(accountName)) - { - accountName = ConnectionStringNames.Storage; - } - - IStorageBlobDirectory storageDirectory = null; - if (!_lockDirectoryMap.TryGetValue(accountName, out storageDirectory)) - { - Task task = _accountProvider.GetStorageAccountAsync(accountName, CancellationToken.None); - IStorageAccount storageAccount = task.Result; - // singleton requires block blobs, cannot be premium - storageAccount.AssertTypeOneOf(StorageAccountType.GeneralPurpose, StorageAccountType.BlobOnly); - IStorageBlobClient blobClient = storageAccount.CreateBlobClient(); - storageDirectory = blobClient.GetContainerReference(HostContainerNames.Hosts) - .GetDirectoryReference(HostDirectoryNames.SingletonLocks); - _lockDirectoryMap[accountName] = storageDirectory; - } - - return storageDirectory; - } - - internal static TimeSpan GetLockPeriod(SingletonAttribute attribute, SingletonConfiguration config) - { - return attribute.Mode == SingletonMode.Listener ? - config.ListenerLockPeriod : config.LockPeriod; - } - - private ITaskSeriesTimer CreateLeaseRenewalTimer(IStorageBlockBlob leaseBlob, string leaseId, string lockId, TimeSpan leasePeriod, - IWebJobsExceptionHandler exceptionHandler) - { - // renew the lease when it is halfway to expiring - TimeSpan normalUpdateInterval = new TimeSpan(leasePeriod.Ticks / 2); - - IDelayStrategy speedupStrategy = new LinearSpeedupStrategy(normalUpdateInterval, MinimumLeaseRenewalInterval); - ITaskSeriesCommand command = new RenewLeaseCommand(leaseBlob, leaseId, lockId, speedupStrategy, _trace, _logger, leasePeriod); - return new TaskSeriesTimer(command, exceptionHandler, Task.Delay(normalUpdateInterval)); - } - - private static async Task TryAcquireLeaseAsync(IStorageBlockBlob blob, TimeSpan leasePeriod, CancellationToken cancellationToken) - { - bool blobDoesNotExist = false; - try - { - // Optimistically try to acquire the lease. The blob may not yet - // exist. If it doesn't we handle the 404, create it, and retry below - return await blob.AcquireLeaseAsync(leasePeriod, null, cancellationToken); - } - catch (StorageException exception) - { - if (exception.RequestInformation != null) - { - if (exception.RequestInformation.HttpStatusCode == 409) - { - return null; - } - else if (exception.RequestInformation.HttpStatusCode == 404) - { - blobDoesNotExist = true; - } - else - { - throw; - } - } - else - { - throw; - } - } - - if (blobDoesNotExist) - { - await TryCreateAsync(blob, cancellationToken); - - try - { - return await blob.AcquireLeaseAsync(leasePeriod, null, cancellationToken); - } - catch (StorageException exception) - { - if (exception.RequestInformation != null && - exception.RequestInformation.HttpStatusCode == 409) - { - return null; - } - else - { - throw; - } - } - } - - return null; - } - - private static async Task ReleaseLeaseAsync(IStorageBlockBlob blob, string leaseId, CancellationToken cancellationToken) - { - try - { - // Note that this call returns without throwing if the lease is expired. See the table at: - // http://msdn.microsoft.com/en-us/library/azure/ee691972.aspx - await blob.ReleaseLeaseAsync( - accessCondition: new AccessCondition { LeaseId = leaseId }, - options: null, - operationContext: null, - cancellationToken: cancellationToken); - } - catch (StorageException exception) - { - if (exception.RequestInformation != null) - { - if (exception.RequestInformation.HttpStatusCode == 404 || - exception.RequestInformation.HttpStatusCode == 409) - { - // if the blob no longer exists, or there is another lease - // now active, there is nothing for us to release so we can - // ignore - } - else - { - throw; - } - } - else - { - throw; - } - } - } - - private static async Task TryCreateAsync(IStorageBlockBlob blob, CancellationToken cancellationToken) - { - bool isContainerNotFoundException = false; - - try - { - await blob.UploadTextAsync(string.Empty, cancellationToken: cancellationToken); - return true; - } - catch (StorageException exception) - { - if (exception.RequestInformation != null) - { - if (exception.RequestInformation.HttpStatusCode == 404) - { - isContainerNotFoundException = true; - } - else if (exception.RequestInformation.HttpStatusCode == 409 || - exception.RequestInformation.HttpStatusCode == 412) - { - // The blob already exists, or is leased by someone else - return false; - } - else - { - throw; - } - } - else - { - throw; - } - } - - Debug.Assert(isContainerNotFoundException); - await blob.Container.CreateIfNotExistsAsync(cancellationToken); - - try - { - await blob.UploadTextAsync(string.Empty, cancellationToken: cancellationToken); - return true; - } - catch (StorageException exception) - { - if (exception.RequestInformation != null && - (exception.RequestInformation.HttpStatusCode == 409 || exception.RequestInformation.HttpStatusCode == 412)) - { - // The blob already exists, or is leased by someone else - return false; - } - else - { - throw; - } - } - } - - private static async Task WriteLeaseBlobMetadata(IStorageBlockBlob blob, string leaseId, string functionInstanceId, CancellationToken cancellationToken) - { - blob.Metadata.Add(FunctionInstanceMetadataKey, functionInstanceId); - - await blob.SetMetadataAsync( - accessCondition: new AccessCondition { LeaseId = leaseId }, - options: null, - operationContext: null, - cancellationToken: cancellationToken); - } - - private static async Task ReadLeaseBlobMetadata(IStorageBlockBlob blob, CancellationToken cancellationToken) - { - try - { - await blob.FetchAttributesAsync(cancellationToken); - } - catch (StorageException exception) - { - if (exception.RequestInformation != null && - exception.RequestInformation.HttpStatusCode == 404) - { - // the blob no longer exists - } - else - { - throw; - } - } - } - - internal class SingletonLockHandle - { - public string LeaseId { get; set; } - public string LockId { get; set; } - public IStorageBlockBlob Blob { get; set; } - public ITaskSeriesTimer LeaseRenewalTimer { get; set; } + return await _lockManager.GetLockOwnerAsync(attribute.Account, lockId, cancellationToken); } internal class RenewLeaseCommand : ITaskSeriesCommand { - private readonly IStorageBlockBlob _leaseBlob; - private readonly string _leaseId; - private readonly string _lockId; + private readonly IDistributedLockManager _lockManager; + private readonly IDistributedLock _lock; private readonly IDelayStrategy _speedupStrategy; - private readonly TraceWriter _trace; - private readonly ILogger _logger; - private DateTimeOffset _lastRenewal; - private TimeSpan _lastRenewalLatency; - private TimeSpan _leasePeriod; - - public RenewLeaseCommand(IStorageBlockBlob leaseBlob, string leaseId, string lockId, IDelayStrategy speedupStrategy, TraceWriter trace, - ILogger logger, TimeSpan leasePeriod) + + public RenewLeaseCommand(IDistributedLockManager lockManager, IDistributedLock @lock, IDelayStrategy speedupStrategy) { - _lastRenewal = DateTimeOffset.UtcNow; - _leaseBlob = leaseBlob; - _leaseId = leaseId; - _lockId = lockId; + _lock = @lock; + _lockManager = lockManager; _speedupStrategy = speedupStrategy; - _trace = trace; - _logger = logger; - _leasePeriod = leasePeriod; } public async Task ExecuteAsync(CancellationToken cancellationToken) - { - TimeSpan delay; - - try - { - AccessCondition condition = new AccessCondition - { - LeaseId = _leaseId - }; - DateTimeOffset requestStart = DateTimeOffset.UtcNow; - await _leaseBlob.RenewLeaseAsync(condition, null, null, cancellationToken); - _lastRenewal = DateTime.UtcNow; - _lastRenewalLatency = _lastRenewal - requestStart; - - // The next execution should occur after a normal delay. - delay = _speedupStrategy.GetNextDelay(executionSucceeded: true); - } - catch (StorageException exception) - { - if (exception.IsServerSideError()) - { - // The next execution should occur more quickly (try to renew the lease before it expires). - delay = _speedupStrategy.GetNextDelay(executionSucceeded: false); - string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock renewal failed for blob '{0}' with error code {1}. Retry renewal in {2} milliseconds.", - _lockId, FormatErrorCode(exception), delay.TotalMilliseconds); - _trace.Warning(msg, source: TraceSource.Execution); - _logger?.LogWarning(msg); - } - else - { - // Log the details we've been accumulating to help with debugging this scenario - int leasePeriodMilliseconds = (int)_leasePeriod.TotalMilliseconds; - string lastRenewalFormatted = _lastRenewal.ToString("yyyy-MM-ddTHH:mm:ss.FFFZ", CultureInfo.InvariantCulture); - int millisecondsSinceLastSuccess = (int)(DateTime.UtcNow - _lastRenewal).TotalMilliseconds; - int lastRenewalMilliseconds = (int)_lastRenewalLatency.TotalMilliseconds; - - string msg = string.Format(CultureInfo.InvariantCulture, "Singleton lock renewal failed for blob '{0}' with error code {1}. The last successful renewal completed at {2} ({3} milliseconds ago) with a duration of {4} milliseconds. The lease period was {5} milliseconds.", - _lockId, FormatErrorCode(exception), lastRenewalFormatted, millisecondsSinceLastSuccess, lastRenewalMilliseconds, leasePeriodMilliseconds); - _trace.Error(msg); - _logger?.LogError(msg); - - // If we've lost the lease or cannot re-establish it, we want to fail any - // in progress function execution - throw; - } - } - return new TaskSeriesCommandResult(wait: Task.Delay(delay)); - } - - private static string FormatErrorCode(StorageException exception) - { - int statusCode; - if (!exception.TryGetStatusCode(out statusCode)) - { - return "''"; - } - - string message = statusCode.ToString(CultureInfo.InvariantCulture); - - string errorCode = exception.GetErrorCode(); - - if (errorCode != null) - { - message += ": " + errorCode; - } + { + // Exceptions wil propagate + bool executionSucceeded = await _lockManager.RenewAsync(_lock, cancellationToken); - return message; + TimeSpan delay = _speedupStrategy.GetNextDelay(executionSucceeded: true); + return new TaskSeriesCommandResult(wait: Task.Delay(delay)); } } } diff --git a/src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj b/src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj index d4554f3f1..010cb3864 100644 --- a/src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj +++ b/src/Microsoft.Azure.WebJobs.Host/WebJobs.Host.csproj @@ -532,6 +532,10 @@ + + + + diff --git a/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestHelpers.cs b/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestHelpers.cs index e837ce005..f5fd3097e 100644 --- a/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestHelpers.cs +++ b/test/Microsoft.Azure.WebJobs.Host.TestCommon/TestHelpers.cs @@ -142,6 +142,7 @@ public static void AddServices(this JobHostConfiguration config, params object[] typeof(IHostIdProvider), typeof(IQueueConfiguration), typeof(IExtensionRegistry), + typeof(IDistributedLockManager), typeof(IFunctionIndexProvider) // set to unit test indexing. }; @@ -221,5 +222,10 @@ public Task TryGetAccountAsync(string connectionStringName, Can return Task.FromResult(account); } } + + public static IJobHostMetadataProvider CreateMetadataProvider(this JobHost host) + { + return host.Services.GetService(); + } } } diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostConfigurationTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostConfigurationTests.cs index 06ba6ee9e..bf3125bff 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostConfigurationTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostConfigurationTests.cs @@ -16,6 +16,7 @@ using Microsoft.Azure.WebJobs.Host.TestCommon; using Newtonsoft.Json; using Xunit; +using Microsoft.Azure.WebJobs.Host.Config; namespace Microsoft.Azure.WebJobs.Host.UnitTests { @@ -425,6 +426,61 @@ public void JobHost_NoStorage_Succeeds() } } + [Fact] + public void TestServices() + { + // Test configuration similar to how ScriptRuntime works. + // - config is created and immediatey passed to a JobHost ctor + // - config is then initialized, including adding extensions + // - extensions may register their own services. + JobHostConfiguration config = new JobHostConfiguration(); + var host = new JobHost(config); + + var lockManager = config.GetService(); + Assert.Null(lockManager); // Not initialized yet. + + var nameResolver = new FakeNameResolver(); + config.AddExtension(new TestExtension()); // this extension will add services. + config.AddService(nameResolver); + + // Now succeeds when called on JobHost instead of Config object. + lockManager = host.Services.GetService(); + Assert.NotNull(lockManager); + Assert.IsType(lockManager); // verify it's our custom type + } + + // A test extension. This registers a new service in the initialization path. + class TestExtension : IExtensionConfigProvider + { + public void Initialize(ExtensionConfigContext context) + { + context.Config.AddService(new TestLockManager()); + } + } + + class TestLockManager : IDistributedLockManager + { + public Task GetLockOwnerAsync(string account, string lockId, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public Task ReleaseLockAsync(IDistributedLock lockHandle, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public Task RenewAsync(IDistributedLock lockHandle, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + + public Task TryLockAsync(string account, string lockId, string lockOwnerId, string proposedLeaseId, TimeSpan lockPeriod, CancellationToken cancellationToken) + { + throw new NotImplementedException(); + } + } + public class BasicTest { public static bool Called = false; diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostContextTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostContextTests.cs index 118a4b9bc..b385077e4 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostContextTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostContextTests.cs @@ -28,7 +28,7 @@ public void Dispose_Disposes() mockListener.Setup(p => p.Dispose()); mockLoggerFactory.Setup(p => p.Dispose()); - var context = new JobHostContext(mockLookup.Object, mockExecutor.Object, mockListener.Object, traceWriter, loggerFactory: mockLoggerFactory.Object); + var context = new JobHostContext(mockLookup.Object, mockExecutor.Object, mockListener.Object, traceWriter, null, loggerFactory: mockLoggerFactory.Object); Assert.Same(traceWriter, context.Trace); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostMetadataProviderTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostMetadataProviderTests.cs index 2a7912384..0b75f3175 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostMetadataProviderTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/JobHostMetadataProviderTests.cs @@ -27,11 +27,11 @@ public void Test() config.AddExtension(ext); - IJobHostMetadataProvider metadataProvider = config.CreateMetadataProvider(); + var host = new TestJobHost(config); + IJobHostMetadataProvider metadataProvider = host.CreateMetadataProvider(); Assert.Equal(1, ext._counter); // Callable - var host = new TestJobHost(config); host.Call("Test"); Assert.Equal(1, ext._counter); @@ -72,7 +72,8 @@ static T GetAttr(IJobHostMetadataProvider metadataProvider, object obj) where public void AttrBuilder() { JobHostConfiguration config = TestHelpers.NewConfig(); - var metadataProvider = config.CreateMetadataProvider(); + var host2 = new JobHost(config); + var metadataProvider = host2.CreateMetadataProvider(); // Blob var blobAttr = GetAttr(metadataProvider, new { path = "x" } ); @@ -138,7 +139,8 @@ public void AttrBuilder() public void DefaultTypeForTable() { JobHostConfiguration config = TestHelpers.NewConfig(); - var metadataProvider = config.CreateMetadataProvider(); + var host2 = new JobHost(config); + var metadataProvider = host2.CreateMetadataProvider(); var t1 = metadataProvider.GetDefaultType(new TableAttribute("table1"), FileAccess.Read, null); Assert.Equal(typeof(JArray), t1); @@ -155,7 +157,8 @@ public void DefaultTypeForTable() public void DefaultTypeForQueue() { JobHostConfiguration config = TestHelpers.NewConfig(); - var metadataProvider = config.CreateMetadataProvider(); + var host2 = new JobHost(config); + var metadataProvider = host2.CreateMetadataProvider(); var t1 = metadataProvider.GetDefaultType(new QueueAttribute("q"), FileAccess.Read, typeof(byte[])); Assert.Equal(typeof(byte[]), t1); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs index 8bb829f26..3e612e03e 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/PublicSurfaceTests.cs @@ -233,7 +233,9 @@ public void WebJobsHostPublicSurface_LimitedToSpecificTypes() "LogCategoryFilter", "LogCategories", "LoggingKeys", - "ScopeKeys" + "ScopeKeys", + "IDistributedLockManager", + "IDistributedLock" }; AssertPublicTypes(expected, assembly); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonEnd2EndTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonEnd2EndTests.cs new file mode 100644 index 000000000..edc53386f --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonEnd2EndTests.cs @@ -0,0 +1,100 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Microsoft.Azure.WebJobs.Host.TestCommon; +using System; +using System.Threading; +using Xunit; +using System.Threading.Tasks; +using System.Collections.Generic; + +namespace Microsoft.Azure.WebJobs.Host.UnitTests.Singleton +{ + public class SingletonEnd2EndTests + { + [Fact] + public async Task ValidateExclusion() + { + var core = new FakeSingletonManager(); + var host = TestHelpers.NewJobHost(core); + + var task1 = host.CallAsync("Func1", null); + var task2 = host.CallAsync("Func1", null); + + await Task.WhenAll(task1, task2); + } + + // Ensure singletons are serialized + public class Program + { + // Ensure exclusion + static int _counter = 0; + + [NoAutomaticTrigger] + [Singleton] // should ensure all instances of Func1 are exclusive. + public async Task Func1() + { + var newVal = Interlocked.Increment(ref _counter); + Assert.Equal(1, newVal); + + // Wait long enough that if singleton is not working, the other function would have started. + await Task.Delay(300); + + newVal = Interlocked.Decrement(ref _counter); + Assert.Equal(0, newVal); + } + } + + + internal class FakeSingletonManager : IDistributedLockManager + { + Dictionary _locks = new Dictionary(); + + public Task GetLockOwnerAsync(string account, string lockId, CancellationToken cancellationToken) + { + return Task.FromResult(null); + } + + public Task ReleaseLockAsync(IDistributedLock lockHandle, CancellationToken cancellationToken) + { + FakeLock x = (FakeLock)lockHandle; + lock (_locks) + { + _locks.Remove(x.LockId); + } + return Task.CompletedTask; + } + + public Task RenewAsync(IDistributedLock lockHandle, CancellationToken cancellationToken) + { + return Task.FromResult(true); + } + + public Task TryLockAsync(string account, string lockId, string lockOwnerId, string proposedLeaseId, TimeSpan lockPeriod, CancellationToken cancellationToken) + { + FakeLock entry = null; + lock (_locks) + { + if (!_locks.ContainsKey(lockId)) + { + entry = new FakeLock + { + LockId = lockId, + LockOwnerId = lockOwnerId + }; + _locks[lockId] = entry; + } + } + return Task.FromResult(entry); + } + + class FakeLock : IDistributedLock + { + public string LockId { get; set; } + public string LockOwnerId { get; set; } + + public Task LeaseLost { get { throw new NotImplementedException(); } } + } + } + } +} \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonListenerTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonListenerTests.cs index ac7b66d07..60ada5629 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonListenerTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonListenerTests.cs @@ -13,6 +13,7 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests.Singleton { + using SingletonLockHandle = BlobLeaseDistributedLockManager.SingletonLockHandle; public class SingletonListenerTests { private readonly string TestHostId = "testhostid"; @@ -44,7 +45,7 @@ public SingletonListenerTests() public async Task StartAsync_StartsListener_WhenLockAcquired() { CancellationToken cancellationToken = new CancellationToken(); - SingletonManager.SingletonLockHandle lockHandle = new SingletonManager.SingletonLockHandle(); + var lockHandle = new RenewableLockHandle(new SingletonLockHandle(), null); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, cancellationToken, false)) .ReturnsAsync(lockHandle); _mockInnerListener.Setup(p => p.StartAsync(cancellationToken)).Returns(Task.FromResult(true)); @@ -62,7 +63,7 @@ public async Task StartAsync_DoesNotStartListener_WhenLockCannotBeAcquired() { CancellationToken cancellationToken = new CancellationToken(); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, cancellationToken, false)) - .ReturnsAsync((object)null); + .ReturnsAsync((RenewableLockHandle)null); await _listener.StartAsync(cancellationToken); @@ -84,7 +85,7 @@ public async Task StartAsync_DoesNotStartLockTimer_WhenPollingIntervalSetToInfin CancellationToken cancellationToken = new CancellationToken(); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, cancellationToken, true)) - .ReturnsAsync((object)null); + .ReturnsAsync((RenewableLockHandle)null); await _listener.StartAsync(cancellationToken); @@ -103,7 +104,7 @@ public async Task TryAcquireLock_WhenLockAcquired_StopsLockTimerAndStartsListene }; _listener.LockTimer.Start(); - SingletonManager.SingletonLockHandle lockHandle = new SingletonManager.SingletonLockHandle(); + RenewableLockHandle lockHandle = new RenewableLockHandle(new SingletonLockHandle(), null); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, CancellationToken.None, false)) .ReturnsAsync(lockHandle); @@ -124,7 +125,7 @@ public async Task TryAcquireLock_LockNotAcquired_DoesNotStopLockTimer() _listener.LockTimer.Start(); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, CancellationToken.None, false)) - .ReturnsAsync((object)null); + .ReturnsAsync((RenewableLockHandle)null); Assert.True(_listener.LockTimer.Enabled); @@ -145,7 +146,7 @@ public async Task StopAsync_WhenLockNotAcquired_StopsLockTimer() { CancellationToken cancellationToken = new CancellationToken(); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, cancellationToken, false)) - .ReturnsAsync((object)null); + .ReturnsAsync((RenewableLockHandle)null); await _listener.StartAsync(cancellationToken); @@ -163,7 +164,7 @@ public async Task StopAsync_WhenLockNotAcquired_StopsLockTimer() public async Task StopAsync_WhenLockAcquired_ReleasesLock_AndStopsListener() { CancellationToken cancellationToken = new CancellationToken(); - SingletonManager.SingletonLockHandle lockHandle = new SingletonManager.SingletonLockHandle(); + var lockHandle = new RenewableLockHandle(new SingletonLockHandle(), null); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, cancellationToken, false)) .ReturnsAsync(lockHandle); _mockInnerListener.Setup(p => p.StartAsync(cancellationToken)).Returns(Task.FromResult(true)); @@ -212,7 +213,7 @@ public void Dispose_DisposesListener() public async Task Dispose_WhenLockAcquired_ReleasesLock() { CancellationToken cancellationToken = new CancellationToken(); - SingletonManager.SingletonLockHandle lockHandle = new SingletonManager.SingletonLockHandle(); + var lockHandle = new RenewableLockHandle(new SingletonLockHandle(), null); _mockSingletonManager.Setup(p => p.TryLockAsync(_lockId, null, _attribute, cancellationToken, false)) .ReturnsAsync(lockHandle); _mockInnerListener.Setup(p => p.StartAsync(cancellationToken)).Returns(Task.FromResult(true)); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonLockTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonLockTests.cs index e88c93052..0f1265542 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonLockTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonLockTests.cs @@ -8,6 +8,8 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests.Singleton { + using SingletonLockHandle = BlobLeaseDistributedLockManager.SingletonLockHandle; + public class SingletonLockTests { private const string TestLockId = "testid"; @@ -32,7 +34,7 @@ public async Task AquireAsync_InvokesSingletonManager_WithExpectedValues() { CancellationToken cancellationToken = new CancellationToken(); SingletonAttribute attribute = new SingletonAttribute(); - SingletonManager.SingletonLockHandle handle = new SingletonManager.SingletonLockHandle(); + var handle = new RenewableLockHandle(new SingletonLockHandle(), null); Mock mockSingletonManager = new Mock(MockBehavior.Strict); mockSingletonManager.Setup(p => p.LockAsync(TestLockId, TestInstanceId, attribute, cancellationToken)).ReturnsAsync(handle); @@ -52,7 +54,7 @@ public async Task ReleaseAsync_InvokesSingletonManager_WithExpectedValues() { CancellationToken cancellationToken = new CancellationToken(); SingletonAttribute attribute = new SingletonAttribute(); - SingletonManager.SingletonLockHandle handle = new SingletonManager.SingletonLockHandle(); + var handle = new RenewableLockHandle(new SingletonLockHandle(), null); Mock mockSingletonManager = new Mock(MockBehavior.Strict); mockSingletonManager.Setup(p => p.LockAsync(TestLockId, TestInstanceId, attribute, cancellationToken)).ReturnsAsync(handle); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonManagerTests.cs b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonManagerTests.cs index f09da04fb..a1459a9b2 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonManagerTests.cs +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/Singleton/SingletonManagerTests.cs @@ -23,6 +23,27 @@ namespace Microsoft.Azure.WebJobs.Host.UnitTests.Singleton { + using SingletonLockHandle = BlobLeaseDistributedLockManager.SingletonLockHandle; + + internal static class Ext + { + // Wrapper to get the internal class. + public static async Task TryLockInternalAsync(this SingletonManager manager, string lockId, string functionInstanceId, SingletonAttribute attribute, CancellationToken cancellationToken, bool retry = true) + { + var handle = await manager.TryLockAsync(lockId, functionInstanceId, attribute, cancellationToken, retry); + return handle.GetInnerHandle(); + } + + public static SingletonLockHandle GetInnerHandle(this RenewableLockHandle handle) + { + if (handle == null) + { + return null; + } + return (SingletonLockHandle)handle.InnerLock; + } + } + public class SingletonManagerTests { private const string TestHostId = "testhost"; @@ -31,6 +52,7 @@ public class SingletonManagerTests private const string TestLeaseId = "testleaseid"; private const string Secondary = "SecondaryStorage"; + private BlobLeaseDistributedLockManager _core; private SingletonManager _singletonManager; private SingletonConfiguration _singletonConfig; private Mock _mockAccountProvider; @@ -89,7 +111,10 @@ public SingletonManagerTests() _loggerProvider = new TestLoggerProvider(filter.Filter); loggerFactory.AddProvider(_loggerProvider); - _singletonManager = new SingletonManager(_mockAccountProvider.Object, _mockExceptionDispatcher.Object, _singletonConfig, _trace, loggerFactory, new FixedHostIdProvider(TestHostId), _nameResolver); + var logger = loggerFactory?.CreateLogger(LogCategories.Singleton); + _core = new BlobLeaseDistributedLockManager(_mockAccountProvider.Object, _trace, logger); + + _singletonManager = new SingletonManager(_core, _singletonConfig, _trace, _mockExceptionDispatcher.Object, loggerFactory, new FixedHostIdProvider(TestHostId), _nameResolver); _singletonManager.MinimumLeaseRenewalInterval = TimeSpan.FromMilliseconds(250); } @@ -97,13 +122,13 @@ public SingletonManagerTests() [Fact] public void GetLockDirectory_HandlesMultipleAccounts() { - IStorageBlobDirectory directory = _singletonManager.GetLockDirectory(ConnectionStringNames.Storage); + IStorageBlobDirectory directory = _core.GetLockDirectory(ConnectionStringNames.Storage); Assert.Same(_mockBlobDirectory.Object, directory); - directory = _singletonManager.GetLockDirectory(null); + directory = _core.GetLockDirectory(null); Assert.Same(_mockBlobDirectory.Object, directory); - directory = _singletonManager.GetLockDirectory(Secondary); + directory = _core.GetLockDirectory(Secondary); Assert.Same(_mockSecondaryBlobDirectory.Object, directory); } @@ -133,12 +158,13 @@ public async Task TryLockAsync_CreatesBlob_WhenItDoesNotExist() _mockStorageBlob.Setup(p => p.ReleaseLeaseAsync(It.Is(q => q.LeaseId == TestLeaseId), null, null, cancellationToken)).Returns(Task.FromResult(true)); SingletonAttribute attribute = new SingletonAttribute(); - SingletonManager.SingletonLockHandle lockHandle = (SingletonManager.SingletonLockHandle)await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken); + RenewableLockHandle lockHandle = await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken); + var innerHandle = lockHandle.GetInnerHandle(); - Assert.Same(_mockStorageBlob.Object, lockHandle.Blob); - Assert.Equal(TestLeaseId, lockHandle.LeaseId); + Assert.Same(_mockStorageBlob.Object, innerHandle.Blob); + Assert.Equal(TestLeaseId, innerHandle.LeaseId); Assert.Equal(1, _mockStorageBlob.Object.Metadata.Keys.Count); - Assert.Equal(_mockStorageBlob.Object.Metadata[SingletonManager.FunctionInstanceMetadataKey], TestInstanceId); + Assert.Equal(_mockStorageBlob.Object.Metadata[BlobLeaseDistributedLockManager.FunctionInstanceMetadataKey], TestInstanceId); } [Fact] @@ -159,12 +185,13 @@ public async Task TryLockAsync_CreatesBlobLease_WithAutoRenewal() }).Returns(Task.FromResult(true)); SingletonAttribute attribute = new SingletonAttribute(); - SingletonManager.SingletonLockHandle lockHandle = (SingletonManager.SingletonLockHandle)await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken); + var lockHandle = await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken); + var innerHandle = lockHandle.GetInnerHandle(); - Assert.Same(_mockStorageBlob.Object, lockHandle.Blob); - Assert.Equal(TestLeaseId, lockHandle.LeaseId); + Assert.Same(_mockStorageBlob.Object, innerHandle.Blob); + Assert.Equal(TestLeaseId, innerHandle.LeaseId); Assert.Equal(1, _mockStorageBlob.Object.Metadata.Keys.Count); - Assert.Equal(_mockStorageBlob.Object.Metadata[SingletonManager.FunctionInstanceMetadataKey], TestInstanceId); + Assert.Equal(_mockStorageBlob.Object.Metadata[BlobLeaseDistributedLockManager.FunctionInstanceMetadataKey], TestInstanceId); // wait for enough time that we expect some lease renewals to occur int duration = 2000; @@ -212,10 +239,11 @@ public async Task TryLockAsync_WithContention_PollsForLease() }); SingletonAttribute attribute = new SingletonAttribute(); - SingletonManager.SingletonLockHandle lockHandle = (SingletonManager.SingletonLockHandle)await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken); + var lockHandle = await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken); + var innerHandle = lockHandle.GetInnerHandle(); Assert.NotNull(lockHandle); - Assert.Equal(TestLeaseId, lockHandle.LeaseId); + Assert.Equal(TestLeaseId, innerHandle.LeaseId); Assert.Equal(numRetries, count - 1); Assert.NotNull(lockHandle.LeaseRenewalTimer); @@ -236,8 +264,7 @@ public async Task TryLockAsync_WithContention_NoRetry_DoesNotPollForLease() }); SingletonAttribute attribute = new SingletonAttribute(); - SingletonManager.SingletonLockHandle lockHandle = (SingletonManager.SingletonLockHandle) - await _singletonManager.TryLockAsync(TestLockId, TestInstanceId, attribute, cancellationToken, retry: false); + SingletonLockHandle lockHandle = await _singletonManager.TryLockInternalAsync(TestLockId, TestInstanceId, attribute, cancellationToken, retry: false); Assert.Null(lockHandle); Assert.Equal(1, count); @@ -280,12 +307,14 @@ public async Task ReleaseLockAsync_StopsRenewalTimerAndReleasesLease() _mockStorageBlob.Setup(p => p.ReleaseLeaseAsync(It.Is(q => q.LeaseId == TestLeaseId), null, null, cancellationToken)).Returns(Task.FromResult(true)); - SingletonManager.SingletonLockHandle handle = new SingletonManager.SingletonLockHandle - { - Blob = _mockStorageBlob.Object, - LeaseId = TestLeaseId, - LeaseRenewalTimer = mockRenewalTimer.Object - }; + var handle = new RenewableLockHandle( + new SingletonLockHandle + { + Blob = _mockStorageBlob.Object, + LeaseId = TestLeaseId + }, + mockRenewalTimer.Object + ); await _singletonManager.ReleaseLockAsync(handle, cancellationToken); @@ -307,7 +336,7 @@ public async Task GetLockOwnerAsync_LeaseLocked_ReturnsOwner() string lockOwner = await _singletonManager.GetLockOwnerAsync(attribute, TestLockId, CancellationToken.None); Assert.Equal(null, lockOwner); - _mockBlobMetadata.Add(SingletonManager.FunctionInstanceMetadataKey, TestLockId); + _mockBlobMetadata.Add(BlobLeaseDistributedLockManager.FunctionInstanceMetadataKey, TestLockId); lockOwner = await _singletonManager.GetLockOwnerAsync(attribute, TestLockId, CancellationToken.None); Assert.Equal(TestLockId, lockOwner); diff --git a/test/Microsoft.Azure.WebJobs.Host.UnitTests/WebJobs.Host.UnitTests.csproj b/test/Microsoft.Azure.WebJobs.Host.UnitTests/WebJobs.Host.UnitTests.csproj index dbd7534f9..a0eefc936 100644 --- a/test/Microsoft.Azure.WebJobs.Host.UnitTests/WebJobs.Host.UnitTests.csproj +++ b/test/Microsoft.Azure.WebJobs.Host.UnitTests/WebJobs.Host.UnitTests.csproj @@ -317,6 +317,7 @@ +