Dubbo中参数回调 Callback 实现深究

参数回调 Callback是Dubbo中一种机制,与调用本地callback相同,将基于长连接生成反向代理在服务端执行客户端的逻辑,本文将以以下内容展开。

例子

何为在服务器执行客户端逻辑?简单点说,就是客户端可以定义一个方法,而此方法调用方为服务端。 要使用参数回调这个功能,有两个要素:

具体例子地址在这里:Callback 参数回调

服务端配置

前几篇文章分析了 服务端配置及初始化过程,有兴趣同学可以回看,这里只拎初对Callback 的单独配置。 provider的初始化过程B:外部化配置初始化过程 Provider的初始化过程C:服务暴露详解

当服务暴露是,主要配置以及export 过程 在ServiceConfig 进行的,而在配置读取时 会首先对 dubbo:methoddubbo:argument 进行初始化,对 callback 原理进行标识则是在 doExportUrlsFor1Protocol 方法中进行。 这个方法代码比较长,逻辑上包括对 配置进行读取,对dubbo 中的 url进行拼凑,以及 输出 invoker等逻辑,可以看上面两篇文章有细分析。 而callbcak 原理在以下逻辑

        if (CollectionUtils.isNotEmpty(methods)) {
        // 判断 methods 是否为空
            for (MethodConfig method : methods) {
                appendParameters(map, method, method.getName());
                String retryKey = method.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(method.getName() + ".retries", "0");
                    }
                }
                List<ArgumentConfig> arguments = method.getArguments();
                if (CollectionUtils.isNotEmpty(arguments)) {
                // 判断arguments 是否为空
                    for (ArgumentConfig argument : arguments) {
                        // convert argument type
                        if (argument.getType() != null && argument.getType().length() > 0) {
                        // 以 <dubbo:argument type="com.demo.CallbackListener" callback="true" /> 方式配置
                            Method[] methods = interfaceClass.getMethods();
                            // visit all methods
                            if (methods != null && methods.length > 0) {
                                for (int i = 0; i < methods.length; i++) {
                                    String methodName = methods[i].getName();
                                    // target the method, and get its signature
                                    if (methodName.equals(method.getName())) {
                                        Class<?>[] argtypes = methods[i].getParameterTypes();
                                        // one callback in the method
                                        if (argument.getIndex() != -1) {
                                            if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
                                                appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                                            } else {
                                                throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                            }
                                        } else {
                                            // multiple callbacks in the method
                                            for (int j = 0; j < argtypes.length; j++) {
                                                Class<?> argclazz = argtypes[j];
                                                if (argclazz.getName().equals(argument.getType())) {
                                                    appendParameters(map, argument, method.getName() + "." + j);
                                                    if (argument.getIndex() != -1 && argument.getIndex() != j) {
                                                        throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
                                                    }
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        } else if (argument.getIndex() != -1) {
                        // 以  <dubbo:argument index="1" callback="true" /> 配置
                            appendParameters(map, argument, method.getName() + "." + argument.getIndex());
                        } else {
                            throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
                        }

                    }
                }
            } // end of methods for
        }

以上代码包括几步:

服务端对Dubbo 配置方面就是这些,最终,在注册中心留下的ProviderService 则会多个callback 的配置。 如下形式 addListener.1.callback=true 说明这个接口的 addListener 方法的 第1个参数是callback类型(从0开始)。

Consumer 对 Callback 适配

开篇有个问题,当 服务端对接口没有配置 callback时候,客户端 直接启动,则会报错,即对象没有实现 Serializable。当然呢,Dubbo 默认以 Hessian 作为序列化框架,而 Hessian 则要求实现 Serializable

Caused by: java.lang.IllegalStateException: Serialized class com.anla.rpc.callback.consumer.Consumer$CallBackDemo must implement java.io.Serializable
    at com.alibaba.com.caucho.hessian.io.SerializerFactory.getDefaultSerializer(SerializerFactory.java:401)
    at com.alibaba.com.caucho.hessian.io.SerializerFactory.getSerializer(SerializerFactory.java:375)
    at com.alibaba.com.caucho.hessian.io.Hessian2Output.writeObject(Hessian2Output.java:389)
    at org.apache.dubbo.common.serialize.hessian2.Hessian2ObjectOutput.writeObject(Hessian2ObjectOutput.java:89)
    at org.apache.dubbo.rpc.protocol.dubbo.DubboCodec.encodeRequestData(DubboCodec.java:185)
    at org.apache.dubbo.remoting.exchange.codec.ExchangeCodec.encodeRequest(ExchangeCodec.java:238)
    at org.apache.dubbo.remoting.exchange.codec.ExchangeCodec.encode(ExchangeCodec.java:69)
    at org.apache.dubbo.rpc.protocol.dubbo.DubboCountCodec.encode(DubboCountCodec.java:40)
    at org.apache.dubbo.remoting.transport.netty4.NettyCodecAdapter$InternalEncoder.encode(NettyCodecAdapter.java:70)

所以这里就引出了 Consumer 端初始化配置的操作,上述明显是执行dubbo调用才会爆出的错误。所以肯定在Consumer在配置配置初始化会在注册中心和 服务端交互。

Consumer 中对callback 支持主要 体现在以下两个方面:

另一方面,如果Consumer 端传入一个实现了Serializable,而客户端尝试调用其里面内部方法,则会报错,客户端会报空指针错误:

Exception in thread "main" java.lang.NullPointerException
    at java.util.concurrent.ConcurrentHashMap.putVal(ConcurrentHashMap.java:1011)
    at java.util.concurrent.ConcurrentHashMap.put(ConcurrentHashMap.java:1006)
    at com.anla.rpc.callback.provider.impl.CallbackServiceImpl.addListener(CallbackServiceImpl.java:44)
    at org.apache.dubbo.common.bytecode.Wrapper1.invokeMethod(Wrapper1.java)
    at org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory$1.doInvoke(JavassistProxyFactory.java:47)
    at org.apache.dubbo.rpc.proxy.AbstractProxyInvoker.invoke(AbstractProxyInvoker.java:84)
    at org.apache.dubbo.config.invoker.DelegateProviderMetaDataInvoker.invoke(DelegateProviderMetaDataInvoker.java:56)
    at org.apache.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:56)
    at org.apache.dubbo.rpc.filter.ExceptionFilter.invoke(ExceptionFilter.java:55)

而服务端也会给出给出红色警告信息:

十月 13, 2019 11:38:17 下午 com.alibaba.com.caucho.hessian.io.SerializerFactory getDeserializer
警告: Hessian/Burlap: 'com.anla.rpc.callback.consumer.Consumer$CallBackDemo' is an unknown class in sun.misc.Launcher$AppClassLoader@18b4aac2:
java.lang.ClassNotFoundException: com.anla.rpc.callback.consumer.Consumer$CallBackDemo

为什么会出现这样信息呢?读完这篇文章估计大家就会有结论了。

Consumer 增加callback配置

Consumer 端 从 注册中心获取 相关配置 在 RegistryProtocol 中进行,而具体管控则是由 RegistryDirectory 进行调用。整个Consumer配置可以看 : @Reference或ReferenceConfig.get代理对象如何产生(一):SPI模式中 Wrapper和 SPI 类组装逻辑Dubbo 消费者中 代理对象 初始化详解

本文同样只拎出 获取 callback 配置相关过程。

Protocl 会执行 refer 方法,去获取某Invoker 首先进入 RegistryProtocoldoRefer 方法:

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
    // 构造一个RegistryDirectory
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        // 订阅的url
        if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) {
            directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url));
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        // 开始订阅
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));
        // 返回Invoker
        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }

