Skip to content

Commit

Permalink
add TestGrpcClient
Browse files Browse the repository at this point in the history
  • Loading branch information
kostapetan committed Dec 7, 2024
1 parent e47ad00 commit 6d5171b
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ public async Task Test_OpenChannel()
var logger = Mock.Of<ILogger<GrpcGateway>>();
var gateway = new GrpcGateway(_fixture.Cluster.Client, logger);
var service = new GrpcGatewayService(gateway);
var callContext = TestServerCallContext.Create();

using var requestStream = new TestAsyncStreamReader<Message>(callContext);
using var responseStream = new TestServerStreamWriter<Message>(callContext);

using var client = new TestGrpcClient();

gateway.WorkersCount.Should().Be(0);
await service.OpenChannel(requestStream, responseStream, callContext);
await service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext);
gateway.WorkersCount.Should().Be(1);
}

Expand All @@ -43,30 +40,53 @@ public async Task Test_Message_Exchange_Through_Gateway()
var logger = Mock.Of<ILogger<GrpcGateway>>();
var gateway = new GrpcGateway(_fixture.Cluster.Client, logger);
var service = new GrpcGatewayService(gateway);
var callContext = TestServerCallContext.Create();

using var requestStream = new TestAsyncStreamReader<Message>(callContext);
using var responseStream = new TestServerStreamWriter<Message>(callContext);
using var client = new TestGrpcClient();

var assembly = typeof(PBAgent).Assembly;
var eventTypes = ReflectionHelper.GetAgentsMetadata(assembly);

await service.OpenChannel(requestStream, responseStream, callContext);
var responseMessage = await responseStream.ReadNextAsync();
await service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext);
var responseMessage = await client.ReadNext();

await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(PBAgent), responseMessage!.Response.RequestId), callContext);
await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(GMAgent), responseMessage!.Response.RequestId), callContext);
var connectionId = responseMessage!.Response.RequestId;

var inputEvent = new NewMessageReceived { Message = "Hello" }.ToCloudEvent("gh-gh-gh");
await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(PBAgent), connectionId), client.CallContext);
await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(GMAgent), connectionId), client.CallContext);

requestStream.AddMessage(new Message { CloudEvent = inputEvent });
var newMessageReceived = await responseStream.ReadNextAsync();
var inputEvent = new NewMessageReceived { Message = $"Start-{connectionId}" }.ToCloudEvent("gh-gh-gh");

client.AddMessage(new Message { CloudEvent = inputEvent });
var newMessageReceived = await client.ReadNext();
newMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(NewMessageReceived)));
newMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh");

// Simulate an agent, by publishing a new message in the request stream
var helloEvent = new Hello { Message = $"Hello test-{connectionId}" }.ToCloudEvent("gh-gh-gh");
client.AddMessage(new Message { CloudEvent = helloEvent });

var helloMessageReceived = await client.ReadNext();
helloMessageReceived!.CloudEvent.Type.Should().Be(GetFullName(typeof(Hello)));
helloMessageReceived.CloudEvent.Source.Should().Be("gh-gh-gh");
}

[Fact]
public async Task Test_Message_Goes_To_Right_Worker()
{
var logger = Mock.Of<ILogger<GrpcGateway>>();
var gateway = new GrpcGateway(_fixture.Cluster.Client, logger);
var service = new GrpcGatewayService(gateway);
using var client = new TestGrpcClient();

var assembly = typeof(PBAgent).Assembly;
var eventTypes = ReflectionHelper.GetAgentsMetadata(assembly);

await service.OpenChannel(client.RequestStream, client.ResponseStream, client.CallContext);
var responseMessage = await client.ReadNext();

var connectionId = responseMessage!.Response.RequestId;

var outputEvent = await responseStream.ReadNextAsync();
outputEvent!.CloudEvent.Type.Should().Be(GetFullName(typeof(Hello)));
await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(PBAgent), connectionId), client.CallContext);
await service.RegisterAgent(CreateRegistrationRequest(eventTypes, typeof(GMAgent), connectionId), client.CallContext);

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// TestGrpcClient.cs

using Microsoft.AutoGen.Abstractions;

namespace Microsoft.AutoGen.Runtime.Grpc.Tests.Helpers.Grpc;
internal class TestGrpcClient: IDisposable
{
public TestAsyncStreamReader<Message> RequestStream { get; }
public TestServerStreamWriter<Message> ResponseStream { get; }
public TestServerCallContext CallContext { get; }

public TestGrpcClient()
{
CallContext = TestServerCallContext.Create();
RequestStream = new TestAsyncStreamReader<Message>(CallContext);
ResponseStream = new TestServerStreamWriter<Message>(CallContext);
}

public async Task<Message> ReadNext()
{
var response = await ResponseStream.ReadNextAsync();
return response!;
}

public void AddMessage(Message message)
{
RequestStream.AddMessage(message);
}

public void Dispose()
{
RequestStream.Dispose();
ResponseStream.Dispose();
}
}

0 comments on commit 6d5171b

Please sign in to comment.