pulsar

Apache Pulsar Core Knowledge

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "pulsar" with this command: npx skills add claude-dev-suite/claude-dev-suite/claude-dev-suite-claude-dev-suite-pulsar

Apache Pulsar Core Knowledge

Deep Knowledge: Use mcp__documentation__fetch_docs with technology: pulsar for comprehensive documentation.

Quick Start (Docker)

docker-compose.yml

services: pulsar: image: apachepulsar/pulsar:latest ports: - "6650:6650" # Broker - "8080:8080" # Admin API command: bin/pulsar standalone volumes: - pulsar_data:/pulsar/data

volumes: pulsar_data:

docker-compose up -d

Create tenant and namespace

docker exec pulsar bin/pulsar-admin tenants create my-tenant docker exec pulsar bin/pulsar-admin namespaces create my-tenant/my-namespace

Test

docker exec pulsar bin/pulsar-client produce persistent://my-tenant/my-namespace/orders -m "test" docker exec pulsar bin/pulsar-client consume persistent://my-tenant/my-namespace/orders -s "test-sub"

Core Concepts

Concept Description

Tenant Top-level isolation unit

Namespace Administrative unit within tenant

Topic Message stream (persistent/non-persistent)

Subscription Named cursor with delivery semantics

Partition Topic partitioning for parallelism

Bookie Storage layer (Apache BookKeeper)

Architecture

┌─────────────────────────────────────────────────────────────┐ │ Pulsar Cluster │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ │ └───────────┘ └───────────┘ └───────────┘ │ │ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ BookKeeper (Bookies) │ │ │ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │ │ │Bookie 1│ │Bookie 2│ │Bookie 3│ │ │ │ │ └────────┘ └────────┘ └────────┘ │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────┘

Subscription Types

Type Description Use Case

Exclusive Single consumer Ordered processing

Shared Round-robin to consumers Load balancing

Failover Active-standby High availability

Key_Shared Key-based routing Ordered per key

Producer Patterns

Node.js (pulsar-client)

import Pulsar from 'pulsar-client';

const client = new Pulsar.Client({ serviceUrl: 'pulsar://localhost:6650', operationTimeoutSeconds: 30, });

const producer = await client.createProducer({ topic: 'persistent://my-tenant/my-namespace/orders', sendTimeoutMs: 30000, batchingEnabled: true, batchingMaxMessages: 1000, batchingMaxPublishDelayMs: 10, });

// Send message const messageId = await producer.send({ data: Buffer.from(JSON.stringify(order)), properties: { 'correlation-id': correlationId, 'order-type': order.type, }, eventTimestamp: Date.now(), });

console.log(Sent: ${messageId});

// Send with key (for Key_Shared subscription) await producer.send({ data: Buffer.from(JSON.stringify(order)), partitionKey: order.customerId, });

await producer.flush(); await producer.close(); await client.close();

Java (pulsar-client)

@Configuration public class PulsarConfig { @Bean public PulsarClient pulsarClient() throws PulsarClientException { return PulsarClient.builder() .serviceUrl("pulsar://localhost:6650") .operationTimeout(30, TimeUnit.SECONDS) .build(); }

@Bean
public Producer<byte[]> orderProducer(PulsarClient client) throws PulsarClientException {
    return client.newProducer()
        .topic("persistent://my-tenant/my-namespace/orders")
        .batchingMaxMessages(1000)
        .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
        .sendTimeout(30, TimeUnit.SECONDS)
        .create();
}

}

@Service public class OrderProducer { @Autowired private Producer<byte[]> producer;

public void sendOrder(Order order) throws PulsarClientException {
    MessageId messageId = producer.newMessage()
        .value(objectMapper.writeValueAsBytes(order))
        .property("correlation-id", UUID.randomUUID().toString())
        .property("order-type", order.getType())
        .eventTime(System.currentTimeMillis())
        .key(order.getCustomerId())  // For Key_Shared
        .send();

    log.info("Sent: {}", messageId);
}

public CompletableFuture&#x3C;MessageId> sendOrderAsync(Order order) {
    return producer.newMessage()
        .value(objectMapper.writeValueAsBytes(order))
        .sendAsync();
}

}

Python (pulsar-client)

import pulsar import json

client = pulsar.Client('pulsar://localhost:6650')

producer = client.create_producer( 'persistent://my-tenant/my-namespace/orders', batching_enabled=True, batching_max_messages=1000, batching_max_publish_delay_ms=10 )

