RabbitMQ source connector
The RabbitMQ source connector receives messages from RabbitMQ clusters and writes messages to Pulsar topics.
Configuration#
The configuration of the RabbitMQ source connector has the following properties.
Property#
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| connectionName | String | true | " " (empty string) | The connection name. | 
| host | String | true | " " (empty string) | The RabbitMQ host. | 
| port | int | true | 5672 | The RabbitMQ port. | 
| virtualHost | String | true | / | The virtual host used to connect to RabbitMQ. | 
| username | String | false | guest | The username used to authenticate to RabbitMQ. | 
| password | String | false | guest | The password used to authenticate to RabbitMQ. | 
| queueName | String | true | " " (empty string) | The RabbitMQ queue name that messages should be read from or written to. | 
| requestedChannelMax | int | false | 0 | The initially requested maximum channel number. 0 means unlimited. | 
| requestedFrameMax | int | false | 0 | The initially requested maximum frame size in octets. 0 means unlimited. | 
| connectionTimeout | int | false | 60000 | The timeout of TCP connection establishment in milliseconds. 0 means infinite. | 
| handshakeTimeout | int | false | 10000 | The timeout of AMQP0-9-1 protocol handshake in milliseconds. | 
| requestedHeartbeat | int | false | 60 | The requested heartbeat timeout in seconds. | 
| prefetchCount | int | false | 0 | The maximum number of messages that the server delivers. 0 means unlimited. | 
| prefetchGlobal | boolean | false | false | Whether the setting should be applied to the entire channel rather than each consumer. | 
| passive | boolean | false | false | Whether the rabbitmq consumer should create its own queue or bind to an existing one. | 
Example#
Before using the RabbitMQ source connector, you need to create a configuration file through one of the following methods.
- JSON - { "host": "localhost", "port": "5672", "virtualHost": "/", "username": "guest", "password": "guest", "queueName": "test-queue", "connectionName": "test-connection", "requestedChannelMax": "0", "requestedFrameMax": "0", "connectionTimeout": "60000", "handshakeTimeout": "10000", "requestedHeartbeat": "60", "prefetchCount": "0", "prefetchGlobal": "false", "passive": "false"}
- YAML - configs: host: "localhost" port: 5672 virtualHost: "/" username: "guest" password: "guest" queueName: "test-queue" connectionName: "test-connection" requestedChannelMax: 0 requestedFrameMax: 0 connectionTimeout: 60000 handshakeTimeout: 10000 requestedHeartbeat: 60 prefetchCount: 0 prefetchGlobal: "false" passive: "false"