Skip to main content
Version: 2.7.3

Pulsar adaptor for Apache Storm

Pulsar Storm is an adaptor for integrating with Apache Storm topologies. It provides core Storm implementations for sending and receiving data.

An application can inject data into a Storm topology via a generic Pulsar spout, as well as consume data from a Storm topology via a generic Pulsar bolt.

Using the Pulsar Storm Adaptor#

Include dependency for Pulsar Storm Adaptor:


<dependency>  <groupId>org.apache.pulsar</groupId>  <artifactId>pulsar-storm</artifactId>  <version>${pulsar.version}</version></dependency>

Pulsar Spout#

The Pulsar Spout allows for the data published on a topic to be consumed by a Storm topology. It emits a Storm tuple based on the message received and the MessageToValuesMapper provided by the client.

The tuples that fail to be processed by the downstream bolts will be re-injected by the spout with an exponential backoff, within a configurable timeout (the default is 60 seconds) or a configurable number of retries, whichever comes first, after which it is acknowledged by the consumer. Here's an example construction of a spout:


MessageToValuesMapper messageToValuesMapper = new MessageToValuesMapper() {
    @Override    public Values toValues(Message msg) {        return new Values(new String(msg.getData()));    }
    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        // declare the output fields        declarer.declare(new Fields("string"));    }};
// Configure a Pulsar SpoutPulsarSpoutConfiguration spoutConf = new PulsarSpoutConfiguration();spoutConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");spoutConf.setTopic("persistent://my-property/usw/my-ns/my-topic1");spoutConf.setSubscriptionName("my-subscriber-name1");spoutConf.setMessageToValuesMapper(messageToValuesMapper);
// Create a Pulsar SpoutPulsarSpout spout = new PulsarSpout(spoutConf);

For a complete example, click here.

Pulsar Bolt#

The Pulsar bolt allows data in a Storm topology to be published on a topic. It publishes messages based on the Storm tuple received and the TupleToMessageMapper provided by the client.

A partitioned topic can also be used to publish messages on different topics. In the implementation of the TupleToMessageMapper, a "key" will need to be provided in the message which will send the messages with the same key to the same topic. Here's an example bolt:


TupleToMessageMapper tupleToMessageMapper = new TupleToMessageMapper() {
    @Override    public TypedMessageBuilder<byte[]> toMessage(TypedMessageBuilder<byte[]> msgBuilder, Tuple tuple) {        String receivedMessage = tuple.getString(0);        // message processing        String processedMsg = receivedMessage + "-processed";        return msgBuilder.value(processedMsg.getBytes());    }
    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        // declare the output fields    }};
// Configure a Pulsar BoltPulsarBoltConfiguration boltConf = new PulsarBoltConfiguration();boltConf.setServiceUrl("pulsar://broker.messaging.usw.example.com:6650");boltConf.setTopic("persistent://my-property/usw/my-ns/my-topic2");boltConf.setTupleToMessageMapper(tupleToMessageMapper);
// Create a Pulsar BoltPulsarBolt bolt = new PulsarBolt(boltConf);