besteffort_policy.go 3.42 KB
Newer Older
songlinfeng's avatar
songlinfeng committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package allocator

import (
	"fmt"
	"github.com/golang/glog"
	"math"
)

const (
	invalidSize         = "allocation size can not be negative"
	invalidAvailable    = "available devices count less than allocation size"
	invalidRequired     = "must_include devices size is more than allocation size"
	invalidReqAvailable = "must_include length should be less than or equal to avilable device size"
	invalidInit         = "Init method must be called before Allocate"
	noCandidateFound    = "No candidate subset found with matching criteria"
)

type BestEffortPolicy struct {
	devices          []*Device
	devicesMap       map[string]*Device
	devicePartitions map[string]*DevicePartitions
	p2pWeights       map[int]map[int]int
}

func NewBestEffortPolicy() *BestEffortPolicy {
	return &BestEffortPolicy{
		devices:          make([]*Device, 0),
		devicesMap:       make(map[string]*Device),
		devicePartitions: make(map[string]*DevicePartitions),
		p2pWeights:       make(map[int]map[int]int),
	}
}

func (b *BestEffortPolicy) getDevicesFromIds(ids []string) []*Device {
	var res []*Device
	for _, id := range ids {
		res = append(res, b.devicesMap[id])
	}
	return res
}

func (b *BestEffortPolicy) Init(devs []*Device, topoDir string) error {
	err := fetchAllPairWeights(devs, b.p2pWeights, topoDir)
	if len(b.p2pWeights) == 0 {
		return fmt.Errorf("Besteffort Policy init failed to initialize p2pWeights")
	}
	if err == nil {
		b.devices = devs
		for idx := range devs {
			b.devicesMap[devs[idx].Id] = devs[idx]
		}
		b.devicePartitions = groupPartitionsByDevId(devs)
		for _, par := range b.devicePartitions {
			glog.Infof("Device: %s Partitions: %v", par.ParentId, par.Devs)
		}
	}
	return err
}

func setContainsAll[K int | string](set, subset []K) bool {
	if len(subset) > len(set) {
		return false
	}
	for _, dev := range subset {
		devFound := false
		for i := range set {
			if set[i] == dev {
				devFound = true
				break
			}
		}
		if !devFound {
			return false
		}
	}
	return true
}

func (b *BestEffortPolicy) Allocate(availableIds, requiredIds []string, size int) ([]string, error) {
	outset := []string{}
	if size <= 0 {
		return outset, fmt.Errorf(invalidSize)
	}

	if len(availableIds) < size {
		return outset, fmt.Errorf(invalidAvailable)
	}

	if len(requiredIds) > size {
		return outset, fmt.Errorf(invalidRequired)
	}

	if len(requiredIds) > len(availableIds) {
		return outset, fmt.Errorf(invalidReqAvailable)
	}

	if len(b.devices) == 0 {
		return outset, fmt.Errorf(invalidInit)
	}

	if len(availableIds) == size {
		return availableIds, nil
	}

	if len(requiredIds) == size {
		return requiredIds, nil
	}

	if len(b.p2pWeights) == 0 {
		return outset, fmt.Errorf(invalidInit)
	}

	if !setContainsAll(availableIds, requiredIds) {
		return outset, fmt.Errorf(noCandidateFound)
	}

	available := b.getDevicesFromIds(availableIds)
	required := b.getDevicesFromIds(requiredIds)
	allSubsets, err := getCandidateDeviceSubsets(b.devicePartitions, b.devices, available, required, size, b.p2pWeights)
	if err != nil {
		return outset, err
	}

	bestScore := math.MaxInt32
	var candidate *DeviceSet
	for _, subset := range allSubsets {
		if subset.TotalWeight < bestScore {
			candidate = subset
			bestScore = subset.TotalWeight
		}
	}
	for _, id := range candidate.Ids {
		for _, d := range available {
			if d.NodeId == id {
				outset = append(outset, d.Id)
				break
			}
		}
	}
	glog.Infof("best device subset:%v best score:%v", outset, candidate.TotalWeight)
	return outset, nil
}