Introduction

Let's say that you need to perform some work continuously and push that to an Orleans cluster, either via streaming or direct grain calls. Typically you'd create a class which implements IHostedService or extends BackgroundService, do some processing and use the IClusterClient to call into the Orleans cluster.

You also decide that this component doesn't need to expose any API, and you decide to create a Worker Service project, because you either want to avoid the overhead coming from ASPNET Core, or you want to deploy this to its own process with it's own dedicated resources.

Below is a sample code of how you would do it:

await Host
.CreateDefaultBuilder(args)
.UseOrleansClient(client =>
client.UseLocalhostClustering())
.ConfigureServices(services =>
services.AddHostedService<Worker>())
.Build()
.RunAsync();

public class Worker : BackgroundService
{
private readonly IClusterClient clusterClient;

public Worker(IClusterClient clusterClient)
=> this.clusterClient = clusterClient;

protected override async Task ExecuteAsync(
CancellationToken cancellationToken)
{
var grain = clusterClient.GetGrain<IMyGrain>("grain-id");

while (!cancellationToken.IsCancellationRequested)
{
string result = await DoSomeWork();
await grain.Push(result);
}
}
}

If this code were to be hosted in the same process as the grain code (the silo), then the client can be directly obtained from the hosting application's dependency injection container.

But it's not! This means that the IClusterClient is an external client. Hence, it acts as a connector to the cluster, which comes with it's challenges (mainly connection problems).

When deployed, you'd obviously configure the ClusterOptions for the client to connect to the cluster.

When would connection issues arise? Well, I can think of 3 cases:

  1. At application startup.
  2. Intermittently while issuing requests.
  3. Loss of connection to all gateways.

Orleans already retries requests in case of intermittent failures (i.e. the network is not reliable), so we'll ignore that and focus on the other 2s.

At application startup

An external cluster client behind the scenes is a BackgroundService in itself, which is started by the application host as every other background service is. If the cluster may not be available for whatever reason at that point in time, we can instruct Orleans to handle that part by executing connection requests with retries, via an implementation of IClientConnectionRetryFilter.


await Host
.CreateDefaultBuilder(args)
.UseOrleansClient(client =>
client
.UseLocalhostClustering()
.UseConnectionRetryFilter<ClientConnectRetryFilter>()) // <- this
.ConfigureServices(services =>
services.AddHostedService<Worker>())
.Build()
.RunAsync();

// source: https://learn.microsoft.com/en-us/dotnet/orleans/host/client
public class ClientConnectRetryFilter : IClientConnectionRetryFilter
{
private int _retryCount = 0;
private const int MaxRetry = 5;
private const int Delay = 1_500;

public async Task<bool> ShouldRetryConnectionAttempt(
Exception exception,
CancellationToken cancellationToken)
{
if (_retryCount >= MaxRetry)
{
return false;
}

if (!cancellationToken.IsCancellationRequested &&
exception is SiloUnavailableException)
{
await Task.Delay(++_retryCount * Delay, cancellationToken);
return true;
}

return false;
}
}

Loss of connection to all gateways

This one is more interesting, because the client can loose connection to all gateways (basically proxies to silos in a given cluster). This could happen for one reason or the other, but a common one is in-place deployment of a new silo code. And the IClientConnectionRetryFilter approach above won't work, because that is executed when the cluster client is started as a BackgroundService.

While this is happening, the client looses all connections to the cluster, and a ConnectionFailedException is raised which, by default (in NET 6 and later), stops the host and the application exits. So we'd want to wrap the logic of ExecuteAsync in a try/catch block like this:

protected override async Task ExecuteAsync(
CancellationToken cancellationToken)
{
var grain = clusterClient.GetGrain<IMyGrain>("grain-id");

while (!cancellationToken.IsCancellationRequested)
{
try
{
string result = await DoSomeWork();
await grain.Push(result);
}
catch (ConnectionFailedException)
{
await Task.Delay(delayInMilliseconds);
}
catch (Exception)
{
// maybe log it?!
throw;
}
}
}

The question is, what value should we set delayInMilliseconds to? We really have no idea when the cluster is going to be up again!

And by the way, 'No' you don't need a new grain reference, the one created above will work. Also, 'Yes' its the same for a stream reference.

Solution

