diff --git a/plugins/wireshark/README.md b/plugins/wireshark/README.md
new file mode 100644
index 00000000..1dbca5f9
--- /dev/null
+++ b/plugins/wireshark/README.md
@@ -0,0 +1,49 @@
+# Bolt wireshark plugin
+
+How to use it:
+
+* For Mac or Linux:
+```
+$ sh install.sh
+```
+* For Windows:
+```
+$ .\install.bat
+```
+
+Then open wireshark and decode package as BOLT.
+
+Bolt package will be decoded like this:
+
+```
+Bolt Protocol Data
+ Header
+ rpc_trace_context.sofaRpcId: 0
+ rpc_trace_context.sofaTraceId: 0bxxxx335162832343267634611586
+ rpc_trace_context.sofaCallerIp: 1.2.3.4
+ service: com.sofastack.demo.Service:1.0
+ rpc_trace_context.sofaCallerApp: test-app
+ sofa_head_method_name: hello
+ Payload
+ payload
+ proto: 2 (BOLTv2)
+ ver1: 1
+ type: 1 (request)
+ cmdcode: 1 (request)
+ ver2: 1
+ req_id: 0
+ codec: 11 (protobuf)
+ switch: 1
+ timeout: 3000
+ class_len: 44
+ header_len: 691
+ content_len: 65
+ classname: com.alipay.sofa.rpc.core.request.SofaRequest
+ rpc_id: 0
+ trace_id: 0bxxxx335162832343267634611586
+```
+
+For advanced usage, you can search for any property under the bolt protocol, such as:
+```
+bolt.trace_id == 0bxxxx335162832343267634611586
+```
diff --git a/plugins/wireshark/bolt.lua b/plugins/wireshark/bolt.lua
new file mode 100644
index 00000000..a51fa736
--- /dev/null
+++ b/plugins/wireshark/bolt.lua
@@ -0,0 +1,441 @@
+--[[
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+]]
+-- ######################################################################################################
+--
+-- WARN(dunjut):
+--
+-- This is just an alpha version of wireshark bolt protocol dissector, potential bugs could mislead your
+-- troubleshooting to a wrong direction (for example fields may not be correctly parsed in corner cases).
+--
+-- Bug reports and optimizations are welcomed (not a lua expert here...)
+--
+-- ######################################################################################################
+-- Request command protocol for v1
+-- ** request definition **
+--
+-- 0 1 2 4 6 8 10 12 14 16
+-- +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+-- |proto| type| cmdcode |ver2 | requestId |codec| timeout | classLen |
+-- +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+-- |headerLen | contentLen | ... ... |
+-- +-----------+-----------+-----------+ +
+-- | className + header + content bytes |
+-- + +
+-- | ... ... |
+-- +-----------------------------------------------------------------------------------------------+
+--
+--
+-- Response command protocol for v1
+-- ** response definition **
+--
+-- 0 1 2 3 4 6 8 10 12 14 16
+-- +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
+-- |proto| type| cmdcode |ver2 | requestId |codec|respstatus | classLen |headerLen |
+-- +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
+-- | contentLen | ... ... |
+-- +-----------------------+ +
+-- | header + content bytes |
+-- + +
+-- | ... ... |
+-- +-----------------------------------------------------------------------------------------------+
+--
+--
+-- ######################################################################################################
+-- Request command protocol for v2
+-- ** request definition **
+--
+-- 0 1 2 4 6 8 10 11 12 14 16
+-- +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
+-- |proto| ver1|type | cmdcode |ver2 | requestId |codec|switch| timeout |
+-- +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
+-- |classLen |headerLen |contentLen | ... |
+-- +-----------+-----------+-----------+-----------+ +
+-- | className + header + content bytes |
+-- + +
+-- | ... ... | CRC32(optional) |
+-- +------------------------------------------------------------------------------------------------+
+--
+--
+-- Response command protocol for v2
+-- ** response definition **
+--
+-- 0 1 2 3 4 6 8 10 11 12 14 16
+-- +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+-----+-----+-----+
+-- |proto| ver1| type| cmdcode |ver2 | requestId |codec|switch|respstatus | classLen |
+-- +-----------+-----------+-----------+-----------+-----------+------------+-----------+-----------+
+-- |headerLen | contentLen | ... |
+-- +-----------------------------------+ +
+-- | className + header + content bytes |
+-- + +
+-- | ... ... | CRC32(optional) |
+-- +------------------------------------------------------------------------------------------------+
+-- respstatus: response status
+--
+-- Author: dunjut
+-- Modified by JervyShi
+
+
+bolt_protocol = Proto("Bolt", "SOFA Bolt Protocol")
+
+-- common fields
+proto = ProtoField.int8("bolt.proto", "proto", base.DEC)
+typ = ProtoField.int8("bolt.type", "type", base.DEC)
+cmdcode = ProtoField.int16("bolt.cmdcode", "cmdcode", base.DEC)
+ver2 = ProtoField.int8("bolt.ver2", "ver2", base.DEC) -- not sure what 'ver' actually means
+req_id = ProtoField.int32("bolt.req_id", "req_id", base.DEC)
+codec = ProtoField.int8("bolt.codec", "codec", base.DEC)
+
+classLen = ProtoField.int16("bolt.class_len", "class_len", base.DEC)
+headerLen = ProtoField.int16("bolt.header_len", "header_len", base.DEC)
+contentLen = ProtoField.int32("bolt.content_len", "content_len", base.DEC)
+bufferLen = ProtoField.int32("bolt.buffer_len", "buffer_len", base.DEC)
+
+-- reqeust only
+timeout = ProtoField.int32("bolt.timeout", "timeout", base.DEC)
+
+-- response only
+status = ProtoField.int16("bolt.status", "status", base.DEC)
+-- for both
+classname = ProtoField.string("bolt.classname", "classname", base.ASCII)
+payload = ProtoField.none("bolt.payload", "payload", base.HEX)
+
+-- not predefined fields but frequetly used data
+trace_id = ProtoField.string("bolt.trace_id", "trace_id", base.ASCII)
+rpc_id = ProtoField.string("bolt.rpc_id", "rpc_id", base.ASCII)
+
+-- boltv2 only fields
+ver1 = ProtoField.int8("bolt.ver1", "ver1", base.DEC) -- bolt v2 only
+switch = ProtoField.int8("bolt.switch", "switch", base.DEC) -- bolt v2 only, crc switch
+
+bolt_protocol.fields = {
+ proto,
+ ver1, -- boltv2 only
+ typ, cmdcode, ver2, req_id, codec,
+ switch, -- boltv2 only
+ timeout, -- request only
+ status, -- response only
+ classLen,
+ headerLen,
+ contentLen,
+ bufferLen,
+ classname,
+ payload,
+ crc, -- boltv2 optional
+ trace_id, rpc_id
+}
+
+function bolt_protocol.dissector(buffer, pinfo, tree)
+ -- Ignore zero-length packets.
+ length = buffer:len()
+ if length == 0 then
+ return
+ end
+
+ pinfo.cols.protocol = bolt_protocol.name
+
+ local subtree = tree:add(bolt_protocol, buffer(), "Bolt Protocol Data")
+ local headerSubtree = subtree:add(bolt_protocol, buffer(), "Header")
+ local payloadSubtree = subtree:add(bolt_protocol, buffer(), "Payload")
+
+ local reader_index = 0
+
+ -- Parse common fields
+ local proto_num = buffer(reader_index, 1):uint()
+ local proto_name = get_proto_name(proto_num)
+ subtree:add(proto, buffer(reader_index, 1)):append_text(" (" .. proto_name .. ")")
+
+ reader_index = reader_index + 1
+
+ if proto_name == "BOLTv1" then
+ local type_num = buffer(reader_index, 1):uint()
+ local type_name = get_type_name(type_num)
+ subtree:add(typ, buffer(reader_index, 1)):append_text(" (" .. type_name .. ")")
+ reader_index = reader_index + 1
+
+ local cmdcode_num = buffer(reader_index, 2):uint()
+ local cmdcode_name = get_cmdcode_name(cmdcode_num)
+ subtree:add(cmdcode, buffer(reader_index, 2)):append_text(" (" .. cmdcode_name .. ")")
+ reader_index = reader_index + 2
+
+ subtree:add(ver2, buffer(reader_index, 1))
+ reader_index = reader_index + 1
+ subtree:add(req_id, buffer(reader_index, 4))
+ reader_index = reader_index + 4
+
+ local codec_num = buffer(reader_index, 1):uint()
+ local codec_name = get_codec_name(codec_num)
+ subtree:add(codec, buffer(reader_index, 1)):append_text(" (" .. codec_name .. ")")
+ reader_index = reader_index + 1
+
+ -- for request packets --
+ if type_name == "request" or type_name == "oneway" then
+ parse_request(buffer, subtree, headerSubtree, payloadSubtree, reader_index)
+ end
+
+ -- for response packets --
+ if cmdcode_name == "response" then
+ parse_response(buffer, subtree, headerSubtree, payloadSubtree, reader_index)
+ end
+ end
+
+ if proto_name == "BOLTv2" then
+ subtree:add(ver1, buffer(reader_index, 1))
+ reader_index = reader_index + 1
+
+ local type_num = buffer(reader_index, 1):uint()
+ local type_name = get_type_name(type_num)
+ subtree:add(typ, buffer(reader_index, 1)):append_text(" (" .. type_name .. ")")
+ reader_index = reader_index + 1
+
+ local cmdcode_num = buffer(reader_index, 2):uint()
+ local cmdcode_name = get_cmdcode_name(cmdcode_num)
+ subtree:add(cmdcode, buffer(reader_index, 2)):append_text(" (" .. cmdcode_name .. ")")
+ reader_index = reader_index + 2
+
+ subtree:add(ver2, buffer(reader_index, 1))
+ reader_index = reader_index + 1
+ subtree:add(req_id, buffer(reader_index, 4))
+ reader_index = reader_index + 4
+
+ local codec_num = buffer(reader_index, 1):uint()
+ local codec_name = get_codec_name(codec_num)
+ subtree:add(codec, buffer(reader_index, 1)):append_text(" (" .. codec_name .. ")")
+ reader_index = reader_index + 1
+
+ subtree:add(switch, buffer(reader_index, 1))
+ reader_index = reader_index + 1
+
+ -- for request packets --
+ if type_name == "request" or type_name == "oneway" then
+ parse_request(buffer, subtree, headerSubtree, payloadSubtree, reader_index)
+ end
+
+ -- for response packets --
+ if type_name == "response" then
+ parse_response(buffer, subtree, headerSubtree, payloadSubtree, reader_index)
+ end
+ end
+end
+
+function parse_request(buffer, subtree, headerSubtree, payloadSubtree, reader_index)
+ subtree:add(timeout, buffer(reader_index, 4))
+ reader_index = reader_index + 4
+
+ local class_len = buffer(reader_index, 2):uint()
+ subtree:add(classLen, buffer(reader_index, 2))
+ reader_index = reader_index + 2
+
+ -- headers
+ local header_len = buffer(reader_index, 2):uint()
+ subtree:add(headerLen, buffer(reader_index, 2))
+ reader_index = reader_index + 2
+
+ -- payload
+ local content_len = buffer(reader_index, 4):uint()
+ subtree:add(contentLen, buffer(reader_index, 4))
+ reader_index = reader_index + 4
+
+ subtree:add(classname, buffer(reader_index, class_len))
+ reader_index = reader_index + class_len
+
+ -- parse header
+ parse_headers(buffer, subtree, headerSubtree, reader_index, header_len)
+ reader_index = reader_index + header_len
+
+ if buffer:len() >= reader_index + content_len then
+ -- parse payload
+ payloadSubtree:add(payload, buffer(reader_index, content_len))
+ reader_index = reader_index + content_len
+ end
+end
+
+function parse_response(buffer, subtree, headerSubtree, payloadSubtree, reader_index)
+ local status_code = buffer(reader_index, 2):uint()
+ local status_name = get_status_name(status_code)
+ subtree:add(status, buffer(reader_index, 2)):append_text(" (" .. status_name .. ")")
+ reader_index = reader_index + 2
+
+ local class_len = buffer(reader_index, 2):uint()
+ subtree:add(classLen, buffer(reader_index, 2))
+ reader_index = reader_index + 2
+
+ -- headers
+ local header_len = buffer(reader_index, 2):uint()
+ subtree:add(headerLen, buffer(reader_index, 2))
+ reader_index = reader_index + 2
+
+ -- payload
+ local content_len = buffer(reader_index, 4):uint()
+ subtree:add(contentLen, buffer(reader_index, 4))
+ reader_index = reader_index + 4
+
+ -- parse className
+ subtree:add(classname, buffer(reader_index, class_len))
+ reader_index = reader_index + class_len
+
+ -- parse headers
+ parse_headers(buffer, subtree, headerSubtree, reader_index, header_len)
+ reader_index = reader_index + header_len
+
+ if buffer:len() >= reader_index + content_len then
+ -- parse payload
+ payloadSubtree:add(payload, buffer(reader_index, content_len))
+ reader_index = reader_index + content_len
+ end
+end
+
+-- parse headers from buffer(start, len) and add KV pairs into tree (packet details pane of wireshark)
+function parse_headers(buffer, subtree, headerTree, start, len)
+ local remain = len
+ local index = start
+ while remain > 0 do
+ local from = index
+ local kv_len = 0
+
+ -- header key
+ local key_len = buffer(index, 4):uint()
+ index = index + 4
+
+ local key_name = buffer(index, key_len):string(ENC_UTF_8)
+ index = index + key_len
+
+ -- header value
+ local val_len = buffer(index, 4):uint()
+ index = index + 4
+
+ kv_len = 4 + key_len + 4 + val_len
+
+ local value = buffer(index, val_len):string(ENC_UTF_8)
+ headerTree:add(buffer(from, kv_len), key_name .. ": " .. value)
+
+ -- special cases
+ if key_name == "rpc_trace_context.sofaTraceId" then
+ subtree:add(trace_id, buffer(index, val_len))
+ end
+
+ if key_name == "rpc_trace_context.sofaRpcId" then
+ subtree:add(rpc_id, buffer(index, val_len))
+ end
+
+ index = index + val_len
+ remain = remain - kv_len
+ end
+end
+
+-- map proto number to proto string.
+function get_proto_name(proto)
+ local proto_name = "Unknown"
+
+ if proto == 1 then
+ proto_name = "BOLTv1"
+ elseif proto == 2 then
+ proto_name = "BOLTv2"
+ elseif proto == 13 then
+ proto_name = "TR"
+ end
+
+ return proto_name
+end
+
+-- map type number to request type string.
+function get_type_name(typ)
+ local type_name = "Unknown"
+
+ if typ == 0 then
+ type_name = "response"
+ elseif typ == 1 then
+ type_name = "request"
+ elseif typ == 2 then
+ type_name = "oneway"
+ end
+
+ return type_name
+end
+
+-- map cmdcode to string representation of command type.
+function get_cmdcode_name(cmdcode)
+ local cmdcode_name = "Unknown"
+
+ if cmdcode == 0 then
+ cmdcode_name = "heartbeat"
+ elseif cmdcode == 1 then
+ cmdcode_name = "request"
+ elseif cmdcode == 2 then
+ cmdcode_name = "response"
+ end
+
+ return cmdcode_name
+end
+
+-- map codec number to codec name.
+function get_codec_name(codec)
+ local codec_name = "Unknown"
+
+ if codec == 0 then
+ codec_name = "hessian"
+ elseif codec == 1 then
+ codec_name = "hessian2"
+ elseif codec == 11 then
+ codec_name = "protobuf"
+ elseif codec == 12 then
+ codec_name = "json"
+ end
+
+ return codec_name
+end
+
+-- map status code to status string.
+function get_status_name(statuscode)
+ local status_name = "Unknown"
+
+ if statuscode == 0 then
+ status_name = "Success"
+ elseif statuscode == 1 then
+ status_name = "Error"
+ elseif statuscode == 2 then
+ status_name = "Server Exception"
+ elseif statuscode == 3 then
+ status_name = "Unknown"
+ elseif statuscode == 4 then
+ status_name = "Server Thread Pool Busy"
+ elseif statuscode == 5 then
+ status_name = "Error Comm" -- not sure what exactly this means...
+ elseif statuscode == 6 then
+ status_name = "No Processor"
+ elseif statuscode == 7 then
+ status_name = "Timeout"
+ elseif statuscode == 8 then
+ status_name = "Client Send Error"
+ elseif statuscode == 9 then
+ status_name = "Codec Exception"
+ elseif statuscode == 16 then
+ status_name = "Connection Closed" -- 16 is from 0x10 ... (previous codes are 0x00..0x09 -.-!)
+ elseif statuscode == 17 then
+ status_name = "Server Serial Exception"
+ elseif statuscode == 18 then
+ status_name = "Server Deserial Exception"
+ end
+
+ return status_name
+end
+
+-- register our dissector upon tcp port 12200 (default)
+bolt_protocol.prefs.port = Pref.uint("Bolt TCP port", 12200)
+local tcp_port = DissectorTable.get("tcp.port")
+tcp_port:add(bolt_protocol.prefs.port, bolt_protocol)
diff --git a/plugins/wireshark/install.bat b/plugins/wireshark/install.bat
new file mode 100644
index 00000000..2f1388dc
--- /dev/null
+++ b/plugins/wireshark/install.bat
@@ -0,0 +1,24 @@
+@REM
+@REM Licensed to the Apache Software Foundation (ASF) under one or more
+@REM contributor license agreements. See the NOTICE file distributed with
+@REM this work for additional information regarding copyright ownership.
+@REM The ASF licenses this file to You under the Apache License, Version 2.0
+@REM (the "License"); you may not use this file except in compliance with
+@REM the License. You may obtain a copy of the License at
+@REM
+@REM http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM Unless required by applicable law or agreed to in writing, software
+@REM distributed under the License is distributed on an "AS IS" BASIS,
+@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM See the License for the specific language governing permissions and
+@REM limitations under the License.
+@REM
+
+@echo off
+:: Get personal plugin folder from https://www.wireshark.org/docs/wsug_html_chunked/ChPluginFolders.html
+SET plugin_dir=%APPDATA%\Wireshark\plugins
+SET script_dir=%cd%
+if not exist %plugin_dir% mkdir %plugin_dir%
+copy %script_dir%\bolt.lua %plugin_dir%
+echo Wireshark bolt protocol extension has been installed successfully, please restart Wireshark before using it.
\ No newline at end of file
diff --git a/plugins/wireshark/install.sh b/plugins/wireshark/install.sh
new file mode 100755
index 00000000..2ec8422f
--- /dev/null
+++ b/plugins/wireshark/install.sh
@@ -0,0 +1,24 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+plugin_dir=~/.config/wireshark/plugins
+script_dir=`dirname $0`
+mkdir -p ${plugin_dir}
+cp $script_dir/bolt.lua ${plugin_dir}
+echo "Wireshark bolt protocol extension has been installed successfully, please restart Wireshark before using it."
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 46b30823..245011e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
4.0.0com.alipay.sofabolt
- 1.6.3-SNAPSHOT
+ 1.6.4jar${project.groupId}:${project.artifactId}
@@ -38,20 +38,20 @@
- tsui
- xiaomin.cxm@antfin.com
+ chengyi
+ mark.lx@antfin.comAnt Financialhttps://www.alipay.com/
- jiangping
- jiangping@antfin.com
+ tsui
+ xiaomin.cxm@antfin.comAnt Financialhttps://www.alipay.com/
- yueliang
- yueliang.yml@antfin.com
+ jiangping
+ jiangping@antfin.comAnt Financialhttps://www.alipay.com/
@@ -116,7 +116,7 @@
junitjunit
- 4.11
+ 4.13.1test
@@ -128,7 +128,7 @@
org.apache.logging.log4jlog4j-core
- 2.3
+ 2.13.2test
@@ -143,24 +143,6 @@
3.2.0test
-
- org.mockito
- mockito-all
- 1.8.5
- test
-
-
- org.openjdk.jmh
- jmh-core
- 1.20
- test
-
-
- org.openjdk.jmh
- jmh-generator-annprocess
- 1.20
- test
-
diff --git a/src/main/java/com/alipay/remoting/AbstractBoltClient.java b/src/main/java/com/alipay/remoting/AbstractBoltClient.java
index 9d3d3c1c..958cb914 100644
--- a/src/main/java/com/alipay/remoting/AbstractBoltClient.java
+++ b/src/main/java/com/alipay/remoting/AbstractBoltClient.java
@@ -16,14 +16,13 @@
*/
package com.alipay.remoting;
+import com.alipay.remoting.config.BoltClientOption;
import com.alipay.remoting.config.BoltOption;
import com.alipay.remoting.config.BoltOptions;
import com.alipay.remoting.config.ConfigManager;
-import com.alipay.remoting.config.Configurable;
+import com.alipay.remoting.config.Configuration;
import com.alipay.remoting.config.ConfigurableInstance;
import com.alipay.remoting.config.configs.ConfigContainer;
-import com.alipay.remoting.config.configs.ConfigItem;
-import com.alipay.remoting.config.configs.ConfigType;
import com.alipay.remoting.config.configs.DefaultConfigContainer;
import com.alipay.remoting.config.switches.GlobalSwitch;
@@ -34,14 +33,23 @@ public abstract class AbstractBoltClient extends AbstractLifeCycle implements Bo
ConfigurableInstance {
private final BoltOptions options;
- private final ConfigType configType;
private final GlobalSwitch globalSwitch;
private final ConfigContainer configContainer;
public AbstractBoltClient() {
this.options = new BoltOptions();
- this.configType = ConfigType.CLIENT_SIDE;
this.globalSwitch = new GlobalSwitch();
+ if (ConfigManager.conn_reconnect_switch()) {
+ option(BoltClientOption.CONN_RECONNECT_SWITCH, true);
+ } else {
+ option(BoltClientOption.CONN_RECONNECT_SWITCH, false);
+ }
+
+ if (ConfigManager.conn_monitor_switch()) {
+ option(BoltClientOption.CONN_MONITOR_SWITCH, true);
+ } else {
+ option(BoltClientOption.CONN_MONITOR_SWITCH, false);
+ }
this.configContainer = new DefaultConfigContainer();
}
@@ -51,44 +59,36 @@ public T option(BoltOption option) {
}
@Override
- public Configurable option(BoltOption option, T value) {
+ public Configuration option(BoltOption option, T value) {
options.option(option, value);
return this;
}
@Override
+ @Deprecated
public ConfigContainer conf() {
return this.configContainer;
}
@Override
+ @Deprecated
public GlobalSwitch switches() {
return this.globalSwitch;
}
@Override
public void initWriteBufferWaterMark(int low, int high) {
- this.configContainer.set(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK, low);
- this.configContainer.set(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK, high);
+ option(BoltClientOption.NETTY_BUFFER_LOW_WATER_MARK, low);
+ option(BoltClientOption.NETTY_BUFFER_HIGH_WATER_MARK, high);
}
@Override
public int netty_buffer_low_watermark() {
- Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK);
- if (config != null) {
- return (Integer) config;
- } else {
- return ConfigManager.netty_buffer_low_watermark();
- }
+ return option(BoltClientOption.NETTY_BUFFER_LOW_WATER_MARK);
}
@Override
public int netty_buffer_high_watermark() {
- Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK);
- if (config != null) {
- return (Integer) config;
- } else {
- return ConfigManager.netty_buffer_high_watermark();
- }
+ return option(BoltClientOption.NETTY_BUFFER_HIGH_WATER_MARK);
}
}
diff --git a/src/main/java/com/alipay/remoting/AbstractRemotingServer.java b/src/main/java/com/alipay/remoting/AbstractRemotingServer.java
index ebcd7cb2..f1351365 100644
--- a/src/main/java/com/alipay/remoting/AbstractRemotingServer.java
+++ b/src/main/java/com/alipay/remoting/AbstractRemotingServer.java
@@ -20,16 +20,14 @@
import com.alipay.remoting.config.BoltOption;
import com.alipay.remoting.config.BoltOptions;
-import com.alipay.remoting.config.ConfigManager;
-import com.alipay.remoting.config.Configurable;
+import com.alipay.remoting.config.BoltServerOption;
+import com.alipay.remoting.config.Configuration;
import com.alipay.remoting.config.ConfigurableInstance;
import com.alipay.remoting.config.configs.ConfigContainer;
-import com.alipay.remoting.config.configs.ConfigItem;
import com.alipay.remoting.config.configs.DefaultConfigContainer;
import com.alipay.remoting.config.switches.GlobalSwitch;
import org.slf4j.Logger;
-import com.alipay.remoting.config.configs.ConfigType;
import com.alipay.remoting.log.BoltLoggerFactory;
/**
@@ -47,7 +45,6 @@ public abstract class AbstractRemotingServer extends AbstractLifeCycle implement
private int port;
private final BoltOptions options;
- private final ConfigType configType;
private final GlobalSwitch globalSwitch;
private final ConfigContainer configContainer;
@@ -64,7 +61,6 @@ public AbstractRemotingServer(String ip, int port) {
this.port = port;
this.options = new BoltOptions();
- this.configType = ConfigType.SERVER_SIDE;
this.globalSwitch = new GlobalSwitch();
this.configContainer = new DefaultConfigContainer();
}
@@ -149,44 +145,36 @@ public T option(BoltOption option) {
}
@Override
- public Configurable option(BoltOption option, T value) {
+ public Configuration option(BoltOption option, T value) {
options.option(option, value);
return this;
}
@Override
+ @Deprecated
public ConfigContainer conf() {
return this.configContainer;
}
@Override
+ @Deprecated
public GlobalSwitch switches() {
return this.globalSwitch;
}
@Override
public void initWriteBufferWaterMark(int low, int high) {
- this.configContainer.set(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK, low);
- this.configContainer.set(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK, high);
+ option(BoltServerOption.NETTY_BUFFER_LOW_WATER_MARK, low);
+ option(BoltServerOption.NETTY_BUFFER_HIGH_WATER_MARK, high);
}
@Override
public int netty_buffer_low_watermark() {
- Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK);
- if (config != null) {
- return (Integer) config;
- } else {
- return ConfigManager.netty_buffer_low_watermark();
- }
+ return option(BoltServerOption.NETTY_BUFFER_LOW_WATER_MARK);
}
@Override
public int netty_buffer_high_watermark() {
- Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK);
- if (config != null) {
- return (Integer) config;
- } else {
- return ConfigManager.netty_buffer_high_watermark();
- }
+ return option(BoltServerOption.NETTY_BUFFER_HIGH_WATER_MARK);
}
}
diff --git a/src/main/java/com/alipay/remoting/BaseRemoting.java b/src/main/java/com/alipay/remoting/BaseRemoting.java
index 519f890e..f65f3b7e 100644
--- a/src/main/java/com/alipay/remoting/BaseRemoting.java
+++ b/src/main/java/com/alipay/remoting/BaseRemoting.java
@@ -36,8 +36,10 @@
* @version $Id: BaseRemoting.java, v 0.1 Mar 4, 2016 12:09:56 AM tao Exp $
*/
public abstract class BaseRemoting {
- /** logger */
- private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
+
+ private final static Logger LOGGER = BoltLoggerFactory
+ .getLogger("CommonDefault");
+ private final static long ABANDONING_REQUEST_THRESHOLD = 0L;
protected CommandFactory commandFactory;
@@ -58,9 +60,25 @@ public BaseRemoting(CommandFactory commandFactory) {
protected RemotingCommand invokeSync(final Connection conn, final RemotingCommand request,
final int timeoutMillis) throws RemotingException,
InterruptedException {
+ int remainingTime = remainingTime(request, timeoutMillis);
+ if (remainingTime <= ABANDONING_REQUEST_THRESHOLD) {
+ // already timeout
+ LOGGER
+ .warn(
+ "already timeout before writing to the network, requestId: {}, remoting address: {}",
+ request.getId(),
+ conn.getUrl() != null ? conn.getUrl() : RemotingUtil.parseRemoteAddress(conn
+ .getChannel()));
+ return this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
+ }
+
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
conn.addInvokeFuture(future);
final int requestId = request.getId();
+ InvokeContext invokeContext = request.getInvokeContext();
+ if (null != invokeContext) {
+ invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_BEFORE_SEND, System.nanoTime());
+ }
try {
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -70,22 +88,29 @@ public void operationComplete(ChannelFuture f) throws Exception {
conn.removeInvokeFuture(requestId);
future.putResponse(commandFactory.createSendFailedResponse(
conn.getRemoteAddress(), f.cause()));
- logger.error("Invoke send failed, id={}", requestId, f.cause());
+ LOGGER.error("Invoke send failed, id={}", requestId, f.cause());
}
}
});
+ if (null != invokeContext) {
+ invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_AFTER_SEND, System.nanoTime());
+ }
} catch (Exception e) {
conn.removeInvokeFuture(requestId);
future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
- logger.error("Exception caught when sending invocation, id={}", requestId, e);
+ LOGGER.error("Exception caught when sending invocation, id={}", requestId, e);
+ }
+ RemotingCommand response = future.waitResponse(remainingTime);
+
+ if (null != invokeContext) {
+ invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_RECEIVED, System.nanoTime());
}
- RemotingCommand response = future.waitResponse(timeoutMillis);
if (response == null) {
conn.removeInvokeFuture(requestId);
response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
- logger.warn("Wait response, request id={} timeout!", requestId);
+ LOGGER.warn("Wait response, request id={} timeout!", requestId);
}
return response;
@@ -104,6 +129,18 @@ protected void invokeWithCallback(final Connection conn, final RemotingCommand r
final InvokeCallback invokeCallback, final int timeoutMillis) {
final InvokeFuture future = createInvokeFuture(conn, request, request.getInvokeContext(),
invokeCallback);
+ int remainingTime = remainingTime(request, timeoutMillis);
+ if (remainingTime <= ABANDONING_REQUEST_THRESHOLD) {
+ LOGGER
+ .warn(
+ "already timeout before writing to the network, requestId: {}, remoting address: {}",
+ request.getId(),
+ conn.getUrl() != null ? conn.getUrl() : RemotingUtil.parseRemoteAddress(conn
+ .getChannel()));
+ future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
+ future.tryAsyncExecuteInvokeCallbackAbnormally();
+ return;
+ }
conn.addInvokeFuture(future);
final int requestId = request.getId();
try {
@@ -118,7 +155,7 @@ public void run(Timeout timeout) throws Exception {
}
}
- }, timeoutMillis, TimeUnit.MILLISECONDS);
+ }, remainingTime, TimeUnit.MILLISECONDS);
future.addTimeout(timeout);
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -132,7 +169,7 @@ public void operationComplete(ChannelFuture cf) throws Exception {
conn.getRemoteAddress(), cf.cause()));
f.tryAsyncExecuteInvokeCallbackAbnormally();
}
- logger.error("Invoke send failed. The address is {}",
+ LOGGER.error("Invoke send failed. The address is {}",
RemotingUtil.parseRemoteAddress(conn.getChannel()), cf.cause());
}
}
@@ -145,7 +182,7 @@ public void operationComplete(ChannelFuture cf) throws Exception {
f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
f.tryAsyncExecuteInvokeCallbackAbnormally();
}
- logger.error("Exception caught when sending invocation. The address is {}",
+ LOGGER.error("Exception caught when sending invocation. The address is {}",
RemotingUtil.parseRemoteAddress(conn.getChannel()), e);
}
}
@@ -163,6 +200,18 @@ protected InvokeFuture invokeWithFuture(final Connection conn, final RemotingCom
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
conn.addInvokeFuture(future);
+ int remainingTime = remainingTime(request, timeoutMillis);
+ if (remainingTime <= ABANDONING_REQUEST_THRESHOLD) {
+ LOGGER
+ .warn(
+ "already timeout before writing to the network, requestId: {}, remoting address: {}",
+ request.getId(),
+ conn.getUrl() != null ? conn.getUrl() : RemotingUtil.parseRemoteAddress(conn
+ .getChannel()));
+ future.putResponse(commandFactory.createTimeoutResponse(conn.getRemoteAddress()));
+ return future;
+ }
+
final int requestId = request.getId();
try {
Timeout timeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
@@ -175,7 +224,7 @@ public void run(Timeout timeout) throws Exception {
}
}
- }, timeoutMillis, TimeUnit.MILLISECONDS);
+ }, remainingTime, TimeUnit.MILLISECONDS);
future.addTimeout(timeout);
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
@@ -189,7 +238,7 @@ public void operationComplete(ChannelFuture cf) throws Exception {
f.putResponse(commandFactory.createSendFailedResponse(
conn.getRemoteAddress(), cf.cause()));
}
- logger.error("Invoke send failed. The address is {}",
+ LOGGER.error("Invoke send failed. The address is {}",
RemotingUtil.parseRemoteAddress(conn.getChannel()), cf.cause());
}
}
@@ -201,7 +250,7 @@ public void operationComplete(ChannelFuture cf) throws Exception {
f.cancelTimeout();
f.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
}
- logger.error("Exception caught when sending invocation. The address is {}",
+ LOGGER.error("Exception caught when sending invocation. The address is {}",
RemotingUtil.parseRemoteAddress(conn.getChannel()), e);
}
return future;
@@ -215,25 +264,40 @@ public void operationComplete(ChannelFuture cf) throws Exception {
* @throws InterruptedException
*/
protected void oneway(final Connection conn, final RemotingCommand request) {
+ if (conn == null) {
+ LOGGER.error("conn is null");
+ return;
+ }
+
+ Url url = conn.getUrl();
+ if (url != null) {
+ int remainingTime = remainingTime(request, url.getConnectTimeout());
+ if (remainingTime <= ABANDONING_REQUEST_THRESHOLD) {
+ LOGGER
+ .warn(
+ "already timeout before writing to the network, requestId: {}, remoting address: {}",
+ request.getId(),
+ conn.getUrl() != null ? conn.getUrl() : RemotingUtil
+ .parseRemoteAddress(conn.getChannel()));
+ return;
+ }
+ }
+
try {
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
if (!f.isSuccess()) {
- logger.error("Invoke send failed. The address is {}",
+ LOGGER.error("Invoke send failed. The address is {}",
RemotingUtil.parseRemoteAddress(conn.getChannel()), f.cause());
}
}
});
} catch (Exception e) {
- if (null == conn) {
- logger.error("Conn is null");
- } else {
- logger.error("Exception caught when sending invocation. The address is {}",
- RemotingUtil.parseRemoteAddress(conn.getChannel()), e);
- }
+ LOGGER.error("Exception caught when sending invocation. The address is {}",
+ RemotingUtil.parseRemoteAddress(conn.getChannel()), e);
}
}
@@ -262,4 +326,17 @@ protected abstract InvokeFuture createInvokeFuture(final Connection conn,
protected CommandFactory getCommandFactory() {
return commandFactory;
}
+
+ private int remainingTime(RemotingCommand request, int timeout) {
+ InvokeContext invokeContext = request.getInvokeContext();
+ if (invokeContext == null) {
+ return timeout;
+ }
+ Long cost = invokeContext.get(InvokeContext.CLIENT_CONN_CREATETIME);
+ if (cost == null) {
+ return timeout;
+ }
+
+ return (int) (timeout - cost);
+ }
}
diff --git a/src/main/java/com/alipay/remoting/BoltClient.java b/src/main/java/com/alipay/remoting/BoltClient.java
index f2e2eb2a..f767e94c 100644
--- a/src/main/java/com/alipay/remoting/BoltClient.java
+++ b/src/main/java/com/alipay/remoting/BoltClient.java
@@ -16,7 +16,7 @@
*/
package com.alipay.remoting;
-import com.alipay.remoting.config.Configurable;
+import com.alipay.remoting.config.Configuration;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.RpcConfigs;
@@ -32,7 +32,7 @@
*
* @author chengyi (mark.lx@antfin.com) 2018-11-07 11:56
*/
-public interface BoltClient extends Configurable, LifeCycle {
+public interface BoltClient extends Configuration, LifeCycle {
/**
* One way invocation using a string address, address format example - 127.0.0.1:12200?key1=value1&key2=value2
@@ -653,7 +653,14 @@ Connection getConnection(Url url, int connectTimeout) throws RemotingException,
*
* @return property value of connectionManager
*/
- DefaultConnectionManager getConnectionManager();
+ ConnectionManager getConnectionManager();
+
+ /**
+ * Setter method for property connectionManager.
+ *
+ * @param connectionManager ConnectionManager
+ */
+ void setConnectionManager(ConnectionManager connectionManager);
/**
* Getter method for property addressParser.
diff --git a/src/main/java/com/alipay/remoting/ConnectionEventHandler.java b/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
index 154005de..a5dda17c 100644
--- a/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
+++ b/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
@@ -22,9 +22,10 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.alipay.remoting.config.BoltClientOption;
+import com.alipay.remoting.config.Configuration;
import org.slf4j.Logger;
-import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.log.BoltLoggerFactory;
import com.alipay.remoting.util.RemotingUtil;
import com.alipay.remoting.util.StringUtils;
@@ -54,14 +55,14 @@ public class ConnectionEventHandler extends ChannelDuplexHandler {
private Reconnector reconnectManager;
- private GlobalSwitch globalSwitch;
+ private Configuration configuration;
public ConnectionEventHandler() {
}
- public ConnectionEventHandler(GlobalSwitch globalSwitch) {
- this.globalSwitch = globalSwitch;
+ public ConnectionEventHandler(Configuration configuration) {
+ this.configuration = configuration;
}
/**
@@ -207,7 +208,8 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}
private void submitReconnectTaskIfNecessary(Url url) {
- if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
+ if (configuration.option(BoltClientOption.CONN_RECONNECT_SWITCH)
+ && reconnectManager != null) {
reconnectManager.reconnect(url);
}
}
diff --git a/src/main/java/com/alipay/remoting/ConnectionManager.java b/src/main/java/com/alipay/remoting/ConnectionManager.java
index f7aa6420..39f2b4ad 100644
--- a/src/main/java/com/alipay/remoting/ConnectionManager.java
+++ b/src/main/java/com/alipay/remoting/ConnectionManager.java
@@ -27,7 +27,7 @@
* @author xiaomin.cxm
* @version $Id: ConnectionManager.java, v 0.1 Mar 7, 2016 2:42:46 PM xiaomin.cxm Exp $
*/
-public interface ConnectionManager extends Scannable, LifeCycle {
+public interface ConnectionManager extends Scannable, ConnectionHeartbeatManager, LifeCycle {
/**
* Deprecated, use startup instead.
diff --git a/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java
index 6f067ba0..5e357628 100644
--- a/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java
+++ b/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java
@@ -16,7 +16,6 @@
*/
package com.alipay.remoting;
-import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.connection.ConnectionFactory;
/**
@@ -35,15 +34,6 @@ public DefaultClientConnectionManager(ConnectionSelectStrategy connectionSelectS
connectionEventListener);
}
- public DefaultClientConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
- ConnectionFactory connectionFactory,
- ConnectionEventHandler connectionEventHandler,
- ConnectionEventListener connectionEventListener,
- GlobalSwitch globalSwitch) {
- super(connectionSelectStrategy, connectionFactory, connectionEventHandler,
- connectionEventListener, globalSwitch);
- }
-
@Override
public void startup() throws LifeCycleException {
super.startup();
diff --git a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
index f1cc4e49..08d501e6 100644
--- a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
+++ b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
@@ -36,7 +36,6 @@
import org.slf4j.Logger;
import com.alipay.remoting.config.ConfigManager;
-import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.connection.ConnectionFactory;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.util.FutureTaskUtil;
@@ -50,7 +49,6 @@
* @version $Id: DefaultConnectionManager.java, v 0.1 Mar 8, 2016 10:43:51 AM xiaomin.cxm Exp $
*/
public class DefaultConnectionManager extends AbstractLifeCycle implements ConnectionManager,
- ConnectionHeartbeatManager,
Scannable, LifeCycle {
private static final Logger logger = BoltLoggerFactory
@@ -61,11 +59,6 @@ public class DefaultConnectionManager extends AbstractLifeCycle implements Conne
*/
private ThreadPoolExecutor asyncCreateConnectionExecutor;
- /**
- * switch status
- */
- private GlobalSwitch globalSwitch;
-
/**
* connection pool initialize tasks
*/
@@ -101,15 +94,6 @@ public class DefaultConnectionManager extends AbstractLifeCycle implements Conne
*/
protected ConnectionEventListener connectionEventListener;
- /**
- * Default constructor.
- */
- public DefaultConnectionManager() {
- this.connTasks = new ConcurrentHashMap>();
- this.healTasks = new ConcurrentHashMap>();
- this.connectionSelectStrategy = new RandomSelectStrategy(globalSwitch);
- }
-
/**
* Construct with parameters.
*
@@ -164,25 +148,6 @@ public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrateg
this.connectionEventListener = connectionEventListener;
}
- /**
- * Construct with parameters.
- *
- * @param connectionSelectStrategy connection selection strategy.
- * @param connectionFactory connection factory
- * @param connectionEventHandler connection event handler
- * @param connectionEventListener connection event listener
- * @param globalSwitch global switch
- */
- public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
- ConnectionFactory connectionFactory,
- ConnectionEventHandler connectionEventHandler,
- ConnectionEventListener connectionEventListener,
- GlobalSwitch globalSwitch) {
- this(connectionSelectStrategy, connectionFactory, connectionEventHandler,
- connectionEventListener);
- this.globalSwitch = globalSwitch;
- }
-
@Override
public void startup() throws LifeCycleException {
super.startup();
diff --git a/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java b/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java
index 05545a72..eaf668f7 100644
--- a/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java
+++ b/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java
@@ -37,13 +37,13 @@ public class DefaultConnectionMonitor extends AbstractLifeCycle {
private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
- private final DefaultConnectionManager connectionManager;
+ private final ConnectionManager connectionManager;
private final ConnectionMonitorStrategy strategy;
private ScheduledThreadPoolExecutor executor;
public DefaultConnectionMonitor(ConnectionMonitorStrategy strategy,
- DefaultConnectionManager connectionManager) {
+ ConnectionManager connectionManager) {
if (strategy == null) {
throw new IllegalArgumentException("null strategy");
}
@@ -72,8 +72,10 @@ public void startup() throws LifeCycleException {
@Override
public void run() {
try {
- Map> connPools = connectionManager
- .getConnPools();
+ Map> connPools = null;
+ if (connectionManager instanceof DefaultConnectionManager) {
+ connPools = ((DefaultConnectionManager) connectionManager).getConnPools();
+ }
strategy.monitor(connPools);
} catch (Exception e) {
logger.warn("MonitorTask error", e);
diff --git a/src/main/java/com/alipay/remoting/config/configs/NettyConfigure.java b/src/main/java/com/alipay/remoting/ExtendedNettyChannelHandler.java
similarity index 52%
rename from src/main/java/com/alipay/remoting/config/configs/NettyConfigure.java
rename to src/main/java/com/alipay/remoting/ExtendedNettyChannelHandler.java
index e32747f2..734ff8f9 100644
--- a/src/main/java/com/alipay/remoting/config/configs/NettyConfigure.java
+++ b/src/main/java/com/alipay/remoting/ExtendedNettyChannelHandler.java
@@ -14,34 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.alipay.remoting.config.configs;
+package com.alipay.remoting;
+
+import io.netty.channel.ChannelHandler;
+
+import java.util.List;
/**
- * netty related configuration items
- *
- * @author tsui
- * @version $Id: NettyConfigure.java, v 0.1 2018-07-30 21:42 tsui Exp $$
+ * Leave it to external expansion and
+ * support the addition of extended handler in the channel pipeline.
*/
-public interface NettyConfigure {
- /**
- * Initialize netty write buffer water mark for remoting instance.
- *
- * Notice: This api should be called before init remoting instance.
- *
- * @param low [0, high]
- * @param high [high, Integer.MAX_VALUE)
- */
- void initWriteBufferWaterMark(int low, int high);
+public interface ExtendedNettyChannelHandler {
/**
- * get the low water mark for netty write buffer
- * @return low watermark
+ * Netty ChannelHandlers to be added before Bolt's built-in Handler.
+ * @return Netty ChannelHandler list
*/
- int netty_buffer_low_watermark();
+ List frontChannelHandlers();
/**
- * get the high water mark for netty write buffer
- * @return high watermark
+ * Netty ChannelHandlers to be added after Bolt's built-in Handler.
+ * @return Netty ChannelHandler list
*/
- int netty_buffer_high_watermark();
-}
\ No newline at end of file
+ List backChannelHandlers();
+}
diff --git a/src/main/java/com/alipay/remoting/InvokeContext.java b/src/main/java/com/alipay/remoting/InvokeContext.java
index 6aae5614..5cfaf402 100644
--- a/src/main/java/com/alipay/remoting/InvokeContext.java
+++ b/src/main/java/com/alipay/remoting/InvokeContext.java
@@ -18,7 +18,7 @@
import java.util.concurrent.ConcurrentHashMap;
-/**
+/**
* Invoke context
*
* @author tsui
@@ -26,28 +26,47 @@
*/
public class InvokeContext {
// ~~~ invoke context keys of client side
- public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip";
- public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port";
- public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip";
- public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port";
+ public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip";
+ public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port";
+ public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip";
+ public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port";
/** time consumed during connection creating, this is a timespan */
- public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";
+ public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";
+ public final static String CLIENT_CONN_CREATE_START_IN_NANO = "bolt.client.conn.create.start.nano";
+ public final static String CLIENT_CONN_CREATE_END_IN_NANO = "bolt.client.conn.create.end.nano";
// ~~~ invoke context keys of server side
- public final static String SERVER_LOCAL_IP = "bolt.server.local.ip";
- public final static String SERVER_LOCAL_PORT = "bolt.server.local.port";
- public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip";
- public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port";
+ public final static String SERVER_LOCAL_IP = "bolt.server.local.ip";
+ public final static String SERVER_LOCAL_PORT = "bolt.server.local.port";
+ public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip";
+ public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port";
// ~~~ invoke context keys of bolt client and server side
- public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
+ public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
/** time consumed start from the time when request arrive, to the time when request be processed, this is a timespan */
- public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
- public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
- public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch";
+ public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
+ /** time request arrived in nano seconds , collected by System.nanoTime() */
+ public final static String BOLT_PROCESS_ARRIVE_HEADER_IN_NANO = "bolt.invoke.request.arrive.header.in.nano";
+ public final static String BOLT_PROCESS_ARRIVE_BODY_IN_NANO = "bolt.invoke.request.arrive.body.in.nano";
+
+ /** time before send request to user thread in nano seconds , collected by System.nanoTime() */
+ public final static String BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO = "bolt.invoke.before.dispatch.in.nano";
+
+ /** time before send request to user thread in nano seconds , collected by System.nanoTime() */
+ public final static String BOLT_PROCESS_START_PROCESS_IN_NANO = "bolt.invoke.start.process.in.nano";
+
+ public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
+ public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch";
+
+ /** time before send request to net in nano seconds , collected by System.nanoTime() **/
+ public final static String BOLT_PROCESS_CLIENT_BEFORE_SEND = "bolt.invoke.client.before.send";
+ /** time after send request to net in nano seconds , collected by System.nanoTime() **/
+ public final static String BOLT_PROCESS_CLIENT_AFTER_SEND = "bolt.invoke.client.after.send";
+ /** time after receive response from server in nano seconds , collected by System.nanoTime() **/
+ public final static String BOLT_PROCESS_CLIENT_RECEIVED = "bolt.invoke.client.received";
// ~~~ constants
- public final static int INITIAL_SIZE = 8;
+ public final static int INITIAL_SIZE = 8;
/** context */
private ConcurrentHashMap context;
diff --git a/src/main/java/com/alipay/remoting/RandomSelectStrategy.java b/src/main/java/com/alipay/remoting/RandomSelectStrategy.java
index bb317de1..9477da4b 100644
--- a/src/main/java/com/alipay/remoting/RandomSelectStrategy.java
+++ b/src/main/java/com/alipay/remoting/RandomSelectStrategy.java
@@ -20,10 +20,11 @@
import java.util.List;
import java.util.Random;
+import com.alipay.remoting.config.BoltClientOption;
+import com.alipay.remoting.config.Configuration;
import org.slf4j.Logger;
import com.alipay.remoting.config.Configs;
-import com.alipay.remoting.config.switches.GlobalSwitch;
import com.alipay.remoting.log.BoltLoggerFactory;
import com.alipay.remoting.util.StringUtils;
@@ -39,14 +40,10 @@ public class RandomSelectStrategy implements ConnectionSelectStrategy {
private static final int MAX_TIMES = 5;
private final Random random = new Random();
- private final GlobalSwitch globalSwitch;
+ private final Configuration configuration;
- public RandomSelectStrategy() {
- this(null);
- }
-
- public RandomSelectStrategy(GlobalSwitch globalSwitch) {
- this.globalSwitch = globalSwitch;
+ public RandomSelectStrategy(Configuration configuration) {
+ this.configuration = configuration;
}
@Override
@@ -61,8 +58,7 @@ public Connection select(List connections) {
}
Connection result;
- if (null != this.globalSwitch
- && this.globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
+ if (configuration != null && configuration.option(BoltClientOption.CONN_MONITOR_SWITCH)) {
List serviceStatusOnConnections = new ArrayList();
for (Connection conn : connections) {
String serviceStatus = (String) conn.getAttribute(Configs.CONN_SERVICE_STATUS);
diff --git a/src/main/java/com/alipay/remoting/RemotingServer.java b/src/main/java/com/alipay/remoting/RemotingServer.java
index 59e74c6d..a947aaf1 100644
--- a/src/main/java/com/alipay/remoting/RemotingServer.java
+++ b/src/main/java/com/alipay/remoting/RemotingServer.java
@@ -18,13 +18,13 @@
import java.util.concurrent.ExecutorService;
-import com.alipay.remoting.config.Configurable;
+import com.alipay.remoting.config.Configuration;
import com.alipay.remoting.rpc.protocol.UserProcessor;
/**
* @author chengyi (mark.lx@antfin.com) 2018-06-16 06:55
*/
-public interface RemotingServer extends Configurable, LifeCycle {
+public interface RemotingServer extends Configuration, LifeCycle {
/**
* init the server
diff --git a/src/main/java/com/alipay/remoting/codec/ProtocolCodeBasedDecoder.java b/src/main/java/com/alipay/remoting/codec/ProtocolCodeBasedDecoder.java
index c603fb4c..9e585a22 100644
--- a/src/main/java/com/alipay/remoting/codec/ProtocolCodeBasedDecoder.java
+++ b/src/main/java/com/alipay/remoting/codec/ProtocolCodeBasedDecoder.java
@@ -79,8 +79,15 @@ protected byte decodeProtocolVersion(ByteBuf in) {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List