Skip to content

At Least Once Export

Pascal S. de Kloe edited this page Mar 17, 2021 · 1 revision

Code Example

// Package export demonstrates remote delivery with an at-least-once guarantee.
package main

import (
	"bytes"
	"errors"
	"log"
	"time"

	"github.com/pascaldekloe/metrics"
	"github.com/pascaldekloe/mqtt"
)

// PublishAtLeastOnce is a method from mqtt.Client.
var PublishAtLeastOnce func(message []byte, topic string) (ack <-chan error, err error)

// The package metrics count publications.
var (
	// Note that the following histograms also counts the number of messages
	// and batches, plus their average size and size totals.
	MessageBytes = metrics.MustHistogram("export_message_bytes", "Amount of payload submitted.", 32, 512, 1024, 32768)
	BatchBytes   = metrics.MustHistogram("export_batch_bytes", "Amount of payload submitted.", 32, 512, 1024, 32768)
	MessageDrops = metrics.MustCounter("export_message_drops", "Amount of messages omitted due fatal error.")
)

// Submision Settings
var (
	// Topic is the message destination.
	Topic = "demo/export"

	// BatchMax limits the number of messages in a submission.
	BatchMax = 60

	// SizeMax is the prefered amount of bytes in a submission.
	SizeMax = 64 * 1024

	// NoDataBackoff is the delay time for retries on a Source.
	NoDataBackoff = 10 * time.Second

	// PublishBackoff is the delay time for error retries.
	PublishBackoff = time.Second
)

// Source is an input queue.
type Source interface {
	// Read reads the last messages from the queue in descending order. The
	// return contains the actual number of messages read, which can be zero
	// when subject to error or queue exhaustion.
	ReadMessages(messages [][]byte) (n int)

	// Commit deletes the last n messages, i.e., the queue position advances
	// with n messages. The implementation should apply an extensive retry
	// arsenal with high expiry timeouts. Failure may lead to duplicate
	// submission.
	Commit(n int)
}

// Export sends messages from a Source to the queue until the mqtt.Client is
// closed.
func Export(dest *mqtt.Client, src Source) error {
	messages := make([][]byte, BatchMax)
	for {
		n := src.ReadMessages(messages)
		if n == 0 {
			time.Sleep(NoDataBackoff)
			continue
		}

		var size int
		for i, m := range messages[:n] {
			if size+len(m) > SizeMax {
				n = i
				break
			}
			size += len(m)
		}

		if n == 0 {
			log.Printf("message with excessive size of %d bytes ommited", len(messages[0]))
			src.Commit(1)
			MessageDrops.Add(1)
			continue
		}

		batch := bytes.Join(messages[:n], nil)
		exchange, err := PublishAtLeastOnce(batch, Topic)
		switch {
		case err == nil:
			BatchBytes.Add(float64(size))
			for _, m := range messages[:n] {
				MessageBytes.Add(float64(len(m)))
			}

			for err := range exchange {
				log.Print(err)
				if errors.Is(err, mqtt.ErrClosed) {
					return nil // suspended
				}
			}

			src.Commit(n)

		case mqtt.IsDeny(err):
			return err // Topic or SizeMax is off

		case errors.Is(err, mqtt.ErrClosed):
			return nil // done

		default:
			log.Printf("export retry in %s on: %s", PublishBackoff, err)
			time.Sleep(PublishBackoff)
		}
	}
}
Clone this wiki locally