博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Master启动源码分析
阅读量:5921 次
发布时间:2019-06-19

本文共 9921 字,大约阅读时间需要 33 分钟。

Spark Master启动源码分析

更多资源

  • github:
  • csdn(汇总视频在线看):

Youtube 视频

  • Spark master启动源码分析:

BiliBili 视频

  • Spark master启动源码分析:

启动 master

启动脚本

加载配置文件

. "${SPARK_HOME}/sbin/spark-config.sh". "${SPARK_HOME}/bin/load-spark-env.sh"

默认配置

SPARK_MASTER_PORT=7077SPARK_MASTER_IP=`hostname`SPARK_MASTER_WEBUI_PORT=8080CLASS="org.apache.spark.deploy.master.Master"

CLASS="org.apache.spark.deploy.master.Master"ORIGINAL_ARGS="$@""${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \  --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \  $ORIGINAL_ARGS

  • 调用命令

     

nohup nice -n “SPARKNICENESS&quot;&quot;SPARK_NICENESS&quot; &quot;SPARKNICENESS""{SPARK_HOME}”/bin/spark-class command&quot;command &quot;command"@" >> “KaTeX parse error: Expected 'EOF', got '&' at position 8: log" 2>&̲1 < /dev/null &…!”

;;

- 即		```shellnohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.master.Master "$@" >> "$log" 2>&1 < /dev/null &newpid="$!";;
run_command() {
mode="$1" shift mkdir -p "$SPARK_PID_DIR" if [ -f "$pid" ]; then TARGET_ID="$(cat "$pid")" if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then echo "$command running as process $TARGET_ID. Stop it first." exit 1 fi fi if [ "$SPARK_MASTER" != "" ]; then echo rsync from "$SPARK_MASTER" rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "${SPARK_HOME}" fi spark_rotate_log "$log" echo "starting $command, logging to $log" case "$mode" in (class) nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null & newpid="$!" ;; (submit) nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-submit --class $command "$@" >> "$log" 2>&1 < /dev/null & newpid="$!" ;; (*) echo "unknown mode: $mode" exit 1 ;; esac echo "$newpid" > "$pid" sleep 2 # Check if the process has died; in that case we'll tail the log so the user can see if [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; then echo "failed to launch $command:" tail -2 "$log" | sed 's/^/ /' echo "full log in $log" fi}
option=$1case $option in  (submit)    run_command submit "$@"    ;;  (start)    run_command class "$@"    ;;

spark-class

加载配置文件

. "${SPARK_HOME}"/bin/load-spark-env.sh

调用Main入口类

# The launcher library will print arguments separated by a NULL character, to allow arguments with# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating# an array that will be used to exec the final command.CMD=()while IFS= read -d '' -r ARG; do  CMD+=("$ARG")done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")exec "${CMD[@]}"

Main类入口

启动Master类命令调用

/**   * Usage: Main [class] [class args]   * 

* This CLI works in two different modes: *

    *
  • "spark-submit": if class is "org.apache.spark.deploy.SparkSubmit", the * {@link SparkLauncher} class is used to launch a Spark application.
  • *
  • "spark-class": if another class is provided, an internal Spark class is run.
  • *
* * This class works in tandem with the "bin/spark-class" script on Unix-like systems, and * "bin/spark-class2.cmd" batch script on Windows to execute the final command. *

* On Unix-like systems, the output is a list of command arguments, separated by the NULL * character. On Windows, the output is a command line suitable for direct execution from the * script. */ public static void main(String[] argsArray) throws Exception { checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); List

args = new ArrayList
(Arrays.asList(argsArray)); String className = args.remove(0); boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); AbstractCommandBuilder builder; if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { printLaunchCommand = false; System.err.println("Error: " + e.getMessage()); System.err.println(); MainClassOptionParser parser = new MainClassOptionParser(); try { parser.parse(args); } catch (Exception ignored) { // Ignore parsing exceptions. } List
help = new ArrayList
(); if (parser.className != null) { help.add(parser.CLASS); help.add(parser.className); } help.add(parser.USAGE_ERROR); builder = new SparkSubmitCommandBuilder(help); } } else { builder = new SparkClassCommandBuilder(className, args); } Map
env = new HashMap
(); List
cmd = builder.buildCommand(env); if (printLaunchCommand) { System.err.println("Spark Command: " + join(" ", cmd)); System.err.println("========================================"); } if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { // In bash, use NULL as the arg separator since it cannot be used in an argument. List
bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c); System.out.print('\0'); } } }

Master类

main方法

  • 启动 ‘sparkMaster’ 服务
  • 给自己发送消息: BoundPortsRequest
def main(argStrings: Array[String]) {    Utils.initDaemon(log)    val conf = new SparkConf    val args = new MasterArguments(argStrings, conf)    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)    rpcEnv.awaitTermination()  }
/**   * Start the Master and return a three tuple of:   *   (1) The Master RpcEnv   *   (2) The web UI bound port   *   (3) The REST server bound port, if any   */  def startRpcEnvAndEndpoint(      host: String,      port: Int,      webUiPort: Int,      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {    val securityMgr = new SecurityManager(conf)    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)  }

onStart方法

  • Started MasterWebUI
  • 实例化默认的存储引擎,实例化Leader选举
  • 默认每分钟检查worker的心跳,未保持连接的worker清除
override def onStart(): Unit = {    logInfo("Starting Spark master at " + masterUrl)    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")    webUi = new MasterWebUI(this, webUiPort)    webUi.bind()    masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {      override def run(): Unit = Utils.tryLogNonFatalError {        self.send(CheckForWorkerTimeOut)      }    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)    if (restServerEnabled) {      val port = conf.getInt("spark.master.rest.port", 6066)      restServer = Some(new StandaloneRestServer(address.host, port, conf, self, masterUrl))    }    restServerBoundPort = restServer.map(_.start())    masterMetricsSystem.registerSource(masterSource)    masterMetricsSystem.start()    applicationMetricsSystem.start()    // Attach the master and app metrics servlet handler to the web ui after the metrics systems are    // started.    masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)    applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)    val serializer = new JavaSerializer(conf)    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {      case "ZOOKEEPER" =>        logInfo("Persisting recovery state to ZooKeeper")        val zkFactory =          new ZooKeeperRecoveryModeFactory(conf, serializer)        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))      case "FILESYSTEM" =>        val fsFactory =          new FileSystemRecoveryModeFactory(conf, serializer)        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))      case "CUSTOM" =>        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])          .newInstance(conf, serializer)          .asInstanceOf[StandaloneRecoveryModeFactory]        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))      case _ =>        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))    }    persistenceEngine = persistenceEngine_    leaderElectionAgent = leaderElectionAgent_  }
  • 接受Worker注册
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {    case RegisterWorker(        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(        workerHost, workerPort, cores, Utils.megabytesToString(memory)))      if (state == RecoveryState.STANDBY) {        context.reply(MasterInStandby)      } else if (idToWorker.contains(id)) {        context.reply(RegisterWorkerFailed("Duplicate worker ID"))      } else {        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,          workerRef, workerWebUiUrl)        if (registerWorker(worker)) {          persistenceEngine.addWorker(worker)          context.reply(RegisteredWorker(self, masterWebUiUrl))          schedule()        } else {          val workerAddress = worker.endpoint.address          logWarning("Worker registration failed. Attempted to re-register worker at same " +            "address: " + workerAddress)          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "            + workerAddress))        }      }    }

转载地址:http://hibvx.baihongyu.com/

你可能感兴趣的文章
oracle创建表空间及用户
查看>>
如何使用JVisualVM进行性能分析
查看>>
怎么在Filter中使用HTTPServletRequest
查看>>
远程执行SQL或远程调用存储过程
查看>>
ThinkPHP示例:ajax分页操作
查看>>
grunt安装
查看>>
centos7安装mariadb10
查看>>
xml生成view时layout参数失效问题解决
查看>>
使用NanoHTTPD在android实现web迷你服务器
查看>>
codewars025: Playing with digits
查看>>
网站留言消息Push到多台本地电脑提醒
查看>>
FFMPEG编译以及裁剪
查看>>
希尔排序
查看>>
使用jQuery清空file文件域的解决方案
查看>>
egret图文混排
查看>>
Git学习系列(五)分支管理详解
查看>>
Primefaces框架开发杂谈!
查看>>
Linux tar命令
查看>>
实现ecshop不同的分类调用不同的模版
查看>>
crm管理软件加速公司的发展
查看>>