Skip to content

Commit

Permalink
combine Handle and CallObject
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleLittleCloud committed Nov 26, 2024
1 parent 3a1625f commit 40c4a0d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 49 deletions.
2 changes: 0 additions & 2 deletions dotnet/src/Microsoft.AutoGen/Abstractions/IAgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ public interface IAgentBase
AgentId AgentId { get; }
IAgentRuntime Context { get; }

// Methods
Task CallHandler(CloudEvent item);
Task<RpcResponse> HandleRequest(RpcRequest request);
void ReceiveMessage(Message message);
Task StoreAsync(AgentState state, CancellationToken cancellationToken = default);
Expand Down
79 changes: 34 additions & 45 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

namespace Microsoft.AutoGen.Agents;

public abstract class AgentBase : IAgentBase, IHandle
public abstract class AgentBase : IAgentBase, IHandle, IHandle<CloudEvent>
{
public static readonly ActivitySource s_source = new("AutoGen.Agent");
public AgentId AgentId => _context.AgentId;
Expand Down Expand Up @@ -93,7 +93,7 @@ protected internal async Task HandleRpcMessage(Message msg, CancellationToken ca
{
var activity = this.ExtractActivity(msg.CloudEvent.Type, msg.CloudEvent.Metadata);
await this.InvokeWithActivityAsync(
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.CallHandler(state.Item),
static ((AgentBase Agent, CloudEvent Item) state, CancellationToken _) => state.Agent.HandleObject(state.Item),
(this, msg.CloudEvent),
activity,
msg.CloudEvent.Type, cancellationToken).ConfigureAwait(false);
Expand Down Expand Up @@ -242,33 +242,47 @@ static async ((AgentBase Agent, CloudEvent Event) state, CancellationToken ct) =
item.Type, cancellationToken).ConfigureAwait(false);
}

public Task CallHandler(CloudEvent item)
public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });

public virtual Task HandleObject(object item)
{
if (item is CloudEvent ce)
{
return Handle(ce);
}

var genericInterfaceType = typeof(IHandle<>).MakeGenericType(item.GetType());

// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
var methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");

return methodInfo.Invoke(this, [item]) as Task ?? throw new InvalidOperationException("Method did not return a Task");
}

// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}

public virtual Task Handle(CloudEvent item)
{
// Only send the event to the handler if the agent type is handling that type
// foreach of the keys in the EventTypes.EventsMap[] if it contains the item.type
foreach (var key in EventTypes.EventsMap.Keys)
{
if (EventTypes.EventsMap[key].Contains(item.Type))
{
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
var genericInterfaceType = typeof(IHandle<>).MakeGenericType(EventTypes.Types[item.Type]);

MethodInfo methodInfo;
try
{
// check that our target actually implements this interface, otherwise call the default static
if (genericInterfaceType.IsAssignableFrom(this.GetType()))
{
methodInfo = genericInterfaceType.GetMethod(nameof(IHandle<object>.Handle), BindingFlags.Public | BindingFlags.Instance)
?? throw new InvalidOperationException($"Method not found on type {genericInterfaceType.FullName}");
return methodInfo.Invoke(this, [payload]) as Task ?? Task.CompletedTask;
}
else
{
// The error here is we have registered for an event that we do not have code to listen to
throw new InvalidOperationException($"No handler found for event '{item.Type}'; expecting IHandle<{item.Type}> implementation.");
}
var payload = item.ProtoData.Unpack(EventTypes.TypeRegistry);
var convertedPayload = Convert.ChangeType(payload, EventTypes.Types[item.Type]);
return this.HandleObject(convertedPayload);
}
catch (Exception ex)
{
Expand All @@ -280,29 +294,4 @@ public Task CallHandler(CloudEvent item)

return Task.CompletedTask;
}

public Task<RpcResponse> HandleRequest(RpcRequest request) => Task.FromResult(new RpcResponse { Error = "Not implemented" });

//TODO: should this be async and cancellable?
public virtual Task HandleObject(object item)
{
// get all Handle<T> methods
var handleTMethods = this.GetType().GetMethods().Where(m => m.Name == "Handle" && m.GetParameters().Length == 1).ToList();

// get the one that matches the type of the item
var handleTMethod = handleTMethods.FirstOrDefault(m => m.GetParameters()[0].ParameterType == item.GetType());

// if we found one, invoke it
if (handleTMethod != null)
{
return (Task)handleTMethod.Invoke(this, [item])!;
}

// otherwise, complain
throw new InvalidOperationException($"No handler found for type {item.GetType().FullName}");
}
public async ValueTask PublishEventAsync(string topic, IMessage evt, CancellationToken cancellationToken = default)
{
await PublishEventAsync(evt.ToCloudEvent(topic), cancellationToken).ConfigureAwait(false);
}
}
3 changes: 1 addition & 2 deletions dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public async Task ItInvokeRightHandlerTestAsync()
{
var mockContext = new Mock<IAgentRuntime>();
var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger<AgentBase>(new LoggerFactory()));

await agent.HandleObject("hello world");
await agent.HandleObject(42);

Expand Down Expand Up @@ -57,7 +56,7 @@ await client.PublishMessageAsync(new TextMessage()
/// <summary>
/// The test agent is a simple agent that is used for testing purposes.
/// </summary>
public class TestAgent : AgentBase, IHandle<string>, IHandle<int>, IHandle<TextMessage>
public class TestAgent : AgentBase, IHandle<string>, IHandle<int>, IHandle<TextMessage>, IHandleConsole
{
public TestAgent(
IAgentRuntime context,
Expand Down

0 comments on commit 40c4a0d

Please sign in to comment.