Skip to main content
Version: 2.7.3

Kinesis sink connector

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration#

The configuration of the Kinesis sink connector has the following property.

Property#

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

  • ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

  • FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

  • FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.
  • retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
    awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
    awsRegionStringfalse" " (empty string)The AWS region.

    Example
    us-west-1, us-west-2
    awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
    awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin.

    It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

    If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
    awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

    Built-in plugins#

    The following are built-in AwsCredentialProviderPlugin plugins:

    • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

      This plugin takes no configuration, it uses the default AWS provider chain.

      For more information, see AWS documentation.

    • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

      This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

      This configuration takes the form of a small json document like:

      {"roleArn": "arn...", "roleSessionName": "name"}

    Example#

    Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

    • JSON

      {    "awsEndpoint": "some.endpoint.aws",    "awsRegion": "us-east-1",    "awsKinesisStreamName": "my-stream",    "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",    "messageFormat": "ONLY_RAW_PAYLOAD",    "retainOrdering": "true"}
    • YAML

      configs:    awsEndpoint: "some.endpoint.aws"    awsRegion: "us-east-1"    awsKinesisStreamName: "my-stream"    awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"    messageFormat: "ONLY_RAW_PAYLOAD"    retainOrdering: "true"