kafka-stream-obdii-demo.mp4
- ELM327
- AndrOBD with MQTT Publisher plugin
- Confluent cloud account
- HiveMQ cloud account
- Docker compose
- Download and set up AndrOBD
- Download and set up AndrOBD plugin
MqttPublisher
. (Use Fdroid to search for MqttPublisher)
Create free account on HiveMQ cloud, you get 10GB free without any constraint. Once you create account, create MQTT broker and use credentials to populate MqttPublisher configuration.
Generate some data (turn on your vehicle :D), and confirm that data is being streamed to your mqtt broker.
Once you confirm data is being streamed to your mqtt broker, proceed with creating account on Confluent cloud. You have 400$ free credits without any constraints.
Add and configure your source connector for HiveMQTT
broker.
In order to create topic per each metric, I used SMT's ExtractTopic
and TopicRegexRouter
to create topic for each metric.
Data comming from HiveMQTT broker follow this message schema:
key: "AndrOBD/engine_load_calculated",
header: [
{
"key": "mqtt.message.id",
"value": "0"
},
{
"key": "mqtt.qos",
"value": "0"
},
{
"key": "mqtt.retained",
"value": "false"
},
{
"key": "mqtt.duplicate",
"value": "false"
}
]
value: 34.117645
JDBC Sink connector need schema in order to work, so we need to do some transformations on data. And while we're doing it we may as well add some ingest timestamp here. I used ksqldb streams to enrich and transform data to avro schema and to add timestamp to our sink topic.
Ksql for creating source stream
CREATE STREAM `AndrOBD_engine_speed` (`value` DOUBLE, `key` STRING KEY)
WITH (CLEANUP_POLICY='delete',
KAFKA_TOPIC='AndrOBD_engine_speed',
KEY_FORMAT='KAFKA',
VALUE_FORMAT='JSON',
WRAP_SINGLE_VALUE=false);
With above we get source stream
with following message:
{
"key": "AndrOBD/engine_speed",
"value": 1494
}
Ksql for transformation and creating sink topic
CREATE STREAM `AndrOBD_enriched_engine_speed` WITH (VALUE_FORMAT='AVRO') AS
SELECT `AndrOBD_engine_speed`.`key` `key`,
`AndrOBD_engine_speed`.`value` `value`,
FORMAT_TIMESTAMP(FROM_UNIXTIME(`AndrOBD_engine_speed`.ROWTIME), 'yyyy-MM-dd HH:mm:ss.SSS') `kafka_ingest_timestamp`
FROM `AndrOBD_engine_speed` `AndrOBD_engine_speed`
PARTITION BY `AndrOBD_engine_speed`.`key` EMIT CHANGES;
With above ksql we get sink stream/topic
with following message and schema:
{
"key": "AndrOBD/engine_speed",
"value": 1494,
"kafka_ingest_timestamp": "2023-10-27 19:00:53.375"
}
I used confluent cloud all in one docker compose file to set up connect instance locally. Configured JDBC Sink connector
{
"name": "PostgresJdbcSinkConnectorConnector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"errors.tolerance": "none",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics.regex": "^pksqlc-wdzzwAndrOBD_enriched_.*",
"connection.url": "jdbc:postgresql://postgres/postgres",
"connection.user": "<POSTGRES_USER>",
"connection.password": "<POSTGRES_PASSWORD>",
"dialect.name": "PostgreSqlDatabaseDialect",
"pk.mode": "kafka",
"auto.create": "true",
"value.converter.schema.registry.basic.auth.user.info": "<SCHEMA_API_KEY>:<SCHEMA_SECRET_KEY>",
"value.converter.basic.auth.credentials.source": "USER_INFO",
"value.converter.schema.registry.url": "<SCHEMA_REGISTRY_URL>"
}
}
The API key for Confluent Cloud Schema Registry is distinct from the API key you created for Kafka clusters in the same environment, per the Quick Start for Confluent Cloud. Follow this guide to obtain correct API keys for schema registry.
Follow through guide for configuring and connecting to postgres. For creating it's first dataset, chart and dashboard one can follow this guide.
For simple dashboard I created 4 charts:
- engine speed vs vehicle speed (mixed timeseries chart)
- mass airflow vs vehicle speed (mixed timeseries chart)
- latest data (radar chart)
- current engine speed (big number with trendline)
Use druid or Spark
TBD
- Anomaly detection
- Detecting bad data
Trends