Skip to content

Commit

Permalink
Reworked Heartbeat Registry Tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
stricklandrbls committed Apr 16, 2024
1 parent 0b4128f commit d906986
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 97 deletions.
1 change: 1 addition & 0 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export * from './server'
export * from './session'
export * from './version'
export * from './viewport'
export { IHeartbeatReceiver } from './registry'

// generated files from protoc
export * from './omega_edit_grpc_pb'
Expand Down
60 changes: 47 additions & 13 deletions packages/client/src/registry.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,60 @@
import { IServerHeartbeat } from "./server";
export type TimeInfo = {
readonly nextTimestampMs: number
}
import EventEmitter = require('events')
import { IServerHeartbeat } from './server'
export interface IHeartbeatReceiver {
readonly id: string
process(heartbeat: IServerHeartbeat): any
}
export interface IHeartbeatRequester {
request(): void
}
export interface IHeartbeatRegistry {
update(receiver: IHeartbeatReceiver, expectNextInMs: number): any
remove(id: IHeartbeatReceiver['id']): any
export interface IHeartbeatRegistry<T> {
update(receiver: IHeartbeatReceiver, measure: T): any
remove(receiver: IHeartbeatReceiver): void
OnAllReceiversRemoved(listener: (ctx: any) => void): void
}
class HeartbeatRegistryManager {
private registry_: Map<IHeartbeatReceiver, TimeInfo> = new Map()
private checkIntervalId: NodeJS.Timeout | undefined = undefined

readonly checkIntervalMs: number = 3000
export type TimeTolerance = { failAfterMs: number }

update(receiver: IHeartbeatReceiver, time: Required<TimeInfo>) {
class TimeRegistryTolerance {
protected timeout: NodeJS.Timeout | undefined
constructor(
tolerance: TimeTolerance,
public onFailure: () => void
) {
this.timeout = setTimeout(onFailure, tolerance.failAfterMs)
}

extend(tolerance: TimeTolerance) {
clearTimeout(this.timeout)
this.timeout = setTimeout(this.onFailure, tolerance.failAfterMs)
}
clear() {
clearTimeout(this.timeout)
}
}

const EmptyEvent = 'emptied'
class TimedHeartbeatRegistry implements IHeartbeatRegistry<TimeTolerance> {
private registry_: Map<IHeartbeatReceiver, TimeRegistryTolerance> = new Map()
private events = new EventEmitter()
update(receiver: IHeartbeatReceiver, tolerance: TimeTolerance) {
let item = this.registry_.get(receiver)
item
? item.extend(tolerance)
: this.registry_.set(
receiver,
new TimeRegistryTolerance(tolerance, () => {
this.remove(receiver)
})
)
}
remove(receiver: IHeartbeatReceiver): void {
this.registry_.delete(receiver)
if (this.registry_.size == 0) this.events.emit(EmptyEvent)
}
OnAllReceiversRemoved(listener: (ctx: any) => void): void {
this.events.on(EmptyEvent, listener)
}
}

}
export const HeartbeatRegistry = new TimedHeartbeatRegistry()
4 changes: 2 additions & 2 deletions packages/client/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import {
ServerControlResponse,
ServerInfoResponse,
} from './omega_edit_pb'
import { IHeartbeatReceiver } from './registry'
import { IHeartbeatReceiver, HeartbeatRegistry } from './registry'

const DEFAULT_PORT = 9000 // default port for the server
const DEFAULT_HOST = '127.0.0.1' // default host for the server
Expand Down Expand Up @@ -738,7 +738,7 @@ export async function getServerHeartbeatFor(
}

const latency: number = Date.now() - startTime
// HeartbeatRegistry.update(receiver, {timestampMs: startTime, nextTimestampMs: startTime + heartbeatInterval})
HeartbeatRegistry.update(receiver, {failAfterMs: heartbeatInterval + latency})

resolve({
latency: latency,
Expand Down
190 changes: 108 additions & 82 deletions packages/client/tests/specs/registry.spec.ts
Original file line number Diff line number Diff line change
@@ -1,97 +1,123 @@
import { describe, it, Done } from "mocha";
import { IHeartbeatReceiver, IHeartbeatRegistry, TimeInfo } from "../../src/registry"
import EventEmitter from "events";
import assert from "assert";
import { afterEach, describe, it } from 'mocha'
import { TimedRegistry } from './registryMock'
import { IHeartbeatReceiver } from '../../src/registry'
import assert from 'assert'
import { IServerHeartbeat } from '../../src/server'

class MockRegistry implements IHeartbeatRegistry {
private registry_: Map<IHeartbeatReceiver, TimeInfo> = new Map()
// private temporalRegistry_: Map<IHeartbeatReceiver, NodeJS.Timeout> = new Map()
private checkIntervalId: NodeJS.Timeout | undefined = undefined
private registryEventEmitter = new EventEmitter()
private eventName = 'empty'
private checkEvent = 'checked'
private checkIntervalMs: number = 3000
private toleranceMs: number = 3000

constructor(){
}
// Expected public event
onEmpty(listener: ()=>any) { this.registryEventEmitter.on(this.eventName, listener) }

// Mock-only events
_onCheck(listener: ()=>any) { this.registryEventEmitter.on(this.checkEvent, listener) }
_onRemoval(listener: (context: any)=>any){ this.registryEventEmitter.on('removal', listener) }
_onToleranceFailure(listener: (context: any)=>any){ this.registryEventEmitter.on('dead', listener) }
const nullProcessFn = () => {}
const receiverMocks: IHeartbeatReceiver[] = [
{ id: 'abc-123', process: nullProcessFn },
{ id: 'de1', process: nullProcessFn },
{ id: 'dfdl-debug-de', process: nullProcessFn },
]
const mockServerHeartbeat: IServerHeartbeat = {
latency: 15, // latency in ms
sessionCount: 0, // session count
serverTimestamp: Date.now(), // timestamp in ms
serverUptime: 100, // uptime in ms
serverCpuCount: 4, // cpu count
serverCpuLoadAverage: 0, // cpu load average
serverMaxMemory: 16, // max memory in bytes
serverCommittedMemory: 2, // committed memory in bytes
serverUsedMemory: 2, // used memory in bytes
}

setTolerance(tolerance: number){ this.toleranceMs = tolerance }
tolerance(): number { return this.toleranceMs }
setCheckInterval(checkIntervalMs: number){ this.checkIntervalMs = checkIntervalMs }
checkInterval(): number { return this.checkIntervalMs }
const registry = new TimedRegistry()

update(receiver: IHeartbeatReceiver, expectNextInMs: number) {
this.registry_.set(receiver, { nextTimestampMs: Date.now() + expectNextInMs })
if(!this.checkIntervalId)
this.checkIntervalId = setInterval(() => { this.check() }, this.checkIntervalMs)
const getServerHeartbeatMock = (
receiver: IHeartbeatReceiver,
heartbeatInterval: number = 1000
): Promise<IServerHeartbeat> => {
return new Promise((resolve) => {
registry.update(receiver, { failAfterMs: heartbeatInterval })
receiver.process(mockServerHeartbeat)
resolve(mockServerHeartbeat)
})
}
class HeartbeatRetention implements IHeartbeatReceiver {
id: string = 'retention'
private last: IServerHeartbeat | undefined = undefined
process(heartbeat: IServerHeartbeat) {
this.last = heartbeat
}
getLast(): IServerHeartbeat {
return this.last!
}
}
const LastHeartbeatKeeper = new HeartbeatRetention()

remove(id: string) {
this.registry_.forEach((time, receiver) => {
if( receiver.id == id ) {
this.registry_.delete(receiver)
this.registryEventEmitter.emit('removal', receiver.id)
}
describe('Heartbeat Receivers', () => {
let timeout: NodeJS.Timeout | undefined
afterEach(() => {
clearTimeout(timeout)
registry.M_reset()
})
describe('Registry Interactions', () => {
it('Should be able to interact with the registery through the `getServerHeartbeatFor` function', (done) => {
getServerHeartbeatMock(LastHeartbeatKeeper).then((hb) => {
assert.equal(LastHeartbeatKeeper.getLast(), hb)
assert(
registry.M_registry().size == 1 &&
registry.M_registry().get(LastHeartbeatKeeper)
)
done()
})
})
if(this.registry_.size == 0){
clearInterval(this.checkIntervalId)
this.checkIntervalId = undefined
this.registryEventEmitter.emit(this.eventName)
}
}
})
})

describe('Heartbeat Registry Implementations', () => {
let timeout: NodeJS.Timeout | undefined
afterEach(() => {
clearTimeout(timeout)
registry.M_reset()
})
describe('Timeout Based registry', () => {
it('Should automatically remove a receiver upon uncleared timeout', (done) => {
registry.update(receiverMocks[1], { failAfterMs: 250 })
registry.OnAllReceiversRemoved(() => {
assert(true)
done()
})

private check() {
this.registry_.forEach((time, receiver) => {

if(!this.inTolerance(time)) {
this.registryEventEmitter.emit('dead')
this.remove(receiver.id)
}
this.registryEventEmitter.emit(this.checkEvent)
timeout = setTimeout(() => {
assert.fail('Did not emit removal before 250ms')
}, 300)
})
}
private inTolerance(time: TimeInfo): boolean {
console.log(`Tolerance Calc: ${Date.now()} - ${time.nextTimestampMs + this.toleranceMs} = ${Date.now() - time.nextTimestampMs + this.toleranceMs}`)
return Date.now() <= time.nextTimestampMs + this.toleranceMs
}
}

describe("Heartbeat Registry Functionality", function(){
const mockReceiver: IHeartbeatReceiver = {id: "abc123", process: (_) => {}}
it("Should refresh a receiver's timeout upon updates", (done) => {
const ExpectedUpdateCount = 3
let updateCount = 0

it("Should drop the receiver at the next check when out of tolerance", function(done){
const registry = new MockRegistry()
registry.setTolerance(500)
registry.setCheckInterval(250)
registry.update(receiverMocks[1], { failAfterMs: 100 })
registry.OnAllReceiversRemoved(() => {
assert.equal(updateCount, 3)
done()
})
let updateInterval = setInterval(() => {
updateCount++
if (updateCount >= ExpectedUpdateCount) clearInterval(updateInterval)
registry.update(receiverMocks[1], { failAfterMs: 100 })
}, 50)

registry._onToleranceFailure((ctx) => {
assert(ctx.variance <= 550, `variance: ${ctx.variance}`)
done()
timeout = setTimeout(() => {
assert.fail('Did not ')
}, 500)
})
registry.update(mockReceiver, 500)
})

it("Should emit an event when receiver count drops to zero", function(done){
const registry = new MockRegistry()
registry.setTolerance(500)
registry.setCheckInterval(250)

let timeout: NodeJS.Timeout | undefined = undefined
registry.onEmpty(()=>{
clearTimeout(timeout)
done()
it('Should not emit "OnAllReceiversRemoved" with active receivers', (done) => {
registry.update(receiverMocks[0], { failAfterMs: 250 })
registry.update(receiverMocks[1], { failAfterMs: 50 })
timeout = setTimeout(() => {
assert(registry.M_registry().size == 1)
assert(registry.M_registry().has(receiverMocks[0]))
done()
}, 100)
})
registry.update(mockReceiver, 500)
timeout = setTimeout(()=>{
assert.fail("Empty Event was not emitted")
}, 5000)
})
})
})

function shortTime(time: number): string {
const timeStr = time.toString()
return '..' + timeStr.substring(timeStr.length - 4)
}
55 changes: 55 additions & 0 deletions packages/client/tests/specs/registryMock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import EventEmitter from "events";
import { IHeartbeatReceiver, IHeartbeatRegistry } from "../../src/registry";

const EmptyEvent = 'emptied'

export type TimeTolerance = { failAfterMs: number }

export class TimeRegistryTolerance {
protected timeout: NodeJS.Timeout | undefined
constructor(
tolerance: TimeTolerance,
public onFailure: () => void
){ this.timeout = setTimeout(onFailure, tolerance.failAfterMs) }

extend(tolerance: TimeTolerance){
clearTimeout(this.timeout)
this.timeout = setTimeout(this.onFailure, tolerance.failAfterMs)
}
clear(){ clearTimeout(this.timeout) }
}

export class TimedRegistry implements IHeartbeatRegistry<TimeTolerance> {
private registry_: Map<IHeartbeatReceiver, TimeRegistryTolerance> = new Map()
private events = new EventEmitter()

update(receiver: IHeartbeatReceiver, tolerance: TimeTolerance) {
let item = this.registry_.get(receiver)
item
? item.extend(tolerance)
: this.registry_.set(receiver, new TimeRegistryTolerance(tolerance, () => { this.remove(receiver) }))
}

OnAllReceiversRemoved(listener: (ctx: any) => void){
this.events.on(EmptyEvent, listener)
}

remove(receiver: IHeartbeatReceiver): void {
this.registry_.delete(receiver)
if(this.registry_.size == 0)
this.events.emit(EmptyEvent)
}

// Test Functions
M_reset(emit: boolean = false){
this.registry_.forEach((tolerance) => {
tolerance.clear()
})
this.registry_.clear()

this.events.removeAllListeners()
if(emit) this.events.emit(EmptyEvent)
}
M_registry(){ return this.registry_ }
M_remove(receiver: IHeartbeatReceiver){ this.remove(receiver) }
}

0 comments on commit d906986

Please sign in to comment.