Skip to content
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,28 @@ using (var tentaclePolling = new HalibutRuntime(services, Certificates.Alice))
}
```

Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work.
Notice that while the configuration code changed, the request/response code didn't apart from the endpoint. Logically, the Octopus is still the request/response client, and the Tentacle is still the request/response server, even though the transport layer has Octopus as the TCP listener and Tentacle as the TCP client polling for work.

## RPC over Redis

Halibut supports executing RPC commands between nodes using a shared Redis queue as the communication mechanism. In this mode, nodes do not communicate directly with each other - all communication flows through Redis. This is particularly useful for scenarios where multiple nodes can all access a shared Redis instance but cannot establish direct network connections to each other.

### How it works

When using RPC over Redis:

- The client queues RPC requests in Redis
- The server polls Redis for pending requests
- The server processes requests and writes responses back to Redis
- The client retrieves responses from Redis

This decoupled communication model allows nodes behind firewalls, in different networks, or with restricted connectivity to communicate as long as they can all reach the shared Redis instance.

### Usage

See the [SimpleLocalExecutionExample](source/Halibut.Tests/LocalExecutionModeFixture.cs) test for a complete example of how to set up and use RPC over Redis.

For more detailed information about the Redis queue implementation, refer to the [Redis Queue documentation](docs/RedisQueue.md).

## Failure modes

Expand Down
46 changes: 46 additions & 0 deletions source/Halibut.Tests/HalibutExamplesFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Threading.Tasks;
using Halibut.ServiceModel;
using Halibut.Tests.Support;
using Halibut.Tests.TestServices;
using Halibut.Tests.TestServices.Async;
using Halibut.TestUtils.Contracts;
using NUnit.Framework;

namespace Halibut.Tests
{
public class HalibutExamplesFixture : BaseTest
{
[Test]
public async Task SimplePollingExample()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this since we lack simple examples of how to use halibit.

{
var services = GetDelegateServiceFactory();
await using (var client = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.Octopus)
.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build())
.Build())
await using (var pollingService = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.TentaclePolling)
.WithServiceFactory(services)
.WithHalibutTimeoutsAndLimits(new HalibutTimeoutsAndLimitsForTestsBuilder().Build())
.Build())
{
var octopusPort = client.Listen();
client.Trust(Certificates.TentaclePollingPublicThumbprint);

pollingService.Poll(new Uri("poll://alice"), new ServiceEndPoint("https://localhost:" + octopusPort, Certificates.OctopusPublicThumbprint, client.TimeoutsAndLimits), CancellationToken);

var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(new ServiceEndPoint("poll://alice", null, client.TimeoutsAndLimits));

await echo.SayHelloAsync("World");
}
}

static DelegateServiceFactory GetDelegateServiceFactory()
{
var services = new DelegateServiceFactory();
services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService());
return services;
}
}
}
103 changes: 103 additions & 0 deletions source/Halibut.Tests/RPCOverQueueExecutionModeFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#if NET8_0_OR_GREATER
using System;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Halibut.Diagnostics;
using Halibut.Logging;
using Halibut.Queue;
using Halibut.Queue.Redis;
using Halibut.Queue.Redis.RedisDataLossDetection;
using Halibut.Queue.Redis.RedisHelpers;
using Halibut.ServiceModel;
using Halibut.Tests.Queue.Redis.Utils;
using Halibut.Tests.Support;
using Halibut.Tests.Support.Logging;
using Halibut.Tests.TestServices;
using Halibut.Tests.TestServices.Async;
using Halibut.TestUtils.Contracts;
using NUnit.Framework;
using DisposableCollection = Halibut.Util.DisposableCollection;

namespace Halibut.Tests
{
public class RPCOverQueueExecutionModeFixture : BaseTest
{
[RedisTest]
[Test]
public async Task SimpleRPCOverQueueExecutionExample()
{
var services = GetDelegateServiceFactory();
var timeoutsAndLimits = new HalibutTimeoutsAndLimitsForTestsBuilder().Build();

var logFactory = new CachingLogFactory(new TestContextLogCreator("", LogLevel.Trace));

var log = new TestContextLogCreator("Redis", LogLevel.Fatal);

var preSharedGuid = Guid.NewGuid();

await using var disposables = new DisposableCollection();

await using var client = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.Octopus)
.WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory))
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
.Build();

await using var worker = new HalibutRuntimeBuilder()
.WithServerCertificate(Certificates.TentaclePolling)
.WithServiceFactory(services)
.WithPendingRequestQueueFactory(RedisFactory(preSharedGuid, disposables, log, logFactory))
.WithHalibutTimeoutsAndLimits(timeoutsAndLimits)
.Build();

// Start worker polling for local://test-worker
using var workerCts = new CancellationTokenSource();
var pollingTask = Task.Run(async () =>
{
await worker.PollForRPCOverQueueAsync(new Uri("local://test-worker"), workerCts.Token);
}, workerCts.Token);

// Client creates proxy to local://test-worker and makes request
var echo = client.CreateAsyncClient<IEchoService, IAsyncClientEchoService>(
new ServiceEndPoint("local://test-worker", null, client.TimeoutsAndLimits));

var result = await echo.SayHelloAsync("World");
result.Should().Be("World...");

await workerCts.CancelAsync();

await pollingTask;
}

Func<QueueMessageSerializer, IPendingRequestQueueFactory> RedisFactory(
Guid preSharedGuid,
DisposableCollection disposables,
TestContextLogCreator log,
CachingLogFactory logFactory)
{
return msgSer =>
{
var redisFacade = RedisFacadeBuilder.CreateRedisFacade(prefix: preSharedGuid);
disposables.AddAsyncDisposable(redisFacade);
var watchForRedisLosingAllItsData = new WatchForRedisLosingAllItsData(redisFacade, log.CreateNewForPrefix("watcher"));
disposables.AddAsyncDisposable(watchForRedisLosingAllItsData);

return new RedisPendingRequestQueueFactory(msgSer,
new InMemoryStoreDataStreamsForDistributedQueues(),
watchForRedisLosingAllItsData,
new HalibutRedisTransport(redisFacade),
new HalibutTimeoutsAndLimitsForTestsBuilder().Build(),
logFactory);
};
}

static DelegateServiceFactory GetDelegateServiceFactory()
{
var services = new DelegateServiceFactory();
services.Register<IEchoService, IAsyncEchoService>(() => new AsyncEchoService());
return services;
}
}
}
#endif
7 changes: 6 additions & 1 deletion source/Halibut/Diagnostics/InMemoryConnectionLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

namespace Halibut.Diagnostics
{
public static class InMemoryConnectionLogLimits
{
public static readonly int MaxLogEventsStored = 100;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be used in Octopus to limit returned logs in multi node setups.

}

internal class InMemoryConnectionLog : ILog
{
readonly string endpoint;
Expand Down Expand Up @@ -57,7 +62,7 @@ void WriteInternal(LogEvent logEvent)

events.Enqueue(logEvent);

while (events.Count > 100 && events.TryDequeue(out _)) { }
while (events.Count > InMemoryConnectionLogLimits.MaxLogEventsStored && events.TryDequeue(out _)) { }
}

static LogLevel GetLogLevel(LogEvent logEvent)
Expand Down
51 changes: 51 additions & 0 deletions source/Halibut/HalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace Halibut
{
public class HalibutRuntime : IHalibutRuntime
{
public const string QueueEndpointScheme = "local";
public static readonly string DefaultFriendlyHtmlPageContent = "<html><body><p>Hello!</p></body></html>";
readonly ConcurrentDictionary<Uri, IPendingRequestQueue> queues = new();
readonly IPendingRequestQueueFactory queueFactory;
Expand Down Expand Up @@ -199,6 +200,55 @@ public void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken c
pollingClients.Add(new PollingClient(subscription, client, HandleIncomingRequestAsync, log, cancellationToken, pollingReconnectRetryPolicy));
}

public async Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken)
{
if (queueOnlyEndpoint.Scheme.ToLowerInvariant() != QueueEndpointScheme)
{
throw new ArgumentException($"Only 'queue://' endpoints are supported. Provided: {queueOnlyEndpoint.Scheme}://", nameof(queueOnlyEndpoint));
}

var queue = GetQueue(queueOnlyEndpoint);
var log = logs.ForEndpoint(queueOnlyEndpoint);

log.Write(EventType.MessageExchange, $"Starting queue polling for endpoint: {queueOnlyEndpoint}");

while (!cancellationToken.IsCancellationRequested)
{
try
{
var request = await queue.DequeueAsync(cancellationToken);

if (request != null)
{
ResponseMessage response;
try
{
response = await invoker.InvokeAsync(request.RequestMessage);
}
catch (Exception ex)
{
log.WriteException(EventType.Error, $"Error executing queue request for {request.RequestMessage.ServiceName}.{request.RequestMessage.MethodName}", ex);
response = ResponseMessage.FromException(request.RequestMessage, ex);
}

await queue.ApplyResponse(response, request.RequestMessage.ActivityId);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
log.Write(EventType.MessageExchange, $"Queue polling cancelled for endpoint: {queueOnlyEndpoint}");
break;
}
catch (Exception ex)
{
log.WriteException(EventType.Error, $"Error in queue polling loop for endpoint: {queueOnlyEndpoint}", ex);
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
}
}

log.Write(EventType.MessageExchange, $"Queue polling stopped for endpoint: {queueOnlyEndpoint}");
}

public async Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken)
{
return await DiscoverAsync(new ServiceEndPoint(uri, null, TimeoutsAndLimits), cancellationToken);
Expand Down Expand Up @@ -242,6 +292,7 @@ async Task<ResponseMessage> SendOutgoingRequestAsync(RequestMessage request, Met
response = await SendOutgoingHttpsRequestAsync(request, cancellationToken).ConfigureAwait(false);
break;
case "poll":
case QueueEndpointScheme:
response = await SendOutgoingPollingRequestAsync(request, cancellationToken).ConfigureAwait(false);
break;
default: throw new ArgumentException("Unknown endpoint type: " + endPoint.BaseUri.Scheme);
Expand Down
1 change: 1 addition & 0 deletions source/Halibut/IHalibutRuntime.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public interface IHalibutRuntime : IAsyncDisposable, IDisposable
int Listen(IPEndPoint endpoint);
void ListenWebSocket(string endpoint);
void Poll(Uri subscription, ServiceEndPoint endPoint, CancellationToken cancellationToken);
Task PollForRPCOverQueueAsync(Uri queueOnlyEndpoint, CancellationToken cancellationToken);

Task<ServiceEndPoint> DiscoverAsync(Uri uri, CancellationToken cancellationToken);
Task<ServiceEndPoint> DiscoverAsync(ServiceEndPoint endpoint, CancellationToken cancellationToken);
Expand Down
6 changes: 3 additions & 3 deletions source/Halibut/Queue/Redis/RedisPendingRequestQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ async Task<bool> TryClearRequestFromQueue(RedisPendingRequest redisPending)
// It will kill the TCP connection, which will force re-connect (in perhaps a backoff function)
// This could result in connecting to a node that is actually connected to redis. It could also
// cause a cascade of failure from high load.
var pending = await DequeueNextAsync();
var pending = await DequeueNextAsync(cancellationToken);
if (pending == null) return null;

var pendingRequest = pending.Value.Item1;
Expand Down Expand Up @@ -491,9 +491,9 @@ public async Task ApplyResponse(ResponseMessage response, Guid requestActivityId
}
}

async Task<(RequestMessage, RequestDataStreamsTransferProgress)?> DequeueNextAsync()
async Task<(RequestMessage, RequestDataStreamsTransferProgress)?> DequeueNextAsync(CancellationToken cancellationToken)
{
await using var cts = new CancelOnDisposeCancellationToken(queueToken);
await using var cts = new CancelOnDisposeCancellationToken(queueToken, cancellationToken);
try
{
hasItemsForEndpoint.Reset();
Expand Down