Skip to content

Commit f072057

Browse files
authored
Merge pull request #1 from esttenorio/estenori/processes/expSerialization
serializing objects internally - dapr working
2 parents 6096bde + 43154d7 commit f072057

File tree

12 files changed

+195
-96
lines changed

12 files changed

+195
-96
lines changed

dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Grpc/Clients/DocumentGenerationGrpcClient.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public async Task EmitExternalEventAsync(string externalTopicEvent, KernelProces
4242
switch (externalTopicEvent)
4343
{
4444
case DocumentGenerationProcess.DocGenerationTopics.RequestUserReview:
45-
var requestDocument = JsonSerializer.Deserialize<DocumentInfo>(message.EventData.ToString()!);
45+
var requestDocument = message.EventData.ToObject() as DocumentInfo;
4646
if (requestDocument != null)
4747
{
4848
await this._grpcClient.RequestUserReviewDocumentationFromProcessAsync(new()
@@ -56,7 +56,7 @@ await this._grpcClient.RequestUserReviewDocumentationFromProcessAsync(new()
5656
return;
5757

5858
case DocumentGenerationProcess.DocGenerationTopics.PublishDocumentation:
59-
var publishedDocument = JsonSerializer.Deserialize<DocumentInfo>(message.EventData.ToString()!);
59+
var publishedDocument = message.EventData.ToObject() as DocumentInfo;
6060
if (publishedDocument != null)
6161
{
6262
await this._grpcClient.PublishDocumentationAsync(new()

dotnet/samples/Demos/ProcessWithCloudEvents/ProcessWithCloudEvents.Processes/Models/DocumentInfo.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,23 @@ namespace ProcessWithCloudEvents.Processes.Models;
1010
/// Since this object is used as parameter and state type by multiple steps,
1111
/// Its members must be public and serializable
1212
/// </summary>
13-
[DataContract]
13+
//[DataContract]
1414
public class DocumentInfo
1515
{
1616
/// <summary>
1717
/// Id of the document
1818
/// </summary>
19-
[DataMember]
19+
//[DataMember]
2020
public string Id { get; set; } = string.Empty;
2121
/// <summary>
2222
/// Title of the document
2323
/// </summary>
24-
[DataMember]
24+
//[DataMember]
2525
public string Title { get; set; } = string.Empty;
2626
/// <summary>
2727
/// Content of the document
2828
/// </summary>
29-
[DataMember]
29+
//[DataMember]
3030
public string Content { get; set; } = string.Empty;
3131

3232
/// <summary>
@@ -35,8 +35,8 @@ public class DocumentInfo
3535
/// Overriding the ToString method allows injecting custom serialization logic if needed.
3636
/// </summary>
3737
/// <returns></returns>
38-
public override string ToString()
39-
{
40-
return JsonSerializer.Serialize(this);
41-
}
38+
//public override string ToString()
39+
//{
40+
// return JsonSerializer.Serialize(this);
41+
//}
4242
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// Copyright (c) Microsoft. All rights reserved.
2+
using System;
3+
using System.Runtime.Serialization;
4+
using System.Text.Json;
5+
6+
namespace Microsoft.SemanticKernel;
7+
8+
/// <summary>
9+
/// A serializable representation of an internal message used in a process runtime received by proxy steps.
10+
/// </summary>
11+
/// <remarks>
12+
/// Initializes a new instance of the <see cref="KernelProcessEventData"/> class.
13+
/// </remarks>
14+
[DataContract]
15+
public sealed record KernelProcessEventData
16+
{
17+
/// <summary>
18+
/// The assembly qualified name of the object type
19+
/// </summary>
20+
[DataMember]
21+
public string ObjectType { get; set; } = string.Empty;
22+
/// <summary>
23+
/// The Json serialized object
24+
/// </summary>
25+
[DataMember]
26+
public string Content { get; set; } = string.Empty;
27+
28+
/// <summary>
29+
/// Converts serialized object to original object type
30+
/// </summary>
31+
/// <returns></returns>
32+
public object? ToObject()
33+
{
34+
Verify.NotNullOrWhiteSpace(this.ObjectType);
35+
Type? type = Type.GetType(this.ObjectType);
36+
if (type != null)
37+
{
38+
try
39+
{
40+
return JsonSerializer.Deserialize(this.Content, type);
41+
}
42+
catch (JsonException)
43+
{
44+
throw new KernelException($"Cannot deserialize object {this.Content}");
45+
}
46+
}
47+
48+
return null;
49+
}
50+
51+
/// <summary>
52+
/// Converts from original object to serialized version of the object
53+
/// </summary>
54+
/// <param name="obj">object to be serialized</param>
55+
/// <returns>instance of <see cref="KernelProcessEventData"/></returns>
56+
public static KernelProcessEventData? FromObject(object? obj)
57+
{
58+
if (obj == null)
59+
{
60+
return null;
61+
}
62+
63+
Verify.NotNull(obj.GetType());
64+
Verify.NotNull(obj.GetType().AssemblyQualifiedName);
65+
66+
try
67+
{
68+
return new KernelProcessEventData()
69+
{
70+
ObjectType = obj.GetType().AssemblyQualifiedName!,
71+
Content = JsonSerializer.Serialize(obj)
72+
};
73+
}
74+
catch (NotSupportedException)
75+
{
76+
throw new KernelException($"Cannot serialize object {obj.GetType().FullName}");
77+
}
78+
}
79+
}

dotnet/src/Experimental/Process.Abstractions/KernelProcessProxyMessage.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,5 @@ public sealed record KernelProcessProxyMessage
4444
/// </summary>
4545
[DataMember]
4646
[JsonPropertyName("eventData")]
47-
public object? EventData { get; init; }
47+
public KernelProcessEventData? EventData { get; init; }
4848
}

dotnet/src/Experimental/Process.Abstractions/KernelProcessStepExternalContext.cs

+1-4
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,8 @@ public async Task EmitExternalEventAsync(KernelProcessProxyMessage processEventD
3333
{
3434
throw new KernelException($"External message channel not configured for step with topic {processEventData.ExternalTopicName}");
3535
}
36-
// External message must be serialized to be Dapr compatible.
37-
// This assumes custom serialization was provided by overriding the .ToString method in case the data is a complex object
38-
var externalMessageData = processEventData with { EventData = processEventData.EventData?.ToString() };
3936

40-
await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, externalMessageData).ConfigureAwait(false);
37+
await this._externalMessageChannel.EmitExternalEventAsync(processEventData.ExternalTopicName, processEventData).ConfigureAwait(false);
4138
}
4239

4340
/// <summary>

dotnet/src/Experimental/Process.IntegrationTests.Shared/ProcessCloudEventsTests.cs

+16-22
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ public async Task LinearProcessWithCloudEventSubscribersUsingEmitToTopicAsync()
7373
Assert.NotNull(mockClient);
7474
Assert.True(mockClient.InitializationCounter > 0);
7575
Assert.Equal(2, mockClient.CloudEvents.Count);
76-
Assert.Equal(runningProcessId, mockClient.CloudEvents[0].Data?.ProcessId);
77-
Assert.Equal(runningProcessId, mockClient.CloudEvents[1].Data?.ProcessId);
78-
this.AssertProxyMessage(mockClient.CloudEvents[0].Data, expectedPublishTopic: MockTopicNames.EchoExternalTopic, expectedTopicData: testInput);
79-
this.AssertProxyMessage(mockClient.CloudEvents[1].Data, expectedPublishTopic: MockTopicNames.RepeatExternalTopic, expectedTopicData: $"{testInput} {testInput}");
76+
Assert.Equal(runningProcessId, mockClient.CloudEvents[0].ProcessId);
77+
Assert.Equal(runningProcessId, mockClient.CloudEvents[1].ProcessId);
78+
this.AssertProxyMessage(mockClient.CloudEvents[0], expectedPublishTopic: MockTopicNames.EchoExternalTopic, expectedTopicData: testInput);
79+
this.AssertProxyMessage(mockClient.CloudEvents[1], expectedPublishTopic: MockTopicNames.RepeatExternalTopic, expectedTopicData: $"{testInput} {testInput}");
8080
}
8181

8282
/// <summary>
@@ -111,15 +111,15 @@ public async Task ProcessWithSubprocessWithProxyEmittingDifferentTopicsAsync()
111111
Assert.NotNull(mockClient);
112112
Assert.True(mockClient.InitializationCounter > 0);
113113
Assert.Equal(2, mockClient.CloudEvents.Count);
114-
if (mockClient.CloudEvents[0].TopicName == this._topic1)
114+
if (mockClient.CloudEvents[0].ExternalTopicName == this._topic1)
115115
{
116-
this.AssertProxyMessage(mockClient.CloudEvents[0].Data, expectedPublishTopic: this._topic1, expectedTopicData: testInput);
117-
this.AssertProxyMessage(mockClient.CloudEvents[1].Data, expectedPublishTopic: this._topic2, expectedTopicData: testInput);
116+
this.AssertProxyMessage(mockClient.CloudEvents[0], expectedPublishTopic: this._topic1, expectedTopicData: testInput);
117+
this.AssertProxyMessage(mockClient.CloudEvents[1], expectedPublishTopic: this._topic2, expectedTopicData: testInput);
118118
}
119119
else
120120
{
121-
this.AssertProxyMessage(mockClient.CloudEvents[0].Data, expectedPublishTopic: this._topic2, expectedTopicData: testInput);
122-
this.AssertProxyMessage(mockClient.CloudEvents[1].Data, expectedPublishTopic: this._topic1, expectedTopicData: testInput);
121+
this.AssertProxyMessage(mockClient.CloudEvents[0], expectedPublishTopic: this._topic2, expectedTopicData: testInput);
122+
this.AssertProxyMessage(mockClient.CloudEvents[1], expectedPublishTopic: this._topic1, expectedTopicData: testInput);
123123
}
124124
}
125125

@@ -155,8 +155,8 @@ public async Task ProcessWithSubprocessWithProxyEmittingSameTopicsAsync()
155155
Assert.NotNull(mockClient);
156156
Assert.True(mockClient.InitializationCounter > 0);
157157
Assert.Equal(2, mockClient.CloudEvents.Count);
158-
this.AssertProxyMessage(mockClient.CloudEvents[0].Data, expectedPublishTopic: this._topic1, expectedTopicData: testInput);
159-
this.AssertProxyMessage(mockClient.CloudEvents[1].Data, expectedPublishTopic: this._topic1, expectedTopicData: testInput);
158+
this.AssertProxyMessage(mockClient.CloudEvents[0], expectedPublishTopic: this._topic1, expectedTopicData: testInput);
159+
this.AssertProxyMessage(mockClient.CloudEvents[1], expectedPublishTopic: this._topic1, expectedTopicData: testInput);
160160
}
161161

162162
/// <summary>
@@ -225,17 +225,11 @@ private void AssertProxyMessage(KernelProcessProxyMessage? proxyMessage, string
225225
Assert.NotNull(proxyMessage);
226226
Assert.IsType<KernelProcessProxyMessage>(proxyMessage);
227227
Assert.Equal(expectedPublishTopic, proxyMessage.ExternalTopicName);
228-
if (proxyMessage.EventData is JsonElement jsonEventData)
229-
{
230-
// needed for Dapr Testing setup since it serializes everything with json
231-
Assert.Equal(JsonValueKind.String, jsonEventData.ValueKind);
232-
Assert.Equal(expectedTopicData, jsonEventData.ToString());
233-
}
234-
else
235-
{
236-
Assert.IsType<string>(proxyMessage.EventData);
237-
Assert.Equal(expectedTopicData, proxyMessage.EventData);
238-
}
228+
229+
Assert.IsType<KernelProcessEventData>(proxyMessage.EventData);
230+
var outputEventData = proxyMessage.EventData.ToObject();
231+
Assert.IsType<string>(outputEventData);
232+
Assert.Equal(expectedTopicData, outputEventData);
239233
}
240234
#endregion
241235
}

dotnet/src/Experimental/Process.LocalRuntime/LocalStep.cs

+19-14
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,14 @@ internal virtual void AssignStepFunctionParameterValues(ProcessMessage message)
154154
functionParameters = this._inputs[message.FunctionName];
155155
}
156156

157-
functionParameters![kvp.Key] = kvp.Value;
157+
if (kvp.Value is KernelProcessEventData proxyData)
158+
{
159+
functionParameters![kvp.Key] = proxyData.ToObject();
160+
}
161+
else
162+
{
163+
functionParameters![kvp.Key] = kvp.Value;
164+
}
158165
}
159166
}
160167

