File source connector
The File source connector pulls messages from files in directories and persists the messages to Pulsar topics.
#
ConfigurationThe configuration of the File source connector has the following properties.
#
PropertyName | Type | Required | Default | Description |
---|---|---|---|---|
inputDirectory | String | true | No default value | The input directory to pull files. |
recurse | Boolean | false | true | Whether to pull files from subdirectory or not. |
keepFile | Boolean | false | false | If set to true, the file is not deleted after it is processed, which means the file can be picked up continually. |
fileFilter | String | false | \\..* | The file whose name matches the given regular expression is picked up. |
pathFilter | String | false | NULL | If recurse is set to true, the subdirectory whose path matches the given regular expression is scanned. |
minimumFileAge | Integer | false | 0 | The minimum age that a file can be processed. Any file younger than minimumFileAge (according to the last modification date) is ignored. |
maximumFileAge | Long | false | Long.MAX_VALUE | The maximum age that a file can be processed. Any file older than maximumFileAge (according to last modification date) is ignored. |
minimumSize | Integer | false | 1 | The minimum size (in bytes) that a file can be processed. |
maximumSize | Double | false | Double.MAX_VALUE | The maximum size (in bytes) that a file can be processed. |
ignoreHiddenFiles | Boolean | false | true | Whether the hidden files should be ignored or not. |
pollingInterval | Long | false | 10000L | Indicates how long to wait before performing a directory listing. |
numWorkers | Integer | false | 1 | The number of worker threads that process files. This allows you to process a larger number of files concurrently. However, setting this to a value greater than 1 makes the data from multiple files mixed in the target topic. |
#
ExampleBefore using the File source connector, you need to create a configuration file through one of the following methods.
JSON
{ "inputDirectory": "/Users/david", "recurse": true, "keepFile": true, "fileFilter": "[^\\.].*", "pathFilter": "*", "minimumFileAge": 0, "maximumFileAge": 9999999999, "minimumSize": 1, "maximumSize": 5000000, "ignoreHiddenFiles": true, "pollingInterval": 5000, "numWorkers": 1}
YAML
configs: inputDirectory: "/Users/david" recurse: true keepFile: true fileFilter: "[^\\.].*" pathFilter: "*" minimumFileAge: 0 maximumFileAge: 9999999999 minimumSize: 1 maximumSize: 5000000 ignoreHiddenFiles: true pollingInterval: 5000 numWorkers: 1
#
UsageHere is an example of using the File source connecter.
Pull a Pulsar image.
$ docker pull apachepulsar/pulsar:{version}
Start Pulsar standalone.
$ docker run -d -it -p 6650:6650 -p 8080:8080 -v $PWD/data:/pulsar/data --name pulsar-standalone apachepulsar/pulsar:{version} bin/pulsar standalone
Create a configuration file file-connector.yaml.
configs: inputDirectory: "/opt"
Copy the configuration file file-connector.yaml to the container.
$ docker cp connectors/file-connector.yaml pulsar-standalone:/pulsar/
Download the File source connector.
$ curl -O https://mirrors.tuna.tsinghua.edu.cn/apache/pulsar/pulsar-{version}/connectors/pulsar-io-file-{version}.nar
Start the File source connector.
$ docker exec -it pulsar-standalone /bin/bash $ ./bin/pulsar-admin sources localrun \--archive /pulsar/pulsar-io-file-{version}.nar \--name file-test \--destination-topic-name pulsar-file-test \--source-config-file /pulsar/file-connector.yaml
Start a consumer.
./bin/pulsar-client consume -s file-test -n 0 pulsar-file-test
Write the message to the file test.txt.
echo "hello world!" > /opt/test.txt
The following information appears on the consumer terminal window.
----- got message -----hello world!