当前位置: 首页 > news >正文

上海市城乡建设和管理委员会门户网站淘宝指数查询官网手机版

上海市城乡建设和管理委员会门户网站,淘宝指数查询官网手机版,博学云网站建设,wordpress css 无效引言 随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。 在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进…

引言

随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,介绍 Master 启动以及调度流程。

通过这些分析,开发者可以更好地理解 DolphinScheduler 的工作机制,并在实际使用中更高效地进行二次开发或优化。

Master Server启动

启动流程图

Master调度工作流流程图

MasterServer启动方法

public void run() {// init remoting serverNettyServerConfig serverConfig = new NettyServerConfig();serverConfig.setListenPort(masterConfig.getListenPort());this.nettyRemotingServer = new NettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());this.nettyRemotingServer.start();// self tolerantthis.zkMasterClient.start();this.zkMasterClient.setStoppable(this);// scheduler startthis.masterSchedulerService.start();// start QuartzExecutors// what system should do if exceptiontry {logger.info("start Quartz server...");QuartzExecutors.getInstance().start();} catch (Exception e) {try {QuartzExecutors.getInstance().shutdown();} catch (SchedulerException e1) {logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);}logger.error("start Quartz failed", e);}/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");}}));}
  • nettyServer会注册三种Command
  1. TASK_EXECUTE_ACK:Worker在接收到Master执行任务的请求后,会给Master发送一条Ack Command,告诉Master已经开始执行Task了。
  2. TASK_EXECUTE_RESPONSE:Worker在执行完Task之后,会给Master发送一条Response Command,告诉Master任务调度/执行结果。
  3. TASK_KILL_RESPONSE:Master接收到Task停止的请求会,会给Worker发送TASK_KILL_REQUEST Command,之后Worker会把Task_KILL_RESPONSE Command返回给Master。
  • 启动调度和定时器。
  • 添加ShutdownHook,关闭资源。

Master 配置文件

master.listen.port=5678# 限制Process Instance并发调度的线程数
master.exec.threads=100# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num=20# 每一批次可以分发的任务数
master.dispatch.task.num=3# master需要选择一个稳定的worker去执行任务
# 算法有:Random,RoundRobin,LowerWeight。默认是LowerWeight
master.host.selector=LowerWeight# master需要向Zookeeper发送心跳,单位:秒
master.heartbeat.interval=10# master提交任务失败,重试次数
master.task.commit.retryTimes=5# master提交任务失败,重试时间间隔
master.task.commit.interval=1000# master最大cpu平均负载,只有当系统cpu平均负载还没有达到这个值,master才能调度任务
# 默认值为-1,系统cpu核数 * 2
master.max.cpuload.avg=-1# master为其他进程保留内存,只有当系统可用内存大于这个值,master才能调度
# 默认值0.3G
master.reserved.memory=0.3

Master Scheduler启动

MasterSchedulerService初始化方法

public void init(){// masterConfig.getMasterExecThreads(),master.properties里master.exec.threads=100// 该线程池的核心线程数和最大线程数为100this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());NettyClientConfig clientConfig = new NettyClientConfig();this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}

MasterSchedulerService启动方法

public void run() {logger.info("master scheduler started");while (Stopper.isRunning()){try {// 这个方法是用来检查master cpu load和memory,判断master是否还有资源进行调度// 如果不能调度,Sleep 1 秒种boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());if(!runCheckFlag) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {// 这里才是真正去执行调度的方法scheduleProcess();}} catch (Exception e) {logger.error("master scheduler thread error", e);}}
}

MasterSchedulerService调度方法

private void scheduleProcess() throws Exception {InterProcessMutex mutex = null;try {// 阻塞式获取分布式锁mutex = zkMasterClient.blockAcquireMutex();// 获取线程池的活跃线程数int activeCount = masterExecService.getActiveCount();// make sure to scan and delete command  table in one transaction// 获取其中一个command,必须保证操作都在一个事务里Command command = processService.findOneCommand();if (command != null) {logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());try{// 获取ProcessInstance,// 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstanceProcessInstance processInstance = processService.handleCommand(logger,getLocalAddress(),this.masterConfig.getMasterExecThreads() - activeCount, command);if (processInstance != null) {logger.info("start master exec thread , split DAG ...");masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));}}catch (Exception e){logger.error("scan command error ", e);processService.moveToErrorCommand(command, e.toString());}} else{//indicate that no command ,sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);}} finally{// 释放分布式锁zkMasterClient.releaseMutex(mutex);}
}

ProcessService处理Command的方法

public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {// 这里是去构造ProcessInstanceProcessInstance processInstance = constructProcessInstance(command, host);//cannot construct process instance, return null;if(processInstance == null){logger.error("scan command, command parameter is error: {}", command);moveToErrorCommand(command, "process instance is null");return null;}// 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量// 如果检测不通过,process instance的状态变为wait thread.并且返回空的process instanceif(!checkThreadNum(command, validThreadNum)){logger.info("there is not enough thread for this command: {}", command);return setWaitingThreadProcess(command, processInstance);}processInstance.setCommandType(command.getCommandType());processInstance.addHistoryCmd(command.getCommandType());saveProcessInstance(processInstance);this.setSubProcessParam(processInstance);delCommandByid(command.getId());return processInstance;
}

MasterExecThread初始化方法

public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){this.processService = processService;this.processInstance = processInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);// master.properties文件里的master.task.exec.numint masterTaskExecNum = masterConfig.getMasterExecTaskNum();this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",masterTaskExecNum);this.nettyRemotingClient = nettyRemotingClient;
}

MasterExecThread启动方法

public void run() {// 省略...try {if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){// 补数逻辑... 暂不看executeComplementProcess();}else{// 执行task方法executeProcess();}}catch (Exception e){logger.error("master exec thread exception", e);logger.error("process execute failed, process id:{}", processInstance.getId());processInstance.setState(ExecutionStatus.FAILURE);processInstance.setEndTime(new Date());processService.updateProcessInstance(processInstance);}finally {taskExecService.shutdown();}
}private void executeProcess() throws Exception {// 前置prepareProcess();// 执行runProcess();// 后置endProcess();
}private void runProcess(){// 从根task开始提交submitPostNode(null);boolean sendTimeWarning = false;while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){// 省略部分代码...// 根据cpu load avg和Memorry判断是否可以调度if(canSubmitTaskToQueue()){submitStandByTask();}try {Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (InterruptedException e) {logger.error(e.getMessage(),e);}updateProcessInstanceState();}logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}// 获取可以并行的task
/**
* task 1 -> task 2 -> task3
* task 4 -> task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);List<TaskInstance> taskInstances = new ArrayList<>();for(String taskNode : submitTaskNodeList){taskInstances.add(createTaskInstance(processInstance, taskNode,dag.getNode(taskNode)));}// if previous node success , post node submitfor(TaskInstance task : taskInstances){if(readyToSubmitTaskQueue.contains(task)){continue;}if(completeTaskList.containsKey(task.getName())){logger.info("task {} has already run success", task.getName());continue;}if(task.getState().typeIsPause() || task.getState().typeIsCancel()){logger.info("task {} stopped, the state is {}", task.getName(), task.getState());}else{// task添加到priorityQueueaddTaskToStandByList(task);}}
}/*** handling the list of tasks to be submitted*/
private void submitStandByTask(){try {int length = readyToSubmitTaskQueue.size();for (int i=0;i<length;i++) {// 从队列里面取task, 提交给worker执行TaskInstance task = readyToSubmitTaskQueue.peek();// 先判断task的前置依赖有没有都运行成功,如果运行成功,在提交该task运行// 如果运行失败,或者没有执行,则不提交DependResult dependResult = getDependResultForTask(task);if(DependResult.SUCCESS == dependResult){if(retryTaskIntervalOverTime(task)){submitTaskExec(task);removeTaskFromStandbyList(task);}}else if(DependResult.FAILED == dependResult){// if the dependency fails, the current node is not submitted and the state changes to failure.dependFailedTask.put(task.getName(), task);removeTaskFromStandbyList(task);logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);} else if (DependResult.NON_EXEC == dependResult) {// for some reasons(depend task pause/stop) this task would not be submitremoveTaskFromStandbyList(task);logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);}}} catch (Exception e) {logger.error("submit standby task error",e);}
}/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {MasterBaseTaskExecThread abstractExecThread = null;if(taskInstance.isSubProcess()){abstractExecThread = new SubProcessTaskExecThread(taskInstance);}else if(taskInstance.isDependTask()){abstractExecThread = new DependentTaskExecThread(taskInstance);}else if(taskInstance.isConditionsTask()){abstractExecThread = new ConditionsTaskExecThread(taskInstance);}else {abstractExecThread = new MasterTaskExecThread(taskInstance);}Future<Boolean> future = taskExecService.submit(abstractExecThread);activeTaskNode.putIfAbsent(abstractExecThread, future);return abstractExecThread.getTaskInstance();
}

MasterBaseTaskExecThread

MasterBaseTaskExecThreadSubProcessTaskExecThreadDependentTaskExecThreadConditionsTaskExecThreadMasterTaskExecThread的父类,实现Callable接口。

  • SubProcessTaskExecThread

    任务实例不会下发到worker节点执行,在submitTask(TaskInstance taskInstance)方法中,针对子流程,会增加一条子流程实例命令,然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。

  • DependentTaskExecThread

    Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

  • ConditionsTaskExecThrea

    Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。

  • MasterTaskExecThread

    将任务实例下发到worker节点执行,并在waitTaskQuit方法中循环等待任务实例执行完成,任务完成后则即出。例如SQKL,Shell等任务类型。

MasterBaseTaskExecThread初始化方法

public MasterBaseTaskExecThread(TaskInstance taskInstance){this.processService = SpringApplicationContext.getBean(ProcessService.class);this.alertDao = SpringApplicationContext.getBean(AlertDao.class);this.cancel = false;this.taskInstance = taskInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);initTaskParams();
}

MasterBaseTaskExecThread执行方法

@Override
public Boolean call() throws Exception {this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());return submitWaitComplete(); // 由各子类实现
}

MasterBaseTaskExecThread公共方法

submit()

protected TaskInstance submit(){// 提交任务重试次数. master.task.commit.retryTimes=5Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();// 提交任务失败,重试间隔时间 master.task.commit.interval=1000Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();int retryTimes = 1;boolean submitDB = false;boolean submitTask = false;TaskInstance task = null;while (retryTimes <= commitRetryTimes){try {if(!submitDB){// 持久化TaskInstance到数据库task = processService.submitTask(taskInstance);if(task != null && task.getId() != 0){submitDB = true;}}if(submitDB && !submitTask){// 分发任务到Woroker执行submitTask = dispatchTask(task);}if(submitDB && submitTask){return task;}if(!submitDB){logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);}else if(!submitTask){logger.error("task commit  failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);}Thread.sleep(commitRetryInterval);} catch (Exception e) {logger.error("task commit to mysql and dispatcht task failed",e);}retryTimes += 1;}return task;
}

dispatchTask(TaskInstance task)

public Boolean dispatchTask(TaskInstance taskInstance) {try{// 如果是子流程,条件任务,依赖任务,直接返回true,不提交给worker执行if(taskInstance.isConditionsTask()|| taskInstance.isDependTask()|| taskInstance.isSubProcess()){return true;}if(taskInstance.getState().typeIsFinished()){logger.info(String.format("submit task , but task [%s] state [%s] is already  finished. ", taskInstance.getName(), taskInstance.getState().toString()));return true;}// task cannot submit when runningif(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));return true;}logger.info("task ready to submit: {}", taskInstance);/***  taskPriority*/TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),processInstance.getId(),taskInstance.getProcessInstancePriority().getCode(),taskInstance.getId(),org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);// 放入TaskPriorityQueue中,// org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费,从队列里取出TaskInstance,提交给Worker执行taskUpdateQueue.put(taskPriority);logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );return true;}catch (Exception e){logger.error("submit task  Exception: ", e);logger.error("task error : %s", JSONUtils.toJson(taskInstance));return false;}
}

MasterTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {Boolean result = false;// 提交任务this.taskInstance = submit();if(this.taskInstance == null){logger.error("submit task instance to mysql and queue failed , please check and fix it");return result;}if(!this.taskInstance.getState().typeIsFinished()) {// 等待任务执行结果result = waitTaskQuit();}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );return result;
}

waitTaskQuit()

public Boolean waitTaskQuit(){// query new statetaskInstance = processService.findTaskInstanceById(taskInstance.getId());logger.info("wait task: process id: {}, task id:{}, task name:{} complete",this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());while (Stopper.isRunning()){try {if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// task instance add queue , waiting worker to killif(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){cancelTaskInstance();}if(processInstance.getState() == ExecutionStatus.READY_PAUSE){pauseTask();}// task instance finishedif (taskInstance.getState().typeIsFinished()){// if task is final result , then remove taskInstance from cache// taskInstanceCacheManager其实现类为:org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl// taskInstance在触发ack和response Command会被添加到taskInstanceCache里taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());break;}if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();}// updateProcessInstance task instancetaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}

SubProcessTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {Boolean result = false;try{// submit task instancethis.taskInstance = submit();if(taskInstance == null){logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");return result;}setTaskInstanceState();waitTaskQuit();subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());// at the end of the subflow , the task state is changed to the subflow stateif(subProcessInstance != null){if(subProcessInstance.getState() == ExecutionStatus.STOP){this.taskInstance.setState(ExecutionStatus.KILL);}else{this.taskInstance.setState(subProcessInstance.getState());}}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );result = true;}catch (Exception e){logger.error("exception: ",e);if (null != taskInstance) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}return result;
}

waitTaskQuit()

private void waitTaskQuit() throws InterruptedException {logger.info("wait sub work flow: {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",this.taskInstance.getName(),this.taskInstance.getState(),this.processInstance.getState());return;}while (Stopper.isRunning()) {// waiting for subflow process instance establishmentif (subProcessInstance == null) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);if(!setTaskInstanceState()){continue;}}subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();handleTimeoutFailed();}updateParentProcessState();if (subProcessInstance.getState().typeIsFinished()){break;}if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){// parent process "ready to pause" , child process "pause"pauseSubProcess();}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){// parent Process "Ready to Cancel" , subflow "Cancel"stopSubProcess();}Thread.sleep(Constants.SLEEP_TIME_MILLIS);}
}

ConditionsTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {try{this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();logger.info("dependent task start");// 等待判断waitTaskQuit();// 更新最终依赖结果updateTaskState();}catch (Exception e){logger.error("conditions task run exception" , e);}return true;
}

waitTaskQuit

private void waitTaskQuit() {List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());for(TaskInstance task : taskInstances){completeTaskList.putIfAbsent(task.getName(), task.getState());}// 获取所有依赖结果List<DependResult> modelResultList = new ArrayList<>();for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){List<DependResult> itemDependResult = new ArrayList<>();for(DependentItem item : dependentTaskModel.getDependItemList()){itemDependResult.add(getDependResultForItem(item));}DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);modelResultList.add(modelResult);}// 根据逻辑运算符,合并依赖结果conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);logger.info("the conditions task depend result : {}", conditionResult);
}

DependentTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {try{logger.info("dependent task start");this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();initDependParameters();waitTaskQuit();updateTaskState();}catch (Exception e){logger.error("dependent task run exception" , e);}return true;
}

waitTaskQuit()

private Boolean waitTaskQuit() {logger.info("wait depend task : {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("task {} already complete. task state:{}",this.taskInstance.getName(),this.taskInstance.getState());return true;}while (Stopper.isRunning()) {try{if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// 省略部分代码// allDependentTaskFinish()等待所有依赖任务执行结束if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){break;}// update process tasktaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}

TaskPriorityQueueConsumer

@Override
public void run() {List<TaskPriority> failedDispatchTasks = new ArrayList<>();while (Stopper.isRunning()){try {// 每一批次分发任务数量,master.dispatch.task.num = 3int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();failedDispatchTasks.clear();for(int i = 0; i < fetchTaskNum; i++){if(taskPriorityQueue.size() <= 0){Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// if not task , blocking here// 从队列里面获取taskTaskPriority taskPriority = taskPriorityQueue.take();// 分发给worker执行boolean dispatchResult = dispatch(taskPriority);if(!dispatchResult){failedDispatchTasks.add(taskPriority);}}if (!failedDispatchTasks.isEmpty()) {// 分发失败的任务,需要重新加入队列中,等待重新分发for (TaskPriority dispatchFailedTask : failedDispatchTasks) {taskPriorityQueue.put(dispatchFailedTask);}// If there are tasks in a cycle that cannot find the worker group,// sleep for 1 secondif (taskPriorityQueue.size() <= failedDispatchTasks.size()) {TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);}}}catch (Exception e){logger.error("dispatcher task error",e);}}
}/*** dispatch task** @param taskPriority taskPriority* @return result*/
protected boolean dispatch(TaskPriority taskPriority) {boolean result = false;try {int taskInstanceId = taskPriority.getTaskId();TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());if (taskInstanceIsFinalState(taskInstanceId)){// when task finish, ignore this task, there is no need to dispatch anymorereturn true;}else{// 分发任务// 分发算法支持:低负载优先算法,随机算法, 轮询算法。result = dispatcher.dispatch(executionContext);}} catch (ExecuteException e) {logger.error("dispatch error: {}",e.getMessage());}return result;
}

通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性,而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。

如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

本文完!

本文由 白鲸开源科技 提供发布支持!

http://www.hengruixuexiao.com/news/11863.html

相关文章:

  • 克隆网站怎么做后台今日最新消息新闻报道
  • 做网站建设怎么跑客户搭建网站的步骤和顺序
  • 如何做快递api接口网站河北百度seo关键词
  • 需要手机端网站建设的企业网络推广图片大全
  • 阿里云服务器ip做网站关键词快速排名平台
  • 网站开发毕业设计报告搜索引擎优化seo名词解释
  • 怎么用服务器做网站专业软文
  • 国内建网站知名企业排名优化公司哪家效果好
  • 武昌便宜做网站常见的网络直接营销有哪些
  • 网站制作多少页百度百科优化排名
  • 网站关键字排名怎么做青岛seo关键词优化公司
  • 汇算清缴在哪个网站做网站推广引流最快方法
  • 网站开发 工期安排国际最新新闻
  • 杭州网站制作公司排名千锋教育和达内哪个好
  • 宿松住房和城乡建设局网站成都关键词seo推广电话
  • 西安网站建设价格热线网站关键词怎么快速上排名
  • 专业网站策划开发一个平台需要多少钱
  • 站长统计芭乐鸭脖小猪网站设计用什么软件
  • 西宁seo网站建设链接生成器
  • 长沙网站建设 个人网络推广培训班哪家好
  • 想通过做威客网站上的任务来赚百度关键词快排
  • 网站开发流程莆田有什么平台可以推广
  • 长葛网站建站企业网站制作方案
  • 韩国女足还能出线吗深圳宝安seo外包
  • 北京做手机网站的公司哪家好深圳关键词推广优化
  • 免费素材网站图库如何免费推广自己的网站
  • 有必要自建网站做导购吗qq群排名优化软件官网
  • 那个网站可以网上兼职做设计seo怎么搞
  • cco网站素材超级seo工具
  • 大连做网站谁家售后好单页关键词优化费用