源码解析
上一篇Http Server实现中讲到,当channel被注册之后,这个类中的initChannel方法就会被调用,同时在管道后面加入handlers。那具体的,我们的Http Server是怎样处理消息的?
【源代码片段】
class HttpsServerInitializer extends ChannelInitializer<SocketChannel> {
...
// 在管道后面加入handlers
pipeline.addLast(new HttpRequestDecoder(),// 这个是对http解码
new HttpResponseEncoder(),// 这个是对http的响应编码
new HttpConnectionHandler(),// 这个是对http连接处理
new HttpObjectAggregator(Integer.MAX_VALUE),// 这个是http对象聚合
new HTTPHandler());// 这个是http的Handler的具体实现
}
}
}
处理逻辑
【源代码片段】
class HTTPHandler extends SimpleChannelInboundHandler<HttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
HttpPostRequestDecoder decoder = null;
// todo start server span, we should get channel here to put span in channel's context in async call.
try {
if (!httpRequest.decoderResult().isSuccess()) {
sendError(ctx, HttpResponseStatus.BAD_REQUEST);
return;
}
// 请求命令
final HttpCommand requestCommand = new HttpCommand();
// todo record command opaque in span.
httpRequest.headers().set(ProtocolKey.ClientInstanceKey.IP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
...
Map<String, Object> bodyMap = new HashMap<>();
// GET请求
if (httpRequest.method() == HttpMethod.GET) {
QueryStringDecoder getDecoder = new QueryStringDecoder(httpRequest.uri());
getDecoder.parameters().entrySet().forEach(entry -> {
bodyMap.put(entry.getKey(), entry.getValue().get(0));
});
// POST请求
} else if (httpRequest.method() == HttpMethod.POST) {
decoder = new HttpPostRequestDecoder(defaultHttpDataFactory, httpRequest);
// 获取参数列表
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
if (parm.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
Attribute data = (Attribute) parm;
bodyMap.put(data.getName(), data.getValue());
}
}
} else {
sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
return;
}
...
String requestCode =
(httpRequest.method() == HttpMethod.POST) ? StringUtils.deleteWhitespace(httpRequest.headers().get(ProtocolKey.REQUEST_CODE))
: MapUtils.getString(bodyMap, StringUtils.lowerCase(ProtocolKey.REQUEST_CODE), "");
requestCommand.setHttpMethod(httpRequest.method().name());
requestCommand.setHttpVersion(httpRequest.protocolVersion().protocolName());
requestCommand.setRequestCode(requestCode);
// todo record command method, version and requestCode in span.
// 响应命令
HttpCommand responseCommand = null;
...
try {
// requestCommand头部和主体
requestCommand.setHeader(Header.buildHeader(requestCode, parseHTTPHeader(httpRequest)));
requestCommand.setBody(Body.buildBody(requestCode, bodyMap));
} catch (Exception e) {
responseCommand = requestCommand.createHttpCommandResponse(EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_RUNTIME_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 3));
sendResponse(ctx, responseCommand.httpResponse());
return;
}
...
// 异步消息上下文
AsyncContext<HttpCommand> asyncContext = new AsyncContext<HttpCommand>(requestCommand, responseCommand, asyncContextCompleteHandler);
// 处理网络请求
processEventMeshRequest(ctx, asyncContext);
} catch (Exception ex) {
httpServerLogger.error("AbrstractHTTPServer.HTTPHandler.channelRead0 err", ex);
// todo span end with exception.
} finally {
try {
decoder.destroy();
} catch (Exception e) {
}
}
}
// 处理网络请求
public void processEventMeshRequest(final ChannelHandlerContext ctx,
final AsyncContext<HttpCommand> asyncContext) {
final Pair<HttpRequestProcessor, ThreadPoolExecutor> choosed = processorTable.get(Integer.valueOf(asyncContext.getRequest().getRequestCode()));
try {
// 为任务分配一个线程
choosed.getObject2().submit(() -> {
try {
// 拒绝请求
if (choosed.getObject1().rejectRequest()) {
HttpCommand responseCommand = asyncContext.getRequest().createHttpCommandResponse(EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getRetCode(), EventMeshRetCode.EVENTMESH_REJECT_BY_PROCESSOR_ERROR.getErrMsg());
asyncContext.onComplete(responseCommand);
if (asyncContext.isComplete()) {
if (httpLogger.isDebugEnabled()) {
httpLogger.debug("{}", asyncContext.getResponse());
}
sendResponse(ctx, responseCommand.httpResponse());
}
return;
}
// 处理请求,对接到http的协议部分的处理器
choosed.getObject1().processRequest(ctx, asyncContext);
if (asyncContext == null || !asyncContext.isComplete()) {
return;
}
...
// 发送响应
sendResponse(ctx, asyncContext.getResponse().httpResponse());
} catch (Exception e) {
httpServerLogger.error("process error", e);
}
});
} catch (RejectedExecutionException re) {
...
try {
sendResponse(ctx, asyncContext.getResponse().httpResponse());
} catch (Exception e) {
}
}
}
...
}
处理器
每一个处理器都会继承它的父类HttpRequestProcessor,并且重写处理请求和拒绝请求两个方法。具体业务中,会根据消息的请求码分配处理器来对消息进行处理。
处理器逻辑,这里以SendAsyncMessageProcessor消息处理为例进行讲解,其他处理器参照即可。
public class SendAsyncMessageProcessor implements HttpRequestProcessor {
@Override
public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
...
// 获取发送消息的请求
SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) asyncContext.getRequest().getHeader();
SendMessageRequestBody sendMessageRequestBody = (SendMessageRequestBody) asyncContext.getRequest().getBody();
// 获取发送消息的响应
SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader.buildHeader(Integer.valueOf(asyncContext.getRequest().getRequestCode()), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshCluster,
IPUtil.getLocalAddress(), eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshEnv,
eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshIDC);
//证实请求头部
if (StringUtils.isBlank(sendMessageRequestHeader.getIdc())
|| StringUtils.isBlank(sendMessageRequestHeader.getPid())
|| !StringUtils.isNumeric(sendMessageRequestHeader.getPid())
|| StringUtils.isBlank(sendMessageRequestHeader.getSys())) {
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
//证实请求主体
if (StringUtils.isBlank(sendMessageRequestBody.getBizSeqNo())
|| StringUtils.isBlank(sendMessageRequestBody.getUniqueId())
|| StringUtils.isBlank(sendMessageRequestBody.getProducerGroup())
|| StringUtils.isBlank(sendMessageRequestBody.getTopic())
|| StringUtils.isBlank(sendMessageRequestBody.getContent())
|| (StringUtils.isBlank(sendMessageRequestBody.getTtl()))) {
//sync message TTL can't be empty
responseEventMeshCommand = asyncContext.getRequest().createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getRetCode(), EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg()));
asyncContext.onComplete(responseEventMeshCommand);
return;
}
//检查acl
if(eventMeshHTTPServer.getEventMeshHttpConfiguration().eventMeshServerSecurityEnable) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = sendMessageRequestHeader.getUsername();
String pass = sendMessageRequestHeader.getPasswd();
String subsystem = sendMessageRequestHeader.getSys();
int requestCode = Integer.valueOf(sendMessageRequestHeader.getCode());
String topic = sendMessageRequestBody.getTopic();
try {
Acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
}catch (Exception e){
//String errorMsg = String.format("CLIENT HAS NO PERMISSION,send failed, topic:%s, subsys:%s, realIp:%s", topic, subsys, realIp);
...
}
}
// 分配生产者组
String producerGroup = sendMessageRequestBody.getProducerGroup();
EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
...
String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
// 消息
Message omsMsg = new Message();
try {
// body
omsMsg.setBody(sendMessageRequestBody.getContent().getBytes(EventMeshConstants.DEFAULT_CHARSET));
// topic
omsMsg.setTopic(sendMessageRequestBody.getTopic());
omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_DESTINATION, sendMessageRequestBody.getTopic());
...
}
// ttl
omsMsg.putUserProperties(Constants.PROPERTY_MESSAGE_TIMEOUT, ttl);
// bizNo
omsMsg.putSystemProperties(Constants.PROPERTY_MESSAGE_SEARCH_KEYS, sendMessageRequestBody.getBizSeqNo());
omsMsg.putUserProperties("msgType", "persistent");
omsMsg.putUserProperties(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
omsMsg.putUserProperties(Constants.RMB_UNIQ_ID, sendMessageRequestBody.getUniqueId());
omsMsg.putUserProperties(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
// new rocketmq client can't support put DeFiBusConstant.PROPERTY_MESSAGE_TTL
// rocketMQMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_TTL, ttl);
} catch (Exception e) {
...
asyncContext.onComplete(responseEventMeshCommand);
return;
}
// 发送消息的上下文
final SendMessageContext sendMessageContext = new SendMessageContext(sendMessageRequestBody.getBizSeqNo(), omsMsg, eventMeshProducer, eventMeshHTTPServer);
eventMeshHTTPServer.metrics.summaryMetrics.recordSendMsg();
final CompleteHandler<HttpCommand> handler = new CompleteHandler<HttpCommand>() {
@Override
// 响应
public void onResponse(HttpCommand httpCommand) {
try {
if (httpLogger.isDebugEnabled()) {
httpLogger.debug("{}", httpCommand);
}
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
eventMeshHTTPServer.metrics.summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
}
}
};
try {
sendMessageContext.getMsg().getUserProperties().put(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
// 生产组发送精简消息
eventMeshProducer.send(sendMessageContext, new SendCallback() {
// 成功
@Override
public void onSuccess(SendResult sendResult) {
...
}
// 错误
@Override
public void onException(OnExceptionContext context) {
...
}
});
} catch (Exception ex) {
...
}
return;
}
@Override
public boolean rejectRequest() {
return false;
}
}
最后,我推荐大家去看看官方的协议文档中关于协议这一部分的内容,可以方便我们理解:官方http协议文档
注:本文内容由社区小伙伴陈创慧提供,
{{o.name}}
{{m.name}}
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/199422.html