azure-servicebus-py

Azure Service Bus SDK for Python

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "azure-servicebus-py" with this command: npx skills add claudedjale/skillset/claudedjale-skillset-azure-servicebus-py

Azure Service Bus SDK for Python

Enterprise messaging for reliable cloud communication with queues and pub/sub topics.

Installation

pip install azure-servicebus azure-identity

Environment Variables

SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net SERVICEBUS_QUEUE_NAME=myqueue SERVICEBUS_TOPIC_NAME=mytopic SERVICEBUS_SUBSCRIPTION_NAME=mysubscription

Authentication

from azure.identity import DefaultAzureCredential from azure.servicebus import ServiceBusClient

credential = DefaultAzureCredential() namespace = "<namespace>.servicebus.windows.net"

client = ServiceBusClient( fully_qualified_namespace=namespace, credential=credential )

Client Types

Client Purpose Get From

ServiceBusClient

Connection management Direct instantiation

ServiceBusSender

Send messages client.get_queue_sender() / get_topic_sender()

ServiceBusReceiver

Receive messages client.get_queue_receiver() / get_subscription_receiver()

Send Messages (Async)

import asyncio from azure.servicebus.aio import ServiceBusClient from azure.servicebus import ServiceBusMessage from azure.identity.aio import DefaultAzureCredential

async def send_messages(): credential = DefaultAzureCredential()

async with ServiceBusClient(
    fully_qualified_namespace="&#x3C;namespace>.servicebus.windows.net",
    credential=credential
) as client:
    sender = client.get_queue_sender(queue_name="myqueue")
    
    async with sender:
        # Single message
        message = ServiceBusMessage("Hello, Service Bus!")
        await sender.send_messages(message)
        
        # Batch of messages
        messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
        await sender.send_messages(messages)
        
        # Message batch (for size control)
        batch = await sender.create_message_batch()
        for i in range(100):
            try:
                batch.add_message(ServiceBusMessage(f"Batch message {i}"))
            except ValueError:  # Batch full
                await sender.send_messages(batch)
                batch = await sender.create_message_batch()
                batch.add_message(ServiceBusMessage(f"Batch message {i}"))
        await sender.send_messages(batch)

asyncio.run(send_messages())

Receive Messages (Async)

async def receive_messages(): credential = DefaultAzureCredential()

async with ServiceBusClient(
    fully_qualified_namespace="&#x3C;namespace>.servicebus.windows.net",
    credential=credential
) as client:
    receiver = client.get_queue_receiver(queue_name="myqueue")
    
    async with receiver:
        # Receive batch
        messages = await receiver.receive_messages(
            max_message_count=10,
            max_wait_time=5  # seconds
        )
        
        for msg in messages:
            print(f"Received: {str(msg)}")
            await receiver.complete_message(msg)  # Remove from queue

asyncio.run(receive_messages())

Receive Modes

Mode Behavior Use Case

PEEK_LOCK (default) Message locked, must complete/abandon Reliable processing

RECEIVE_AND_DELETE

Removed immediately on receive At-most-once delivery

from azure.servicebus import ServiceBusReceiveMode

receiver = client.get_queue_receiver( queue_name="myqueue", receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE )

Message Settlement

async with receiver: messages = await receiver.receive_messages(max_message_count=1)

for msg in messages:
    try:
        # Process message...
        await receiver.complete_message(msg)  # Success - remove from queue
    except ProcessingError:
        await receiver.abandon_message(msg)  # Retry later
    except PermanentError:
        await receiver.dead_letter_message(
            msg,
            reason="ProcessingFailed",
            error_description="Could not process"
        )

Action Effect

complete_message()

Remove from queue (success)

abandon_message()

Release lock, retry immediately

dead_letter_message()

Move to dead-letter queue

defer_message()

Set aside, receive by sequence number

Topics and Subscriptions

Send to topic

sender = client.get_topic_sender(topic_name="mytopic") async with sender: await sender.send_messages(ServiceBusMessage("Topic message"))

Receive from subscription

receiver = client.get_subscription_receiver( topic_name="mytopic", subscription_name="mysubscription" ) async with receiver: messages = await receiver.receive_messages(max_message_count=10)

Sessions (FIFO)

Send with session

message = ServiceBusMessage("Session message") message.session_id = "order-123" await sender.send_messages(message)

Receive from specific session

receiver = client.get_queue_receiver( queue_name="session-queue", session_id="order-123" )

Receive from next available session

from azure.servicebus import NEXT_AVAILABLE_SESSION receiver = client.get_queue_receiver( queue_name="session-queue", session_id=NEXT_AVAILABLE_SESSION )

Scheduled Messages

from datetime import datetime, timedelta, timezone

message = ServiceBusMessage("Scheduled message") scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)

Schedule message

sequence_number = await sender.schedule_messages(message, scheduled_time)

Cancel scheduled message

await sender.cancel_scheduled_messages(sequence_number)

Dead-Letter Queue

from azure.servicebus import ServiceBusSubQueue

Receive from dead-letter queue

dlq_receiver = client.get_queue_receiver( queue_name="myqueue", sub_queue=ServiceBusSubQueue.DEAD_LETTER )

async with dlq_receiver: messages = await dlq_receiver.receive_messages(max_message_count=10) for msg in messages: print(f"Dead-lettered: {msg.dead_letter_reason}") await dlq_receiver.complete_message(msg)

Sync Client (for simple scripts)

from azure.servicebus import ServiceBusClient, ServiceBusMessage from azure.identity import DefaultAzureCredential

with ServiceBusClient( fully_qualified_namespace="<namespace>.servicebus.windows.net", credential=DefaultAzureCredential() ) as client: with client.get_queue_sender("myqueue") as sender: sender.send_messages(ServiceBusMessage("Sync message"))

with client.get_queue_receiver("myqueue") as receiver:
    for msg in receiver:
        print(str(msg))
        receiver.complete_message(msg)

Best Practices

  • Use async client for production workloads

  • Use context managers (async with ) for proper cleanup

  • Complete messages after successful processing

  • Use dead-letter queue for poison messages

  • Use sessions for ordered, FIFO processing

  • Use message batches for high-throughput scenarios

  • Set max_wait_time to avoid infinite blocking

Reference Files

File Contents

references/patterns.md Competing consumers, sessions, retry patterns, request-response, transactions

references/dead-letter.md DLQ handling, poison messages, reprocessing strategies

scripts/setup_servicebus.py CLI for queue/topic/subscription management and DLQ monitoring

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

github-issue-creator

No summary provided by upstream source.

Repository SourceNeeds Review
General

azure-observability

No summary provided by upstream source.

Repository SourceNeeds Review
General

azure-appconfiguration-java

No summary provided by upstream source.

Repository SourceNeeds Review
General

copilot-sdk

No summary provided by upstream source.

Repository SourceNeeds Review