From dab5d6101270f9d811dacf7db25cc0dd43720a1c Mon Sep 17 00:00:00 2001 From: Matt LaPaglia Date: Tue, 22 Jun 2021 10:39:00 -0400 Subject: [PATCH] better thinking --- .../Hydration/HydrationService.cs | 2 -- OpenAlprWebhookProcessor/Startup.cs | 2 ++ .../WebhookProcessor/GroupWebhookHandler.cs | 21 +++++++++++++++- .../OpenAlprAgentScraper.cs | 25 +++++++++++++------ .../WebhookProcessor/WebhookController.cs | 2 ++ 5 files changed, 41 insertions(+), 11 deletions(-) diff --git a/OpenAlprWebhookProcessor/Hydration/HydrationService.cs b/OpenAlprWebhookProcessor/Hydration/HydrationService.cs index d9d7a0cd..ce5dcfc4 100644 --- a/OpenAlprWebhookProcessor/Hydration/HydrationService.cs +++ b/OpenAlprWebhookProcessor/Hydration/HydrationService.cs @@ -66,8 +66,6 @@ private async Task StartHydrationAsync() } await _processorHub.Clients.All.ScrapeFinished(); - - _logger.LogInformation("Finished OpenALPR Agent scrape."); } } } diff --git a/OpenAlprWebhookProcessor/Startup.cs b/OpenAlprWebhookProcessor/Startup.cs index aa06979d..b4ef0c04 100644 --- a/OpenAlprWebhookProcessor/Startup.cs +++ b/OpenAlprWebhookProcessor/Startup.cs @@ -198,6 +198,8 @@ public void ConfigureServices(IServiceCollection services) { options.SchedulePollingInterval = TimeSpan.FromSeconds(1); }); + + services.AddMemoryCache(); } public void Configure( diff --git a/OpenAlprWebhookProcessor/WebhookProcessor/GroupWebhookHandler.cs b/OpenAlprWebhookProcessor/WebhookProcessor/GroupWebhookHandler.cs index 76880e01..c3a5d60e 100644 --- a/OpenAlprWebhookProcessor/WebhookProcessor/GroupWebhookHandler.cs +++ b/OpenAlprWebhookProcessor/WebhookProcessor/GroupWebhookHandler.cs @@ -43,6 +43,7 @@ public GroupWebhookHandler( public async Task HandleWebhookAsync( Webhook webhook, bool isBulkImport, + List previouslyProcessedGroups, CancellationToken cancellationToken) { var camera = await _processorContext.Cameras @@ -67,7 +68,25 @@ public async Task HandleWebhookAsync( return; } - var alreadyProcessed = await _processorContext.PlateGroups.AnyAsync(x => x.OpenAlprUuid == webhook.Group.BestUuid); + var alreadyProcessed = false; + + if (previouslyProcessedGroups != null) + { + alreadyProcessed = previouslyProcessedGroups + .Intersect(webhook.Group.Uuids) + .Any(); + + if (!alreadyProcessed) + { + previouslyProcessedGroups.AddRange(webhook.Group.Uuids); + } + } + else + { + alreadyProcessed = await _processorContext.PlateGroups.Select(x => x.OpenAlprUuid) + .Intersect(webhook.Group.Uuids) + .AnyAsync(cancellationToken); + } if (alreadyProcessed) { diff --git a/OpenAlprWebhookProcessor/WebhookProcessor/OpenAlprAgentScraper/OpenAlprAgentScraper.cs b/OpenAlprWebhookProcessor/WebhookProcessor/OpenAlprAgentScraper/OpenAlprAgentScraper.cs index 12057c27..5c94036b 100644 --- a/OpenAlprWebhookProcessor/WebhookProcessor/OpenAlprAgentScraper/OpenAlprAgentScraper.cs +++ b/OpenAlprWebhookProcessor/WebhookProcessor/OpenAlprAgentScraper/OpenAlprAgentScraper.cs @@ -4,6 +4,7 @@ using OpenAlprWebhookProcessor.WebhookProcessor.OpenAlprWebhook; using System; using System.Collections.Generic; +using System.Linq; using System.Net.Http; using System.Text.Json; using System.Text.RegularExpressions; @@ -76,34 +77,43 @@ public async Task ScrapeAgentAsync(CancellationToken cancellationToken) foreach (var metadata in metaDatasToQuery) { - _logger.LogInformation("querying: " + metadata.Key); - var newGroup = await _httpClient.GetAsync( agent.EndpointUrl + metadataUrl.Replace("{0}", metadata.Key), cancellationToken); + var group = await JsonSerializer.DeserializeAsync( + await newGroup.Content.ReadAsStreamAsync(cancellationToken), + cancellationToken: cancellationToken); + + _logger.LogInformation("date: " + DateTimeOffset.FromUnixTimeMilliseconds(group.EpochStart).ToString() + " querying: " + metadata.Key); + if (!newGroup.IsSuccessStatusCode) { _logger.LogError("Unable to parse Group with Id: " + metadata.Key); continue; } - var newGroupResult = await newGroup.Content.ReadAsStringAsync(cancellationToken); - + var previouslyProcessedGroups = await _processorContext.PlateGroups + .Select(x => x.OpenAlprUuid) + .ToListAsync(cancellationToken); try { await _groupWebhookHandler.HandleWebhookAsync( new Webhook { - Group = JsonSerializer.Deserialize(newGroupResult) + Group = group, }, true, + previouslyProcessedGroups, cancellationToken); } catch { _logger.LogError("Failed to parse bulk import request."); } + + agent.LastSuccessfulScrapeEpoch = group.EpochStart; + await _processorContext.SaveChangesAsync(cancellationToken); } lastSuccessfulScrape = lastSuccessfulScrape.AddMinutes(minutesToScrape); @@ -112,10 +122,9 @@ await _groupWebhookHandler.HandleWebhookAsync( { lastSuccessfulScrape = startDate; } - - agent.LastSuccessfulScrapeEpoch = lastSuccessfulScrape.ToUnixTimeMilliseconds(); - await _processorContext.SaveChangesAsync(cancellationToken); } + + _logger.LogInformation("Finished OpenALPR Agent scrape."); } private async Task GetEarliestGroupEpochAsync( diff --git a/OpenAlprWebhookProcessor/WebhookProcessor/WebhookController.cs b/OpenAlprWebhookProcessor/WebhookProcessor/WebhookController.cs index b09f2e36..09a48b31 100644 --- a/OpenAlprWebhookProcessor/WebhookProcessor/WebhookController.cs +++ b/OpenAlprWebhookProcessor/WebhookProcessor/WebhookController.cs @@ -48,6 +48,7 @@ public async Task Post(CancellationToken cancellationToken) await _groupWebhookHandler.HandleWebhookAsync( alertGroupResult, false, + null, cancellationToken); } else if (rawWebhook.Contains("alpr_group")) @@ -61,6 +62,7 @@ await _groupWebhookHandler.HandleWebhookAsync( await _groupWebhookHandler.HandleWebhookAsync( groupResult, false, + null, cancellationToken); } else if (rawWebhook.Contains("alpr_results"))