class PubSubContainer(image: str = 'google/cloud-sdk:emulators', project: str = 'test-project', port: int = 8432, **kwargs)ΒΆ

PubSub container for testing managed message queues.


The example will spin up a Google Cloud PubSub emulator that you can use for integration tests. The pubsub instance provides convenience methods get_publisher and get_subscriber to connect to the emulator without having to set the environment variable PUBSUB_EMULATOR_HOST.

>>> from import PubSubContainer

>>> config = PubSubContainer()
>>> with config as pubsub:
...    publisher = pubsub.get_publisher_client()
...    topic_path = publisher.topic_path(pubsub.project, "my-topic")
...    topic = publisher.create_topic(name=topic_path)