Show / Hide Table of Contents

Interface IEventHubConsumerService

Interface of EventHubConsumerService contract with the methods responsible for connect at Event Hub Azzure service and consumer event messages.

Namespace: StoneCo.Framework.EventHub.Services.Consumer
Assembly: cs.temp.dll.dll
Syntax
public interface IEventHubConsumerService
Examples

note

This is an initial configuration that should be used as a basis for the following examples.

First let's create the class that will represent the generic type that will be produced.

public class Foo 
{
    public Foo(long id, string content, bool done)
    {
        Id = id;
        Content = content;
        Done = done;
    }

    public long Id { get; private set; }
    public string Content { get; private set; }
    public bool Done { get; private set; }
}

Here we have the appsettings.json structure.

Only field "EventHubProducerConnectionString" is required on file appsettings.json if to producer an event message.

{
   "EventHubs": {
        "Consumers": [
            {
            "Name": "Test1-Consumer",
            "ConnectionString": "Endpoint=sb://sample-eventhub.servicebus.windows.net/;SharedAccessKeyName=SampleConsumer;SharedAccessKey=XxxnXxxxXXsnnxxxxxXxxXXxxxnXXxXXnxxXxnxXxxn=;EntityPath=test-1",
            "EventHubName": "test-1",
            "StorageContainerName": "test-1-offset",
            "StorageAccountName": "transrepframeworkstg",
             "StorageAccountKey": "XxXnxxXXnXXsxXxXnxxXnnXnXxXxXxxXXXsnXXnXXXnxXXxXxxnxXnnnxnxxxXnXsnxnXxxnxnxXXXsxXxnxxx=="
            "ConsumerGroupName": "$Default",
            "RenewIntervalInSeconds": 2,
            "LeaseDurationInSeconds": 15,
            "NumberOfEventsPerRequest": 100,
            "ProducerName" : "Teste1-Error",
            "Shunt" : {
                "ProducerName" : "Teste1-ToAnalyse",
                "MaxRetry" : 5
                }
            }
        ],
        "Producers": [
            {
            "Name": "Teste1-Error",
            "ConnectionString": "Endpoint=sb://sample-eventhub.servicebus.windows.net/;SharedAccessKeyName=SampleProducer;SharedAccessKey=XxxnXxxxXXsnnxxxxxXxxXXxxxnXXxXXnxxXxnxXxxn=;EntityPath=test-1",
            "EventHubName": "test-1"
            },
            {
            "Name": "Teste1-ToAnalyse",
            "ConnectionString": "Endpoint=sb://sample-eventhub.servicebus.windows.net/;SharedAccessKeyName=SampleProducer;SharedAccessKey=XxxnXxxxXXsnnxxxxxXxxXXxxxnXXxXXnxxXxnxXxxn=;EntityPath=test-1",
            "EventHubName": "test-1"
            }
        ]
    }

Dependency injection of configuration on ServiceCollection

using StoneCo.Framework.EventHub;
Services.AddEventHub(Configuration);

Methods

RegisterEventMessageConsumerAsync(String, IEventProcessorFactory)

Register an event processor that will be consumer event messages of a queue or topic asynchronously.

warning

This method is only for the internal use of the framework.

Declaration
Task<bool> RegisterEventMessageConsumerAsync(string name, IEventProcessorFactory eventProcessorFactory)
Parameters
Type Name Description
System.String name

Name of configuration at Event Hub Configuration setting.

IEventProcessorFactory eventProcessorFactory

An intance of factory.

Returns
Type Description
Task<System.Boolean>

True if it's connect and false if it's connected already.

RegisterEventMessageConsumerAsync<TEventProcessor>(String)

Register an event processor that will be consumer event messages of a queue or topic asynchronously.

Declaration
Task<bool> RegisterEventMessageConsumerAsync<TEventProcessor>(string name)
    where TEventProcessor : EventProcessorDefault
Parameters
Type Name Description
System.String name

Name of configuration at Event Hub Configuration setting.

Returns
Type Description
Task<System.Boolean>

True if it's connect and false if it's connected already.

Type Parameters
Name Description
TEventProcessor

Could be the class EventProcessorDefault (That will purge the queue) or a type of it overriding its methods.

Examples

Creating an specific Event Processor. It will Take a list of EventData and will store into a test collection. Will use too method 'GetString' to desserialize.

public class FooEventProcessor : EventProcessorDefault
{
    private CollectorServiceTest CollectorServiceTest { get; }

    public FooEventProcessor(int? consumeDelayInSeconds, IServiceProvider serviceProvider, IConfiguration configuration)
        : base(consumeDelayInSeconds, serviceProvider, configuration)
    {
        CollectorServiceTest = ServiceProvider.GetService<CollectorServiceTest>();
    }

    public override async Task<bool> ProcessEventDataList(PartitionContext context, List<EventData> eventDataList)
    {
        foreach (var eventData in eventDataList) CollectorServiceTest.Add(GetEventSendMessage<Foo>(eventData).EventMessage);

        await Task.CompletedTask;

        return true;
    }
}

Get an instance.

IEventHubConsumerService eventHubConsumerService = provider.GetService<IEventHubConsumerService>();

Register a consumer.

await eventHubConsumerService.RegisterEventMessageConsumerAsync<FooEventProcessor>("Test1-Consumer");

UnregisterAllMessageConsumerAsync()

Unregister asynchronously all event processors that has been consuming event messages of a queue or topic.

Declaration
Task UnregisterAllMessageConsumerAsync()
Returns
Type Description
Task
Examples
await eventHubConsumerService.UnregisterAllMessageConsumerAsync();

UnregisterEventMessageConsumerAsync(String)

Unregister asynchronously an event processor that has been consuming event messages of a queue or topic.

Declaration
Task<IEventProcessorFactory> UnregisterEventMessageConsumerAsync(string name)
Parameters
Type Name Description
System.String name

Name of configuration at Event Hub Configuration setting.

Returns
Type Description
Task<IEventProcessorFactory>
Examples
await eventHubConsumerService.UnregisterEventMessageConsumerAsync("Test1-Consumer");
Back to top Generated by DocFX