admin管理员组文章数量:1612059
CapacityTaskScheduler
链接:
http://hadoop.apache/common/docs/r0.19.2/capacity_scheduler.html
http://hadoop.apache/common/docs/r0.20.2/capacity_scheduler.html
https://issues.apache/jira/browse/HADOOP-3445
特性:
支持多个队列,每个job 只会被提交到一个队列上。
每个队列被分配了集群容量的一部分容量。
空余的容量被分配给超过其容量的任何队列。
队列内的作业支持优先级。
每个队列强制地分配给每个用户受限制的容量。
在分配任务给 TT 时会考虑到 JOB 的内存要求以及在 TT 结点是 RAM 和 VM 的情况。而 TT 中都是可以通过 mapred.child.ulimit 参数来设置 child 进程的 VM 大小。
start()
// initialize our queues from the config settings
CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
try {
initializeQueues(queueManager.getRoot().getJobQueueInfo().getChildren(),
schedConf, false );
} catch (Throwable e) {
LOG .error( "Couldn't initialize queues because of the excecption : "
+ StringUtils.stringifyException (e));
throw new IOException(e);
}
// Queues are ready. Now register jobQueuesManager with the JobTracker so as
// to listen to job changes
taskTrackerManager .addJobInProgressListener( jobQueuesManager );
//Start thread for initialization
if ( initializationPoller == null ) {
this . initializationPoller = new JobInitializationPoller(
jobQueuesManager , taskTrackerManager );
}
initializationPoller .init( jobQueuesManager .getJobQueueNames(), schedConf);
initializationPoller .setDaemon( true );
initializationPoller .start();
assignTasks
首先根据TT 信息得到TT 和集群当前的MR 运行数目以及空余数。
ClusterStatus c = taskTrackerManager .getClusterStatus();
int mapClusterCapacity = c.getMaxMapTasks();
int reduceClusterCapacity = c.getMaxReduceTasks();
int maxMapSlots = taskTrackerStatus.getMaxMapSlots();
int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
int currentReduceSlots = taskTrackerStatus. countOccupiedReduceSlots ();
LOG .debug( "TT asking for task, max maps="
更新集群中的MR 的相关调度环境信息。
updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
当TT 中有空余的R 的Slots 时,调用R 的调度器分配tasks 。
if (maxReduceSlots > currentReduceSlots) {
//reduce slot available , try to get a
//reduce task
tlr = reduceScheduler .assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus. TASK_FOUND ==
tlr.getLookUpStatus()) {
result.add(tlr.getTask());
}
}
同样当TT 中的M 有空余slots 时,调用M 调度器分配M 的tasks 。
if (maxMapSlots > currentMapSlots) {
//map slot available , try to get a map task
tlr = mapScheduler .assignTasks(taskTracker);
if (TaskLookupResult.LookUpStatus. TASK_FOUND ==
tlr.getLookUpStatus()) {
result.add(tlr.getTask());
}
}
updateContextObjec ts
TaskSchedulingMgr
该类是调度处理算法,也是最为重要的类。它对于M ,R 都是适用的算法。
把调度算法抽象出来了,实现类有M ,R 二种子类。
assignTasks()
该方法才是真正为TT 分配tasks 的方法。
找到该TT 正在休闲的job ,如果有job, 则拿到job 需要的slots 数。
JobInProgress job = taskTracker.getJobForFallowSlot( type );
int availableSlots = taskTracker.getAvailableSlots( type );
如果当前的空闲slots 数大于一个task 需要的slots 数,则可以为该job 分配task 。
如果TT 没有被分配的JOB ,则需要从多个队列中选择一个job 。
首先,对队列进行排序,依次按顺序去遍历队列,排序算法为QueueComparator 类。
然后,得到队列的调度信息,如果队列的容量为0 ,则跳过。
判断当前的队列的当前占用的slots 数+ 每个task 分配的slots 数 > 最大容量。
如果是,则跳过。
如果上面的条件都没有成立,则从当前队列中分配task ,同时返回分配task 状态。有找到task, 没有找到task ,由于memory 不满足而失败。
for (AbstractQueue q : getOrderedJobQueues()) { QueueSchedulingContext qsc = q.getQueueSchedulingContext(); // we may have queues with capacity=0. We shouldn't look at jobs from // these queues if (0 == getTSC(qsc).getCapacity()) { continue ; }
//This call is important for optimization purposes , if we //have reached the limit already no need for traversing the queue. if ( this .areTasksInQueueOverMaxCapacity(qsc,1)) { continue ; }
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc); TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
if (lookUpStatus == TaskLookupResult.LookUpStatus. NO_TASK_FOUND ) { continue ; // Look in other queues. }
// if we find a task, return if (lookUpStatus == TaskLookupResult.LookUpStatus. TASK_FOUND ) { return tlr; } // if there was a memory mismatch, return else if (lookUpStatus == TaskLookupResult.LookUpStatus. TASK_FAILING_MEMORY_REQUIREMENT ) { return tlr; } } |
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
如何从队列中取到task 呢?
遍历队列正运行的job 。
for (JobInProgress j :
scheduler . jobQueuesManager .getJobQueue(qsi.getQueueName())
.getRunningJobs()) {
同样判断队列是否超过最大容量,如果是,跳过该job 。
判断job 的用户是不是超过限制,如果是,跳过该job 。
再判断当前的memory 是否满足该job 的M,R 。如果满足则从该job 中分配task 。
如果不满足,则返回失败信息。
经过上述条件,如果没有找到job 的话,则去掉用户限制,再次遍历队列,找到相应的job 。
而真正的从job 中分配task 的方法将分别由实现类MapSchedulingMgr 和ReduceSchedulingMgr 来完成。
MapSchedulingMgr
Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
throws IOException {
synchronized ( scheduler ) {
ClusterStatus clusterStatus = scheduler . taskTrackerManager
.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
return job.obtainNewMapTask(taskTracker, numTaskTrackers,
scheduler . taskTrackerManager .getNumberOfUniqueHosts());
}
}
就是直接调用job 的获取新的task 接口。
ReduceSchedulingMgr 跟MapSchedulingMgr 类似。
QueueSchedulingContext
为每个队列记录调度信息。
维持跟队列有关的调度信息,比如名字,容量,当前运行task 数等。
这些信息被用来决定怎么去分配task ,重装分布容量等。
private static abstract class QueueComparator
implements Comparator<AbstractQueue> {
abstract TaskSchedulingContext getTSC(
QueueSchedulingContext qsi);
public int compare(AbstractQueue q1, AbstractQueue q2) {
TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());
TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());
// look at how much capacity they've filled. Treat a queue with
// capacity=0 equivalent to a queue running at capacity
double r1 = (0 == t1.getCapacity())? 1.0f:
( double ) t1.getNumSlotsOccupied() /( double ) t1.getCapacity();
double r2 = (0 == t2.getCapacity())? 1.0f:
( double ) t2.getNumSlotsOccupied() /( double ) t2.getCapacity();
if (r1<r2) return -1;
else if (r1>r2) return 1;
else return 0;
}
}
DynamicPriorityScheduler
允许用户不断地增加或更改他们的队列优先顺序来满足他们当前负载的需要。
根据当前的需求,自动的调节优先级。即在运行过程中去计算队列优先级。
即可以根据当前的集群以及队列的开销,来按照这个开销比例来动态的为各个用户队列分配等比例的task 。
定时更新开销,根据以往各个队列的开销来为不同的队列分配不同数量的task 。
总结:
CapacityTaskScheduler 会为每个队列,用户给定一个最大容量,当超过最大容量时,则不给该队列再分配任务,否则会分配给队列或用户任务。再把多余的资源分配给超过最大容量的队列或用户。同时该调度还会判断 TT 中进程的内存是否满足任务所需要的内存,来作为任务分配的一个条件。
DynamicPriorityScheduler 会保存好每个队列的开销和预算,每隔一段时间进行更新,分配任务时根据队列的开销和预算来分配,如果超过预算会杀死部分任务。
对于任务调试器主要类有 : TaskScheduler 以及各种队列,任务排序算法等。
方法有:start(),assignTasks(TaskTracker) 等。另外还有几个任务监听类JobInProgress 。
大致把调试器的源码过了一遍,接下来要看集群监控系统,以及hdfs的相关代码。
本文标签: Capacityschedulerdynamic
版权声明:本文标题:Capacity Scheduler and Dynamic Scheduler 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:https://m.elefans.com/xitong/1728621854a1166455.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论