@@ -212,24 +219,22 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)
212219
{
213220
FunctionResult invokeResult = await this.InvokeFunction(function, this._kernel, arguments).ConfigureAwait(false);
214221
this.EmitEvent(
215-
new ProcessEvent
216-
{
217-
Namespace = this._eventNamespace,
218-
SourceId = $"{targetFunction}.OnResult",
219-
Data = invokeResult.GetValue<object>()
220-
});
222+
ProcessEvent.Create(
223+
invokeResult.GetValue<object>(),
224+
this._eventNamespace,
225+
sourceId: $"{targetFunction}.OnResult",
226+
eventVisibility: KernelProcessEventVisibility.Public));
221227
}
222228
catch (Exception ex)
223229
{
224230
this._logger.LogError(ex, "Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
225231
this.EmitEvent(
226-
new ProcessEvent
227-
{
228-
Namespace = this._eventNamespace,
229-
SourceId = $"{targetFunction}.OnError",
230-
Data = KernelProcessError.FromException(ex),
231-
IsError = true
232-
});
232+
ProcessEvent.Create(
233+
KernelProcessError.FromException(ex),
234+
this._eventNamespace,
235+
sourceId: $"{targetFunction}.OnError",
236+
eventVisibility: KernelProcessEventVisibility.Public,
237+
isError: true));
233238
}
234239
finally
235240
{

dotnet/src/Experimental/Process.Runtime.Dapr/Actors/StepActor.cs

+19-14
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,14 @@ internal virtual void AssignStepFunctionParameterValues(ProcessMessage message)
234234
functionParameters = this._inputs[message.FunctionName];
235235
}
236236