Send message

message_id = producer.send( json.dumps(order).encode('utf-8'), properties={ 'correlation-id': correlation_id, 'order-type': order['type'] }, partition_key=order['customer_id'] )

print(f"Sent: {message_id}")

producer.flush() producer.close() client.close()

Go (pulsar-client-go)

package main

import ( "context" "encoding/json" "github.com/apache/pulsar-client-go/pulsar" )

func main() { client, _ := pulsar.NewClient(pulsar.ClientOptions{ URL: "pulsar://localhost:6650", OperationTimeout: 30 * time.Second, }) defer client.Close()

producer, _ := client.CreateProducer(pulsar.ProducerOptions{
    Topic:                   "persistent://my-tenant/my-namespace/orders",
    BatchingMaxMessages:     1000,
    BatchingMaxPublishDelay: 10 * time.Millisecond,
})
defer producer.Close()

body, _ := json.Marshal(order)

msgID, _ := producer.Send(context.Background(), &#x26;pulsar.ProducerMessage{
    Payload: body,
    Key:     order.CustomerID,
    Properties: map[string]string{
        "correlation-id": correlationID,
        "order-type":     order.Type,
    },
})

log.Printf("Sent: %v", msgID)

}

Consumer Patterns

Node.js

// Exclusive subscription const consumer = await client.subscribe({ topic: 'persistent://my-tenant/my-namespace/orders', subscription: 'order-processor', subscriptionType: 'Exclusive', ackTimeoutMs: 30000, });

while (true) { const msg = await consumer.receive(); try { const order = JSON.parse(msg.getData().toString()); await processOrder(order); consumer.acknowledge(msg); } catch (error) { consumer.negativeAcknowledge(msg); } }

// Shared subscription (load balanced) const sharedConsumer = await client.subscribe({ topic: 'persistent://my-tenant/my-namespace/orders', subscription: 'order-processors', subscriptionType: 'Shared', receiverQueueSize: 1000, });

// Key_Shared subscription (ordered per key) const keySharedConsumer = await client.subscribe({ topic: 'persistent://my-tenant/my-namespace/orders', subscription: 'order-processors', subscriptionType: 'KeyShared', });

// Dead letter topic const dlqConsumer = await client.subscribe({ topic: 'persistent://my-tenant/my-namespace/orders', subscription: 'order-processor', subscriptionType: 'Shared', deadLetterPolicy: { maxRedeliverCount: 3, deadLetterTopic: 'persistent://my-tenant/my-namespace/orders-dlq', }, });

Java

@Service public class OrderConsumer { @Autowired private PulsarClient client;

@PostConstruct
public void startConsumer() throws PulsarClientException {
    Consumer&#x3C;byte[]> consumer = client.newConsumer()
        .topic("persistent://my-tenant/my-namespace/orders")
        .subscriptionName("order-processor")
        .subscriptionType(SubscriptionType.Shared)
        .ackTimeout(30, TimeUnit.SECONDS)
        .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
        .deadLetterPolicy(DeadLetterPolicy.builder()
            .maxRedeliverCount(3)
            .deadLetterTopic("persistent://my-tenant/my-namespace/orders-dlq")
            .build())
        .subscribe();

    new Thread(() -> {
        while (true) {
            try {
                Message&#x3C;byte[]> msg = consumer.receive();
                Order order = objectMapper.readValue(msg.getData(), Order.class);
                processOrder(order);
                consumer.acknowledge(msg);
            } catch (Exception e) {
                consumer.negativeAcknowledge(msg);
            }
        }
    }).start();
}

}

// Batch consumer Consumer<byte[]> batchConsumer = client.newConsumer() .topic("persistent://my-tenant/my-namespace/orders") .subscriptionName("batch-processor") .subscriptionType(SubscriptionType.Shared) .batchReceivePolicy(BatchReceivePolicy.builder() .maxNumMessages(100) .maxNumBytes(1024 * 1024) .timeout(100, TimeUnit.MILLISECONDS) .build()) .subscribe();

Messages<byte[]> messages = batchConsumer.batchReceive(); for (Message<byte[]> msg : messages) { processMessage(msg); } batchConsumer.acknowledge(messages);

Python

consumer = client.subscribe( 'persistent://my-tenant/my-namespace/orders', subscription_name='order-processor', consumer_type=pulsar.ConsumerType.Shared, dead_letter_policy=pulsar.DeadLetterPolicy( max_redeliver_count=3, dead_letter_topic='persistent://my-tenant/my-namespace/orders-dlq' ) )

