Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

32 reroute and retry #37

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ BaseOptions:
WorkInfo: false
BucketInfo: true
LinkInfo: true
RerouteInfo: true
ExperimentId: "0"
# Reset output loggers after every output. With false, all outputs will be accumulated
Reset: false
Expand Down
1 change: 1 addition & 0 deletions config/config_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type outputOptions struct {
WorkInfo bool `yaml:"WorkInfo"`
BucketInfo bool `yaml:"BucketInfo"`
LinkInfo bool `yaml:"LinkInfo"`
RerouteInfo bool `yaml:"RerouteInfo"`
ExperimentId string `yaml:"ExperimentId"`
Reset bool `yaml:"Reset"`
EvaluateInterval int `yaml:"EvaluateInterval"`
Expand Down
1 change: 1 addition & 0 deletions config/default_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func getDefaultConfig() Config {
WorkInfo: false, // false
BucketInfo: false, // false
LinkInfo: false, // false
RerouteInfo: false, // false
ExperimentId: "default", // default
Reset: false, // false
EvaluateInterval: 0, // 0
Expand Down
7 changes: 6 additions & 1 deletion config/get_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ func JustPrintOutPut() bool {
!theconfig.BaseOptions.OutputOptions.DensenessIncome &&
!theconfig.BaseOptions.OutputOptions.WorkInfo &&
!theconfig.BaseOptions.OutputOptions.BucketInfo &&
!theconfig.BaseOptions.OutputOptions.LinkInfo {
!theconfig.BaseOptions.OutputOptions.LinkInfo &&
!theconfig.BaseOptions.OutputOptions.RerouteInfo {
return true
}
return false
Expand Down Expand Up @@ -253,6 +254,10 @@ func GetLinkInfo() bool {
return theconfig.BaseOptions.OutputOptions.LinkInfo
}

func GetRerouteInfo() bool {
return theconfig.BaseOptions.OutputOptions.RerouteInfo
}

func GetExpeimentId() string {
return theconfig.BaseOptions.OutputOptions.ExperimentId
}
Expand Down
2 changes: 0 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,4 @@ func run(iteration int, graphId string, maxPO int) {
func PrintState(state types.State) {
fmt.Println("TimeStep: ", state.TimeStep)
fmt.Println("OriginatorIndex: ", state.OriginatorIndex)
fmt.Println("UniqueRetryCounter: ", state.UniqueRetryCounter)
fmt.Println("UniqueWaitingCounter: ", state.UniqueWaitingCounter)
}
1 change: 1 addition & 0 deletions model/parts/output/output_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Route struct {
AccessFailed bool
ThresholdFailed bool
FoundByCaching bool
NumRetries int
}

func (o *Route) failed() bool {
Expand Down
53 changes: 53 additions & 0 deletions model/parts/output/reroute_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package output

import (
"bufio"
"fmt"
"os"
)

type RerouteInfo struct {
SuccessfullChunks int
Count int
File *os.File
Writer *bufio.Writer
}

func InitRerouteInfo() *RerouteInfo {
rri := RerouteInfo{}
rri.File = MakeFile("./results/reroute.txt")
rri.Writer = bufio.NewWriter(rri.File)
LogExpSting(rri.Writer)
return &rri
}

func (rri *RerouteInfo) Close() {
err := rri.Writer.Flush()
if err != nil {
fmt.Println("Couldn't flush the remaining buffer in the writer for reroute output")
}
err = rri.File.Close()
if err != nil {
fmt.Println("Couldn't close the file with filepath: ./results/reroute.txt")
}
}

func (rri *RerouteInfo) Reset() {
rri.SuccessfullChunks = 0
rri.Count = 0
}

func (rri *RerouteInfo) Update(output *Route) {
if output.Found {
rri.SuccessfullChunks++
rri.Count += output.NumRetries
}
}

func (rri *RerouteInfo) Log() {
avgRetryPerSuccess := float64(rri.Count) / float64(rri.SuccessfullChunks)
_, err := rri.Writer.WriteString(fmt.Sprintf("Average retry per successfull downloads: %.2f%% \n", avgRetryPerSuccess))
if err != nil {
panic(err)
}
}
6 changes: 6 additions & 0 deletions model/parts/output/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func CreateLoggers() []LogResetUpdater {
loggers = append(loggers, linkInfo)
}

if config.IsRetryWithAnotherPeer() && config.GetRerouteInfo() {
rerouteInfo := InitRerouteInfo()
defer rerouteInfo.Close()
loggers = append(loggers, rerouteInfo)
}

if config.JustPrintOutPut() {
outputWriter := InitOutputWriter()
defer outputWriter.Close()
Expand Down
19 changes: 10 additions & 9 deletions model/parts/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ type Request struct {
OriginatorIndex int
OriginatorId NodeId
ChunkId ChunkId
IsRetry bool
RetryIteration int
IsWaited bool
}

type RequestResult struct {
Expand Down Expand Up @@ -41,15 +44,13 @@ type Threshold [2]NodeId
//}

type State struct {
Graph *Graph
Originators []NodeId
NodesId []NodeId
RouteLists []RequestResult
UniqueWaitingCounter int64
UniqueRetryCounter int64
OriginatorIndex int64
TimeStep int64
Epoch int
Graph *Graph
Originators []NodeId
NodesId []NodeId
RouteLists []RequestResult
OriginatorIndex int64
TimeStep int64
Epoch int
}

func (s *State) GetOriginatorId(originatorIndex int) NodeId {
Expand Down
52 changes: 21 additions & 31 deletions model/parts/update/update_pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,33 @@ package update
import (
"go-incentive-simulation/config"
"go-incentive-simulation/model/parts/types"
"sync/atomic"
)

func Pending(state *types.State, requestResult types.RequestResult, curEpoch int) int64 {
var waitingCounter int64 = 0
if config.IsWaitingEnabled() {
route := requestResult.Route
chunkId := requestResult.ChunkId
originatorId := route[0]
originator := state.Graph.GetNode(originatorId)
isNewChunk := false
func Pending(state *types.State, requestResult types.RequestResult, curEpoch int) {
if !config.IsWaitingEnabled() {
return
}

if config.IsRetryWithAnotherPeer() {
if requestResult.ThresholdFailed || requestResult.AccessFailed {
isNewChunk = originator.PendingStruct.AddPendingChunkId(chunkId, curEpoch)
} else if requestResult.Found {
if len(originator.PendingStruct.PendingQueue) > 0 {
originator.PendingStruct.DeletePendingChunkId(chunkId)
}
}
route := requestResult.Route
chunkId := requestResult.ChunkId
originatorId := route[0]
originator := state.Graph.GetNode(originatorId)

} else {
if requestResult.ThresholdFailed {
isNewChunk = originator.PendingStruct.AddPendingChunkId(chunkId, curEpoch)
} else if requestResult.Found || requestResult.AccessFailed {
if len(originator.PendingStruct.PendingQueue) > 0 {
originator.PendingStruct.DeletePendingChunkId(chunkId)
}
if config.IsRetryWithAnotherPeer() {
if requestResult.ThresholdFailed || requestResult.AccessFailed {
originator.PendingStruct.AddPendingChunkId(chunkId, curEpoch)
} else if requestResult.Found {
if len(originator.PendingStruct.PendingQueue) > 0 {
originator.PendingStruct.DeletePendingChunkId(chunkId)
}
}

if isNewChunk {
waitingCounter = atomic.AddInt64(&state.UniqueWaitingCounter, 1)
} else {
waitingCounter = atomic.LoadInt64(&state.UniqueWaitingCounter)
} else {
if requestResult.ThresholdFailed {
originator.PendingStruct.AddPendingChunkId(chunkId, curEpoch)
} else if requestResult.Found || requestResult.AccessFailed {
if len(originator.PendingStruct.PendingQueue) > 0 {
originator.PendingStruct.DeletePendingChunkId(chunkId)
}
}
}

return waitingCounter
}
62 changes: 25 additions & 37 deletions model/parts/update/update_reroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,34 @@ import (
"go-incentive-simulation/config"
"go-incentive-simulation/model/general"
"go-incentive-simulation/model/parts/types"
"sync/atomic"
)

func Reroute(state *types.State, requestResult types.RequestResult, curEpoch int) int64 {
var retryCounter int64 = 0
if config.IsRetryWithAnotherPeer() {

route := requestResult.Route
chunkId := requestResult.ChunkId
originatorId := route[0]
originator := state.Graph.GetNode(originatorId)
reroute := originator.RerouteStruct.Reroute // reroute = rejected nodes + chunk

if requestResult.Found {
if reroute.RejectedNodes != nil {
if reroute.ChunkId == chunkId { // If chunkId == chunkId --> reset reroute
originator.RerouteStruct.ResetRerouteAndSaveToHistory(chunkId, curEpoch)
}
}

} else if len(route) > 1 { // Rejection in second hop --> route have at least an originator and a lastHopNode
lastHopNode := route[len(route)-1]
if reroute.RejectedNodes == nil {
reroute = originator.RerouteStruct.AddNewReroute(requestResult.AccessFailed, lastHopNode, chunkId, curEpoch)
retryCounter = atomic.AddInt64(&state.UniqueRetryCounter, 1)
} else {
if !general.Contains(reroute.RejectedNodes, lastHopNode) { // if the last hop in new route have not been searched before
originator.RerouteStruct.AddNodeToRejectedNodes(requestResult.AccessFailed, lastHopNode, curEpoch)
}
}
}

if retryCounter == 0 {
retryCounter = atomic.LoadInt64(&state.UniqueRetryCounter)
}
func Reroute(state *types.State, requestResult types.RequestResult, curEpoch int) {
if !config.IsRetryWithAnotherPeer() {
return
}

if len(reroute.RejectedNodes) > config.GetBinSize() {
originator.RerouteStruct.ResetRerouteAndSaveToHistory(chunkId, curEpoch)
route := requestResult.Route
chunkId := requestResult.ChunkId
originatorId := route[0]
originator := state.Graph.GetNode(originatorId)
reroute := originator.RerouteStruct.Reroute // reroute = rejected nodes + chunk

// If the request was successful and the chunk is in the current reroute, reset the reroute
if requestResult.Found && reroute.RejectedNodes != nil && reroute.ChunkId == chunkId {
originator.RerouteStruct.ResetRerouteAndSaveToHistory(chunkId, curEpoch)
} else if len(route) > 1 { // Rejection in second hop -> route has at least an originator and a lastHopNode
lastHopNode := route[len(route)-1]
if reroute.RejectedNodes == nil {
// Create a new reroute if it doesn't exist
reroute = originator.RerouteStruct.AddNewReroute(requestResult.AccessFailed, lastHopNode, chunkId, curEpoch)
} else if !general.Contains(reroute.RejectedNodes, lastHopNode) {
// Add the last hop node to rejected nodes if it hasn't been searched before
originator.RerouteStruct.AddNodeToRejectedNodes(requestResult.AccessFailed, lastHopNode, curEpoch)
}

}
return retryCounter
// If the reroute exceeds the bin size, reset the reroute
if len(reroute.RejectedNodes) > config.GetBinSize() {
originator.RerouteStruct.ResetRerouteAndSaveToHistory(chunkId, curEpoch)
}
}
18 changes: 13 additions & 5 deletions model/parts/workers/request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
)

func RequestWorker(pauseChan chan bool, continueChan chan bool, requestChan chan types.Request, globalState *types.State, wg *sync.WaitGroup) {

defer wg.Done()
var requestQueueSize = 10
var counter = 0
Expand Down Expand Up @@ -38,22 +37,26 @@ func RequestWorker(pauseChan chan bool, continueChan chan bool, requestChan chan

// Needed for checks waiting and retry
chunkId = -1

isRetriedChunk := false
rejectedNodesLength := 0
if config.IsRetryWithAnotherPeer() {
rerouteStruct := originator.RerouteStruct

if len(rerouteStruct.Reroute.RejectedNodes) > 0 {
rejectedNodesLength = len(rerouteStruct.Reroute.RejectedNodes)
if rejectedNodesLength > 0 {
chunkId = rerouteStruct.Reroute.ChunkId
isRetriedChunk = true
// fmt.Printf("Rejected Nodes: %v, originatorId: %v, chunkId: %v\n", rerouteStruct.Reroute.RejectedNodes, originatorId, chunkId)
}
}

isFromQueuedChunks := false
if config.IsWaitingEnabled() && chunkId == -1 { // No valid chunkId in reroute
pendingStruct := originator.PendingStruct

if pendingStruct.PendingQueue != nil {
queuedChunk, ok := pendingStruct.GetChunkFromQueue(curEpoch)
if ok {
chunkId = queuedChunk.ChunkId
isFromQueuedChunks = true
}
}
}
Expand Down Expand Up @@ -81,10 +84,15 @@ func RequestWorker(pauseChan chan bool, continueChan chan bool, requestChan chan
OriginatorIndex: originatorIndex,
OriginatorId: originatorId,
ChunkId: chunkId,
IsRetry: isRetriedChunk,
RetryIteration: rejectedNodesLength + 1,
IsWaited: isFromQueuedChunks,
}
requestChan <- request
}

//fmt.Printf("timestep: %v, epoch: %v, OriginatorIndex: %v, OriginatorId: %v, chunkId: %v, IsRetry: %v, RetryIteration: %v, IsWaited: %v\n", timeStep, curEpoch, originatorIndex, originatorId, chunkId, isRetriedChunk, rejectedNodesLength+1, isFromQueuedChunks)

if config.TimeForDebugPrints(timeStep) {
fmt.Println("TimeStep is currently:", timeStep)
}
Expand Down
1 change: 1 addition & 0 deletions model/routing/routing_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func RoutingWorker(pauseChan chan bool, continueChan chan bool, requestChan chan
output.ThresholdFailed = thresholdFailed
output.AccessFailed = accessFailed
output.FoundByCaching = foundByCaching
output.NumRetries = request.RetryIteration
outputChan <- output
}
}
Expand Down
16 changes: 7 additions & 9 deletions model/state/initial_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,13 @@ func MakeInitialState(path string) types.State {
//cacheStruct := types.CacheStruct{CacheHits: 0, CacheMap: make(types.CacheMap), CacheMutex: &sync.Mutex{}}

initialState := types.State{
Graph: graph,
Originators: utils.CreateDownloadersList(graph),
NodesId: utils.CreateNodesList(graph),
RouteLists: make([]types.RequestResult, 10000),
UniqueWaitingCounter: 0,
UniqueRetryCounter: 0,
OriginatorIndex: 0,
TimeStep: 0,
Epoch: 0,
Graph: graph,
Originators: utils.CreateDownloadersList(graph),
NodesId: utils.CreateNodesList(graph),
RouteLists: make([]types.RequestResult, 10000),
OriginatorIndex: 0,
TimeStep: 0,
Epoch: 0,
}
return initialState
}