Manage Functions
Pulsar Functions are lightweight compute processes that
- consume messages from one or more Pulsar topics
- apply a user-supplied processing logic to each message
- publish the results of the computation to another topic
Functions can be managed via the following methods.
Method | Description |
---|---|
Admin CLI | The functions command of the pulsar-admin tool. |
REST API | The /admin/v3/functions endpoint of the admin REST API. |
Java Admin API | The functions method of the PulsarAdmin object in the Java API. |
#
Function resourcesYou can perform the following operations on functions.
#
Create a functionYou can create a Pulsar function in cluster mode (deploy it on a Pulsar cluster) using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the create
subcommand.
Example
$ pulsar-admin functions create \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --inputs test-input-topic \ --output persistent://public/default/test-output-topic \ --classname org.apache.pulsar.functions.api.examples.ExclamationFunction \ --jar /examples/api-examples.jar
FunctionConfig functionConfig = new FunctionConfig();functionConfig.setTenant(tenant);functionConfig.setNamespace(namespace);functionConfig.setName(functionName);functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);functionConfig.setParallelism(1);functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);functionConfig.setTopicsPattern(sourceTopicPattern);functionConfig.setSubName(subscriptionName);functionConfig.setAutoAck(true);functionConfig.setOutput(sinkTopic);admin.functions().createFunction(functionConfig, fileName);
#
Update a functionYou can update a Pulsar function that has been deployed to a Pulsar cluster using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST Admin API
- Java Admin API
Use the update
subcommand.
Example
$ pulsar-admin functions update \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --output persistent://public/default/update-output-topic \ # other options
FunctionConfig functionConfig = new FunctionConfig();functionConfig.setTenant(tenant);functionConfig.setNamespace(namespace);functionConfig.setName(functionName);functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);functionConfig.setParallelism(1);functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");UpdateOptions updateOptions = new UpdateOptions();updateOptions.setUpdateAuthData(updateAuthData);admin.functions().updateFunction(functionConfig, userCodeFile, updateOptions);
#
Start an instance of a functionYou can start a stopped function instance with instance-id
using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the start
subcommand.
$ pulsar-admin functions start \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --instance-id 1
admin.functions().startFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
#
Start all instances of a functionYou can start all stopped function instances using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java
Use the start
subcommand.
Example
$ pulsar-admin functions start \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \
admin.functions().startFunction(tenant, namespace, functionName);
#
Stop an instance of a functionYou can stop a function instance with instance-id
using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the stop
subcommand.
Example
$ pulsar-admin functions stop \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --instance-id 1
admin.functions().stopFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
#
Stop all instances of a functionYou can stop all function instances using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the stop
subcommand.
Example
$ pulsar-admin functions stop \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \
admin.functions().stopFunction(tenant, namespace, functionName);
#
Restart an instance of a functionRestart a function instance with instance-id
using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the restart
subcommand.
Example
$ pulsar-admin functions restart \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --instance-id 1
admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
#
Restart all instances of a functionYou can restart all function instances using Admin CLI, REST API or Java admin API.
- Admin CLI
- REST API
- Java Admin API
Use the restart
subcommand.
Example
$ pulsar-admin functions restart \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \
admin.functions().restartFunction(tenant, namespace, functionName);
#
List all functionsYou can list all Pulsar functions running under a specific tenant and namespace using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the list
subcommand.
Example
$ pulsar-admin functions list \ --tenant public \ --namespace default
admin.functions().getFunctions(tenant, namespace);
#
Delete a functionYou can delete a Pulsar function that is running on a Pulsar cluster using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the delete
subcommand.
Example
$ pulsar-admin functions delete \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions)
admin.functions().deleteFunction(tenant, namespace, functionName);
#
Get info about a functionYou can get information about a Pulsar function currently running in cluster mode using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the get
subcommand.
Example
$ pulsar-admin functions get \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions)
admin.functions().getFunction(tenant, namespace, functionName);
#
Get status of an instance of a functionYou can get the current status of a Pulsar function instance with instance-id
using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the status
subcommand.
Example
$ pulsar-admin functions status \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --instance-id 1
admin.functions().getFunctionStatus(tenant, namespace, functionName, Integer.parseInt(instanceId));
#
Get status of all instances of a functionYou can get the current status of a Pulsar function instance using Admin CLI, REST API or Java Admin API.
- Admin CLI
- REST API
- Java Admin API
Use the status
subcommand.
Example
$ pulsar-admin functions status \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions)
admin.functions().getFunctionStatus(tenant, namespace, functionName);
#
Get stats of an instance of a functionYou can get the current stats of a Pulsar Function instance with instance-id
using Admin CLI, REST API or Java admin API.
- Admin CLI
- REST API
- Java Admin API
Use the stats
subcommand.
Example
$ pulsar-admin functions stats \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --instance-id 1
admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId));
#
Get stats of all instances of a functionYou can get the current stats of a Pulsar function using Admin CLI, REST API or Java admin API.
- Admin CLI
- REST API
- Java Admin API
Use the stats
subcommand.
Example
$ pulsar-admin functions stats \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions)
admin.functions().getFunctionStats(tenant, namespace, functionName);
#
Trigger a functionYou can trigger a specified Pulsar function with a supplied value using Admin CLI, REST API or Java admin API.
- Admin CLI
- REST API
- Java Admin API
Use the trigger
subcommand.
Example
$ pulsar-admin functions trigger \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --topic (the name of input topic) \ --trigger-value \"hello pulsar\" # or --trigger-file (the path of trigger file)
admin.functions().triggerFunction(tenant, namespace, functionName, topic, triggerValue, triggerFile);
#
Put state associated with a functionYou can put the state associated with a Pulsar function using Admin CLI, REST API or Java admin API.
- Admin CLI
- REST API
- Java Admin API
Use the putstate
subcommand.
Example
$ pulsar-admin functions putstate \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --state "{\"key\":\"pulsar\", \"stringValue\":\"hello pulsar\"}"
POST /admin/v3/functions/:tenant/:namespace/:functionName/state/:key?version=@pulsar:version_number@
TypeReference<FunctionState> typeRef = new TypeReference<FunctionState>() {};FunctionState stateRepr = ObjectMapperFactory.getThreadLocal().readValue(state, typeRef);admin.functions().putFunctionState(tenant, namespace, functionName, stateRepr);
#
Fetch state associated with a functionYou can fetch the current state associated with a Pulsar function using Admin CLI, REST API or Java admin API.
- Admin CLI
- REST API
- Java Admin CLI
Use the querystate
subcommand.
Example
$ pulsar-admin functions querystate \ --tenant public \ --namespace default \ --name (the name of Pulsar Functions) \ --key (the key of state)
admin.functions().getFunctionState(tenant, namespace, functionName, key);