diff --git a/cmd/livepeer/starter/starter.go b/cmd/livepeer/starter/starter.go index 064536bf57..ec6afab4bb 100755 --- a/cmd/livepeer/starter/starter.go +++ b/cmd/livepeer/starter/starter.go @@ -1140,7 +1140,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) { glog.Exit("Error setting orch webhook URL ", err) } glog.Info("Using orchestrator webhook URL ", whurl) - n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl) + n.OrchestratorPool = discovery.NewWebhookPool(bcast, whurl, *cfg.DiscoveryTimeout) } else if len(orchURLs) > 0 { n.OrchestratorPool = discovery.NewOrchestratorPool(bcast, orchURLs, common.Score_Trusted, orchBlacklist, *cfg.DiscoveryTimeout) } diff --git a/discovery/discovery_test.go b/discovery/discovery_test.go index 12284e1b2b..972b6240a2 100644 --- a/discovery/discovery_test.go +++ b/discovery/discovery_test.go @@ -43,7 +43,7 @@ func TestNewDBOrchestratorPoolCache_NilEthClient_ReturnsError(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) assert.Nil(pool) assert.EqualError(err, "could not create DBOrchestratorPoolCache: LivepeerEthClient is nil") } @@ -163,7 +163,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) { goleak.VerifyNone(t, common.IgnoreRoutines()...) }() - emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + emptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) require.NotNil(emptyPool) assert.Equal(0, emptyPool.Size()) @@ -174,7 +174,7 @@ func TestDBOrchestratorPoolCacheSize(t *testing.T) { dbh.UpdateOrch(ethOrchToDBOrch(o)) } - nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + nonEmptyPool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) require.NotNil(nonEmptyPool) assert.Equal(len(addresses), nonEmptyPool.Size()) @@ -218,7 +218,7 @@ func TestNewDBOrchestorPoolCache_NoEthAddress(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, time.Duration(500)*time.Millisecond) require.Nil(err) // Check that serverGetOrchInfo returns early and the orchestrator isn't updated @@ -272,7 +272,7 @@ func TestNewDBOrchestratorPoolCache_InvalidPrices(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, rm, []string{}, time.Duration(500)*time.Millisecond) require.Nil(err) // priceInfo.PixelsPerUnit = 0 @@ -343,7 +343,7 @@ func TestNewDBOrchestratorPoolCache_GivenListOfOrchs_CreatesPoolCacheCorrectly(t sender.On("ValidateTicketParams", mock.Anything).Return(nil).Times(3) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) assert.Equal(pool.Size(), 3) orchs, err := pool.GetOrchestrators(context.TODO(), pool.Size(), newStubSuspender(), newStubCapabilities(), common.ScoreAtLeast(0)) @@ -413,7 +413,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // bad URLs are inserted in the database but are not included in the working set, as there is no returnable query for getting their priceInfo // And if URL is updated it won't be picked up until next cache update @@ -446,7 +446,7 @@ func TestNewDBOrchestratorPoolCache_TestURLs_Empty(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) assert.Equal(0, pool.Size()) infos := pool.GetInfos() @@ -531,7 +531,7 @@ func TestNewDBOrchestorPoolCache_PollOrchestratorInfo(t *testing.T) { origCacheRefreshInterval := cacheRefreshInterval cacheRefreshInterval = 200 * time.Millisecond defer func() { cacheRefreshInterval = origCacheRefreshInterval }() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // Ensure orchestrators exist in DB @@ -685,7 +685,7 @@ func TestCachedPool_AllOrchestratorsTooExpensive_ReturnsAllOrchestrators(t *test sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -775,7 +775,7 @@ func TestCachedPool_GetOrchestrators_MaxBroadcastPriceNotSet(t *testing.T) { sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -881,7 +881,7 @@ func TestCachedPool_N_OrchestratorsGoodPricing_ReturnsNOrchestrators(t *testing. sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -971,7 +971,7 @@ func TestCachedPool_GetOrchestrators_TicketParamsValidation(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // Test 25 out of 50 orchs pass ticket params validation @@ -1065,7 +1065,7 @@ func TestCachedPool_GetOrchestrators_OnlyActiveOrchestrators(t *testing.T) { sender.On("ValidateTicketParams", mock.Anything).Return(nil) - pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{}, 0) + pool, err := NewDBOrchestratorPoolCache(ctx, node, &stubRoundsManager{round: big.NewInt(24)}, []string{}, time.Duration(500)*time.Millisecond) require.NoError(err) // ensuring orchs exist in DB @@ -1120,7 +1120,7 @@ func TestNewWHOrchestratorPoolCache(t *testing.T) { // assert created webhook pool is correct length whURL, _ := url.ParseRequestURI("https://livepeer.live/api/orchestrator") - whpool := NewWebhookPool(nil, whURL) + whpool := NewWebhookPool(nil, whURL, time.Duration(500)*time.Millisecond) assert.Equal(3, whpool.Size()) // assert that list is not refreshed if lastRequest is less than 1 min ago and hash is the same @@ -1643,7 +1643,7 @@ func TestSetGetOrchestratorTimeout(t *testing.T) { } //set timeout to 1000ms - poolCache, err := NewDBOrchestratorPoolCache(context.TODO(), node, &stubRoundsManager{}, []string{}, 1000) + poolCache, err := NewDBOrchestratorPoolCache(context.TODO(), node, &stubRoundsManager{}, []string{}, time.Duration(1000)*time.Millisecond) assert.Nil(err) //confirm the timeout is now 1000ms assert.Equal(poolCache.discoveryTimeout, time.Duration(1000)*time.Millisecond) diff --git a/discovery/wh_discovery.go b/discovery/wh_discovery.go index 1c4c8539f3..7e552dfcce 100644 --- a/discovery/wh_discovery.go +++ b/discovery/wh_discovery.go @@ -21,19 +21,21 @@ type webhookResponse struct { } type webhookPool struct { - pool *orchestratorPool - callback *url.URL - responseHash ethcommon.Hash - lastRequest time.Time - mu *sync.RWMutex - bcast common.Broadcaster + pool *orchestratorPool + callback *url.URL + responseHash ethcommon.Hash + lastRequest time.Time + mu *sync.RWMutex + bcast common.Broadcaster + discoveryTimeout time.Duration } -func NewWebhookPool(bcast common.Broadcaster, callback *url.URL) *webhookPool { +func NewWebhookPool(bcast common.Broadcaster, callback *url.URL, discoveryTimeout time.Duration) *webhookPool { p := &webhookPool{ - callback: callback, - mu: &sync.RWMutex{}, - bcast: bcast, + callback: callback, + mu: &sync.RWMutex{}, + bcast: bcast, + discoveryTimeout: discoveryTimeout, } go p.getInfos() return p @@ -71,7 +73,7 @@ func (w *webhookPool) getInfos() ([]common.OrchestratorLocalInfo, error) { } // pool = NewOrchestratorPool(w.bcast, addrs) - pool = &orchestratorPool{infos: infos, bcast: w.bcast} + pool = &orchestratorPool{infos: infos, bcast: w.bcast, discoveryTimeout: w.discoveryTimeout} w.mu.Lock() w.responseHash = hash