ElasticSearch sink connector
The ElasticSearch sink connector pulls messages from Pulsar topics and persists the messages to indexes.
#
ConfigurationThe configuration of the ElasticSearch sink connector has the following properties.
#
PropertyName | Type | Required | Default | Description |
---|---|---|---|---|
elasticSearchUrl | String | true | " " (empty string) | The URL of elastic search cluster to which the connector connects. |
indexName | String | true | " " (empty string) | The index name to which the connector writes messages. |
typeName | String | false | "_doc" | The type name to which the connector writes messages to. The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise. |
indexNumberOfShards | int | false | 1 | The number of shards of the index. |
indexNumberOfReplicas | int | false | 1 | The number of replicas of the index. |
username | String | false | " " (empty string) | The username used by the connector to connect to the elastic search cluster. If username is set, then password should also be provided. |
password | String | false | " " (empty string) | The password used by the connector to connect to the elastic search cluster. If username is set, then password should also be provided. |
#
ExampleBefore using the ElasticSearch sink connector, you need to create a configuration file through one of the following methods.
#
Configuration#
For Elasticsearch After 6.2JSON
{ "elasticSearchUrl": "http://localhost:9200", "indexName": "my_index", "username": "scooby", "password": "doobie"}
YAML
configs: elasticSearchUrl: "http://localhost:9200" indexName: "my_index" username: "scooby" password: "doobie"
#
For Elasticsearch Before 6.2JSON
{ "elasticSearchUrl": "http://localhost:9200", "indexName": "my_index", "typeName": "doc", "username": "scooby", "password": "doobie"}
YAML
configs: elasticSearchUrl: "http://localhost:9200" indexName: "my_index" typeName: "doc" username: "scooby" password: "doobie"
#
UsageStart a single node Elasticsearch cluster.
$ docker run -p 9200:9200 -p 9300:9300 \ -e "discovery.type=single-node" \ docker.elastic.co/elasticsearch/elasticsearch:7.5.1
Start a Pulsar service locally in standalone mode.
$ bin/pulsar standalone
Make sure the NAR file is available at
connectors/pulsar-io-elastic-search-{{pulsar:version}}.nar
.Start the Pulsar Elasticsearch connector in local run mode using one of the following methods.
Use the JSON configuration as shown previously.
$ bin/pulsar-admin sinks localrun \ --archive connectors/pulsar-io-elastic-search-{{pulsar:version}}.nar \ --tenant public \ --namespace default \ --name elasticsearch-test-sink \ --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "scooby","password": "doobie"}' \ --inputs elasticsearch_test
Use the YAML configuration file as shown previously.
$ bin/pulsar-admin sinks localrun \ --archive connectors/pulsar-io-elastic-search-{{pulsar:version}}.nar \ --tenant public \ --namespace default \ --name elasticsearch-test-sink \ --sink-config-file elasticsearch-sink.yml \ --inputs elasticsearch_test
Publish records to the topic.
$ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
Check documents in Elasticsearch.
- refresh the index
$ curl -s http://localhost:9200/my_index/_refresh
- search documentsYou can see the record that published earlier has been successfully written into Elasticsearch.
$ curl -s http://localhost:9200/my_index/_search
{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}
- refresh the index