Skip to content

Commit

Permalink
update tests and add timeout config to webhook pool
Browse files Browse the repository at this point in the history
  • Loading branch information
ad-astra-video committed Sep 7, 2024
1 parent 006809d commit 767c9a6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 28 deletions.
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
glog.Exit("Error setting orch webhook URL ", err)
}
glog.Info("Using orchestrator webhook URL ", whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl)
n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout)
} else if len(orchURLs) > 0 {
n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout)
}
Expand Down
32 changes: 16 additions & 16 deletions discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
assert.Nil(pool)
assert.EqualError(err, "could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) {
goleak.VerifyNone(t, common.IgnoreRoutines()...)
}()

emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)
require.NotNil(emptyPool)
assert.Equal(0, emptyPool.Size())
Expand All @@ -174,7 +174,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) {
dbh.UpdateOrch(ethOrchToDBOrch(o))
}

nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)
require.NotNil(nonEmptyPool)
assert.Equal(len(addresses), nonEmptyPool.Size())
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestNewDBOrchestorPoolCache_NoEthAddress(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, time.Duration(500)*time.Millisecond)
require.Nil(err)

// Check that serverGetOrchInfo returns early and the orchestrator isn't updated
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestNewDBOrchestratorPoolCache_InvalidPrices(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, time.Duration(500)*time.Millisecond)
require.Nil(err)

// priceInfo.PixelsPerUnit = 0
Expand Down Expand Up @@ -343,7 +343,7 @@ func TestNewDBOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t

sender.On("ValidateTicketParams", mock.Anything).Return(nil).Times(3)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)
assert.Equal(pool.Size(), 3)
orchs, err := pool.GetOrchestrators(context.TODO(), pool.Size(), newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0))
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)
// bad URLs are inserted in the database but are not included in the working set, as there is no returnable query for getting their priceInfo
// And if URL is updated it won't be picked up until next cache update
Expand Down Expand Up @@ -446,7 +446,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs_Empty(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)
assert.Equal(0, pool.Size())
infos := pool.GetInfos()
Expand Down Expand Up @@ -531,7 +531,7 @@ func TestNewDBOrchestorPoolCache_PollOrchestratorInfo(t *testing.T) {
origCacheRefreshInterval := cacheRefreshInterval
cacheRefreshInterval = 200 * time.Millisecond
defer func() { cacheRefreshInterval = origCacheRefreshInterval }()
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)

// Ensure orchestrators exist in DB
Expand Down Expand Up @@ -685,7 +685,7 @@ func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsAllOrchestrators(t *test

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -775,7 +775,7 @@ func TestCachedPool_GetOrchestrators_MaxBroadcastPriceNotSet(t *testing.T) {

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -881,7 +881,7 @@ func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing.

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -971,7 +971,7 @@ func TestCachedPool_GetOrchestrators_TicketParamsValidation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)

// Test 25 out of 50 orchs pass ticket params validation
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func TestCachedPool_GetOrchestrators_OnlyActiveOrchestrators(t *testing.T) {

sender.On("ValidateTicketParams", mock.Anything).Return(nil)

pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{}, 0)
pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{}, time.Duration(500)*time.Millisecond)
require.NoError(err)

// ensuring orchs exist in DB
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) {

// assert created webhook pool is correct length
whURL, _ := url.ParseRequestURI("https://livepeer.live/api/orchestrator")
whpool := NewWebhookPool(nil, whURL)
whpool := NewWebhookPool(nil, whURL, time.Duration(500)*time.Millisecond)
assert.Equal(3, whpool.Size())

// assert that list is not refreshed if lastRequest is less than 1 min ago and hash is the same
Expand Down Expand Up @@ -1643,7 +1643,7 @@ func TestSetGetOrchestratorTimeout(t *testing.T) {
}

//set timeout to 1000ms
poolCache, err := NewDBOrchestratorPoolCache(context.TODO(), node, &stubRoundsManager{}, []string{}, 1000)
poolCache, err := NewDBOrchestratorPoolCache(context.TODO(), node, &stubRoundsManager{}, []string{}, time.Duration(1000)*time.Millisecond)
assert.Nil(err)
//confirm the timeout is now 1000ms
assert.Equal(poolCache.discoveryTimeout, time.Duration(1000)*time.Millisecond)
Expand Down
24 changes: 13 additions & 11 deletions discovery/wh_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@ type webhookResponse struct {
}

type webhookPool struct {
pool *orchestratorPool
callback *url.URL
responseHash ethcommon.Hash
lastRequest time.Time
mu *sync.RWMutex
bcast common.Broadcaster
pool *orchestratorPool
callback *url.URL
responseHash ethcommon.Hash
lastRequest time.Time
mu *sync.RWMutex
bcast common.Broadcaster
discoveryTimeout time.Duration
}

func NewWebhookPool(bcast common.Broadcaster, callback *url.URL) *webhookPool {
func NewWebhookPool(bcast common.Broadcaster, callback *url.URL, discoveryTimeout time.Duration) *webhookPool {
p := &webhookPool{
callback: callback,
mu: &sync.RWMutex{},
bcast: bcast,
callback: callback,
mu: &sync.RWMutex{},
bcast: bcast,
discoveryTimeout: discoveryTimeout,
}
go p.getInfos()
return p
Expand Down Expand Up @@ -71,7 +73,7 @@ func (w *webhookPool) getInfos() ([]common.OrchestratorLocalInfo, error) {
}

// pool = NewOrchestratorPool(w.bcast, addrs)
pool = &orchestratorPool{infos: infos, bcast: w.bcast}
pool = &orchestratorPool{infos: infos, bcast: w.bcast, discoveryTimeout: w.discoveryTimeout}

w.mu.Lock()
w.responseHash = hash
Expand Down

0 comments on commit 767c9a6

Please sign in to comment.