237-
functionParameters![kvp.Key] = kvp.Value;
237+
if (kvp.Value is KernelProcessEventData proxyData)
238+
{
239+
functionParameters![kvp.Key] = proxyData.ToObject();
240+
}
241+
else
242+
{
243+
functionParameters![kvp.Key] = kvp.Value;
244+
}
238245
}
239246
}
240247

@@ -301,24 +308,22 @@ internal virtual async Task HandleMessageAsync(ProcessMessage message)
301308
await this.StateManager.SaveStateAsync().ConfigureAwait(false);
302309

303310
await this.EmitEventAsync(
304-
new ProcessEvent
305-
{
306-
Namespace = this._eventNamespace!,
307-
SourceId = $"{targetFunction}.OnResult",
308-
Data = invokeResult.GetValue<object>()
309-
}).ConfigureAwait(false);
311+
ProcessEvent.Create(
312+
invokeResult.GetValue<object>(),
313+
this._eventNamespace!,
314+
sourceId: $"{targetFunction}.OnResult",
315+
eventVisibility: KernelProcessEventVisibility.Public)).ConfigureAwait(false);
310316
}
311317
catch (Exception ex)
312318
{
313319
this._logger?.LogError(ex, "Error in Step {StepName}: {ErrorMessage}", this.Name, ex.Message);
314320
await this.EmitEventAsync(
315-
new ProcessEvent
316-
{
317-
Namespace = this._eventNamespace!,
318-
SourceId = $"{targetFunction}.OnError",
319-
Data = KernelProcessError.FromException(ex),
320-
IsError = true
321-
}).ConfigureAwait(false);
321+
ProcessEvent.Create(
322+
KernelProcessError.FromException(ex),
323+
this._eventNamespace!,
324+
sourceId: $"{targetFunction}.OnError",
325+
eventVisibility: KernelProcessEventVisibility.Public,
326+
isError: true)).ConfigureAwait(false);
322327
}
323328
finally
324329
{

0 commit comments

Comments
 (0)