Announcing ActorSrcGen 1.1.2

The open-source project ActorSrcGen (Nuget) has recently undergone significant updates that help streamline the development of complex long-lived pipeline processes for C# developers. These changes enhance the existing model to provide an easy approach towards the incorporation of multiple message pumps to feed data into the high-performance pipeline code that was a feature of previous versions.

The open-source project ActorSrcGen (Nuget) has recently undergone significant updates to help streamline the development of complex long-lived pipeline processes for C# developers. These changes enhance the existing model providing an easy approach to creating multiple message pumps feeding data into your high-performance pipelines.

I use this system for the development of large scale IoT telemetry ingestion pipelines, where performance, and resilience are vital. The fact that you can structure your code to use a small set of simple, self-contained methods, with minimal interaction between them, makes the code simpler to read and easier to test.

The New Enhancements

The latest release of ActorSrcGen introduce a new programming model that simplifies the creation of dataflow compatible pipelines. This model leverages the Task Parallel Library (TPL) Dataflow to convert simple C# classes into robust, actor-based systems. The beauty of this approach lies in its simplicity: developers can now define a series of steps within a class, and ActorSrcGen will automatically generate the boilerplate code necessary to wire these methods together into a cohesive dataflow pipeline.

Introducing The Receiver Attribute

By using the new Receiver attribute on any of the InitialSteps of your actor, ActorSrcGen will automatically generate a small framework based on a partial method for a message pump. It uses the expected types of the input steps and can be made to run in the background indefinitely till it is cancelled.

The Receiver attribute helps separate the receipt of incoming messages from the complex code for handling them. This allows the incoming messages to be processed separately, making the structure more organized and easier to maintain. By using the Receiver attribute, the system can efficiently manage and direct incoming messages to the right handlers without mixing up the receipt and processing logic. This ultimately improves the modularity and readability of the code.

Here’s an example of a simple actor that can accept a bunch of integers (as supplied by the little ReceiveDoTask1 function that just waits for a while then returns a value from a counter).

[Actor]
public partial class MyActor
{
    public List<int> Results { get; set; } = [];
    public int Counter { get; set; }

    [FirstStep("blah")]
    [Receiver]
    [NextStep(nameof(DoTask2))]
    [NextStep(nameof(LogMessage))]
    public Task<string> DoTask1(int x)
    {
        Console.WriteLine("DoTask1");
        return Task.FromResult(x.ToString());
    }

    protected async partial Task<int> ReceiveDoTask1(CancellationToken ct)
    {
        await Task.Delay(1000, ct);
        return Counter++;
    }


    [Step]
    [NextStep(nameof(DoTask3))]
    public Task<string> DoTask2(string x)
    {
        Console.WriteLine("DoTask2");
        return Task.FromResult($"100{x}");
    }

    [LastStep]
    public async Task<int> DoTask3(string input)
    {
        await Console.Out.WriteLineAsync("DoTask3");
        var result = int.Parse(input);
        Results.Add(result);
        return result;
    }

    [LastStep]
    public void LogMessage(string x)
    {
        Console.WriteLine("Incoming Message: " + x);
    }
}

You would use the code like this:

var actor = new MyActor();

try
{
    if (actor.Call(10))
        Console.WriteLine("Called Synchronously");

    var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

    var t = Task.Run(async () => await actor.ListenForReceiveDoTask1(cts.Token), cts.Token);

    while (!cts.Token.IsCancellationRequested)
    {
        var result = await actor.AcceptAsync(cts.Token);
        Console.WriteLine($"Result: {result}");
    }

    await actor.SignalAndWaitForCompletionAsync();
}
catch (OperationCanceledException operationCanceledException)
{
    Console.WriteLine("All Done!");
}

What this is effectively doing is creating a producer-consumer model where the thread running actor.ListenForReceiveDoTask1 continuously receives integers until it is told to stop. It pumps those integers into the input block for the actor (in this case the block wrapping DoTask1) where the pipeline plumbing picks up the work. Further on, we have another loop that is consuming messages from the tail end of the pipeline as they become available.

In my work, this would actually be replaced by telemetry poll triggers, incoming messages on a message queue, files in BLOB storage or messages from an MQTT subscription.

Alignment with Erlang’s Coding Models

The actor model supported by ActorSrcGen shares similarities with the coding models found in technology stacks like Erlang. Erlang, known for its robust concurrent processing capabilities, also employs an actor model where processes communicate through message passing and can dynamically adjust their behavior in response to incoming messages. ActorSrcGen’s model aligns with these principles by enabling in-process message passing and promoting a style of programming that is both concurrent and isolated, much like Erlang’s processes.

It is, when all is said and done, a .NET framework mainly for multi-threading and asynchrony. As such it is still falling short of Erlang, but it is a step forward for C#. For those interested in contributing or learning more about ActorSrcGen, the project is actively maintained on GitHub, and contributions and suggestions from the community are welcome.