doRefer 方法中 主要通过 构造一个 RegistryDirectory 进行整个 对外服务相关管理,设置完相关参数,执行

        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
                PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY));

RegistryDirectory 方法:

    public void subscribe(URL url) {
        setConsumerUrl(url);
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        registry.subscribe(url, this);
    }

而后在 FailbackRegistrysubscribe 方法:

    @Override
    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

代码较深,就单以文字加部分代码分析。

当服务端url 有值是,会执行 RegistryDirectoryrefreshOverrideAndInvoker 。将其进行配置更新,最终将服务端配置addListener.1.callback=true 加入到自己请求参数中,当进行序列化时绕过Hessian。

Consumer 构造Request数据

当Consumer 端这边已经搞好配置,也拿到了 负载均衡后的 Invoker,一步一步穿过Protocol,Transporter,转由Netty发送,而Netty 发送,则会对其进行编码,从而使用 到Dubbo 自实现的编码方式,具体逻辑如下:

Consumer 端 callback 产生以及传递

如果说,callback 参数是直接发送null,那么callback如何传递给服务端呢?二者如何交互呢? 先看第一个 , 当判断为 CALLBACK_CREATE 事件是,将 exportOrUnexportCallbackService 返回的 String 放入到 需要传递的 RpcInvocation 中,并且以 sys_callback_arg + paraIndex 作为key。 而在encodeRequestData 方法最后,会执行 out.writeObject(inv.getAttachments());RpcInvocation 的所有attachments 都交由 Netty发送。 所以,最终 callback 在客户端是以 String 类型 通过Netty 发送给 Provider 端。