while True: msg = consumer.receive() try: order = json.loads(msg.data().decode('utf-8')) process_order(order) consumer.acknowledge(msg) except Exception as e: consumer.negative_acknowledge(msg)

Topic Management

Partitioned topic

pulsar-admin topics create-partitioned-topic
persistent://my-tenant/my-namespace/orders
--partitions 12

Set retention

pulsar-admin namespaces set-retention my-tenant/my-namespace
--size 10G --time 7d

Set TTL

pulsar-admin namespaces set-message-ttl my-tenant/my-namespace
--messageTTL 86400

Compaction (for key-based topics)

pulsar-admin topics compact persistent://my-tenant/my-namespace/orders

When NOT to Use This Skill

Use alternative messaging solutions when:

  • Simple request/reply patterns - RabbitMQ or NATS are simpler

  • AWS-native architecture - SQS has better AWS integration

  • Small-scale deployments - Operational complexity may not be justified

  • JMS compliance required - Use ActiveMQ

  • Extremely low latency (<1ms) - Use Redis or in-memory solutions

  • Single datacenter only - Kafka may be simpler

  • Limited operational expertise - Managed services may be better

Anti-Patterns

Anti-Pattern Why It's Bad Solution

Non-persistent topics in production Message loss on broker restart Use persistent topics

No backlog quotas Unbounded storage growth Set retention policies and quotas

Shared subscription for ordering Messages processed out of order Use Exclusive or Key_Shared

No dead letter topic Failed messages lost Configure DLT for all subscriptions

Large message batching Memory pressure, high latency Batch moderately (100-1000 messages)

Single bookie ensemble Data loss on bookie failure Use ensemble size >= 3

No namespace isolation Tenant resource contention Properly configure namespaces and quotas

Sync send everywhere Poor throughput Use async send with callbacks

No monitoring Invisible backlog and issues Monitor backlog, throughput, latency

Auto-ack without processing Message loss Acknowledge only after processing

Quick Troubleshooting

Issue Likely Cause Fix

Messages not delivered Subscription not created Create subscription before consuming

Backlog growing Slow consumers or consumer down Add consumers or optimize processing

High latency Batching delay or network Reduce batching delay, check network

Out of order messages Shared subscription Use Exclusive or Key_Shared

Connection timeout Broker overload or network Check broker health, network

Subscription not found Wrong topic or subscription name Verify names with pulsar-admin

Publish errors Topic not found or permissions Create topic, check ACLs

Geo-replication lag Network issues or high load Check replication backlog metrics

Bookie failures Disk full or hardware issues Monitor disk usage, replace bookie

Negative ack storm Processing failures Fix processing logic, add retry limits

Production Readiness

Security Configuration

broker.conf

authenticationEnabled=true authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken

brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken brokerClientAuthenticationParameters={"token":"xxx"}

TLS

tlsEnabled=true tlsCertificateFilePath=/path/to/broker.cert.pem tlsKeyFilePath=/path/to/broker.key-pk8.pem tlsTrustCertsFilePath=/path/to/ca.cert.pem

const client = new Pulsar.Client({ serviceUrl: 'pulsar+ssl://pulsar.example.com:6651', authentication: new Pulsar.AuthenticationToken({ token: 'xxx' }), tlsTrustCertsFilePath: '/path/to/ca.cert.pem', });

Geo-Replication

Enable replication

pulsar-admin namespaces set-clusters my-tenant/my-namespace
--clusters cluster-1,cluster-2,cluster-3

Check replication status

pulsar-admin topics stats persistent://my-tenant/my-namespace/orders

Monitoring Metrics

Metric Alert Threshold

Backlog size

100000 messages

Publish rate Anomaly detection

Subscription lag

10000

Message age

1 hour

Replication lag

1000

Checklist

  • TLS enabled

  • Authentication configured

  • Authorization (namespace policies)

  • Partitioned topics for scale

  • Retention policies set

  • Dead letter topics configured

  • Geo-replication (if needed)

  • Backlog quotas set

  • Monitoring dashboards

  • Backup procedures

Reference Documentation

Deep Knowledge: Use mcp__documentation__fetch_docs with technology: pulsar for comprehensive documentation.

Available topics: basics , producers , consumers , production

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

cron-scheduling

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

token-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

webrtc

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

react-19

No summary provided by upstream source.

Repository SourceNeeds Review