A CLI application for subscribing to Apache Kafka topics and triggering Ylem pipelines using messages as input in real-time streaming mode.
Kafka-trigger listens to Kafka topics and calls the pipeline run endpoint for each message, passing the message body as pipeline input.
In the pipeline, the input can be received by the task "External trigger". When the pipeline is run, this task gets the input data and passes it further to the next pipeline task.
Kafka-trigger is configured with environment variables.
Besides the conventional way, the config variables can also be specified in the .env
or .env.local
file.
Main variables:
- YLEM_KT_API_URL - Ylem's API URL. If you use the cloud version of Ylem, it is https://api.ylem.co, othervise add URL of your custom Ylem API instance here. If you run the open-source version of Ylem in a docker container, it is http://host.docker.internal:7339.
- YLEM_KT_API_CLIENT_ID — OAuth client ID.
- YLEM_KT_API_CLIENT_SECRET — OAuth client secret.
- YLEM_KT_KAFKA_VERSION=3.1.0 — version of the Kafka server.
- YLEM_KT_KAFKA_BOOTSTRAP_SERVERS="127.0.0.1:9092" — a comma-separated list of Kafka bootstrap servers.
- YLEM_KT_KAFKA_TOPIC_MAPPING="topic_1:pipeline_uuid_1,topic_1:pipeline_uuid_2,topic_2:pipeline_uuid_2" — topic-to-pipeline mapping, a comma-separated list of
<topic name>:<pipeline uuid>
pairs.
Other supported env variables are in the .env.dist file
More information about how to obtain your OAuth credentials is in our documentation
Let's call it test_kafka_trigger
Let's create a simple pipeline that contains only two tasks: External_trigger and Aggregator.

An Aggregator will just send what it receives as input from Kafka to the output.

Create the new .env
and copy the content from .env.dist
to it and then customize values in the list.
Let's assume, you run this application in a Docker container, with your Kafka set up on the hostmachine port 9092 and you want to trigger pipelines in the Cloud version of Datamin. In Kafka you created a topic called test_kafka_trigger
and you want to subscribe to the new messages there and trigger the pipeline UUID 8feaae75-7234-4f2f-9e4e-0b491e4a4331
The .env variables will look like this:
YLEM_KT_API_CLIENT_ID=%%REPLACE_IT_WITH_YOUR_CLIENT_ID%%
YLEM_KT_API_CLIENT_SECRET=%%REPLACE_IT_WITH_YOUR_CLIENT_SECRET%%
YLEM_KT_API_URL=https://api.ylem.co
YLEM_KT_KAFKA_BOOTSTRAP_SERVERS="host.docker.internal:9092"
YLEM_KT_KAFKA_TOPIC_MAPPING="test_kafka_trigger:8feaae75-7234-4f2f-9e4e-0b491e4a4331"
docker compose up --build
If everything goes well, the service will be able to subscribe to your newly created topic and in the CLI output it will print something like that:

If you now produce a simple new message to the topic:

You will see in the CLI that Kafka trigger successfully consumed it and triggered the pipeline:

And the pipeline logs also contain a new item stating that pipeline was successfully executed and each of two tasks returned a correct output:

