How to get the route messaging details in IOT hub in python?

107 views Asked by At

I want to know the Python sdk that could be used to get the route messagine details in IOT hub. Basically whatever is shown in the tabular column under Message Routing Tab, is what I need. But I am unable to find the package to get this details.

enter image description here

1

There are 1 answers

0
LeelaRajesh_Sayana On

The API call would need a Bearer token for authentication. The following Python code generates a Bearer token and gets the Azure IoT Hub Custom end points and routes.

import subprocess
import json
import requests

url = "https://management.azure.com/subscriptions/<subscriptionID>/resourceGroups/<resourceGroupName>/providers/Microsoft.Devices/IotHubs/<IoTHubName>?api-version=2022-04-30-preview"
cmd = "az account get-access-token --resource https://management.azure.com/"
result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)


if result.returncode == 0:
    output = json.loads(result.stdout.decode())
    access_token = output["accessToken"]
    #print(f"Bearer {access_token}")
    # Set the Authorization header with the bearer token
    headers = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}    
    # Make the HTTP GET request with the headers
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        data = json.loads(response.text)
        properties = data["properties"]
        routing = properties["routing"]
        customendpoints = routing["endpoints"]
        print(customendpoints)
        routes = routing["routes"]
        print("\n\nPrinting routes")
        print(routes)
        # Use the parsed JSON data as needed
    else:
        print("Error:", response.status_code, response.text)
else:
    print("Error:", result.stderr.decode())

Please note to provide the correct Azure subscription ID, resource group name and IoT Hub name in the url varaible

If you plan to execute the code repeatedly, you can modify the code to put a check point to verify bearer tokens validity and generate a new token only when the created Token expired.

Hope this helps!