using Prefab.Catalog.Api.Data; namespace Prefab.Catalog.Api.Workers; /// /// Background service for seeding data. Collects seed tasks from a channel and executes them concurrently. /// public class DataSeeder(IServiceProvider serviceProvider, IConfiguration configuration, ILogger logger) : BackgroundService { private readonly int _workerCount = configuration.GetValue("DataSeederWorkerCount", 4); protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await WaitForDatabaseAsync(stoppingToken); var tasks = new List(); for (var i = 0; i < _workerCount; i++) { tasks.Add(Task.Run(async () => { await foreach (var seedTask in Prefab.Data.Seeder.Extensions.Channel.Reader.ReadAllAsync(stoppingToken)) { try { await seedTask(serviceProvider, stoppingToken); logger.LogInformation("Seed task executed successfully."); } catch (Exception ex) { logger.LogError(ex, "Error executing seed task."); // Optionally, add retry logic here. } } }, stoppingToken)); } await Task.WhenAll(tasks); } private async Task WaitForDatabaseAsync(CancellationToken stoppingToken) { var hasLoggedWait = false; while (!stoppingToken.IsCancellationRequested) { try { using var scope = serviceProvider.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); if (await db.Database.CanConnectAsync(stoppingToken)) { if (hasLoggedWait) { logger.LogInformation("Database connectivity confirmed. Starting seed workers."); } return; } } catch (Exception ex) { logger.LogDebug(ex, "Database connectivity probe failed; retrying."); } if (!hasLoggedWait) { logger.LogInformation("Waiting for database availability before processing seed tasks..."); hasLoggedWait = true; } await Task.Delay(TimeSpan.FromSeconds(1), stoppingToken); } } }