Added LoadBalancer and static Functions

This commit is contained in:
Elias Stepanik 2023-06-02 21:54:46 +02:00
parent 6515f62d05
commit 347b3c36f4
14 changed files with 341 additions and 34 deletions

View File

@ -12,11 +12,14 @@ public class FunctionController : ControllerBase
{
private readonly ILogger<TestController> _logger;
private readonly FunctionManager _functionManager;
private readonly ILoadManager _loadManager;
public FunctionController(ILogger<TestController> logger, FunctionManager functionManager)
public FunctionController(ILogger<TestController> logger, FunctionManager functionManager, ILoadManager loadManager)
{
_logger = logger;
_functionManager = functionManager;
_loadManager = loadManager;
}
[HttpPost("{functionName}/edit")]
@ -30,7 +33,7 @@ public class FunctionController : ControllerBase
[HttpPost("{functionName}")]
public async Task<IActionResult> RunFunctionPost(string functionName,[FromBody] string text)
{
var responseContext = await _functionManager.RunInstance(functionName,HttpMethod.Post, text);
var responseContext = await _loadManager.HandleRequest(functionName,HttpMethod.Post, text);
if (responseContext.IsSuccessStatusCode)
{
@ -47,7 +50,7 @@ public class FunctionController : ControllerBase
[HttpGet("{functionName}")]
public async Task<IActionResult> RunFunctionGet(string functionName)
{
var responseContext = await _functionManager.RunInstance(functionName,HttpMethod.Get);
var responseContext = await _loadManager.HandleRequest(functionName,HttpMethod.Get);
if (responseContext.IsSuccessStatusCode)
{
@ -65,7 +68,7 @@ public class FunctionController : ControllerBase
[HttpPatch("{functionName}")]
public async Task<IActionResult> RunFunctionPatch(string functionName,[FromBody] string text)
{
var responseContext = await _functionManager.RunInstance(functionName,HttpMethod.Patch, text);
var responseContext = await _loadManager.HandleRequest(functionName,HttpMethod.Patch, text);
if (responseContext.IsSuccessStatusCode)
{
@ -83,7 +86,7 @@ public class FunctionController : ControllerBase
[HttpPut("{functionName}")]
public async Task<IActionResult> RunFunctionPut(string functionName,[FromBody] string text)
{
var responseContext = await _functionManager.RunInstance(functionName,HttpMethod.Put, text);
var responseContext = await _loadManager.HandleRequest(functionName,HttpMethod.Put, text);
if (responseContext.IsSuccessStatusCode)
{
@ -101,7 +104,7 @@ public class FunctionController : ControllerBase
[HttpDelete("{functionName}")]
public async Task<IActionResult> RunFunctionDelete(string functionName,[FromBody] string text)
{
var responseContext = await _functionManager.RunInstance(functionName,HttpMethod.Delete, text);
var responseContext = await _loadManager.HandleRequest(functionName,HttpMethod.Delete, text);
if (responseContext.IsSuccessStatusCode)
{

View File

@ -23,5 +23,5 @@ public class Function
public List<Environment> EnvironmentVariables { get; set; } = null!;
public List<Instance> Instances { get; set; } = null!;
public List<Instance?> Instances { get; set; } = null!;
}

View File

@ -0,0 +1,6 @@
namespace Functions.Data;
public class InstanceRuntimeInfo
{
public Instance? Instance { get; set; }
}

View File

@ -32,11 +32,12 @@ var serverVersion = new MySqlServerVersion(new Version(8, 0, 31));
builder.Services.AddDbContextFactory<FunctionsContext>(
dbContextOptions => dbContextOptions
.UseMySql(connectionString.ToString(), serverVersion)
.LogTo(Console.WriteLine, LogLevel.Debug)
.LogTo(Console.WriteLine, LogLevel.Error)
.EnableSensitiveDataLogging()
.EnableDetailedErrors());
builder.Services.AddTransient<ILoadManager,LoadManager>();
builder.Services.AddSingleton<TimerManager>();
builder.Services.AddTransient<IDockerManager,DockerManager>();
builder.Services.AddSingleton<INativeCommandWrapper,NativeCommandWrapper>();
builder.Services.AddSingleton<FunctionManager>();
@ -63,4 +64,7 @@ var dbFactory = app.Services.GetRequiredService<IDbContextFactory<FunctionsConte
var db = await dbFactory.CreateDbContextAsync();
await db.Database.MigrateAsync();
var loadManager = app.Services.GetRequiredService<ILoadManager>();
loadManager.Start();
app.Run();

View File

@ -117,6 +117,20 @@ public class DockerManager : IDockerManager
return false;
}
public async Task<ContainerStatsResponse> GetLoad(string containerId)
{
ContainerStatsParameters parameters = new ContainerStatsParameters()
{
Stream = false,
};
var response = new ContainerStatsResponse();
IProgress<ContainerStatsResponse> progress = new Progress<ContainerStatsResponse>(stats => { response = stats; });
await _docker.Containers.GetContainerStatsAsync(containerId,parameters, progress);
return response;
}
}
public class ContainerResponse

View File

@ -40,14 +40,14 @@ public class FunctionManager
var function = db.Functions.Include(s => s.Instances).Include(s => s.EnvironmentVariables).First(s => s.Name.Equals(functionName));
foreach (var functionInstance in function.Instances)
{
_dockerManager.DeleteContainer(functionInstance.InstanceId);
DeleteInstance(functionInstance);
}
db.Functions.Remove(function);
await db.SaveChangesAsync();
await db.DisposeAsync();
}
public async Task<HttpResponseMessage> RunInstance(string functionName, HttpMethod method, string body = "")
public async Task<InstanceRuntimeInfo> RunInstance(string functionName)
{
var db = await _dbContextFactory.CreateDbContextAsync();
var function = db.Functions.Include(s => s.Instances).Include(s => s.EnvironmentVariables).First(s => s.Name.Equals(functionName));
@ -61,10 +61,16 @@ public class FunctionManager
_dockerManager.ConnectNetwork(_configuration["AppConfig:FuctionNetworkName"] ?? throw new InvalidOperationException(), instance.InstanceId);
_dockerManager.StartContainer(instance.InstanceId);
//TODO: If not started delete instance
//Send Request to Container
return new InstanceRuntimeInfo()
{
Instance = instance
};
}
public async Task<HttpResponseMessage> SendRequest(Instance? instance,string function, HttpMethod method, string body = "")
{
if (method.Equals(HttpMethod.Post))
{
var message = await _externalEndpointManager.Post(instance.Name, body);
@ -86,39 +92,35 @@ public class FunctionManager
{
var message = await _externalEndpointManager.Put(instance.Name, body);
return await HandleError(message, instance);
}
if (method.Equals(HttpMethod.Delete))
{
var message = await _externalEndpointManager.Delete(instance.Name);
return await HandleError(message, instance);
}
return new HttpResponseMessage(HttpStatusCode.BadRequest);
}
private async Task<HttpResponseMessage> HandleError(HttpResponseMessage message, Instance instance)
private async Task<HttpResponseMessage> HandleError(HttpResponseMessage message, Instance? instance)
{
var db = await _dbContextFactory.CreateDbContextAsync();
if (!message.IsSuccessStatusCode)
{
_dockerManager.DeleteContainer(instance.InstanceId);
var i = db.Instances.First(s => s.InstanceId.Equals(instance.InstanceId));
db.Instances.Remove(i);
await db.SaveChangesAsync();
DeleteInstance(instance);
return new HttpResponseMessage(HttpStatusCode.BadRequest);
}
return message;
}
public async void DeleteInstance(Instance? instance)
{
var db = await _dbContextFactory.CreateDbContextAsync();
_dockerManager.DeleteContainer(instance.InstanceId);
var temp_i = db.Instances.First(s => s.InstanceId.Equals(instance.InstanceId));
db.Instances.Remove(temp_i);
await db.SaveChangesAsync();
await db.DisposeAsync();
return message;
}

View File

@ -13,4 +13,5 @@ public interface IDockerManager
public void DeleteContainer(string containerId);
public void CreateNetwork(string name);
public Task<bool> IsRunning(string containerId);
public Task<ContainerStatsResponse> GetLoad(string containerId);
}

View File

@ -0,0 +1,8 @@
namespace Functions.Services.Interfaces;
public interface ILoadManager
{
public Task<HttpResponseMessage> HandleRequest(string functionName, HttpMethod method, string body = "");
public void Update();
public void Start();
}

View File

@ -0,0 +1,187 @@
using Functions.Data;
using Functions.Data.DB;
using Functions.Services.Interfaces;
using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
namespace Functions.Services;
public class LoadManager : ILoadManager
{
private readonly FunctionManager _functionManager;
private readonly IDbContextFactory<FunctionsContext> _dbContextFactory;
private readonly IDockerManager _dockerManager;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<LoadManager> _logger;
public LoadManager(FunctionManager functionManager, IDbContextFactory<FunctionsContext> dbContextFactory, IDockerManager dockerManager,IServiceProvider serviceProvider, ILogger<LoadManager> logger)
{
_functionManager = functionManager;
_dbContextFactory = dbContextFactory;
_dockerManager = dockerManager;
_serviceProvider = serviceProvider;
_logger = logger;
}
public async Task<HttpResponseMessage> HandleRequest(string functionName, HttpMethod method, string body = "")
{
var db = await _dbContextFactory.CreateDbContextAsync();
var function = await db.Functions.Include(s => s.Instances).FirstAsync(x => x.Name.Equals(functionName));
if (function.Instances.Count == 0)
{
await _functionManager.RunInstance(function.Name);
}
foreach (var dbFunction in db.Functions.Include(s => s.Instances))
{
if (dbFunction.Instances.Count == 0)
{
await _functionManager.RunInstance(dbFunction.Name);
}
}
var instance = await GetLowestLoadInstance(functionName);
var responseMessage = await _functionManager.SendRequest(instance, functionName, method, body);
return responseMessage;
}
private async Task<bool> IsOverloaded(string containerId)
{
var load = await _dockerManager.GetLoad(containerId);
if (JsonConvert.SerializeObject(load).Equals("{}"))
{
return false;
}
try
{
var usedMemory = load.MemoryStats.Usage;
var availableMemory = load.MemoryStats.Limit;
// ReSharper disable once PossibleLossOfFraction
var memoryUsage = (usedMemory / availableMemory) * 100.0;
var cpuDelta = load.CPUStats.CPUUsage.TotalUsage - load.PreCPUStats.CPUUsage.TotalUsage;
var systemCpuDelta = load.CPUStats.SystemUsage - load.PreCPUStats.CPUUsage.TotalUsage;
var numberCpus = load.CPUStats.OnlineCPUs;
// ReSharper disable once PossibleLossOfFraction
var cpuUsage = (cpuDelta / systemCpuDelta) * numberCpus * 100.0;
if (cpuUsage > 80)
{
return true;
}
if(memoryUsage > 80)
{
return true;
}
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
}
return false;
}
private async Task<Instance?> GetLowestLoadInstance(string functionName)
{
Instance? lowestUsageCpuInstance = null;
double lowestUsageCpu = double.MaxValue;
Instance? lowestUsageMemoryInstance = null;
double lowestUsageMemory = double.MaxValue;
var db = await _dbContextFactory.CreateDbContextAsync();
try
{
foreach (var function in db.Functions.Include(s => s.Instances))
{
foreach (var functionInstance in function.Instances)
{
var load = await _dockerManager.GetLoad(functionInstance.InstanceId);
var usedMemory = load.MemoryStats.Usage;
var availableMemory = load.MemoryStats.Limit;
var cpuDelta = load.CPUStats.CPUUsage.TotalUsage - load.PreCPUStats.CPUUsage.TotalUsage;
var systemCpuDelta = load.CPUStats.SystemUsage - load.PreCPUStats.CPUUsage.TotalUsage;
var numberCpus = load.CPUStats.OnlineCPUs;
// ReSharper disable once PossibleLossOfFraction
//TODO: Later
var cpuUsage = (cpuDelta / systemCpuDelta) * numberCpus * 100.0;
// ReSharper disable once PossibleLossOfFraction
var memoryUsage = (usedMemory / availableMemory) * 100.0;
if (cpuUsage <= lowestUsageCpu)
{
lowestUsageCpu = cpuUsage;
lowestUsageCpuInstance = functionInstance;
}
if (memoryUsage <= lowestUsageMemory)
{
lowestUsageMemory = memoryUsage;
lowestUsageMemoryInstance = functionInstance;
}
}
}
if (lowestUsageCpu <= lowestUsageMemory)
{
return lowestUsageCpuInstance;
}
else
{
return lowestUsageMemoryInstance;
}
}
catch (Exception e)
{
_logger.LogError(e, e.Message);
}
Random rd = new Random();
var randomInstance = await db.Functions.Include(s => s.Instances).ToListAsync();
await db.DisposeAsync();
return randomInstance.First(s => s.Name.Equals(functionName)).Instances[rd.Next(0,randomInstance.Count)];
}
public async void Update()
{
var db = await _dbContextFactory.CreateDbContextAsync();
foreach (var function in db.Functions.Include(s => s.Instances))
{
if (function.Instances.Count != 0)
{
foreach (var functionInstance in function.Instances)
{
if (await IsOverloaded(functionInstance!.InstanceId))
{
await _functionManager.RunInstance(function.Name);
}
else if (!await IsOverloaded(functionInstance.InstanceId) && function.Instances.Count > 1)
{
_functionManager.DeleteInstance(functionInstance);
}
}
}
else
{
await _functionManager.RunInstance(function.Name);
}
}
await db.DisposeAsync();
}
public async void Start()
{
var db = await _dbContextFactory.CreateDbContextAsync();
foreach (var function in db.Functions.Include(s => s.Instances))
{
await _functionManager.RunInstance(function.Name);
}
await db.DisposeAsync();
_serviceProvider.GetRequiredService<TimerManager>().StartExecuting();
}
}

View File

@ -0,0 +1,62 @@
using System.Net;
using System.Timers;
using Functions.Services.Interfaces;
using Microsoft.EntityFrameworkCore;
using Timer = System.Timers.Timer;
namespace Functions.Services;
public class JobExecutedEventArgs : EventArgs {}
public class TimerManager : IDisposable
{
private readonly ILoadManager _loadManager;
public TimerManager(ILoadManager loadManager)
{
_loadManager = loadManager;
}
public event EventHandler<JobExecutedEventArgs> JobExecuted;
void OnJobExecuted()
{
JobExecuted?.Invoke(this, new JobExecutedEventArgs());
}
System.Timers.Timer _timer;
bool _running;
public void StartExecuting()
{
if (!_running)
{
//TimerLogic();
//initiate a timer
_timer = new Timer();
_timer.Interval = 1000;
//_Timer.Interval = 60000;
_timer.AutoReset = true;
_timer.Enabled = true;
_timer.Elapsed += HandleTimer;
_running = true;
}
}
void HandleTimer(object source, ElapsedEventArgs e)
{
_loadManager.Update();
//Execute required job
//notify any subscibers to the event
OnJobExecuted();
}
public void Dispose()
{
if (_running)
{
//clear up the timer
_timer.Dispose();
}
}
}

View File

@ -2,7 +2,8 @@
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
"Microsoft.AspNetCore": "Warning",
"Microsoft.EntityFrameworkCore.Database": "Error"
}
},
"AllowedHosts": "*",

View File

@ -1,3 +1,5 @@
using Docker.DotNet;
using Docker.DotNet.Models;
using Microsoft.AspNetCore.Mvc;
namespace TestFunction.Controllers;
@ -31,15 +33,9 @@ public class TestController : ControllerBase
}
[HttpPost]
public IEnumerable<WeatherForecast> Post()
public async Task<ContainerStatsResponse> Post(string id)
{
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
TemperatureC = Random.Shared.Next(-20, 55),
Summary = Summaries[Random.Shared.Next(Summaries.Length)]
})
.ToArray();
return await Test.GetLoad(id);
}
[HttpPatch]

22
TestFunction/Test.cs Normal file
View File

@ -0,0 +1,22 @@
using Docker.DotNet;
using Docker.DotNet.Models;
namespace TestFunction;
public class Test
{
public static async Task<ContainerStatsResponse> GetLoad(string containerId)
{
ContainerStatsParameters parameters = new ContainerStatsParameters()
{
Stream = false,
};
var response = new ContainerStatsResponse();
IProgress<ContainerStatsResponse> progress = new Progress<ContainerStatsResponse>(stats => { response = stats; });
var _docker = new DockerClientConfiguration().CreateClient();
await _docker.Containers.GetContainerStatsAsync(containerId,parameters, progress);
return response;
}
}

View File

@ -9,6 +9,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Docker.DotNet" Version="3.125.15" />
<PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.5" />
<PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
</ItemGroup>