Skip to content

Commit

Permalink
feat: tolerate failed checkins / ready
Browse files Browse the repository at this point in the history
  • Loading branch information
redref committed Aug 23, 2024
1 parent 6b3ece3 commit 155f8b6
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cmd/kg/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (h *graphHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

nodes := make(map[string]*mesh.Node)
for _, n := range ns {
if n.Ready() {
if n.Ready(false) {
nodes[n.Name] = n
}
}
Expand Down
4 changes: 3 additions & 1 deletion cmd/kg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ var cmd = &cobra.Command{

var (
backend string
checkIn bool
cleanUp bool
cleanUpIface bool
createIface bool
Expand Down Expand Up @@ -134,6 +135,7 @@ var (

func init() {
cmd.Flags().StringVar(&backend, "backend", k8s.Backend, fmt.Sprintf("The backend for the mesh. Possible values: %s", availableBackends))
cmd.Flags().BoolVar(&checkIn, "check-in", true, "Should kilo regularly check-in in backend")
cmd.Flags().BoolVar(&cleanUp, "clean-up", true, "Should kilo clean up network modifications on shutdown?")
cmd.Flags().BoolVar(&cleanUpIface, "clean-up-interface", false, "Should Kilo delete its interface when it shuts down?")
cmd.Flags().BoolVar(&createIface, "create-interface", true, "Should kilo create an interface on startup?")
Expand Down Expand Up @@ -266,7 +268,7 @@ func runRoot(_ *cobra.Command, _ []string) error {
serviceCIDRs = append(serviceCIDRs, s)
}

m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry)
m, err := mesh.New(b, enc, gr, hostname, port, s, local, cni, cniPath, iface, checkIn, cleanUp, cleanUpIface, createIface, mtu, resyncPeriod, prioritisePrivateAddr, iptablesForwardRule, serviceCIDRs, log.With(logger, "component", "kilo"), registry)
if err != nil {
return fmt.Errorf("failed to create Kilo mesh: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/kgctl/connect_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func sync(table *route.Table, peerName string, privateKey wgtypes.Key, iface int
nodes := make(map[string]*mesh.Node)
var nodeNames []string
for _, n := range ns {
if n.Ready() {
if n.Ready(false) {
nodes[n.Name] = n
hostname = n.Name
nodeNames = append(nodeNames, n.Name)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kgctl/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func runGraph(_ *cobra.Command, _ []string) error {
subnet := mesh.DefaultKiloSubnet
nodes := make(map[string]*mesh.Node)
for _, n := range ns {
if n.Ready() {
if n.Ready(false) {
nodes[n.Name] = n
hostname = n.Name
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/kgctl/showconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error {
subnet := mesh.DefaultKiloSubnet
nodes := make(map[string]*mesh.Node)
for _, n := range ns {
if n.Ready() {
if n.Ready(false) {
nodes[n.Name] = n
}
if n.WireGuardIP != nil {
Expand All @@ -147,7 +147,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error {

peers := make(map[string]*mesh.Peer)
for _, p := range ps {
if p.Ready() {
if p.Ready(false) {

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / container

too many arguments in call to p.Ready

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / build

too many arguments in call to p.Ready

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / darwin

too many arguments in call to p.Ready

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / e2e

too many arguments in call to p.Ready

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / linux

too many arguments in call to p.Ready

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / unit

too many arguments in call to p.Ready

Check failure on line 150 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / windows

too many arguments in call to p.Ready
peers[p.Name] = p
}
}
Expand Down Expand Up @@ -227,7 +227,7 @@ func runShowConfPeer(_ *cobra.Command, args []string) error {
subnet := mesh.DefaultKiloSubnet
nodes := make(map[string]*mesh.Node)
for _, n := range ns {
if n.Ready() {
if n.Ready(false) {
nodes[n.Name] = n
hostname = n.Name
}
Expand All @@ -243,7 +243,7 @@ func runShowConfPeer(_ *cobra.Command, args []string) error {
peer := args[0]
peers := make(map[string]*mesh.Peer)
for _, p := range ps {
if p.Ready() {
if p.Ready(false) {

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / container

too many arguments in call to p.Ready

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / build

too many arguments in call to p.Ready

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / darwin

too many arguments in call to p.Ready

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / e2e

too many arguments in call to p.Ready

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / linux

too many arguments in call to p.Ready

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / unit

too many arguments in call to p.Ready

Check failure on line 246 in cmd/kgctl/showconf.go

View workflow job for this annotation

GitHub Actions / windows

too many arguments in call to p.Ready
peers[p.Name] = p
}
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/mesh/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,19 @@ type Node struct {
}

// Ready indicates whether or not the node is ready.
func (n *Node) Ready() bool {
func (n *Node) Ready(checkLastSeen bool) bool {
// Nodes that are not leaders will not have WireGuardIPs, so it is not required.
var checkedIn bool
if checkLastSeen {
checkedIn = time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
} else {
checkedIn = true
}
return n != nil &&
n.Endpoint.Ready() &&
n.Key != wgtypes.Key{} &&
n.Subnet != nil &&
time.Now().Unix()-n.LastSeen < int64(checkInPeriod)*2/int64(time.Second)
checkedIn
}

// Peer represents a peer in the network.
Expand Down
23 changes: 14 additions & 9 deletions pkg/mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
// Mesh is able to create Kilo network meshes.
type Mesh struct {
Backend
checkin bool
cleanup bool
cleanUpIface bool
cni bool
Expand Down Expand Up @@ -89,7 +90,7 @@ type Mesh struct {
}

// New returns a new Mesh instance.
func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularity, hostname string, port int, subnet *net.IPNet, local, cni bool, cniPath, iface string, cleanup bool, cleanUpIface bool, createIface bool, mtu uint, resyncPeriod time.Duration, prioritisePrivateAddr, iptablesForwardRule bool, serviceCIDRs []*net.IPNet, logger log.Logger, registerer prometheus.Registerer) (*Mesh, error) {
func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularity, hostname string, port int, subnet *net.IPNet, local, cni bool, cniPath, iface string, checkin bool, cleanup bool, cleanUpIface bool, createIface bool, mtu uint, resyncPeriod time.Duration, prioritisePrivateAddr, iptablesForwardRule bool, serviceCIDRs []*net.IPNet, logger log.Logger, registerer prometheus.Registerer) (*Mesh, error) {
if err := os.MkdirAll(kiloPath, 0700); err != nil {
return nil, fmt.Errorf("failed to create directory to store configuration: %v", err)
}
Expand Down Expand Up @@ -168,6 +169,7 @@ func New(backend Backend, enc encapsulation.Encapsulator, granularity Granularit
}
mesh := Mesh{
Backend: backend,
checkin: checkin,
cleanup: cleanup,
cleanUpIface: cleanUpIface,
cni: cni,
Expand Down Expand Up @@ -269,6 +271,9 @@ func (m *Mesh) Run(ctx context.Context) error {
}
resync := time.NewTimer(m.resyncPeriod)
checkIn := time.NewTimer(checkInPeriod)
if !m.checkin {
checkIn.Stop()
}
nw := m.Nodes().Watch()
pw := m.Peers().Watch()
var ne *NodeEvent
Expand Down Expand Up @@ -304,7 +309,7 @@ func (m *Mesh) syncNodes(ctx context.Context, e *NodeEvent) {
}
var diff bool
m.mu.Lock()
if !e.Node.Ready() {
if !e.Node.Ready(m.checkin) {
// Trace non ready nodes with their presence in the mesh.
_, ok := m.nodes[e.Node.Name]
level.Debug(logger).Log("msg", "received non ready node", "node", e.Node, "in-mesh", ok)
Expand All @@ -313,7 +318,7 @@ func (m *Mesh) syncNodes(ctx context.Context, e *NodeEvent) {
case AddEvent:
fallthrough
case UpdateEvent:
if !nodesAreEqual(m.nodes[e.Node.Name], e.Node) {
if !nodesAreEqual(m.nodes[e.Node.Name], e.Node, m.checkin) {
diff = true
}
// Even if the nodes are the same,
Expand Down Expand Up @@ -416,7 +421,7 @@ func (m *Mesh) handleLocal(ctx context.Context, n *Node) {
AllowedLocationIPs: n.AllowedLocationIPs,
Granularity: m.granularity,
}
if !nodesAreEqual(n, local) {
if !nodesAreEqual(n, local, m.checkin) {
level.Debug(m.logger).Log("msg", "local node differs from backend")
if err := m.Nodes().Set(ctx, m.hostname, local); err != nil {
level.Error(m.logger).Log("error", fmt.Sprintf("failed to set local node: %v", err), "node", local)
Expand All @@ -432,7 +437,7 @@ func (m *Mesh) handleLocal(ctx context.Context, n *Node) {
n = &Node{}
}
m.mu.Unlock()
if !nodesAreEqual(n, local) {
if !nodesAreEqual(n, local, m.checkin) {
m.mu.Lock()
m.nodes[local.Name] = local
m.mu.Unlock()
Expand All @@ -455,7 +460,7 @@ func (m *Mesh) applyTopology() {
var readyNodes float64
for k := range m.nodes {
m.nodes[k].Granularity = m.granularity
if !m.nodes[k].Ready() {
if !m.nodes[k].Ready(m.checkin) {
continue
}
// Make it point to the node without copy.
Expand Down Expand Up @@ -635,7 +640,7 @@ func (m *Mesh) resolveEndpoints() error {
for k := range m.nodes {
// Skip unready nodes, since they will not be used
// in the topology anyways.
if !m.nodes[k].Ready() {
if !m.nodes[k].Ready(m.checkin) {
continue
}
// Resolve the Endpoint
Expand Down Expand Up @@ -664,7 +669,7 @@ func isSelf(hostname string, node *Node) bool {
return node != nil && node.Name == hostname
}

func nodesAreEqual(a, b *Node) bool {
func nodesAreEqual(a, b *Node, checkLastSeen bool) bool {
if (a != nil) != (b != nil) {
return false
}
Expand All @@ -686,7 +691,7 @@ func nodesAreEqual(a, b *Node) bool {
a.Location == b.Location &&
a.Name == b.Name &&
subnetsEqual(a.Subnet, b.Subnet) &&
a.Ready() == b.Ready() &&
a.Ready(checkLastSeen) == b.Ready(checkLastSeen) &&
a.PersistentKeepalive == b.PersistentKeepalive &&
discoveredEndpointsAreEqual(a.DiscoveredEndpoints, b.DiscoveredEndpoints) &&
ipNetSlicesEqual(a.AllowedLocationIPs, b.AllowedLocationIPs) &&
Expand Down
2 changes: 1 addition & 1 deletion pkg/mesh/mesh_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestReady(t *testing.T) {
ready: true,
},
} {
ready := tc.node.Ready()
ready := tc.node.Ready(false)
if ready != tc.ready {
t.Errorf("test case %q: expected %t, got %t", tc.name, tc.ready, ready)
}
Expand Down

0 comments on commit 155f8b6

Please sign in to comment.