Init
This commit is contained in:
102
Prefab.Web/Workers/DataSeeder.cs
Normal file
102
Prefab.Web/Workers/DataSeeder.cs
Normal file
@@ -0,0 +1,102 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Prefab.Web.Data;
|
||||
|
||||
namespace Prefab.Web.Workers;
|
||||
|
||||
/// <summary>
|
||||
/// Background service for seeding data. Collects seed tasks from a channel and executes them concurrently.
|
||||
/// </summary>
|
||||
public class DataSeeder(IServiceProvider serviceProvider, IConfiguration configuration, ILogger<DataSeeder> logger) : BackgroundService
|
||||
{
|
||||
private readonly int _workerCount = configuration.GetValue("DataSeederWorkerCount", 4);
|
||||
|
||||
private bool _databaseIsAvailable;
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
if (_databaseIsAvailable)
|
||||
{
|
||||
var tasks = new List<Task>();
|
||||
|
||||
for (var i = 0; i < _workerCount; i++)
|
||||
{
|
||||
tasks.Add(Task.Run(async () =>
|
||||
{
|
||||
await foreach (var seedTask in Prefab.Data.Seeder.Extensions.Channel.Reader.ReadAllAsync(stoppingToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
await seedTask(serviceProvider, stoppingToken);
|
||||
logger.LogInformation("Seed task executed successfully.");
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Error executing seed task.");
|
||||
// Optionally, add retry logic here.
|
||||
}
|
||||
}
|
||||
}, stoppingToken));
|
||||
}
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
}
|
||||
else
|
||||
{
|
||||
await WaitForDatabaseToAvailable(stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task WaitForDatabaseToAvailable(CancellationToken stoppingToken)
|
||||
{
|
||||
using var scope = serviceProvider.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDb>();
|
||||
var contextName = db.GetType().Name;
|
||||
var loggerHasLoggedWaitingOnDatabaseMessage = false;
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (await db.Database.CanConnectAsync(stoppingToken))
|
||||
{
|
||||
if (loggerHasLoggedWaitingOnDatabaseMessage)
|
||||
{
|
||||
logger.LogInformation("Database connectivity confirmed for context {Context}.", contextName);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
logger.LogInformation("Applying migrations for context {Context}...", contextName);
|
||||
await db.Database.MigrateAsync(stoppingToken);
|
||||
logger.LogInformation("Migrations applied for context {Context}.", contextName);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogError(ex, "Failed to apply migrations for context {Context}. Retrying shortly.", contextName);
|
||||
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
|
||||
continue;
|
||||
}
|
||||
|
||||
_databaseIsAvailable = true;
|
||||
|
||||
return;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
logger.LogDebug(ex, "Database connectivity probe failed for context {Context}.", contextName);
|
||||
}
|
||||
|
||||
if (!loggerHasLoggedWaitingOnDatabaseMessage)
|
||||
{
|
||||
logger.LogInformation("Waiting for database availability before applying migrations for context {Context}...", contextName);
|
||||
loggerHasLoggedWaitingOnDatabaseMessage = true;
|
||||
}
|
||||
|
||||
await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user