一个dremio 查询简单调用链的说明
参考命令
- arthas watch
watch com.dremio.sabot.exec.fragment.FragmentExecutor$AsyncTaskImpl run '{params, target, returnObj, throwExp}' -x 2
- jprofiler
可以直接附加就行了
参考调用图
代码处理
- run 处理
private void run(){
assert taskState != State.DONE : "Attempted to run a task with state of DONE.";
assert eventProvider != null : "Attempted to run without an eventProvider";
if (!activateResource.isActivated()) {
// All tasks are expected to begin in a runnable state. So, switch to the BLOCKED state on the
// first call.
taskState = State.BLOCKED_ON_SHARED_RESOURCE;
return;
}
stats.runStarted();
// update thread name.
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
currentThread.setName(originalName + " - " + name);
try {
// if we're already done, we're finishing clean up. No core method
// execution is necessary, simply exit this block so we finishRun() below.
// We do this because a failure state will put us in a situation.
if(state == FragmentState.CANCELLED || state == FragmentState.FAILED || state == FragmentState.FINISHED) {
return;
}
// if there are any deferred exceptions, exit.
if(deferredException.hasException()) {
transitionToFailed(null);
return;
}
// if cancellation is requested, that is always the top priority.
if (cancelled.isDone()) {
Optional<Throwable> failedReason = eventProvider.getFailedReason();
if (failedReason.isPresent() || foremanDead) {
// check if it was failed due to an external reason (eg. by heap monitor).
// foremanDead is true, foremanDeadException must be non null.
assert(!foremanDead || (foremanDeadException != null));
transitionToFailed(failedReason.isPresent() ? failedReason.get() : foremanDeadException);
return;
}
transitionToCancelled();
taskState = State.DONE;
return;
}
// setup the execution if it isn't setup.
if(!isSetup){
stats.setupStarted();
try {
if (memoryArbiter != null) {
memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
}
setupExecution();
} finally {
stats.setupEnded();
}
// exit since we just did setup which could be a non-trivial amount of work. Allow the scheduler to decide whether we should continue.
return;
}
// workQueue might contain OOBMessages, which should be held and processed after the setup.
// This piece should always execute after the setup is done.
final Runnable work = workQueue.poll();
if (work != null) {
// we don't know how long it will take to process one work unit, we rely on the scheduler to execute
// this fragment again if it didn't run long enough
work.run();
return;
}
// handle any previously sent fragment finished messages.
FragmentHandle finishedFragment;
while ((finishedFragment = eventProvider.pollFinishedReceiver()) != null) {
pipeline.getTerminalOperator().receivingFragmentFinished(finishedFragment);
}
if (memoryArbiter != null) {
memoryArbiter.acquireMemoryGrant(this, getMemoryToAcquire());
}
// pump the pipeline
taskState = pumper.run();
// if we've finished all work, let's wrap up.
if(taskState == State.DONE){
transitionToFinished();
}
injector.injectChecked(executionControls, INJECTOR_DO_WORK, OutOfMemoryError.class);
} catch (OutOfMemoryError | OutOfMemoryException e) {
// handle out of memory errors differently from other error types.
if (e instanceof OutOfDirectMemoryError || e instanceof OutOfMemoryException || "Direct buffer memory".equals(e.getMessage()) || INJECTOR_DO_WORK.equals(e.getMessage())) {
transitionToFailed(UserException.memoryError(e)
.addContext(MemoryDebugInfo.getDetailsOnAllocationFailure(new OutOfMemoryException(e), allocator))
.buildSilently());
} else {
// we have a heap out of memory error. The JVM in unstable, exit.
ProcessExit.exitHeap(e);
}
} catch (Throwable e) {
transitionToFailed(e);
} finally {
try {
finishRun(originalName);
} finally {
stats.runEnded();
}
}
}
- setupExecution 处理
参考代码
void setupExecution() throws Exception{
// drill 的模式以及官方的proflier 可以学习到FragmentMajor 以及FragmentMinor
final PlanFragmentMajor major = fragment.getMajor();
final PlanFragmentMinor minor = fragment.getMinor();
logger.debug("Starting fragment {}:{} on {}:{}", major.getHandle().getMajorFragmentId(), getHandle().getMinorFragmentId(), minor.getAssignment().getAddress(), minor.getAssignment().getUserPort());
outputAllocator = ticket.newChildAllocator("output-frag:" + QueryIdHelper.getFragmentId(getHandle()),
fragmentOptions.getOption(ExecConstants.OUTPUT_ALLOCATOR_RESERVATION),
Long.MAX_VALUE);
contextCreator.setFragmentOutputAllocator(outputAllocator);
final PhysicalOperator rootOperator = reader.readFragment(fragment);
contextCreator.setMinorFragmentEndpointsFromRootSender(rootOperator);
FunctionLookupContext functionLookupContextToUse = functionLookupContext;
if (fragmentOptions.getOption(PlannerSettings.ENABLE_DECIMAL_V2)) {
functionLookupContextToUse = decimalFunctionLookupContext;
}
pipeline = PipelineCreator.get(
new FragmentExecutionContext(major.getForeman(), sources, cancelled, major.getContext()),
buffers,
opCreator,
contextCreator,
functionLookupContextToUse,
rootOperator,
tunnelProvider,
new SharedResourcesContextImpl(sharedResources)
);
pipeline.setup();
clusterCoordinator.getServiceSet(ClusterCoordinator.Role.COORDINATOR).addNodeStatusListener(crashListener);
transitionToRunning();
isSetup = true;
}
说明
dremio 实际运行的时候包含了一个社区办的任务调度包dremio-ce-sabot-scheduler,实际执行就是依赖了这个,基于了动态类加载以及配置管理
基于此链路大家再学习以及分析dremio就比较方便了,毕竟dremio 依赖的组件还是比较多的,同时内部还是比较复杂的,后续慢慢详细说明
参考资料
sabot/kernel/src/main/java/com/dremio/sabot/task/TaskPools.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/TaskPoolInitializer.java
dac/backend/src/main/java/com/dremio/dac/daemon/DACDaemonModule.java
sabot/kernel/src/main/java/com/dremio/sabot/exec/fragment/FragmentExecutor.java
sabot/kernel/src/main/java/com/dremio/sabot/driver/PipelineCreator.java
原创文章,作者:506227337,如若转载,请注明出处:https://blog.ytso.com/273576.html