Skip to content

Commit

Permalink
Use official cloud event proto schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jackgerrits committed Dec 3, 2024
1 parent c062c51 commit 368dccf
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 50 deletions.
2 changes: 1 addition & 1 deletion protos/agent_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ message Message {
oneof message {
RpcRequest request = 1;
RpcResponse response = 2;
cloudevent.CloudEvent cloudEvent = 3;
io.cloudevents.v1.CloudEvent cloudEvent = 3;
RegisterAgentTypeRequest registerAgentTypeRequest = 4;
RegisterAgentTypeResponse registerAgentTypeResponse = 5;
AddSubscriptionRequest addSubscriptionRequest = 6;
Expand Down
44 changes: 33 additions & 11 deletions protos/cloudevent.proto
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
// https://github.com/cloudevents/spec/blob/main/cloudevents/formats/cloudevents.proto

/**
* CloudEvent Protobuf Format
*
* - Required context attributes are explicitly represented.
* - Optional and Extension context attributes are carried in a map structure.
* - Data may be represented as binary, text, or protobuf messages.
*/

syntax = "proto3";

package cloudevent;
package io.cloudevents.v1;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";

option csharp_namespace = "Microsoft.AutoGen.Abstractions";

option csharp_namespace = "CloudNative.CloudEvents.V1";
option go_package = "cloudevents.io/genproto/v1";
option java_package = "io.cloudevents.v1.proto";
option java_multiple_files = true;
option php_namespace = "Io\\CloudEvents\\V1\\Proto";
option ruby_package = "Io::CloudEvents::V1::Proto";

message CloudEvent {

Expand All @@ -20,19 +34,18 @@ message CloudEvent {

// Optional & Extension Attributes
map<string, CloudEventAttributeValue> attributes = 5;
map<string, string> metadata = 6;

// -- CloudEvent Data (Bytes, Text, or Proto)
oneof data {
bytes binary_data = 7;
string text_data = 8;
google.protobuf.Any proto_data = 9;
bytes binary_data = 6;
string text_data = 7;
google.protobuf.Any proto_data = 8;
}

/**
* The CloudEvent specification defines
* seven attribute value types...
*/
* The CloudEvent specification defines
* seven attribute value types...
*/

message CloudEventAttributeValue {

Expand All @@ -46,4 +59,13 @@ message CloudEvent {
google.protobuf.Timestamp ce_timestamp = 7;
}
}
}
}

/**
* CloudEvent Protobuf Batch Format
*
*/

message CloudEventBatch {
repeated CloudEvent events = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,38 @@ async def _process_event(self, event: cloudevent_pb2.CloudEvent) -> None:
agent = await self._get_agent(agent_id)
with MessageHandlerContext.populate_context(agent.id):

def stringify_attributes(
attributes: Mapping[str, cloudevent_pb2.CloudEvent.CloudEventAttributeValue],
) -> Mapping[str, str]:
result: Dict[str, str] = {}
for key, value in attributes.items():
item = None
match value.WhichOneof("attr"):
case "ce_boolean":
item = str(value.ce_boolean)
case "ce_integer":
item = str(value.ce_integer)
case "ce_string":
item = value.ce_string
case "ce_bytes":
item = str(value.ce_bytes)
case "ce_uri":
item = value.ce_uri
case "ce_uri_ref":
item = value.ce_uri_ref
case "ce_timestamp":
item = str(value.ce_timestamp)
case _:
raise ValueError("Unknown attribute kind")
result[key] = item

return result

async def send_message(agent: Agent, message_context: MessageContext) -> Any:
with self._trace_helper.trace_block(
"process",
agent.id,
parent=event.metadata,
parent=stringify_attributes(event.attributes),
extraAttributes={"message_type": message_type},
):
await agent.on_message(message, ctx=message_context)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 368dccf

Please sign in to comment.