Skip to content

Commit 0eac878

Browse files
committed
[YUNIKORN-3276] Remove code: Clean up context in shim repo (#1025)
Closes: #1025 Signed-off-by: Manikandan R <manirajv06@gmail.com>
1 parent 1962e80 commit 0eac878

3 files changed

Lines changed: 0 additions & 214 deletions

File tree

pkg/cache/context.go

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,11 @@ type Context struct {
6969
schedulerCache *schedulercache.SchedulerCache // external cache
7070
apiProvider client.APIProvider // apis to interact with api-server, scheduler-core, etc
7171
predManager predicates.PredicateManager // K8s predicates
72-
pluginMode bool // true if we are configured as a scheduler plugin
7372
namespace string // yunikorn namespace
7473
configMaps []*v1.ConfigMap // cached yunikorn configmaps
7574
lock *locking.RWMutex // lock - used not only for context data but also to ensure that multiple event types are not executed concurrently
7675
txnID atomic.Uint64 // transaction ID counter
7776
klogger klog.Logger
78-
podActivator atomic.Value
7977
}
8078

8179
// NewContext create a new context for the scheduler using a default (empty) configuration
@@ -112,18 +110,6 @@ func NewContextWithBootstrapConfigMaps(apis client.APIProvider, bootstrapConfigM
112110
return ctx
113111
}
114112

115-
// SetPodActivator is used by the plugin mode to add a callback function to reschedule a pod
116-
func (ctx *Context) SetPodActivator(podActivator func(logger klog.Logger, pod *v1.Pod)) {
117-
ctx.podActivator.Store(podActivator)
118-
}
119-
120-
// ActivatePod is used to tell Kubernetes to re-schedule a pod when using plugin mode
121-
func (ctx *Context) ActivatePod(pod *v1.Pod) {
122-
if activator, ok := ctx.podActivator.Load().(func(logger klog.Logger, pod *v1.Pod)); ok && activator != nil {
123-
activator(ctx.klogger, pod)
124-
}
125-
}
126-
127113
func (ctx *Context) AddSchedulingEventHandlers() error {
128114
err := ctx.apiProvider.AddEventHandler(&client.ResourceEventHandlers{
129115
Type: client.ConfigMapInformerHandlers,
@@ -170,10 +156,6 @@ func (ctx *Context) AddSchedulingEventHandlers() error {
170156
return nil
171157
}
172158

173-
func (ctx *Context) IsPluginMode() bool {
174-
return ctx.pluginMode
175-
}
176-
177159
func (ctx *Context) addNode(obj interface{}) {
178160
ctx.updateNode(nil, obj)
179161
}
@@ -890,28 +872,6 @@ func (ctx *Context) IsTaskMaybeSchedulable(taskID string) bool {
890872
return ctx.schedulerCache.IsTaskMaybeSchedulable(taskID)
891873
}
892874

893-
func (ctx *Context) AddPendingPodAllocation(podKey string, nodeID string) {
894-
ctx.schedulerCache.AddPendingPodAllocation(podKey, nodeID)
895-
}
896-
897-
func (ctx *Context) RemovePodAllocation(podKey string) {
898-
ctx.schedulerCache.RemovePodAllocation(podKey)
899-
}
900-
901-
func (ctx *Context) GetPendingPodAllocation(podKey string) (nodeID string, ok bool) {
902-
nodeID, ok = ctx.schedulerCache.GetPendingPodAllocation(podKey)
903-
return nodeID, ok
904-
}
905-
906-
func (ctx *Context) GetInProgressPodAllocation(podKey string) (nodeID string, ok bool) {
907-
nodeID, ok = ctx.schedulerCache.GetInProgressPodAllocation(podKey)
908-
return nodeID, ok
909-
}
910-
911-
func (ctx *Context) StartPodAllocation(podKey string, nodeID string) bool {
912-
return ctx.schedulerCache.StartPodAllocation(podKey, nodeID)
913-
}
914-
915875
func (ctx *Context) notifyTaskComplete(app *Application, taskID string) {
916876
if app == nil {
917877
log.Log(log.ShimContext).Debug("In notifyTaskComplete but app is nil",

pkg/cache/context_test.go

Lines changed: 0 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,121 +1648,6 @@ func TestAddApplicationsWithTags(t *testing.T) {
16481648
}
16491649
}
16501650

1651-
func TestPendingPodAllocations(t *testing.T) {
1652-
utils.SetPluginMode(true)
1653-
defer utils.SetPluginMode(false)
1654-
1655-
context, apiProvider := initContextAndAPIProviderForTest()
1656-
dispatcher.Start()
1657-
defer dispatcher.UnregisterAllEventHandlers()
1658-
defer dispatcher.Stop()
1659-
1660-
apiProvider.MockSchedulerAPIUpdateNodeFn(func(request *si.NodeRequest) error {
1661-
for _, node := range request.Nodes {
1662-
dispatcher.Dispatch(CachedSchedulerNodeEvent{
1663-
NodeID: node.NodeID,
1664-
Event: NodeAccepted,
1665-
})
1666-
}
1667-
return nil
1668-
})
1669-
1670-
node1 := v1.Node{
1671-
ObjectMeta: apis.ObjectMeta{
1672-
Name: Host1,
1673-
Namespace: "default",
1674-
UID: uid1,
1675-
},
1676-
}
1677-
context.addNode(&node1)
1678-
1679-
node2 := v1.Node{
1680-
ObjectMeta: apis.ObjectMeta{
1681-
Name: Host2,
1682-
Namespace: "default",
1683-
UID: uid2,
1684-
},
1685-
}
1686-
context.addNode(&node2)
1687-
1688-
// add a new application
1689-
context.AddApplication(&AddApplicationRequest{
1690-
Metadata: ApplicationMetadata{
1691-
ApplicationID: appID1,
1692-
QueueName: queueNameA,
1693-
User: testUser,
1694-
Tags: nil,
1695-
},
1696-
})
1697-
1698-
pod := &v1.Pod{
1699-
TypeMeta: apis.TypeMeta{
1700-
Kind: "Pod",
1701-
APIVersion: "v1",
1702-
},
1703-
ObjectMeta: apis.ObjectMeta{
1704-
Name: taskUID1,
1705-
UID: uid1,
1706-
},
1707-
}
1708-
1709-
// add a tasks to the existing application
1710-
task := context.AddTask(&AddTaskRequest{
1711-
Metadata: TaskMetadata{
1712-
ApplicationID: appID1,
1713-
TaskID: taskUID1,
1714-
Pod: pod,
1715-
},
1716-
})
1717-
assert.Assert(t, task != nil, "task was nil")
1718-
1719-
// add the allocation
1720-
context.AddPendingPodAllocation(uid1, Host1)
1721-
1722-
// validate that the pending allocation matches
1723-
nodeID, ok := context.GetPendingPodAllocation(uid1)
1724-
if !ok {
1725-
t.Fatalf("no pending pod allocation found")
1726-
}
1727-
assert.Equal(t, nodeID, Host1, "wrong host")
1728-
1729-
// validate that there is not an in-progress allocation
1730-
if _, ok = context.GetInProgressPodAllocation(uid1); ok {
1731-
t.Fatalf("in-progress allocation exists when it should be pending")
1732-
}
1733-
1734-
if context.StartPodAllocation(uid1, Host2) {
1735-
t.Fatalf("attempt to start pod allocation on wrong node succeeded")
1736-
}
1737-
1738-
if !context.StartPodAllocation(uid1, Host1) {
1739-
t.Fatalf("attempt to start pod allocation on correct node failed")
1740-
}
1741-
1742-
if _, ok = context.GetPendingPodAllocation(uid1); ok {
1743-
t.Fatalf("pending pod allocation still exists after transition to in-progress")
1744-
}
1745-
1746-
nodeID, ok = context.GetInProgressPodAllocation(uid1)
1747-
if !ok {
1748-
t.Fatalf("in-progress allocation does not exist")
1749-
}
1750-
assert.Equal(t, nodeID, Host1, "wrong host")
1751-
1752-
context.RemovePodAllocation(uid1)
1753-
if _, ok = context.GetInProgressPodAllocation(uid1); ok {
1754-
t.Fatalf("in-progress pod allocation still exists after removal")
1755-
}
1756-
1757-
// re-add to validate pending pod removal
1758-
context.AddPendingPodAllocation(uid1, Host1)
1759-
context.RemovePodAllocation(uid1)
1760-
1761-
if _, ok = context.GetPendingPodAllocation(uid1); ok {
1762-
t.Fatalf("pending pod allocation still exists after removal")
1763-
}
1764-
}
1765-
17661651
func TestGetStateDump(t *testing.T) {
17671652
context := initContextForTest()
17681653

pkg/cache/external/scheduler_cache.go

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -309,65 +309,6 @@ func (cache *SchedulerCache) IsTaskMaybeSchedulable(taskID string) bool {
309309
return cache.taskBloomFilterRef.Load().isTaskMaybePresent(taskID)
310310
}
311311

312-
// AddPendingPodAllocation is used to add a new pod -> node mapping to the cache when running in scheduler plugin mode.
313-
// This function is called (in plugin mode) after a task is allocated by the YuniKorn scheduler.
314-
func (cache *SchedulerCache) AddPendingPodAllocation(podKey string, nodeID string) {
315-
cache.lock.Lock()
316-
defer cache.lock.Unlock()
317-
cache.dumpState("AddPendingPodAllocation.Pre")
318-
defer cache.dumpState("AddPendingPodAllocation.Post")
319-
delete(cache.inProgressAllocations, podKey)
320-
cache.pendingAllocations[podKey] = nodeID
321-
}
322-
323-
// RemovePodAllocation is used to remove a pod -> node mapping from the cache when running in scheduler plugin
324-
// mode. It removes both pending and in-progress allocations. This function is called (via cache) from the scheduler
325-
// plugin in PreFilter() if a previous allocation was found, and in PostBind() to cleanup the allocation since it is no
326-
// longer relevant.
327-
func (cache *SchedulerCache) RemovePodAllocation(podKey string) {
328-
cache.lock.Lock()
329-
defer cache.lock.Unlock()
330-
cache.dumpState("RemovePendingPodAllocation.Pre")
331-
defer cache.dumpState("RemovePendingPodAllocation.Post")
332-
delete(cache.pendingAllocations, podKey)
333-
delete(cache.inProgressAllocations, podKey)
334-
}
335-
336-
// GetPendingPodAllocation is used in scheduler plugin mode to retrieve a pending pod allocation. A pending
337-
// allocation is one which has been decided upon by YuniKorn but has not yet been communicated to the default scheduler.
338-
func (cache *SchedulerCache) GetPendingPodAllocation(podKey string) (nodeID string, ok bool) {
339-
cache.lock.RLock()
340-
defer cache.lock.RUnlock()
341-
res, ok := cache.pendingAllocations[podKey]
342-
return res, ok
343-
}
344-
345-
// GetInProgressPodAllocation is used in scheduler plugin mode to retrieve an in-progress pod allocation. An in-progress
346-
// allocation is one which has been communicated to the default scheduler, but has not yet been bound.
347-
func (cache *SchedulerCache) GetInProgressPodAllocation(podKey string) (nodeID string, ok bool) {
348-
cache.lock.RLock()
349-
defer cache.lock.RUnlock()
350-
res, ok := cache.inProgressAllocations[podKey]
351-
return res, ok
352-
}
353-
354-
// StartPodAllocation is used in scheduler plugin mode to transition a pod allocation from pending to in-progress. If
355-
// the given pod has a pending allocation on the given node, the allocation is marked as in-progress and this function
356-
// returns true. If the pod is not pending or is pending on another node, this function does nothing and returns false.
357-
func (cache *SchedulerCache) StartPodAllocation(podKey string, nodeID string) bool {
358-
cache.lock.Lock()
359-
defer cache.lock.Unlock()
360-
cache.dumpState("StartPendingPodAllocation.Pre")
361-
defer cache.dumpState("StartPendingPodAllocation.Post")
362-
expectedNodeID, ok := cache.pendingAllocations[podKey]
363-
if ok && expectedNodeID == nodeID {
364-
delete(cache.pendingAllocations, podKey)
365-
cache.inProgressAllocations[podKey] = nodeID
366-
return true
367-
}
368-
return false
369-
}
370-
371312
// IsAssumedPod returns if pod is assumed in cache, avoid nil
372313
func (cache *SchedulerCache) IsAssumedPod(podKey string) bool {
373314
cache.lock.RLock()

0 commit comments

Comments
 (0)