messaging-testing

Messaging Integration Testing — Multi-Broker Reference

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "messaging-testing" with this command: npx skills add claude-dev-suite/claude-dev-suite/claude-dev-suite-claude-dev-suite-messaging-testing

Messaging Integration Testing — Multi-Broker Reference

Dedicated skills: For Kafka testing see messaging-testing-kafka . For RabbitMQ testing see messaging-testing-rabbitmq .

Redis Pub/Sub

Java: Testcontainers GenericContainer

@SpringBootTest @Testcontainers class RedisPubSubTest {

@Container
@ServiceConnection(name = "redis")
static GenericContainer<?> redis =
    new GenericContainer<>("redis:7-alpine").withExposedPorts(6379);

@Autowired
private StringRedisTemplate redisTemplate;

@Test
void shouldPublishAndReceive() {
    List<String> received = new CopyOnWriteArrayList<>();
    CountDownLatch latch = new CountDownLatch(1);

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisTemplate.getConnectionFactory());
    container.addMessageListener((message, pattern) -> {
        received.add(new String(message.getBody()));
        latch.countDown();
    }, new ChannelTopic("orders"));
    container.afterPropertiesSet();
    container.start();

    redisTemplate.convertAndSend("orders", "{\"orderId\":\"123\"}");

    assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
    assertThat(received).hasSize(1);
    assertThat(received.get(0)).contains("123");

    container.stop();
}

}

Node.js: ioredis + Testcontainers

import { GenericContainer } from "testcontainers"; import Redis from "ioredis";

let container, publisher, subscriber;