下面看 exportOrUnexportCallbackService 执行了什么操作:

    private static String exportOrUnexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException {
        // 获取一个instid
        int instid = System.identityHashCode(inst);
        // 由于会调用共用的export,所以这个 callback 的服务和 主 服务共享一个 service
        // 以下为构造参数过程
        Map<String, String> params = new HashMap<>(3);
        params.put(IS_SERVER_KEY, Boolean.FALSE.toString());
        params.put(IS_CALLBACK_SERVICE, Boolean.TRUE.toString());
        String group = (url == null ? null : url.getParameter(GROUP_KEY));
        if (group != null && group.length() > 0) {
            params.put(GROUP_KEY, group);
        }
        // add method, for verifying against method, automatic fallback (see dubbo protocol)
        params.put(METHODS_KEY, StringUtils.join(Wrapper.getWrapper(clazz).getDeclaredMethodNames(), ","));

        Map<String, String> tmpMap = new HashMap<>(url.getParameters());
        tmpMap.putAll(params);
        tmpMap.remove(VERSION_KEY);// doesn't need to distinguish version for callback
        tmpMap.put(INTERFACE_KEY, clazz.getName());
        URL exportUrl = new URL(DubboProtocol.NAME, channel.getLocalAddress().getAddress().getHostAddress(), channel.getLocalAddress().getPort(), clazz.getName() + "." + instid, tmpMap);

        // no need to generate multiple exporters for different channel in the same JVM, cache key cannot collide.
        String cacheKey = getClientSideCallbackServiceCacheKey(instid);
        String countKey = getClientSideCountKey(clazz.getName());
        if (export) {
            // one channel can have multiple callback instances, no need to re-export for different instance.
            if (!channel.hasAttribute(cacheKey)) {
                if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
                // 构造一个callback 的Invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(inst, clazz, exportUrl);
                    // 暴露该服务,获取一个 Exporter                    
                    Exporter<?> exporter = protocol.export(invoker);
                    // this is used for tracing if instid has published service or not.
                    // 将Exporter 放入channel 中
                    channel.setAttribute(cacheKey, exporter);
                    logger.info("Export a callback service :" + exportUrl + ", on " + channel + ", url is: " + url);
                    increaseInstanceCount(channel, countKey);
                }
            }
        } else {
        // 如果是销毁callback操作
            if (channel.hasAttribute(cacheKey)) {
                Exporter<?> exporter = (Exporter<?>) channel.getAttribute(cacheKey);
                exporter.unexport();
                channel.removeAttribute(cacheKey);
                decreaseInstanceCount(channel, countKey);
            }
        }
        return String.valueOf(instid);
    }

上面方法有以下几个过程:

下面看看 callback 的 Invoker 产生逻辑,即 PROXY_FACTORY.getInvoker(inst, clazz, exportUrl); 代码逻辑: 最终通过 JavaassistProxyFactory 产生一个代理Invoker,用于执行传入inst 实例的方法。

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

所以这个 getInvoker 最终获取一个 Invoker ,这个Invoker 只是包装了一层,最终执行 wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); 即执行 传入proxy 的,执行 methodName 的方法。

Provider 对 Callback 适配

服务端对callback 也有着特定的适配:

当 Provider 获取而来的 具体 Callback 对象时,参数有了封装,看截图: 在这里插入图片描述 此处相信认真读的同学有个疑问:

往下看。

当在客户端传递过来是,在Netty 的encode 逻辑处,对callback进行了适配,所以其实在 provider 的 decode 逻辑进行了封装,用 InvokerInvoocationHandler 以及 AsyncToSyncInvoker 的包装类型。 下面看看具体逻辑:

看看 referOrDestroyCallbackService 中是如何在服务端构造一个 代理对象的:

    private static Object referOrDestroyCallbackService(Channel channel, URL url, Class<?> clazz, Invocation inv, int instid, boolean isRefer) {
        Object proxy = null;
        // invoker 缓存对象 的key:callback.service.proxy.12503143.com.anla.rpc.callback.provider.service.CallbackListener.654342195.invoker 
        String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
        // 代理缓存对象key:callback.service.proxy.12503143.com.anla.rpc.callback.provider.service.CallbackListener.654342195
        String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
        // 判断当前 channel 是否已经产生了代理对象。
        proxy = channel.getAttribute(proxyCacheKey);
        // count 的key  callback.service.proxy.12503143.com.anla.rpc.callback.provider.service.CallbackListener.COUNT
        String countkey = getServerSideCountKey(channel, clazz.getName());
        if (isRefer) {
            if (proxy == null) {
                URL referurl = URL.valueOf("callback://" + url.getAddress() + "/" + clazz.getName() + "?" + INTERFACE_KEY + "=" + clazz.getName());
                referurl = referurl.addParametersIfAbsent(url.getParameters()).removeParameter(METHODS_KEY);
                // 以下判断是否超出 服务的 callback 参数
                if (!isInstancesOverLimit(channel, referurl, clazz.getName(), instid, true)) {
                    @SuppressWarnings("rawtypes")
                    // 构造一个Invoker
                    Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, referurl, String.valueOf(instid));
                    // 使用 JavassistProxyFactory 生成一个由 InvokerInvocationHandler+ AsyncToSyncInvoker 的包装的invoker
                    proxy = PROXY_FACTORY.getProxy(new AsyncToSyncInvoker<>(invoker));
                    // 设置channel属性
                    channel.setAttribute(proxyCacheKey, proxy);
                    channel.setAttribute(invokerCacheKey, invoker);
                    // 设置计数到channel中
                    increaseInstanceCount(channel, countkey);
                    // 忽略并发问题,快速失败
                // 将构造出的invoker 放入channel中
                    Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(CHANNEL_CALLBACK_KEY);
                    if (callbackInvokers == null) {
                        callbackInvokers = new ConcurrentHashSet<Invoker<?>>(1);
                        callbackInvokers.add(invoker);
                        channel.setAttribute(CHANNEL_CALLBACK_KEY, callbackInvokers);
                    }
                    logger.info("method " + inv.getMethodName() + " include a callback service :" + invoker.getUrl() + ", a proxy :" + invoker + " has been created.");
                }
            }
        } else {
            if (proxy != null) {
            // 从Invoker 中拿出并销毁
                Invoker<?> invoker = (Invoker<?>) channel.getAttribute(invokerCacheKey);
                try {
                    Set<Invoker<?>> callbackInvokers = (Set<Invoker<?>>) channel.getAttribute(CHANNEL_CALLBACK_KEY);
                    if (callbackInvokers != null) {
                        callbackInvokers.remove(invoker);
                    }
                    invoker.destroy();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                // 直接从map中删除。
                channel.removeAttribute(proxyCacheKey);
                channel.removeAttribute(invokerCacheKey);
                decreaseInstanceCount(channel, countkey);
            }
        }
        return proxy;
    }

此时该回调的Invoker 中的url为 callback开头:

callback://192.168.1.107:20880/com.anla.rpc.callback.provider.service.CallbackListener?addListener.1.callback=true&anyhost=true&application=provider&bean.name=com.anla.rpc.callback.provider.service.CallbackService&bind.ip=192.168.1.107&bind.port=20880&callbacks=1000&connections=1&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.anla.rpc.callback.provider.service.CallbackListener&pid=1090&register=true&release=2.7.2&side=provider&timestamp=1571061967992

但是此 url 好像并没有体现出任何作用。 另一方面,这次从服务端发起的回调也是 twoWay类型,即服务端会等待客户端执行结果。 但是在服务端并没有设置超时控制的 task 回调,但是会记录下调用的异常。

Dubbo 客户端接受回调

当服务端调用之后,则是直接以 Netty 调用方式调用 dubbo接口,而不再去询问注册中心是否有对应服务。 客户端拿到 回调用,通过 对事件进行判断是 以下事件某一种:CONNECTEDDISCONNECTEDSENTRECEIVEDCAUGHT的某种,而此次回调属于 RECEIVED 事件,从而 使用一个 ChannelEventRunnable 用于执行其逻辑。

在Netty 传输时,Invoker 并没有拿回来,而只是 拿到 serviceKey 去解析,从而获取到 客户端 所缓存的 DubboExporter,最终由 DubboExporter 返回对应的Invoker 对象,这个Invoker 对象就是dubbo 为callback 方法创建的一个代理对象,而由该Invoker 对象则最终负责由服务端传递过来的 参数类型调用对应的方法。

流程

简单梳理下参数回调Callback 整体实现原理:

整个参数回调Callback 实现就研究完成。

相信读完全篇文章,大家就会对博主开篇几个问题有自己见解了,如果对问题仍然不太懂,可以结合源码进行多次调试。 当然各位同学也可以关注博主公众号,通过后台获取博主私人微信进行交流,一起探讨其中疑问

觉得博主写的有用,不妨关注博主公众号: 六点A君。 哈哈哈,Dubbo小吃街不迷路

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/60356.html

(0)
上一篇 2021年8月10日
下一篇 2021年8月10日

相关推荐

发表回复

登录后才能评论