Interface IEventHubConsumerService
Interface of EventHubConsumerService contract with the methods responsible for connect at Event Hub Azzure service and consumer event messages.
Namespace: StoneCo.Framework.EventHub.Services.Consumer
Assembly: cs.temp.dll.dll
Syntax
public interface IEventHubConsumerService
Examples
note
This is an initial configuration that should be used as a basis for the following examples.
public class Foo
{
public Foo(long id, string content, bool done)
{
Id = id;
Content = content;
Done = done;
}
public long Id { get; private set; }
public string Content { get; private set; }
public bool Done { get; private set; }
}
Here we have the appsettings.json structure.
Only field "EventHubProducerConnectionString" is required on file appsettings.json if to producer an event message.
{
"EventHubs": {
"Consumers": [
{
"Name": "Test1-Consumer",
"ConnectionString": "Endpoint=sb://sample-eventhub.servicebus.windows.net/;SharedAccessKeyName=SampleConsumer;SharedAccessKey=XxxnXxxxXXsnnxxxxxXxxXXxxxnXXxXXnxxXxnxXxxn=;EntityPath=test-1",
"EventHubName": "test-1",
"StorageContainerName": "test-1-offset",
"StorageAccountName": "transrepframeworkstg",
"StorageAccountKey": "XxXnxxXXnXXsxXxXnxxXnnXnXxXxXxxXXXsnXXnXXXnxXXxXxxnxXnnnxnxxxXnXsnxnXxxnxnxXXXsxXxnxxx=="
"ConsumerGroupName": "$Default",
"RenewIntervalInSeconds": 2,
"LeaseDurationInSeconds": 15,
"NumberOfEventsPerRequest": 100,
"ProducerName" : "Teste1-Error",
"Shunt" : {
"ProducerName" : "Teste1-ToAnalyse",
"MaxRetry" : 5
}
}
],
"Producers": [
{
"Name": "Teste1-Error",
"ConnectionString": "Endpoint=sb://sample-eventhub.servicebus.windows.net/;SharedAccessKeyName=SampleProducer;SharedAccessKey=XxxnXxxxXXsnnxxxxxXxxXXxxxnXXxXXnxxXxnxXxxn=;EntityPath=test-1",
"EventHubName": "test-1"
},
{
"Name": "Teste1-ToAnalyse",
"ConnectionString": "Endpoint=sb://sample-eventhub.servicebus.windows.net/;SharedAccessKeyName=SampleProducer;SharedAccessKey=XxxnXxxxXXsnnxxxxxXxxXXxxxnXXxXXnxxXxnxXxxn=;EntityPath=test-1",
"EventHubName": "test-1"
}
]
}
Dependency injection of configuration on ServiceCollection
using StoneCo.Framework.EventHub;
Services.AddEventHub(Configuration);
Methods
RegisterEventMessageConsumerAsync(String, IEventProcessorFactory)
Register an event processor that will be consumer event messages of a queue or topic asynchronously.
warning
This method is only for the internal use of the framework.
Declaration
Task<bool> RegisterEventMessageConsumerAsync(string name, IEventProcessorFactory eventProcessorFactory)
Parameters
Type | Name | Description |
---|---|---|
System.String | name | Name of configuration at Event Hub Configuration setting. |
IEventProcessorFactory | eventProcessorFactory | An intance of factory. |
Returns
Type | Description |
---|---|
Task<System.Boolean> | True if it's connect and false if it's connected already. |
RegisterEventMessageConsumerAsync<TEventProcessor>(String)
Register an event processor that will be consumer event messages of a queue or topic asynchronously.
Declaration
Task<bool> RegisterEventMessageConsumerAsync<TEventProcessor>(string name)
where TEventProcessor : EventProcessorDefault
Parameters
Type | Name | Description |
---|---|---|
System.String | name | Name of configuration at Event Hub Configuration setting. |
Returns
Type | Description |
---|---|
Task<System.Boolean> | True if it's connect and false if it's connected already. |
Type Parameters
Name | Description |
---|---|
TEventProcessor | Could be the class EventProcessorDefault (That will purge the queue) or a type of it overriding its methods. |
Examples
Creating an specific Event Processor. It will Take a list of EventData and will store into a test collection. Will use too method 'GetString' to desserialize.
public class FooEventProcessor : EventProcessorDefault
{
private CollectorServiceTest CollectorServiceTest { get; }
public FooEventProcessor(int? consumeDelayInSeconds, IServiceProvider serviceProvider, IConfiguration configuration)
: base(consumeDelayInSeconds, serviceProvider, configuration)
{
CollectorServiceTest = ServiceProvider.GetService<CollectorServiceTest>();
}
public override async Task<bool> ProcessEventDataList(PartitionContext context, List<EventData> eventDataList)
{
foreach (var eventData in eventDataList) CollectorServiceTest.Add(GetEventSendMessage<Foo>(eventData).EventMessage);
await Task.CompletedTask;
return true;
}
}
Get an instance.
IEventHubConsumerService eventHubConsumerService = provider.GetService<IEventHubConsumerService>();
Register a consumer.
await eventHubConsumerService.RegisterEventMessageConsumerAsync<FooEventProcessor>("Test1-Consumer");
UnregisterAllMessageConsumerAsync()
Unregister asynchronously all event processors that has been consuming event messages of a queue or topic.
Declaration
Task UnregisterAllMessageConsumerAsync()
Returns
Type | Description |
---|---|
Task |
Examples
await eventHubConsumerService.UnregisterAllMessageConsumerAsync();
UnregisterEventMessageConsumerAsync(String)
Unregister asynchronously an event processor that has been consuming event messages of a queue or topic.
Declaration
Task<IEventProcessorFactory> UnregisterEventMessageConsumerAsync(string name)
Parameters
Type | Name | Description |
---|---|---|
System.String | name | Name of configuration at Event Hub Configuration setting. |
Returns
Type | Description |
---|---|
Task<IEventProcessorFactory> |
Examples
await eventHubConsumerService.UnregisterEventMessageConsumerAsync("Test1-Consumer");