beforeAll(async () => { container = await new GenericContainer("redis:7-alpine") .withExposedPorts(6379).start(); const url = redis://${container.getHost()}:${container.getMappedPort(6379)}; publisher = new Redis(url); subscriber = new Redis(url); }, 30_000);

afterAll(async () => { await publisher.quit(); await subscriber.quit(); await container.stop(); });

it("should pub/sub", async () => { const messages: string[] = []; await subscriber.subscribe("orders"); subscriber.on("message", (ch, msg) => messages.push(msg));

await publisher.publish("orders", JSON.stringify({ orderId: "123" })); await new Promise((r) => setTimeout(r, 500));

expect(messages).toHaveLength(1); expect(JSON.parse(messages[0]).orderId).toBe("123"); });

NATS

Java: Testcontainers

@SpringBootTest @Testcontainers class NatsTest {

@Container
static GenericContainer<?> nats =
    new GenericContainer<>("nats:2.10-alpine").withExposedPorts(4222);

@DynamicPropertySource
static void natsProperties(DynamicPropertyRegistry registry) {
    registry.add("nats.url", () ->
        "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222));
}

@Test
void shouldPublishAndSubscribe() throws Exception {
    Connection nc = Nats.connect(
        "nats://" + nats.getHost() + ":" + nats.getMappedPort(4222));

    CompletableFuture<Message> future = new CompletableFuture<>();
    Dispatcher dispatcher = nc.createDispatcher(future::complete);
    dispatcher.subscribe("orders");

    nc.publish("orders", "{\"orderId\":\"123\"}".getBytes());

    Message msg = future.get(5, TimeUnit.SECONDS);
    assertThat(new String(msg.getData())).contains("123");
    nc.close();
}

}

Node.js: nats + Testcontainers

import { GenericContainer } from "testcontainers"; import { connect, StringCodec } from "nats";

it("should pub/sub via NATS", async () => { const container = await new GenericContainer("nats:2.10-alpine") .withExposedPorts(4222).start(); const nc = await connect({ servers: nats://${container.getHost()}:${container.getMappedPort(4222)}, }); const sc = StringCodec();

const messages: string[] = []; const sub = nc.subscribe("orders"); (async () => { for await (const msg of sub) messages.push(sc.decode(msg.data)); })();

nc.publish("orders", sc.encode(JSON.stringify({ orderId: "123" }))); await nc.flush(); await new Promise((r) => setTimeout(r, 500));

expect(messages).toHaveLength(1); await nc.close(); await container.stop(); });

Apache Pulsar

Java: PulsarContainer + @ServiceConnection (Spring Boot 3.2+)

@SpringBootTest @Testcontainers class PulsarTest {

@Container
@ServiceConnection
static PulsarContainer pulsar = new PulsarContainer("apachepulsar/pulsar:3.2.0");

@Autowired
private PulsarTemplate<String> pulsarTemplate;

@Test
void shouldProduceAndConsume() throws Exception {
    pulsarTemplate.send("orders", "order-123");

    // Consumer verifies via listener or direct consumer API
    await().atMost(Duration.ofSeconds(10))
        .untilAsserted(() -> {
            // Assert side effect of listener processing
        });
}

}

Dependencies

<dependency> <groupId>org.testcontainers</groupId> <artifactId>pulsar</artifactId> <scope>test</scope> </dependency>

Amazon SQS (LocalStack)

Java: LocalStack + @DynamicPropertySource

@SpringBootTest @Testcontainers class SqsTest {

@Container
static LocalStackContainer localstack = new LocalStackContainer(
    DockerImageName.parse("localstack/localstack:3.4"))
    .withServices(Service.SQS);

@DynamicPropertySource
static void sqsProperties(DynamicPropertyRegistry registry) {
    registry.add("spring.cloud.aws.sqs.endpoint",
        () -> localstack.getEndpointOverride(Service.SQS).toString());
    registry.add("spring.cloud.aws.region.static", () -> localstack.getRegion());
    registry.add("spring.cloud.aws.credentials.access-key", localstack::getAccessKey);
    registry.add("spring.cloud.aws.credentials.secret-key", localstack::getSecretKey);
}

@BeforeAll
static void createQueue() throws Exception {
    localstack.execInContainer("awslocal", "sqs", "create-queue",
        "--queue-name", "orders-queue");
}

@Autowired
private SqsTemplate sqsTemplate;

@Test
void shouldSendAndReceive() {
    sqsTemplate.send("orders-queue", new OrderEvent("123", "CREATED"));

    await().atMost(Duration.ofSeconds(10))
        .untilAsserted(() -> {
            // Assert consumer processed the message
        });
}

}

Node.js: LocalStack + @aws-sdk

import { LocalstackContainer } from "@testcontainers/localstack"; import { SQSClient, CreateQueueCommand, SendMessageCommand, ReceiveMessageCommand } from "@aws-sdk/client-sqs";

it("should send and receive SQS message", async () => { const container = await new LocalstackContainer("localstack/localstack:3.4").start(); const client = new SQSClient({ endpoint: container.getConnectionUri(), region: "us-east-1", credentials: { accessKeyId: "test", secretAccessKey: "test" }, });

const { QueueUrl } = await client.send( new CreateQueueCommand({ QueueName: "test-queue" }));

await client.send(new SendMessageCommand({ QueueUrl, MessageBody: JSON.stringify({ orderId: "123" }), }));

const { Messages } = await client.send( new ReceiveMessageCommand({ QueueUrl, WaitTimeSeconds: 5 }));

expect(Messages).toHaveLength(1); expect(JSON.parse(Messages![0].Body!).orderId).toBe("123"); await container.stop(); });

Python: moto (Mock) or LocalStack

import boto3 from moto import mock_aws

@mock_aws def test_sqs_send_receive(): sqs = boto3.client("sqs", region_name="us-east-1") queue = sqs.create_queue(QueueName="test-queue") queue_url = queue["QueueUrl"]

sqs.send_message(QueueUrl=queue_url, MessageBody='{"orderId": "123"}')

response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=1)
assert len(response["Messages"]) == 1
assert "123" in response["Messages"][0]["Body"]

ActiveMQ (Artemis)

Java: EmbeddedActiveMQ (In-Process)

@SpringBootTest class ActiveMQTest {

private static EmbeddedActiveMQ embeddedActiveMQ;

@BeforeAll
static void startBroker() throws Exception {
    Configuration config = new ConfigurationImpl()
        .setPersistenceEnabled(false)
        .setSecurityEnabled(false)
        .addAcceptorConfiguration("invm", "vm://0");
    embeddedActiveMQ = new EmbeddedActiveMQ().setConfiguration(config);
    embeddedActiveMQ.start();
}

@AfterAll
static void stopBroker() throws Exception {
    embeddedActiveMQ.stop();
}

@Test
void shouldSendAndReceive() {
    // Use JMS or Spring JmsTemplate to send/receive
}

}

Java: Testcontainers

@Container static GenericContainer<?> artemis = new GenericContainer<>("apache/activemq-artemis:2.33.0") .withExposedPorts(61616, 8161) .waitingFor(Wait.forLogMessage(".AMQ241004.", 1));

@DynamicPropertySource static void jmsProperties(DynamicPropertyRegistry registry) { registry.add("spring.artemis.broker-url", () -> "tcp://" + artemis.getHost() + ":" + artemis.getMappedPort(61616)); registry.add("spring.artemis.user", () -> "artemis"); registry.add("spring.artemis.password", () -> "artemis"); }

Azure Service Bus

Emulator Container

@Container static GenericContainer<?> servicebus = new GenericContainer<>("mcr.microsoft.com/azure-messaging/servicebus-emulator:latest") .withExposedPorts(5672) .withEnv("ACCEPT_EULA", "Y") .withEnv("MSSQL_SA_PASSWORD", "StrongPassword1!") .waitingFor(Wait.forListeningPort());

@DynamicPropertySource static void sbProperties(DynamicPropertyRegistry registry) { registry.add("spring.cloud.azure.servicebus.connection-string", () -> "Endpoint=sb://" + servicebus.getHost() + ":" + servicebus.getMappedPort(5672) + ";SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=test"); }

Node.js: Emulator + @azure/service-bus

import { ServiceBusClient } from "@azure/service-bus"; import { GenericContainer } from "testcontainers";

// Start emulator container, then: const client = new ServiceBusClient(connectionString); const sender = client.createSender("test-queue"); await sender.sendMessages({ body: { orderId: "123" } });

const receiver = client.createReceiver("test-queue"); const [message] = await receiver.receiveMessages(1, { maxWaitTimeInMs: 5000 }); expect(message.body.orderId).toBe("123"); await receiver.completeMessage(message);

Google Pub/Sub

Emulator (gcloud)

@Container static GenericContainer<?> pubsub = new GenericContainer<>("gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators") .withExposedPorts(8085) .withCommand("gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085", "--project=test-project") .waitingFor(Wait.forLogMessage(".Server started.", 1));

@DynamicPropertySource static void pubsubProperties(DynamicPropertyRegistry registry) { registry.add("spring.cloud.gcp.pubsub.emulator-host", () -> pubsub.getHost() + ":" + pubsub.getMappedPort(8085)); registry.add("spring.cloud.gcp.project-id", () -> "test-project"); }

Node.js: Emulator + @google-cloud/pubsub

import { PubSub } from "@google-cloud/pubsub"; import { GenericContainer } from "testcontainers";

const container = await new GenericContainer( "gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators") .withExposedPorts(8085) .withCommand("gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085", "--project=test-project") .start();

process.env.PUBSUB_EMULATOR_HOST = ${container.getHost()}:${container.getMappedPort(8085)};

const pubsub = new PubSub({ projectId: "test-project" }); const [topic] = await pubsub.createTopic("test-topic"); const [subscription] = await topic.createSubscription("test-sub");

await topic.publishMessage({ data: Buffer.from(JSON.stringify({ orderId: "123" })) });

const [messages] = await subscription.pull({ maxMessages: 1 }); expect(messages).toHaveLength(1); expect(JSON.parse(messages[0].message.data.toString()).orderId).toBe("123");

Python: Emulator + google-cloud-pubsub

import os from google.cloud import pubsub_v1

def test_pubsub(pubsub_container): os.environ["PUBSUB_EMULATOR_HOST"] = ( f"{pubsub_container.get_container_host_ip()}:" f"{pubsub_container.get_exposed_port(8085)}" ) publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path("test-project", "test-topic") publisher.create_topic(request={"name": topic_path})

future = publisher.publish(topic_path, b'{"orderId": "123"}')
future.result(timeout=5)

subscriber = pubsub_v1.SubscriberClient()
sub_path = subscriber.subscription_path("test-project", "test-sub")
subscriber.create_subscription(request={"name": sub_path, "topic": topic_path})

response = subscriber.pull(request={"subscription": sub_path, "max_messages": 1})
assert len(response.received_messages) == 1

Cross-Broker Best Practices

Do Don't

Use static containers shared across tests Create container per test method

Use @ServiceConnection when available Hardcode connection properties

Use await() or latches for async assertions Use Thread.sleep()

Clean up resources in @AfterAll

Leave connections/containers open

Use emulators for cloud services in CI Connect to real cloud services in tests

Pin specific image versions Use latest tag

Reference Documentation

  • Testcontainers Modules

  • LocalStack

  • Google Pub/Sub Emulator

  • Azure Service Bus Emulator

Cross-reference: For Kafka testing see messaging-testing-kafka . For RabbitMQ testing see messaging-testing-rabbitmq . For generic Testcontainers patterns see testcontainers skill.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

cron-scheduling

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

token-optimization

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

webrtc

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

react-19

No summary provided by upstream source.

Repository SourceNeeds Review