Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WILDCARDs and TLS configuration added #61

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ The MQTT data source has the following requirements:
| Field | Description |
| ----- | -------------------------------------------------- |
| Name | A name for this particular MQTT data source |
| URI | The scehme, host, and port of the MQTT Broker. Supported schemes: TCP (tcp://), TLS (tls://), and WebSocket (ws://) |
| URI | The scheme, host, and port of the MQTT Broker. Supported schemes: TCP (tcp://), TLS (tls://), and WebSocket (ws://) |

#### Authentication fields

Expand All @@ -29,6 +29,21 @@ The MQTT data source has the following requirements:
| Username | (Optional) The username to use when connecting to the MQTT broker |
| Password | (Optional) The password to use when connecting to the MQTT broker |

#### TLS fields

TLS are used by IOT brokers such as AWS IOT Core. In order to connect, use the tls connection scheme.

| Field | Description |
| -------- | ----------------------------------------------------------------- |
| Root certificate | (Optional) The path to the root certificate on the server|
| Private Key Path | (Optional) The path to the private key file on the server|
| Certificate Path | (Optional) The path to the certificate file on the server|

## Using wildcards

It is possible to subscribe to topic such as TOPIC/# in order to subscribe to a subtree of messages by using the keyword \__WILDCARD__ in place of the # char.
The subscription becomes: DATA/\__WILDCARD__ (Double underscore before and after `WILDCARD`)

## Query the data source

The query editor allows you to specify which MQTT topics the panel will subscribe to. Refer to the [MQTT v3.1.1 specification](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718106)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"devDependencies": {
"@grafana/data": "9.1.0",
"@grafana/runtime": "9.1.0",
"@grafana/toolkit": "9.1.0",
"@grafana/toolkit": "^9.3.2",
"@grafana/ui": "9.1.0",
"@types/lodash": "latest",
"aedes": "^0.45.0"
Expand Down
63 changes: 55 additions & 8 deletions pkg/mqtt/client.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package mqtt

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"path"
"strings"
Expand All @@ -20,22 +24,56 @@ type Client interface {
}

type Options struct {
URI string `json:"uri"`
Username string `json:"username"`
Password string `json:"password"`
CertificatePath string `json:"certificatePath"`
RootCertPath string `json:"rootCertPath"`
PrivateKeyPath string `json:"privateKeyPath"`
URI string `json:"uri"`
Username string `json:"username"`
Password string `json:"password"`
}

type client struct {
client paho.Client
topics TopicMap
}

func NewTLSConfig(o Options) (config *tls.Config, err error) {
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(o.RootCertPath)
if err != nil {
return
}
certpool.AppendCertsFromPEM(pemCerts)

cert, err := tls.LoadX509KeyPair(o.CertificatePath, o.PrivateKeyPath)
if err != nil {
return
}

config = &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
Certificates: []tls.Certificate{cert},
}
return
}

func NewClient(o Options) (Client, error) {

opts := paho.NewClientOptions()

opts.AddBroker(o.URI)
opts.SetClientID(fmt.Sprintf("grafana_%d", rand.Int()))

if o.CertificatePath != "" {
tlsconfig, err := NewTLSConfig(o)
if err != nil {
log.DefaultLogger.Error("Failed to create TLS configuration: %v", err)
}
opts.SetTLSConfig(tlsconfig)
}

if o.Username != "" {
opts.SetUsername(o.Username)
}
Expand Down Expand Up @@ -72,9 +110,18 @@ func (c *client) IsConnected() bool {
}

func (c *client) HandleMessage(_ paho.Client, msg paho.Message) {
var payload = msg.Payload()
var data map[string]interface{}
err := json.Unmarshal([]byte(msg.Payload()), &data)
data["Topic"] = msg.Topic()
if err == nil {
log.DefaultLogger.Info("%s", data)
payload, _ = json.Marshal(data)
}

Comment on lines +113 to +121
Copy link
Contributor

@NiklasCi NiklasCi Jan 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var payload = msg.Payload()
var data map[string]interface{}
err := json.Unmarshal([]byte(msg.Payload()), &data)
data["Topic"] = msg.Topic()
if err == nil {
log.DefaultLogger.Info("%s", data)
payload, _ = json.Marshal(data)
}
var data map[string]interface{}
err := json.Unmarshal([]byte(msg.Payload()), &data)
if err != nil {
log.DefaultLogger.Error("Failed to unmarshal payload of", msg)
return
}
data["Topic"] = msg.Topic()
payload, err := json.Marshal(data)
if err != nil {
log.DefaultLogger.Error("Failed to marshal data to new payload", data)
return
}

message := Message{
Timestamp: time.Now(),
Value: msg.Payload(),
Value: payload,
}
c.topics.AddMessage(msg.Topic(), message)
}
Expand Down Expand Up @@ -103,10 +150,10 @@ func (c *client) Subscribe(reqPath string) *Topic {
if t, ok := c.topics.Load(topicPath); ok {
return t
}

log.DefaultLogger.Debug("Subscribing to MQTT topic", "topic", t.Path)
if token := c.client.Subscribe(t.Path, 0, c.HandleMessage); token.Wait() && token.Error() != nil {
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", t.Path, "error", token.Error())
var topic = strings.Replace(t.Path, "__WILDCARD__", "#", -1)
log.DefaultLogger.Info("Subscribing to MQTT topic", "topic", topic)
if token := c.client.Subscribe(topic, 0, c.HandleMessage); token.Wait() && token.Error() != nil {
log.DefaultLogger.Error("Error subscribing to MQTT topic", "topic", topic, "error", token.Error())
}
c.topics.Store(t)
return t
Expand Down
16 changes: 15 additions & 1 deletion pkg/mqtt/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mqtt

import (
"path"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -51,7 +53,7 @@ func (tm *TopicMap) AddMessage(path string, message Message) {
if !ok {
return false
}
if topic.Path == path {
if TopicMatches(topic.Path, path) {
topic.Messages = append(topic.Messages, message)
tm.Store(topic)
}
Expand All @@ -66,3 +68,15 @@ func (tm *TopicMap) Store(t *Topic) {
func (tm *TopicMap) Delete(path string) {
tm.Map.Delete(path)
}

func TopicMatches(topic string, intopic string) bool {
var regex = strings.Replace(topic, "/__WILDCARD__", "/(.*)", -1)
m1 := regexp.MustCompile(regex)
var grp = m1.ReplaceAllString(intopic, "$1")
if grp == "" {
return intopic == topic
}
var intopic2 = strings.Replace(intopic, grp, "__WILDCARD__", -1)

return strings.Compare(topic, intopic2) == 0
}
58 changes: 56 additions & 2 deletions src/ConfigEditor.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const ConfigEditor = (props: Props) => {
options,
options: { jsonData, secureJsonData, secureJsonFields },
} = props;
const { uri, username } = jsonData;
const { uri, username,rootCertPath,privateKeyPath,certificatePath } = jsonData;
const handleChange = handlerFactory(options, onOptionsChange);

const onPasswordChange = (event: ChangeEvent<HTMLInputElement>) => {
Expand All @@ -23,6 +23,7 @@ export const ConfigEditor = (props: Props) => {
},
});
};


const onResetPassword = () => {
onOptionsChange({
Expand Down Expand Up @@ -59,6 +60,59 @@ export const ConfigEditor = (props: Props) => {
</InlineField>
</InlineFieldRow>
</FieldSet>
<FieldSet label="TLS">
<InlineFieldRow>
<InlineField
label="Root certificate"
labelWidth={30}
tooltip="Root certificate"
>
<Input
width={60}
name="uri"
required
value={rootCertPath}
autoComplete="off"
placeholder=""
onChange={handleChange('jsonData.rootCertPath')}
/>
</InlineField>
</InlineFieldRow>
<InlineFieldRow>
<InlineField
label="Private Key Path"
labelWidth={30}
tooltip="Private Key Path"
>
<Input
width={60}
name="uri"
required
value={privateKeyPath}
autoComplete="off"
placeholder=""
onChange={handleChange('jsonData.privateKeyPath')}
/>
</InlineField>
</InlineFieldRow>
<InlineFieldRow>
<InlineField
label="Certificate"
labelWidth={30}
tooltip="Certificate"
>
<Input
width={60}
name="uri"
required
value={certificatePath}
autoComplete="off"
placeholder=""
onChange={handleChange('jsonData.certificatePath')}
/>
</InlineField>
</InlineFieldRow>
</FieldSet>

<FieldSet label="Authentication">
<InlineFieldRow>
Expand All @@ -83,7 +137,7 @@ export const ConfigEditor = (props: Props) => {
onReset={onResetPassword}
/>
</InlineField>
</InlineFieldRow>
</InlineFieldRow>
</FieldSet>
</>
);
Expand Down
4 changes: 4 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,12 @@ export interface MqttQuery extends DataQuery {
export interface MqttDataSourceOptions extends DataSourceJsonData {
uri: string;
username?: string;
rootCertPath?: string;
privateKeyPath?: string;
certificatePath?: string;
}

export interface MqttSecureJsonData {
password?: string;
}

Loading