基于jprofiler 的一个简单dremio 查询处理学习


一个dremio 查询简单调用链的说明

参考命令

  • arthas watch
watch com.dremio.sabot.exec.fragment.FragmentExecutor$AsyncTaskImpl run '{params, target, returnObj, throwExp}' -x 2
  • jprofiler
    可以直接附加就行了

参考调用图

基于jprofiler 的一个简单dremio 查询处理学习

 

 

基于jprofiler 的一个简单dremio 查询处理学习

 

 

代码处理

  • run 处理

 

  private void run(){
    assert taskState != State.DONE : "Attempted to run a task with state of DONE.";
    assert eventProvider != null : "Attempted to run without an eventProvider";

 

    if (!activateResource.isActivated()) {
      // All tasks are expected to begin in a runnable state. So, switch to the BLOCKED state on the
      // first call.
      taskState = State.BLOCKED_ON_SHARED_RESOURCE;
      return;
    }
    stats.runStarted();

 

    // update thread name.
    final Thread currentThread = Thread.currentThread();
    final String originalName = currentThread.getName();
    currentThread.setName(originalName + " - " + name);

 

    try {

 

      // if we're already done, we're finishing clean up. No core method
      // execution is necessary, simply exit this block so we finishRun() below.
      // We do this because a failure state will put us in a situation.
      if(state == FragmentState.CANCELLED || state == FragmentState.FAILED || state == FragmentState.FINISHED) {
        return;
      }

 

      // if there are any deferred exceptions, exit.
      if(deferredException.hasException()) {
        transitionToFailed(null);
        return;
      }

 

      // if cancellation is requested, that is always the top priority.
      if (cancelled.isDone()) {
        Optional<Throwable> failedReason = eventProvider.getFailedReason();
        if (failedReason.isPresent() || foremanDead) {
          // check if it was failed due to an external reason (eg. by heap monitor).
          // foremanDead is true, foremanDeadException must be non null.
          assert(!foremanDead || (foremanDeadException != null));
          transitionToFailed(failedReason.isPresent() ? failedReason.get() : foremanDeadException);
          return;
        }

 

        transitionToCancelled();
        taskState = State.DONE;
        return;
      }

 

      // setup the execution if it isn't setup.
      if(!isSetup){
        stats.setupStarted();
        try {
          if (memoryArbiter != null) {
            memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
          }
          setupExecution();
        } finally {
          stats.setupEnded();
        }
        // exit since we just did setup which could be a non-trivial amount of work. Allow the scheduler to decide whether we should continue.
        return;
      }

 

      // workQueue might contain OOBMessages, which should be held and processed after the setup.
      // This piece should always execute after the setup is done.
      final Runnable work = workQueue.poll();
      if (work != null) {
        // we don't know how long it will take to process one work unit, we rely on the scheduler to execute
        // this fragment again if it didn't run long enough
        work.run();
        return;
      }

 

      // handle any previously sent fragment finished messages.
      FragmentHandle finishedFragment;
      while ((finishedFragment = eventProvider.pollFinishedReceiver()) != null) {
        pipeline.getTerminalOperator().receivingFragmentFinished(finishedFragment);
      }

 

      if (memoryArbiter != null) {
        memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
      }
      // pump the pipeline
      taskState = pumper.run();

 

      // if we've finished all work, let's wrap up.
      if(taskState == State.DONE){
        transitionToFinished();
      }

 

      injector.injectChecked(executionControls, INJECTOR_DO_WORK, OutOfMemoryError.class);

 

    } catch (OutOfMemoryError | OutOfMemoryException e) {
      // handle out of memory errors differently from other error types.
      if (e instanceof OutOfDirectMemoryError || e instanceof OutOfMemoryException || "Direct buffer memory".equals(e.getMessage()) || INJECTOR_DO_WORK.equals(e.getMessage())) {
        transitionToFailed(UserException.memoryError(e)
            .addContext(MemoryDebugInfo.getDetailsOnAllocationFailure(new OutOfMemoryException(e), allocator))
            .buildSilently());
      } else {
        // we have a heap out of memory error. The JVM in unstable, exit.
        ProcessExit.exitHeap(e);
      }
    } catch (Throwable e) {
      transitionToFailed(e);
    } finally {

 

      try {
        finishRun(originalName);
      } finally {
        stats.runEnded();
      }
    }

 

  }
  • setupExecution 处理
    参考代码

 

void setupExecution() throws Exception{
   // drill 的模式以及官方的proflier 可以学习到FragmentMajor 以及FragmentMinor
    final PlanFragmentMajor major = fragment.getMajor();
    final PlanFragmentMinor minor = fragment.getMinor();

 

    logger.debug("Starting fragment {}:{} on {}:{}", major.getHandle().getMajorFragmentId(), getHandle().getMinorFragmentId(), minor.getAssignment().getAddress(), minor.getAssignment().getUserPort());
    outputAllocator = ticket.newChildAllocator("output-frag:" + QueryIdHelper.getFragmentId(getHandle()),
      fragmentOptions.getOption(ExecConstants.OUTPUT_ALLOCATOR_RESERVATION),
      Long.MAX_VALUE);
    contextCreator.setFragmentOutputAllocator(outputAllocator);

 

    final PhysicalOperator rootOperator = reader.readFragment(fragment);
    contextCreator.setMinorFragmentEndpointsFromRootSender(rootOperator);
    FunctionLookupContext functionLookupContextToUse = functionLookupContext;
    if (fragmentOptions.getOption(PlannerSettings.ENABLE_DECIMAL_V2)) {
      functionLookupContextToUse = decimalFunctionLookupContext;
    }
    pipeline = PipelineCreator.get(
        new FragmentExecutionContext(major.getForeman(), sources, cancelled, major.getContext()),
        buffers,
        opCreator,
        contextCreator,
        functionLookupContextToUse,
        rootOperator,
        tunnelProvider,
        new SharedResourcesContextImpl(sharedResources)
        );

 

    pipeline.setup();

 

    clusterCoordinator.getServiceSet(ClusterCoordinator.Role.COORDINATOR).addNodeStatusListener(crashListener);

 

    transitionToRunning();
    isSetup = true;
  }

说明

dremio 实际运行的时候包含了一个社区办的任务调度包dremio-ce-sabot-scheduler,实际执行就是依赖了这个,基于了动态类加载以及配置管理
基于此链路大家再学习以及分析dremio就比较方便了,毕竟dremio 依赖的组件还是比较多的,同时内部还是比较复杂的,后续慢慢详细说明

参考资料

sabot/kernel/src/main/java/com/dremio/sabot/task/TaskPools.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/TaskPoolInitializer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/fragment/FragmentExecutor.java
sabot/kernel/src/main/java/com/dremio/sabot/driver/PipelineCreator.java

原创文章,作者:3628473679,如若转载,请注明出处:https://blog.ytso.com/273584.html

(0)
上一篇 2022年7月11日
下一篇 2022年7月11日

相关推荐

发表回复

登录后才能评论