How to use Pulsar connectors
This guide describes how to use Pulsar connectors.
Install a connector#
Pulsar bundles several builtin connectors used to move data in and out of commonly used systems (such as database and messaging system). Optionally, you can create and use your desired non-builtin connectors.
note
When using a non-builtin connector, you need to specify the path of a archive file for the connector.
To set up a builtin connector, follow the instructions here.
After the setup, the builtin connector is automatically discovered by Pulsar brokers (or function-workers), so no additional installation steps are required.
Configure a connector#
You can configure the following information:
Configure a default storage location for a connector#
To configure a default folder for builtin connectors, set the connectorsDirectory parameter in the ./conf/functions_worker.yml configuration file.
Example
Set the ./connectors folder as the default storage location for builtin connectors.
######################### Connectors########################
connectorsDirectory: ./connectorsConfigure a connector with a YAML file#
To configure a connector, you need to provide a YAML configuration file when creating a connector.
The YAML configuration file tells Pulsar where to locate connectors and how to connect connectors with Pulsar topics.
Example 1
Below is a YAML configuration file of a Cassandra sink, which tells Pulsar:
- Which Cassandra cluster to connect
- What is the
keyspaceandcolumnFamilyto be used in Cassandra for collecting data - How to map Pulsar messages into Cassandra table key and columns
tenant: publicnamespace: defaultname: cassandra-test-sink...# cassandra specific configconfigs: roots: "localhost:9042" keyspace: "pulsar_test_keyspace" columnFamily: "pulsar_test_table" keyname: "key" columnName: "col"Example 2
Below is a YAML configuration file of a Kafka source.
configs: bootstrapServers: "pulsar-kafka:9092" groupId: "test-pulsar-io" topic: "my-topic" sessionTimeoutMs: "10000" autoCommitEnabled: "false"Example 3
Below is a YAML configuration file of a PostgreSQL JDBC sink.
configs: userName: "postgres" password: "password" jdbcUrl: "jdbc:postgresql://localhost:5432/test_jdbc" tableName: "test_jdbc"Get available connectors#
Before starting using connectors, you can perform the following operations:
reload#
If you add or delete a nar file in a connector folder, reload the available builtin connector before using it.
Source#
Use the reload subcommand.
$ pulsar-admin sources reloadFor more information, see here.
Sink#
Use the reload subcommand.
$ pulsar-admin sinks reloadFor more information, see here.
available#
After reloading connectors (optional), you can get a list of available connectors.
Source#
Use the available-sources subcommand.
$ pulsar-admin sources available-sourcesSink#
Use the available-sinks subcommand.
$ pulsar-admin sinks available-sinksRun a connector#
To run a connector, you can perform the following operations:
create#
You can create a connector using Admin CLI, REST API or JAVA admin API.f
Source#
Create a source connector.
- Admin CLI
- REST API
- Java Admin API
Send a POST request to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName
Create a source connector with a local file.
void createSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminExceptionParameter
Name Description sourceConfigThe source configuration object Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSource.Create a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
File: file:///dir/fileName.jar
Parameter
Parameter Description sourceConfigThe source configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSourceWithUrl.
Sink#
Create a sink connector.
- Admin CLI
- REST API
- Java Admin API
Send a POST request to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName
Create a sink connector with a local file.
void createSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminExceptionParameter
Name Description sinkConfigThe sink configuration object Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSink.Create a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void createSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
File: file:///dir/fileName.jar
Parameter
Parameter Description sinkConfigThe sink configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
createSinkWithUrl.
start#
You can start a connector using Admin CLI or REST API.
Source#
Start a source connector.
- Admin CLI
- REST API
Start all source connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/startStart a specified source connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/start
Sink#
Start a sink connector.
- Admin CLI
- REST API
Start all sink connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/startStart a specified sink connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/start
localrun#
You can run a connector locally rather than deploying it on a Pulsar cluster using Admin CLI.
Source#
Run a source connector locally.
- Admin CLI
Sink#
Run a sink connector locally.
- Admin CLI
Monitor a connector#
To monitor a connector, you can perform the following operations:
get#
You can get the information of a connector using Admin CLI, REST API or JAVA admin API.
Source#
Get the information of a source connector.
- Admin CLI
- REST API
- Java Admin API
Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName
SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminExceptionExample
This is a sourceConfig.
{ "tenant": "tenantName", "namespace": "namespaceName", "name": "sourceName", "className": "className", "topicName": "topicName", "configs": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}This is a sourceConfig example.
{ "tenant": "public", "namespace": "default", "name": "debezium-mysql-source", "className": "org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource", "topicName": "debezium-mysql-topic", "configs": { "database.user": "debezium", "database.server.id": "184054", "database.server.name": "dbserver1", "database.port": "3306", "database.hostname": "localhost", "database.password": "dbz", "database.history.pulsar.service.url": "pulsar://127.0.0.1:6650", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "database.whitelist": "inventory", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "database.history": "org.apache.pulsar.io.debezium.PulsarDatabaseHistory", "pulsar.service.url": "pulsar://127.0.0.1:6650", "database.history.pulsar.topic": "history-topic2" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }}Exception
| Exception name | Description |
|---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException | Unexpected error |
For more information, see getSource.
Sink#
Get the information of a sink connector.
- Admin CLI
- REST API
- Java Admin API
Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName
SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminExceptionExample
This is a sinkConfig.
{"tenant": "tenantName","namespace": "namespaceName","name": "sinkName","className": "className","inputSpecs": {"topicName": { "isRegexPattern": false}},"configs": {},"parallelism": 1,"processingGuarantees": "ATLEAST_ONCE","retainOrdering": false,"autoAck": true}This is a sinkConfig example.
{ "tenant": "public", "namespace": "default", "name": "pulsar-postgres-jdbc-sink", "className": "org.apache.pulsar.io.jdbc.PostgresJdbcAutoSchemaSink", "inputSpecs": { "pulsar-postgres-jdbc-sink-topic": { "isRegexPattern": false } }, "configs": { "password": "password", "jdbcUrl": "jdbc:postgresql://localhost:5432/pulsar_postgres_jdbc_sink", "userName": "postgres", "tableName": "pulsar_postgres_jdbc_sink" }, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "autoAck": true}Parameter description
| Name | Description |
|---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
For more information, see getSink.
list#
You can get the list of all running connectors using Admin CLI, REST API or JAVA admin API.
Source#
Get the list of all running source connectors.
- Admin CLI
- REST API
- Java Admin API
Send a GET request to this endpoint: GET /admin/v3/sources/:tenant/:namespace/
List<String> listSources(String tenant, String namespace) throws PulsarAdminExceptionResponse example
["f1", "f2", "f3"]Exception
| Exception name | Description |
|---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException | Unexpected error |
For more information, see listSource.
Sink#
Get the list of all running sink connectors.
- Admin CLI
- REST API
- Java Admin API
Send a GET request to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/
List<String> listSinks(String tenant, String namespace) throws PulsarAdminExceptionResponse example
["f1", "f2", "f3"]Exception
| Exception name | Description |
|---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException | Unexpected error |
For more information, see listSource.
status#
You can get the current status of a connector using Admin CLI, REST API or JAVA admin API.
Source#
Get the current status of a source connector.
- Admin CLI
- REST API
- Java Admin API
Get the current status of all source connectors.
Send a
GETrequest to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/statusGets the current status of a specified source connector.
Send a
GETrequest to this endpoint: GET /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/status
Get the current status of all source connectors.
SourceStatus getSourceStatus(String tenant, String namespace, String source) throws PulsarAdminExceptionParameter
Parameter Description tenantTenant name namespaceNamespace name sinkSource name Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
getSourceStatus.Gets the current status of a specified source connector.
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceStatus(String tenant, String namespace, String source, int id) throws PulsarAdminExceptionParameter
Parameter Description tenantTenant name namespaceNamespace name sinkSource name idSource instanceID Exception
Exception name Description PulsarAdminExceptionUnexpected error For more information, see
getSourceStatus.
Sink#
Get the current status of a Pulsar sink connector.
- Admin CLI
- REST API
- Java Admin API
Get the current status of all sink connectors.
Send a
GETrequest to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sinkName/statusGets the current status of a specified sink connector.
Send a
GETrequest to this endpoint: GET /admin/v3/sinks/:tenant/:namespace/:sourceName/:instanceId/status
Get the current status of all sink connectors.
SinkStatus getSinkStatus(String tenant, String namespace, String sink) throws PulsarAdminExceptionParameter
Parameter Description tenantTenant name namespaceNamespace name sinkSource name Exception
Exception name Description PulsarAdminExceptionUnexpected error For more information, see
getSinkStatus.Gets the current status of a specified source connector.
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkStatus(String tenant, String namespace, String sink, int id) throws PulsarAdminExceptionParameter
Parameter Description tenantTenant name namespaceNamespace name sinkSource name idSink instanceID Exception
Exception name Description PulsarAdminExceptionUnexpected error For more information, see
getSinkStatusWithInstanceID.
Update a connector#
update#
You can update a running connector using Admin CLI, REST API or JAVA admin API.
Source#
Update a running Pulsar source connector.
- Admin CLI
- REST API
- Java Admin API
Send a PUT request to this endpoint: PUT /admin/v3/sources/:tenant/:namespace/:sourceName
Update a running source connector with a local file.
void updateSource(SourceConfig sourceConfig, String fileName) throws PulsarAdminExceptionParameter
Name Description sourceConfigThe source configuration object Exception
Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error For more information, see
updateSource.Update a source connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSourceWithUrl(SourceConfig sourceConfig, String pkgUrl) throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
File: file:///dir/fileName.jar
Parameter
Name Description sourceConfigThe source configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error
For more information, see createSourceWithUrl.
Sink#
Update a running Pulsar sink connector.
- Admin CLI
- REST API
- Java Admin API
Send a PUT request to this endpoint: PUT /admin/v3/sinks/:tenant/:namespace/:sinkName
Update a running sink connector with a local file.
void updateSink(SinkConfig sinkConfig, String fileName) throws PulsarAdminExceptionParameter
Name Description sinkConfigThe sink configuration object Exception
Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error For more information, see
updateSink.Update a sink connector using a remote file with a URL from which fun-pkg can be downloaded.
void updateSinkWithUrl(SinkConfig sinkConfig, String pkgUrl) throws PulsarAdminExceptionSupported URLs are
httpandfile.Example
File: file:///dir/fileName.jar
Parameter
Name Description sinkConfigThe sink configuration object pkgUrlURL from which pkg can be downloaded Exception
Name Description PulsarAdminException.NotAuthorizedExceptionYou don't have the admin permission PulsarAdminException.NotFoundExceptionCluster doesn't exist PulsarAdminExceptionUnexpected error
For more information, see updateSinkWithUrl.
Stop a connector#
stop#
You can stop a connector using Admin CLI, REST API or JAVA admin API.
Source#
Stop a source connector.
- Admin CLI
- REST API
- Java Admin API
Stop all source connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceNameStop a specified source connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId
Stop all source connectors.
void stopSource(String tenant, String namespace, String source) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
stopSource.Stop a specified source connector.
void stopSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSource instanceID Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
stopSource.
Sink#
Stop a sink connector.
- Admin CLI
- REST API
- Java Admin API
Stop all sink connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sinks/:tenant/:namespace/:sinkName/stopStop a specified sink connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkeName/:instanceId/stop
Stop all sink connectors.
void stopSink(String tenant, String namespace, String sink) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
stopSink.Stop a specified sink connector.
void stopSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSource instanceID Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
stopSink.
Restart a connector#
restart#
You can restart a connector using Admin CLI, REST API or JAVA admin API.
Source#
Restart a source connector.
- Admin CLI
- REST API
- Java Admin API
Restart all source connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/restartRestart a specified source connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sourceName/:instanceId/restart
Restart all source connectors.
void restartSource(String tenant, String namespace, String source) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
restartSource.Restart a specified source connector.
void restartSource(String tenant, String namespace, String source, int instanceId) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSource instanceID Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
restartSource.
Sink#
Restart a sink connector.
- Admin CLI
- REST API
- Java Admin API
Restart all sink connectors.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/restartRestart a specified sink connector.
Send a
POSTrequest to this endpoint: POST /admin/v3/sources/:tenant/:namespace/:sinkName/:instanceId/restart
Restart all Pulsar sink connectors.
void restartSink(String tenant, String namespace, String sink) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sinkSink name Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
restartSink.Restart a specified sink connector.
void restartSink(String tenant, String namespace, String sink, int instanceId) throws PulsarAdminExceptionParameter
Name Description tenantTenant name namespaceNamespace name sourceSource name instanceIdSink instanceID Exception
Name Description PulsarAdminExceptionUnexpected error For more information, see
restartSink.
Delete a connector#
delete#
You can delete a connector using Admin CLI, REST API or JAVA admin API.
Source#
Delete a source connector.
- Admin CLI
- REST API
- Java Admin API
Delete al Pulsar source connector.
Send a DELETE request to this endpoint: DELETE /admin/v3/sources/:tenant/:namespace/:sourceName
Delete a source connector.
void deleteSource(String tenant, String namespace, String source) throws PulsarAdminExceptionParameter
| Name | Description |
|---|---|
tenant | Tenant name |
namespace | Namespace name |
source | Source name |
Exception
| Name | Description |
|---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException.PreconditionFailedException | Cluster is not empty |
PulsarAdminException | Unexpected error |
For more information, see deleteSource.
Sink#
Delete a sink connector.
- Admin CLI
- REST API
- Java Admin API
Delete a sink connector.
Send a DELETE request to this endpoint: DELETE /admin/v3/sinks/:tenant/:namespace/:sinkName
Delete a Pulsar sink connector.
void deleteSink(String tenant, String namespace, String source) throws PulsarAdminExceptionParameter
| Name | Description |
|---|---|
tenant | Tenant name |
namespace | Namespace name |
sink | Sink name |
Exception
| Name | Description |
|---|---|
PulsarAdminException.NotAuthorizedException | You don't have the admin permission |
PulsarAdminException.NotFoundException | Cluster doesn't exist |
PulsarAdminException.PreconditionFailedException | Cluster is not empty |
PulsarAdminException | Unexpected error |
For more information, see deleteSource.