DAPR pubsub - subscribe to an azure service bus queue

530 views Asked by At

I created an azure service bus queue, named: product with sessions enabled as I must use the session feature.

I created a C# console app to send messages to an Azure Service Bus using DAPR. I use connection string in pubsub yaml:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: PRODUCT
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
  - name: connectionString
    value: "Endpoint=sb://MyService.servicebus.windows.net/;SharedAccessKeyName=myAccessKeyName;SharedAccessKey=mySharedAccessKey;EntityPath=product"

I am able to send messages to the queue with sessionId like "CARS.12" where 12 is a tenant id.

I tried to create a consumer service using minimap webApi using .netCore. I have two problems now:

app.MapPost("/productEvents", 
    [Topic("PRODUCT", "product")] 
    async ([FromBody] CloudEvent job, [FromServices] ILogger<ProductManagementApi> logger) =>
{
   ...
}

The question is: the Topic(..) attribute's first argument is the name of the configuration settings PRODUCT (in the yaml file), thats ok. But the 2nd argument is a topic name which I has none, since it is not a topic (in azure service bus) but a queue. But I cannot let this argument empty or set as empty string. If I use product as name here, the DAPR drops exception with the following text:

App is subscribed to the following topics: [product] through pubsub=PRODUCT

error occurred while beginning pubsub for topic product on component PRODUCT: failed to subscribe to topic product: could not get subscription myAppId: GET https://MyService.servicebus.windows.net/product/Subscriptions/myAppId

Now I wonder how to subscribe to a queue and receive the messages... any idea?

Thanks in advance!

1

There are 1 answers

2
Suresh Chikkam On

It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

  • Then create a sessionful message receiver to consume messages from the session-enabled queue.
namespace MyDaprApp.Controllers
{
    [ApiController]
    [Route("[controller]")]
    public class ProductController : ControllerBase
    {
        private readonly ILogger<ProductController> _logger;
        private readonly ServiceBusClient _serviceBusClient;
        private readonly ServiceBusSessionClient _sessionClient;

        public ProductController(ILogger<ProductController> logger)
        {
            _logger = logger;
            _serviceBusClient = new ServiceBusClient("Your Azure Service Bus Connection String Here");
            _sessionClient = _serviceBusClient.CreateSessionClient("your-queue-name");
        }

        [HttpPost("productEvents")]
        [Topic("PRODUCT", "product", Entities.Session | Entities.Queue)]
        public async Task<IActionResult> HandleProductEvent([FromBody] object data)
        {
            try
            {
                // Create a session receiver
                ServiceBusReceiver receiver = _sessionClient.CreateReceiver();

                // Receive a message from the session-enabled queue
                ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();

                if (message != null)
                {
                    // Process the received message
                    string messageBody = Encoding.UTF8.GetString(message.Body);
                    _logger.LogInformation($"Received product event: {messageBody}");

                    // Complete the message to remove it from the queue
                    await receiver.CompleteMessageAsync(message);

                    return Ok();
                }
                else
                {
                    // No message received
                    return NotFound();
                }
            }
            catch (Exception ex)
            {
                _logger.LogError($"Error handling product event: {ex.Message}");
                return StatusCode(500, "Internal Server Error");
            }
        }
    }
}

enter image description here

  • In your Dapr component configuration (pubsub.yaml), check that you have set the isSessionEnabled property to "true" for your Azure Service Bus component.

enter image description here

When a request is received, it will attempt to consume a message from the session-enabled Azure Service Bus queue and process it according to the defined logic.