- class NatsContainer(image: str = 'nats:latest', client_port: int = 4222, management_port: int = 8222, expected_ready_log: str = 'Server is ready', ready_timeout_secs: int = 120, **kwargs)ΒΆ
Nats container.
Example
>>> import asyncio >>> from nats import connect as nats_connect >>> from testcontainers.nats import NatsContainer >>> async def test_doctest_usage(): ... with NatsContainer() as nats_container: ... client = await nats_connect(nats_container.nats_uri()) ... sub_tc = await client.subscribe("tc") ... await client.publish("tc", b"Test-Containers") ... next_message = await sub_tc.next_msg(timeout=5.0) ... await client.close() ... return next_message.data >>> asyncio.run(test_doctest_usage()) b'Test-Containers'