当前位置: 首页 > news >正文

网站建设现状和前景交换友情链接

网站建设现状和前景,交换友情链接,企业crm软件,一个软件开发需要什么技术引言 随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。 在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进…

引言

随着大数据的发展,任务调度系统成为了数据处理和管理中至关重要的部分。Apache DolphinScheduler 是一款优秀的开源分布式工作流调度平台,在大数据场景中得到广泛应用。

在本文中,我们将对 Apache DolphinScheduler 1.3.9 版本的源码进行深入分析,介绍 Master 启动以及调度流程。

通过这些分析,开发者可以更好地理解 DolphinScheduler 的工作机制,并在实际使用中更高效地进行二次开发或优化。

Master Server启动

启动流程图

Master调度工作流流程图

MasterServer启动方法

public void run() {// init remoting serverNettyServerConfig serverConfig = new NettyServerConfig();serverConfig.setListenPort(masterConfig.getListenPort());this.nettyRemotingServer = new NettyRemotingServer(serverConfig);this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());this.nettyRemotingServer.start();// self tolerantthis.zkMasterClient.start();this.zkMasterClient.setStoppable(this);// scheduler startthis.masterSchedulerService.start();// start QuartzExecutors// what system should do if exceptiontry {logger.info("start Quartz server...");QuartzExecutors.getInstance().start();} catch (Exception e) {try {QuartzExecutors.getInstance().shutdown();} catch (SchedulerException e1) {logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);}logger.error("start Quartz failed", e);}/*** register hooks, which are called before the process exits*/Runtime.getRuntime().addShutdownHook(new Thread(() -> {if (Stopper.isRunning()) {close("shutdownHook");}}));}
  • nettyServer会注册三种Command
  1. TASK_EXECUTE_ACK:Worker在接收到Master执行任务的请求后,会给Master发送一条Ack Command,告诉Master已经开始执行Task了。
  2. TASK_EXECUTE_RESPONSE:Worker在执行完Task之后,会给Master发送一条Response Command,告诉Master任务调度/执行结果。
  3. TASK_KILL_RESPONSE:Master接收到Task停止的请求会,会给Worker发送TASK_KILL_REQUEST Command,之后Worker会把Task_KILL_RESPONSE Command返回给Master。
  • 启动调度和定时器。
  • 添加ShutdownHook,关闭资源。

Master 配置文件

master.listen.port=5678# 限制Process Instance并发调度的线程数
master.exec.threads=100# 限制每个ProcessInstance可以执行的任务数
master.exec.task.num=20# 每一批次可以分发的任务数
master.dispatch.task.num=3# master需要选择一个稳定的worker去执行任务
# 算法有:Random,RoundRobin,LowerWeight。默认是LowerWeight
master.host.selector=LowerWeight# master需要向Zookeeper发送心跳,单位:秒
master.heartbeat.interval=10# master提交任务失败,重试次数
master.task.commit.retryTimes=5# master提交任务失败,重试时间间隔
master.task.commit.interval=1000# master最大cpu平均负载,只有当系统cpu平均负载还没有达到这个值,master才能调度任务
# 默认值为-1,系统cpu核数 * 2
master.max.cpuload.avg=-1# master为其他进程保留内存,只有当系统可用内存大于这个值,master才能调度
# 默认值0.3G
master.reserved.memory=0.3

Master Scheduler启动

MasterSchedulerService初始化方法

public void init(){// masterConfig.getMasterExecThreads(),master.properties里master.exec.threads=100// 该线程池的核心线程数和最大线程数为100this.masterExecService = (ThreadPoolExecutor)ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());NettyClientConfig clientConfig = new NettyClientConfig();this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}

MasterSchedulerService启动方法

public void run() {logger.info("master scheduler started");while (Stopper.isRunning()){try {// 这个方法是用来检查master cpu load和memory,判断master是否还有资源进行调度// 如果不能调度,Sleep 1 秒种boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMasterMaxCpuloadAvg(), masterConfig.getMasterReservedMemory());if(!runCheckFlag) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}if (zkMasterClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {// 这里才是真正去执行调度的方法scheduleProcess();}} catch (Exception e) {logger.error("master scheduler thread error", e);}}
}

MasterSchedulerService调度方法

private void scheduleProcess() throws Exception {InterProcessMutex mutex = null;try {// 阻塞式获取分布式锁mutex = zkMasterClient.blockAcquireMutex();// 获取线程池的活跃线程数int activeCount = masterExecService.getActiveCount();// make sure to scan and delete command  table in one transaction// 获取其中一个command,必须保证操作都在一个事务里Command command = processService.findOneCommand();if (command != null) {logger.info("find one command: id: {}, type: {}", command.getId(),command.getCommandType());try{// 获取ProcessInstance,// 这个方法会根据master.exec.threads配置和活跃线程数来判断是否可以调度processInstanceProcessInstance processInstance = processService.handleCommand(logger,getLocalAddress(),this.masterConfig.getMasterExecThreads() - activeCount, command);if (processInstance != null) {logger.info("start master exec thread , split DAG ...");masterExecService.execute(new MasterExecThread(processInstance, processService, nettyRemotingClient));}}catch (Exception e){logger.error("scan command error ", e);processService.moveToErrorCommand(command, e.toString());}} else{//indicate that no command ,sleep for 1sThread.sleep(Constants.SLEEP_TIME_MILLIS);}} finally{// 释放分布式锁zkMasterClient.releaseMutex(mutex);}
}

ProcessService处理Command的方法

public ProcessInstance handleCommand(Logger logger, String host, int validThreadNum, Command command) {// 这里是去构造ProcessInstanceProcessInstance processInstance = constructProcessInstance(command, host);//cannot construct process instance, return null;if(processInstance == null){logger.error("scan command, command parameter is error: {}", command);moveToErrorCommand(command, "process instance is null");return null;}// 这里是检测当前剩余线程数是否大于等于该ProcessDefinition及其所有子Process的数量// 如果检测不通过,process instance的状态变为wait thread.并且返回空的process instanceif(!checkThreadNum(command, validThreadNum)){logger.info("there is not enough thread for this command: {}", command);return setWaitingThreadProcess(command, processInstance);}processInstance.setCommandType(command.getCommandType());processInstance.addHistoryCmd(command.getCommandType());saveProcessInstance(processInstance);this.setSubProcessParam(processInstance);delCommandByid(command.getId());return processInstance;
}

MasterExecThread初始化方法

public MasterExecThread(ProcessInstance processInstance, ProcessService processService, NettyRemotingClient nettyRemotingClient){this.processService = processService;this.processInstance = processInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);// master.properties文件里的master.task.exec.numint masterTaskExecNum = masterConfig.getMasterExecTaskNum();this.taskExecService = ThreadUtils.newDaemonFixedThreadExecutor("Master-Task-Exec-Thread",masterTaskExecNum);this.nettyRemotingClient = nettyRemotingClient;
}

MasterExecThread启动方法

public void run() {// 省略...try {if (processInstance.isComplementData() &&  Flag.NO == processInstance.getIsSubProcess()){// 补数逻辑... 暂不看executeComplementProcess();}else{// 执行task方法executeProcess();}}catch (Exception e){logger.error("master exec thread exception", e);logger.error("process execute failed, process id:{}", processInstance.getId());processInstance.setState(ExecutionStatus.FAILURE);processInstance.setEndTime(new Date());processService.updateProcessInstance(processInstance);}finally {taskExecService.shutdown();}
}private void executeProcess() throws Exception {// 前置prepareProcess();// 执行runProcess();// 后置endProcess();
}private void runProcess(){// 从根task开始提交submitPostNode(null);boolean sendTimeWarning = false;while(!processInstance.isProcessInstanceStop() && Stopper.isRunning()){// 省略部分代码...// 根据cpu load avg和Memorry判断是否可以调度if(canSubmitTaskToQueue()){submitStandByTask();}try {Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (InterruptedException e) {logger.error(e.getMessage(),e);}updateProcessInstanceState();}logger.info("process:{} end, state :{}", processInstance.getId(), processInstance.getState());
}// 获取可以并行的task
/**
* task 1 -> task 2 -> task3
* task 4 -> task 5
* task 6
* task 1,task4,task6可以并行跑
*/
private void submitPostNode(String parentNodeName){Set<String> submitTaskNodeList = DagHelper.parsePostNodes(parentNodeName, skipTaskNodeList, dag, completeTaskList);List<TaskInstance> taskInstances = new ArrayList<>();for(String taskNode : submitTaskNodeList){taskInstances.add(createTaskInstance(processInstance, taskNode,dag.getNode(taskNode)));}// if previous node success , post node submitfor(TaskInstance task : taskInstances){if(readyToSubmitTaskQueue.contains(task)){continue;}if(completeTaskList.containsKey(task.getName())){logger.info("task {} has already run success", task.getName());continue;}if(task.getState().typeIsPause() || task.getState().typeIsCancel()){logger.info("task {} stopped, the state is {}", task.getName(), task.getState());}else{// task添加到priorityQueueaddTaskToStandByList(task);}}
}/*** handling the list of tasks to be submitted*/
private void submitStandByTask(){try {int length = readyToSubmitTaskQueue.size();for (int i=0;i<length;i++) {// 从队列里面取task, 提交给worker执行TaskInstance task = readyToSubmitTaskQueue.peek();// 先判断task的前置依赖有没有都运行成功,如果运行成功,在提交该task运行// 如果运行失败,或者没有执行,则不提交DependResult dependResult = getDependResultForTask(task);if(DependResult.SUCCESS == dependResult){if(retryTaskIntervalOverTime(task)){submitTaskExec(task);removeTaskFromStandbyList(task);}}else if(DependResult.FAILED == dependResult){// if the dependency fails, the current node is not submitted and the state changes to failure.dependFailedTask.put(task.getName(), task);removeTaskFromStandbyList(task);logger.info("task {},id:{} depend result : {}",task.getName(), task.getId(), dependResult);} else if (DependResult.NON_EXEC == dependResult) {// for some reasons(depend task pause/stop) this task would not be submitremoveTaskFromStandbyList(task);logger.info("remove task {},id:{} , because depend result : {}", task.getName(), task.getId(), dependResult);}}} catch (Exception e) {logger.error("submit standby task error",e);}
}/**
* 创建TaskExecThread
*/
private TaskInstance submitTaskExec(TaskInstance taskInstance) {MasterBaseTaskExecThread abstractExecThread = null;if(taskInstance.isSubProcess()){abstractExecThread = new SubProcessTaskExecThread(taskInstance);}else if(taskInstance.isDependTask()){abstractExecThread = new DependentTaskExecThread(taskInstance);}else if(taskInstance.isConditionsTask()){abstractExecThread = new ConditionsTaskExecThread(taskInstance);}else {abstractExecThread = new MasterTaskExecThread(taskInstance);}Future<Boolean> future = taskExecService.submit(abstractExecThread);activeTaskNode.putIfAbsent(abstractExecThread, future);return abstractExecThread.getTaskInstance();
}

MasterBaseTaskExecThread

MasterBaseTaskExecThreadSubProcessTaskExecThreadDependentTaskExecThreadConditionsTaskExecThreadMasterTaskExecThread的父类,实现Callable接口。

  • SubProcessTaskExecThread

    任务实例不会下发到worker节点执行,在submitTask(TaskInstance taskInstance)方法中,针对子流程,会增加一条子流程实例命令,然后在waitTaskQuit方法中循环等待子流程执行完成。在当前工作流运行结束后会继续运行子工作流并做相关状态更新,子工作流完全完成才同步状态为子工作流的状态。

  • DependentTaskExecThread

    Dependent 节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

  • ConditionsTaskExecThrea

    Conditions 是一个条件节点,根据上游任务运行状态,判断应该运行哪个下游任务。截止目前 Conditions 支持多个上游任务,但只支持两个下游任务。当上游任务数超过一个时,可以通过且以及或操作符实现复杂上游依赖。

  • MasterTaskExecThread

    将任务实例下发到worker节点执行,并在waitTaskQuit方法中循环等待任务实例执行完成,任务完成后则即出。例如SQKL,Shell等任务类型。

MasterBaseTaskExecThread初始化方法

public MasterBaseTaskExecThread(TaskInstance taskInstance){this.processService = SpringApplicationContext.getBean(ProcessService.class);this.alertDao = SpringApplicationContext.getBean(AlertDao.class);this.cancel = false;this.taskInstance = taskInstance;this.masterConfig = SpringApplicationContext.getBean(MasterConfig.class);this.taskUpdateQueue = SpringApplicationContext.getBean(TaskPriorityQueueImpl.class);initTaskParams();
}

MasterBaseTaskExecThread执行方法

@Override
public Boolean call() throws Exception {this.processInstance = processService.findProcessInstanceById(taskInstance.getProcessInstanceId());return submitWaitComplete(); // 由各子类实现
}

MasterBaseTaskExecThread公共方法

submit()

protected TaskInstance submit(){// 提交任务重试次数. master.task.commit.retryTimes=5Integer commitRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();// 提交任务失败,重试间隔时间 master.task.commit.interval=1000Integer commitRetryInterval = masterConfig.getMasterTaskCommitInterval();int retryTimes = 1;boolean submitDB = false;boolean submitTask = false;TaskInstance task = null;while (retryTimes <= commitRetryTimes){try {if(!submitDB){// 持久化TaskInstance到数据库task = processService.submitTask(taskInstance);if(task != null && task.getId() != 0){submitDB = true;}}if(submitDB && !submitTask){// 分发任务到Woroker执行submitTask = dispatchTask(task);}if(submitDB && submitTask){return task;}if(!submitDB){logger.error("task commit to db failed , taskId {} has already retry {} times, please check the database", taskInstance.getId(), retryTimes);}else if(!submitTask){logger.error("task commit  failed , taskId {} has already retry {} times, please check", taskInstance.getId(), retryTimes);}Thread.sleep(commitRetryInterval);} catch (Exception e) {logger.error("task commit to mysql and dispatcht task failed",e);}retryTimes += 1;}return task;
}

dispatchTask(TaskInstance task)

public Boolean dispatchTask(TaskInstance taskInstance) {try{// 如果是子流程,条件任务,依赖任务,直接返回true,不提交给worker执行if(taskInstance.isConditionsTask()|| taskInstance.isDependTask()|| taskInstance.isSubProcess()){return true;}if(taskInstance.getState().typeIsFinished()){logger.info(String.format("submit task , but task [%s] state [%s] is already  finished. ", taskInstance.getName(), taskInstance.getState().toString()));return true;}// task cannot submit when runningif(taskInstance.getState() == ExecutionStatus.RUNNING_EXECUTION){logger.info(String.format("submit to task, but task [%s] state already be running. ", taskInstance.getName()));return true;}logger.info("task ready to submit: {}", taskInstance);/***  taskPriority*/TaskPriority taskPriority = buildTaskPriority(processInstance.getProcessInstancePriority().getCode(),processInstance.getId(),taskInstance.getProcessInstancePriority().getCode(),taskInstance.getId(),org.apache.dolphinscheduler.common.Constants.DEFAULT_WORKER_GROUP);// 放入TaskPriorityQueue中,// org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl用于消费,从队列里取出TaskInstance,提交给Worker执行taskUpdateQueue.put(taskPriority);logger.info(String.format("master submit success, task : %s", taskInstance.getName()) );return true;}catch (Exception e){logger.error("submit task  Exception: ", e);logger.error("task error : %s", JSONUtils.toJson(taskInstance));return false;}
}

MasterTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {Boolean result = false;// 提交任务this.taskInstance = submit();if(this.taskInstance == null){logger.error("submit task instance to mysql and queue failed , please check and fix it");return result;}if(!this.taskInstance.getState().typeIsFinished()) {// 等待任务执行结果result = waitTaskQuit();}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );return result;
}

waitTaskQuit()

public Boolean waitTaskQuit(){// query new statetaskInstance = processService.findTaskInstanceById(taskInstance.getId());logger.info("wait task: process id: {}, task id:{}, task name:{} complete",this.taskInstance.getProcessInstanceId(), this.taskInstance.getId(), this.taskInstance.getName());while (Stopper.isRunning()){try {if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// task instance add queue , waiting worker to killif(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){cancelTaskInstance();}if(processInstance.getState() == ExecutionStatus.READY_PAUSE){pauseTask();}// task instance finishedif (taskInstance.getState().typeIsFinished()){// if task is final result , then remove taskInstance from cache// taskInstanceCacheManager其实现类为:org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl// taskInstance在触发ack和response Command会被添加到taskInstanceCache里taskInstanceCacheManager.removeByTaskInstanceId(taskInstance.getId());break;}if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();}// updateProcessInstance task instancetaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}

SubProcessTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {Boolean result = false;try{// submit task instancethis.taskInstance = submit();if(taskInstance == null){logger.error("sub work flow submit task instance to mysql and queue failed , please check and fix it");return result;}setTaskInstanceState();waitTaskQuit();subProcessInstance = processService.findSubProcessInstance(processInstance.getId(), taskInstance.getId());// at the end of the subflow , the task state is changed to the subflow stateif(subProcessInstance != null){if(subProcessInstance.getState() == ExecutionStatus.STOP){this.taskInstance.setState(ExecutionStatus.KILL);}else{this.taskInstance.setState(subProcessInstance.getState());}}taskInstance.setEndTime(new Date());processService.updateTaskInstance(taskInstance);logger.info("subflow task :{} id:{}, process id:{}, exec thread completed ",this.taskInstance.getName(),taskInstance.getId(), processInstance.getId() );result = true;}catch (Exception e){logger.error("exception: ",e);if (null != taskInstance) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}return result;
}

waitTaskQuit()

private void waitTaskQuit() throws InterruptedException {logger.info("wait sub work flow: {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("sub work flow task {} already complete. task state:{}, parent work flow instance state:{}",this.taskInstance.getName(),this.taskInstance.getState(),this.processInstance.getState());return;}while (Stopper.isRunning()) {// waiting for subflow process instance establishmentif (subProcessInstance == null) {Thread.sleep(Constants.SLEEP_TIME_MILLIS);if(!setTaskInstanceState()){continue;}}subProcessInstance = processService.findProcessInstanceById(subProcessInstance.getId());if (checkTaskTimeout()) {this.checkTimeoutFlag = !alertTimeout();handleTimeoutFailed();}updateParentProcessState();if (subProcessInstance.getState().typeIsFinished()){break;}if(this.processInstance.getState() == ExecutionStatus.READY_PAUSE){// parent process "ready to pause" , child process "pause"pauseSubProcess();}else if(this.cancel || this.processInstance.getState() == ExecutionStatus.READY_STOP){// parent Process "Ready to Cancel" , subflow "Cancel"stopSubProcess();}Thread.sleep(Constants.SLEEP_TIME_MILLIS);}
}

ConditionsTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {try{this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();logger.info("dependent task start");// 等待判断waitTaskQuit();// 更新最终依赖结果updateTaskState();}catch (Exception e){logger.error("conditions task run exception" , e);}return true;
}

waitTaskQuit

private void waitTaskQuit() {List<TaskInstance> taskInstances = processService.findValidTaskListByProcessId(taskInstance.getProcessInstanceId());for(TaskInstance task : taskInstances){completeTaskList.putIfAbsent(task.getName(), task.getState());}// 获取所有依赖结果List<DependResult> modelResultList = new ArrayList<>();for(DependentTaskModel dependentTaskModel : dependentParameters.getDependTaskList()){List<DependResult> itemDependResult = new ArrayList<>();for(DependentItem item : dependentTaskModel.getDependItemList()){itemDependResult.add(getDependResultForItem(item));}DependResult modelResult = DependentUtils.getDependResultForRelation(dependentTaskModel.getRelation(), itemDependResult);modelResultList.add(modelResult);}// 根据逻辑运算符,合并依赖结果conditionResult = DependentUtils.getDependResultForRelation(dependentParameters.getRelation(), modelResultList);logger.info("the conditions task depend result : {}", conditionResult);
}

DependentTaskExecThread

submitWaitComplete()

public Boolean submitWaitComplete() {try{logger.info("dependent task start");this.taskInstance = submit();logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,taskInstance.getProcessDefinitionId(),taskInstance.getProcessInstanceId(),taskInstance.getId()));String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));Thread.currentThread().setName(threadLoggerInfoName);initTaskParameters();initDependParameters();waitTaskQuit();updateTaskState();}catch (Exception e){logger.error("dependent task run exception" , e);}return true;
}

waitTaskQuit()

private Boolean waitTaskQuit() {logger.info("wait depend task : {} complete", this.taskInstance.getName());if (taskInstance.getState().typeIsFinished()) {logger.info("task {} already complete. task state:{}",this.taskInstance.getName(),this.taskInstance.getState());return true;}while (Stopper.isRunning()) {try{if(this.processInstance == null){logger.error("process instance not exists , master task exec thread exit");return true;}// 省略部分代码// allDependentTaskFinish()等待所有依赖任务执行结束if ( allDependentTaskFinish() || taskInstance.getState().typeIsFinished()){break;}// update process tasktaskInstance = processService.findTaskInstanceById(taskInstance.getId());processInstance = processService.findProcessInstanceById(processInstance.getId());Thread.sleep(Constants.SLEEP_TIME_MILLIS);} catch (Exception e) {logger.error("exception",e);if (processInstance != null) {logger.error("wait task quit failed, instance id:{}, task id:{}",processInstance.getId(), taskInstance.getId());}}}return true;
}

TaskPriorityQueueConsumer

@Override
public void run() {List<TaskPriority> failedDispatchTasks = new ArrayList<>();while (Stopper.isRunning()){try {// 每一批次分发任务数量,master.dispatch.task.num = 3int fetchTaskNum = masterConfig.getMasterDispatchTaskNumber();failedDispatchTasks.clear();for(int i = 0; i < fetchTaskNum; i++){if(taskPriorityQueue.size() <= 0){Thread.sleep(Constants.SLEEP_TIME_MILLIS);continue;}// if not task , blocking here// 从队列里面获取taskTaskPriority taskPriority = taskPriorityQueue.take();// 分发给worker执行boolean dispatchResult = dispatch(taskPriority);if(!dispatchResult){failedDispatchTasks.add(taskPriority);}}if (!failedDispatchTasks.isEmpty()) {// 分发失败的任务,需要重新加入队列中,等待重新分发for (TaskPriority dispatchFailedTask : failedDispatchTasks) {taskPriorityQueue.put(dispatchFailedTask);}// If there are tasks in a cycle that cannot find the worker group,// sleep for 1 secondif (taskPriorityQueue.size() <= failedDispatchTasks.size()) {TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);}}}catch (Exception e){logger.error("dispatcher task error",e);}}
}/*** dispatch task** @param taskPriority taskPriority* @return result*/
protected boolean dispatch(TaskPriority taskPriority) {boolean result = false;try {int taskInstanceId = taskPriority.getTaskId();TaskExecutionContext context = getTaskExecutionContext(taskInstanceId);ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());if (taskInstanceIsFinalState(taskInstanceId)){// when task finish, ignore this task, there is no need to dispatch anymorereturn true;}else{// 分发任务// 分发算法支持:低负载优先算法,随机算法, 轮询算法。result = dispatcher.dispatch(executionContext);}} catch (ExecuteException e) {logger.error("dispatch error: {}",e.getMessage());}return result;
}

通过对 Apache DolphinScheduler 1.3.9 的源码分析,我们深入了解了其核心模块的设计和实现。DolphinScheduler 的 Master 架构充分保证了任务调度的高可用性和扩展性,而通过 Zookeeper 实现的集群协调则为系统提供了强大的容错机制。

如果你对 Apache DolphinScheduler 的源码有兴趣,可以深入研究其任务调度策略的细节部分,或者根据自身业务场景进行二次开发,充分发挥 DolphinScheduler 的调度能力。

本文完!

本文由 白鲸开源科技 提供发布支持!

http://www.hengruixuexiao.com/news/28447.html

相关文章:

  • 深圳建设工程质量协会网站百度灰色关键词排名代做
  • 各行各业网站建设优化内容
  • 国内做视频课程的网站有哪些深圳百度总部
  • win不用iis做网站怎么在线上推广自己的产品
  • 建设目标网站优化网站内容
  • 做网站asp网络推广公司排名
  • 衢州网站建设推广网络推广网站的方法
  • 静安做网站公司微信朋友圈推广软文
  • 网站群系统建设思路百度快速优化排名软件
  • 贵州桥梁集团建设有限公司网站晚上网站推广软件免费版
  • 网站建设公司创业计划书百度网盘下载速度慢破解方法
  • 昆山门户网站网站建设7个基本流程
  • hexo做网站客户管理软件
  • 自己做的网站项目怎样卖乔拓云网微信小程序制作
  • 成都本地宝新闻动态网站优化关键词
  • 网站登录qq新品上市怎么做宣传推广
  • 青海建设兵团青岛战友网站成都seo推广
  • 南通网站建设设计个人引流推广怎么做
  • 在国内做博彩网站代理职业技能培训网站
  • 网站外链软件博客网站注册
  • 电商网站开发主要的三个软件seo投放是什么意思
  • 网站动态页面怎么做网站外链优化方法
  • 专业做合同的网站百度注册
  • 微网站栏目图标 今日头条
  • 网站建设及代运营合同品牌网站建设制作
  • 网站信管局备案国外网站建设
  • 广州seo网站靠谱sem优化
  • 自己做网站推广试玩百度网站入口
  • 网站建设与管理 十四五国规教材2023年8月新冠疫情
  • vs2008可以做网站营销推广计划书