Orleans has two delegates that informs us when there are connection issues.

  • ConnectionToClusterLostHandler - When the connection to the cluster is lost.
  • GatewayCountChangedHandler - When the gateway count has changed from a previous count.

We can use these to build a logic that will provide us with the neccessary knowledge to continue our work, without having to retry like crazy inside our worker. By default Orleans expose extension methods for these delegates, AddClusterConnectionLostHandler(handler) and AddGatewayCountChangedHandler(handler).

So we could define simply 2 methods for these:

await Host
.CreateDefaultBuilder(args)
.UseOrleansClient(client =>
client.UseLocalhostClustering()
.UseConnectionRetryFilter<ClientConnectRetryFilter>()
.AddClusterConnectionLostHandler(ConnectionHandler)
.AddGatewayCountChangedHandler(GatewayHandler))
.ConfigureServices(services =>
services.AddHostedService<Worker>())
.Build()
.RunAsync();

void GatewayHandler( // <- this
object sender, GatewayCountChangedEventArgs e)
{
...
}

void ConnectionHandler( // <- this
object sender, EventArgs e)
{
...
}

But how on earth are we suppose to use these from the Worker?

We can do the following:

  1. Create a new class, let's name it ClusterConnectionStatusProvider.
  2. Add these handler methods to it.
  3. Register that as a singleton.
  4. Register the delegates as singleton.

In C#, we can register a delegate using a factory method that resolves the type which provides the method, and then returns the method!

await Host
.CreateDefaultBuilder(args)
.UseOrleansClient(client =>
client.UseLocalhostClustering()
.UseConnectionRetryFilter<ClientConnectRetryFilter>()
//.AddClusterConnectionLostHandler(ConnectionHandler) // <- remove
//.AddGatewayCountChangedHandler(GatewayHandler)) // <- remove
.ConfigureServices(services =>
{
services.AddSingleton<ClusterConnectionStatusProvider>();
services.AddSingleton<ConnectionToClusterLostHandler>(sp =>
{
var statusProvider = sp.GetRequiredService<ClusterConnectionStatusProvider>();
return statusProvider.OnClusterConnectionLost;
});
services.AddSingleton<GatewayCountChangedHandler>(sp =>
{
var statusProvider = sp.GetRequiredService<ClusterConnectionStatusProvider>();
return statusProvider.OnGatewayCountChanged;
});
services.AddHostedService<Worker>();
})
.Build()
.RunAsync();

public class ClusterConnectionStatusProvider
{
public bool IsConnected { get; private set; }

public void OnClusterConnectionLost(object sender, EventArgs e)
=> IsConnected = false;

public void OnGatewayCountChanged(object sender, GatewayCountChangedEventArgs e)
=> IsConnected = e.NumberOfConnectedGateways > 0;
}

Now we can inject the ClusterConnectionStatusProvider into the Worker because its a singleton, and when a ConnectionFailedException is raised we can check the IsConnected property of the status provider and know when to continue with the work, as opposed to blindly retrying.

public class Worker : BackgroundService
{
private readonly IClusterClient clusterClient;
private readonly ClusterConnectionStatusProvider statusProvider;

public Worker(
IClusterClient clusterClient,
ClusterConnectionStatusProvider statusProvider)
{
this.clusterClient = clusterClient;
this.statusProvider = statusProvider;
}

protected override async Task ExecuteAsync(
CancellationToken cancellationToken)
{
var grain = clusterClient.GetGrain<IMyGrain>("grain-id");

while (!cancellationToken.IsCancellationRequested)
{
try
{
string result = await DoSomeWork();
await grain.Push(result);
}
catch (ConnectionFailedException)
{
while (!gatewayStatusListner.IsConnected &&
!cancellationToken.IsCancellationRequested)
{

}
}
catch (Exception)
{
// maybe log it?!
throw;
}
}
}
}

Note the while (!gatewayStatusListner.IsConnected && !cancellationToken.IsCancellationRequested), we wait here until IsConnected returns true but also check if cancellation is requested because if it's the case, there is no point in hogging the app until a connection to the cluster has been established.

Inside the inner while loop we may employ a default await Task.Delay(1000) if we want to log that we are waiting for the client to reconnect, otherwise we just let it loop, and the delegates will inform us.


If you found this article helpful please give it a share in your favorite forums 😉.