Skip to content

Hprose 中间件

小马哥 edited this page Oct 28, 2016 · 17 revisions

简介

Hprose 过滤器的功能虽然比较强大,可以将 Hprose 的功能进行扩展。但是有些功能使用它仍然难以实现,比如缓存。

为此,Hprose 2.0 引入了更加强大的中间件功能。Hprose 中间件不仅可以对输入输出的数据进行操作,它还可以对调用本身的参数和结果进行操作,甚至你可以跳过中间的执行步骤,或者完全由你来接管中间数据的处理。

Hprose 中间件跟普通的 HTTP 服务器中间件有些类似,但又有所不同。

Hprose 中间件分为两种:

  • 调用中间件
  • 输入输出中间件

另外,输入输出中间件又可以细分为 beforeFilterafterFilter 两种,但它们本质上没有什么区别,只是在执行顺序上有所区别。

执行顺序

Hprose 中间件的顺序执行是按照添加的前后顺序执行的,假设添加的中间件处理器分别为:handler1, handler2 ... handlerN,那么执行顺序就是 handler1, handler2 ... handlerN

不同类型的 Hprose 中间件和 Hprose 其它过程的执行流程如下图所示:

+------------------------------------------------------------------+
|                     +------+                                     |
|                     |invoke|                                     |
|                     +------+                                     |
|                         ^                                        |
|                         |                                        |
|                         v                                        |
|               +-------------------+                              |
|               | invoke middleware |                              |
|               +-------------------+                              |
|                         ^                                        |
|                         |                                        |
|                         v                                        |
|                 +---------------+                                |
|                 | encode/decode |                                |
|                 +---------------+                                |
|                         ^                                        |
|                         |                                        |
|                         v                                        |
|           +--------------------------+                           |
|           | before filter middleware |                           |
|           +--------------------------+                           |
|                         ^                                        |
|                         |       _  _ ___  ____ ____ ____ ____    |
|                         v       |__| |__] |__/ |  | [__  |___    |
|                    +--------+   |  | |    |  \ |__| ___] |___    |
|                    | filter |                                    |
|                    +--------+     ____ _    _ ____ _  _ ___      |
|                         ^         |    |    | |___ |\ |  |       |
|                         |         |___ |___ | |___ | \|  |       |
|                         v                                        |
|            +-------------------------+                           |
|            | after filter middleware |                           |
|            +-------------------------+                           |
+------------------------------------------------------------------+
                                  ^
                                  |
                                  |
                                  v
+------------------------------------------------------------------+
|           +--------------------------+                           |
|           | before filter middleware |                           |
|           +--------------------------+                           |
|                         ^                                        |
|                         |        _  _ ___  ____ ____ ____ ____   |
|                         v        |__| |__] |__/ |  | [__  |___   |
|                    +--------+    |  | |    |  \ |__| ___] |___   |
|                    | filter |                                    |
|                    +--------+    ____ ____ ____ _  _ ____ ____   |
|                         ^        [__  |___ |__/ |  | |___ |__/   |
|                         |        ___] |___ |  \  \/  |___ |  \   |
|                         v                                        |
|            +-------------------------+                           |
|            | after filter middleware |                           |
|            +-------------------------+                           |
|                         ^                                        |
|                         |                                        |
|                         v                                        |
|                 +---------------+                                |
|    +----------->| encode/decode |<---------------------+         |
|    |            +---------------+                      |         |
|    |                    |                              |         |
|    |                    |                              |         |
|    |                    v                              |         |
|    |            +---------------+                      |         |
|    |            | before invoke |-------------+        |         |
|    |            +---------------+             |        |         |
|    |                    |                     |        |         |
|    |                    |                     |        |         |
|    |                    v                     v        |         |
|    |          +-------------------+    +------------+  |         |
|    |          | invoke middleware |--->| send error |--+         |
|    |          +-------------------+    +------------+            |
|    |                    |                     ^                  |
|    |                    |                     |                  |
|    |                    v                     |                  |
|    |            +--------------+              |                  |
|    |            | after invoke |--------------+                  |
|    |            +--------------+                                 |
|    |                    |                                        |
|    |                    |                                        |
|    +--------------------+                                        |
+------------------------------------------------------------------+

调用中间件

调用中间件需要实现 InvokeHandler 接口:

public interface InvokeHandler {
    Promise<Object> handle(String name, Object[] args, HproseContext context, NextInvokeHandler next);
}

调用中间件的形式为:

public class MyInvokeHandler implements InvokeHandler {
    public Promise<Object> handle(String name, Object[] args, HproseContext context, NextInvokeHandler next) {
        ...
        Promise<Object> result = next.handle(name, args, context);
        ...
        return result;
    }
}

name 是调用的远程函数/方法名。

args 是调用参数。

context 是调用上下文对象。

next 表示下一个中间件。通过调用 next 将各个中间件串联起来。

在调用 next 之前的操作在调用发生前执行,在调用 next 之后的操作在调用发生后执行,如果你不想修改返回结果,你应该将 next 的返回值作为该中间件的返回值返回。

跟踪测试

我们来看一个例子:

LogHandler.java

package hprose.example.invokehandler.log;

import hprose.common.HproseContext;
import hprose.common.InvokeHandler;
import hprose.common.NextInvokeHandler;
import hprose.util.concurrent.Promise;
import java.util.Arrays;

public class LogHandler implements InvokeHandler {
    @Override
    public Promise<Object> handle(String name, Object[] args, HproseContext context, NextInvokeHandler next) {
        System.out.println("before invoke: " + name + ", " + Arrays.deepToString(args));
        Promise<Object> result = next.handle(name, args, context);
        result.then((Object value) -> System.out.println("after invoke: " + name + ", " + Arrays.deepToString(args) + ", " + value));
        return result;
    }
}

Server.java

package hprose.example.invokehandler.log;

import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static String hello(String name) {
        return "Hello " + name + "!";
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8085");
        server.add("hello", Server.class);
        server.use(new LogHandler());
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.invokehandler.log;

import hprose.client.HproseClient;
import hprose.common.MethodName;
import hprose.util.concurrent.Promise;
import java.io.IOException;
import java.net.URISyntaxException;

interface IHello {
    String hello(String name);
    @MethodName("hello")
    Promise<String> asyncHello(String name);
    @MethodName("hello")
    Promise<String> asyncHello(Promise<String> name);
}

public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8085");
        client.use(new LogHandler());
        IHello h = client.useService(IHello.class);
        h.asyncHello("Async World")
         .then((String result) -> System.out.println(result));
        h.asyncHello(Promise.value("Async World"))
         .then((String result) -> System.out.println(result));
        System.out.println(h.hello("World"));
    }
}

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
before invoke: hello, [Async World]
before invoke: hello, [Async World]
after invoke: hello, [Async World], Hello Async World!
after invoke: hello, [Async World], Hello Async World!
before invoke: hello, [World]
after invoke: hello, [World], Hello World!

客户端输出

before invoke: hello, [Async World]
before invoke: hello, [Async World]
before invoke: hello, [World]
after invoke: hello, [Async World], Hello Async World!
after invoke: hello, [Async World], Hello Async World!
Hello Async World!
Hello Async World!
after invoke: hello, [World], Hello World!
Hello World!

我们会注意到在 LogHandler 里面,result 是一个 Promise 类型对象,但参数值 argsLogHandlerhandle 并不包含 Promise 的值,原因是 Hprose 内部已经对参数值处理过了。这样对于中间件编写就方便了很多,只需要处理异步结果就可以了。

缓存调用

我们再来看一个实现缓存调用的例子,在这个例子中我们也使用了上面的日志中间件,用来观察我们的缓存是否真的有效。

CacheHandler.java

package hprose.example.invokehandler.cache;

import hprose.common.HproseContext;
import hprose.common.InvokeHandler;
import hprose.common.NextInvokeHandler;
import hprose.util.concurrent.Promise;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CacheHandler implements InvokeHandler {
    private final Map<String, Map<String, Promise<Object>>> cache = new ConcurrentHashMap<>();
    @Override
    public Promise<Object> handle(String name, Object[] args, HproseContext context, NextInvokeHandler next) {
        if (context.getBoolean("cache")) {
            String key = Arrays.deepToString(args);
            if (cache.containsKey(name)) {
                if (cache.get(name).containsKey(key)) {
                    return cache.get(name).get(key);
                }
            }
            else {
                cache.put(name, new ConcurrentHashMap<>());
            }
            Promise<Object> result = next.handle(name, args, context);
            cache.get(name).put(key, result);
            return result;
        }
        return next.handle(name, args, context);
    }
}

Client.java

package hprose.example.invokehandler.cache;

import hprose.client.HproseClient;
import hprose.common.InvokeSettings;
import hprose.common.MethodName;
import hprose.example.invokehandler.log.LogHandler;
import hprose.util.concurrent.Promise;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

interface IHello {
    String hello(String name);
    @MethodName("hello")
    Promise<String> asyncHello(String name, InvokeSettings settings);
    @MethodName("hello")
    Promise<String> asyncHello(Promise<String> name, InvokeSettings settings);
}

public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8085");
        client.use(new CacheHandler()).use(new LogHandler());
        IHello h = client.useService(IHello.class);
        Map<String, Object> userData = new HashMap<>();
        userData.put("cache", true);
        InvokeSettings settings = new InvokeSettings();
        settings.setUserData(userData);
        h.asyncHello("Cached Async World", settings)
         .then((String result) -> System.out.println(result));
        h.asyncHello(Promise.value("Cached Async World"), settings)
         .then((String result) -> System.out.println(result));
        System.out.println(h.hello("World"));
        System.out.println(h.hello("World"));
    }
}

我们的服务器仍然使用上面例子中的服务器。在确保服务器已启动的情况下,我们运行客户端,可以看到它们分别输出以下结果:

服务器输出

START
before invoke: hello, [World]
before invoke: hello, [Cached Async World]
after invoke: hello, [Cached Async World], Hello Cached Async World!
after invoke: hello, [World], Hello World!
before invoke: hello, [World]
after invoke: hello, [World], Hello World!

客户端输出

before invoke: hello, [Cached Async World]
before invoke: hello, [World]
after invoke: hello, [Cached Async World], Hello Cached Async World!
Hello Cached Async World!
after invoke: hello, [World], Hello World!
Hello World!
before invoke: hello, [World]
Hello Cached Async World!
after invoke: hello, [World], Hello World!
Hello World!

我们看到输出结果中 "Cached Async World" 的日志只被打印了一次,而 "World" 的日志被打印了两次。这说明 "Cached Async World" 确实被缓存了。

在这个例子中,我们还用到了使用 InvokeSettingsuserData 设置 contextuserData 属性,通过 userData 配合 Hprose 中间件,我们就可以实现自定义选项功能了。

另外,我们在这个例子中可以看到,use 方法可以链式调用。

输入输出中间件

输入输出中间件可以完全代替 HproseFilter。使用输入输出中间件还是使用 Hprose 过滤器完全看开发者喜好。

输入输出中间件需要实现 FilterHandler 接口:

public interface FilterHandler {
    Promise<ByteBuffer> handle(ByteBuffer request, HproseContext context, NextFilterHandler next);
}

输入输出中间件的形式为:

public class MyFilterHandler implements FilterHandler {
    public Promise<ByteBuffer> handle(ByteBuffer request, HproseContext context, NextFilterHandler next) {
        ...
        Promise<ByteBuffer> response = next.handle(request, context);
        ...
        return response;
    }
}

request 是原始请求数据,对于客户端来说它是输出数据,对于服务器端来说,它是输入数据。

context 是调用上下文对象。

next 表示下一个中间件。通过调用 next 将各个中间件串联起来。

next 的返回值 response 是返回的响应数据。对于客户端来说,它是输入数据。对于服务器端来说,它是输出数据。

跟踪调试

下面我们来看一下 Hprose 过滤器中的跟踪调试的例子在这里如何实现。

LogHandler.java

package hprose.example.filterhandler.log;

import hprose.common.FilterHandler;
import hprose.common.HproseContext;
import hprose.common.NextFilterHandler;
import hprose.util.StrUtil;
import hprose.util.concurrent.Promise;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class LogHandler implements FilterHandler {
    private static final Logger logger = Logger.getLogger(LogHandler.class.getName());
    @Override
    public Promise<ByteBuffer> handle(ByteBuffer request, HproseContext context, NextFilterHandler next) {
        logger.log(Level.INFO, StrUtil.toString(request));
        Promise<ByteBuffer> response = next.handle(request, context);
        response.then((ByteBuffer data) -> logger.log(Level.INFO, StrUtil.toString(data)));
        return response;
    }
}

Server.java

package hprose.example.filterhandler.log;

import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static String hello(String name) {
        return "Hello " + name + "!";
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8086");
        server.add("hello", Server.class);
        server.beforeFilter.use(new LogHandler());
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.filterhandler.log;

import hprose.client.HproseClient;
import java.io.IOException;
import java.net.URISyntaxException;

interface IHello {
    String hello(String name);
}
public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8086");
        client.beforeFilter.use(new LogHandler());
        IHello h = client.useService(IHello.class);
        System.out.println(h.hello("World"));
    }
}

然后分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
七月 02, 2016 11:53:15 下午 hprose.example.filterhandler.log.LogHandler handle
信息: Cs5"hello"a1{s5"World"}z
七月 02, 2016 11:53:15 下午 hprose.example.filterhandler.log.LogHandler lambda$handle$0
信息: Rs12"Hello World!"z

客户端输出

七月 02, 2016 11:53:15 下午 hprose.example.filterhandler.log.LogHandler handle
信息: Cs5"hello"a1{s5"World"}z
七月 02, 2016 11:53:15 下午 hprose.example.filterhandler.log.LogHandler handle
信息: Rs12"Hello World!"z
Hello World!

这个结果跟使用 Hprose 过滤器的例子的结果一样。

但是我们发现,这里使用 Hprose 中间件要写的代码比起 Hprose 过滤器来要多一些。主要原因是在 Hprose 中间件中,next 的返回值为 Promise 对象,需要异步处理,而 Hprose 过滤器只需要同步处理就可以了。

另外,因为这个例子中,我们没有使用过滤器功能,因此使用 beforeFilter.use 方法或者 afterFilter.use 方法添加中间件处理器效果都是一样的。

但如果我们使用了过滤器的话,那么 beforeFilter.use 添加的中间件处理器的 request 数据是未经过过滤器处理的。过滤器的处理操作在 next 的最后一环中执行。next 返回的响应 response 是经过过滤器处理的。

如果某个通过 beforeFilter.use 添加的中间件处理器跳过了 next 而直接返回了结果的话,则返回的 response 也是未经过过滤器处理的。而且如果某个 beforeFilter.use 添加的中间件处理器跳过了 next,不但过滤器不会执行,而且在它之后使用 beforeFilter.use 所添加的中间件处理器也不会执行,afterFilter.use 方法所添加的所有中间件处理器也都不会执行。

afterFilter.use 添加的处理器所收到的 request 都是经过过滤器处理以后的,但它当中使用 next 方法返回的 response 是未经过过滤器处理的。

下面,我们在来看一个结合了压缩过滤器和输入输出缓存中间件的例子。

压缩、缓存、计时

压缩我们使用过滤器一章的过滤器 CompressFilter.java,这里就不在列代码了。

SizeHandler.java

package hprose.example.filterhandler.size;

import hprose.common.FilterHandler;
import hprose.common.HproseContext;
import hprose.common.NextFilterHandler;
import hprose.util.concurrent.Promise;
import java.nio.ByteBuffer;

public class SizeHandler implements FilterHandler {
    private final String message;
    public SizeHandler(String message) {
        this.message = message;
    }
    @Override
    public Promise<ByteBuffer> handle(ByteBuffer request, HproseContext context, NextFilterHandler next) {
        System.out.println(message + " request size: " + request.remaining());
        Promise<ByteBuffer> response = next.handle(request, context);
        response.then((ByteBuffer data) -> {
            System.out.println(message + " response size: " + data.remaining());
        });
        return response;
    }
}

StatHandler.java

package hprose.example.filterhandler.stat;

import hprose.common.FilterHandler;
import hprose.common.HproseContext;
import hprose.common.NextFilterHandler;
import hprose.util.concurrent.Promise;
import java.nio.ByteBuffer;

public class StatHandler implements FilterHandler {
    private final String message;
    public StatHandler(String message) {
        this.message = message;
    }
    @Override
    public Promise<ByteBuffer> handle(ByteBuffer request, HproseContext context, NextFilterHandler next) {
        long start = System.currentTimeMillis();
        Promise<ByteBuffer> response = next.handle(request, context);
        response.whenComplete(() -> {
           long end = System.currentTimeMillis();
           System.out.println(message + ": It takes " + (end - start) + " ms.");
        });
        return response;
    }
}

Server.java

package hprose.example.filterhandler;

import hprose.example.filter.compress.CompressFilter;
import hprose.example.filterhandler.size.SizeHandler;
import hprose.example.filterhandler.stat.StatHandler;
import hprose.server.HproseTcpServer;
import java.io.IOException;
import java.net.URISyntaxException;

public class Server {
    public static Object echo(Object obj) {
        return obj;
    }
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseTcpServer server = new HproseTcpServer("tcp://0.0.0.0:8087");
        server.add("echo", Server.class);
        server.beforeFilter.use(new StatHandler("BeforeFilter"))
                           .use(new SizeHandler("Compresssed"));
        server.addFilter(new CompressFilter());
        server.afterFilter.use(new StatHandler("AfterFilter"))
                          .use(new SizeHandler("Non Compresssed"));
        server.start();
        System.out.println("START");
        System.in.read();
        server.stop();
        System.out.println("STOP");
    }
}

Client.java

package hprose.example.filterhandler;

import hprose.client.HproseClient;
import hprose.common.InvokeSettings;
import hprose.example.filter.compress.CompressFilter;
import hprose.example.filterhandler.size.SizeHandler;
import hprose.example.filterhandler.stat.StatHandler;
import hprose.example.invokehandler.cache.CacheHandler;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;

interface IEcho {
    int[] echo(int[] obj, InvokeSettings settings);
}

public class Client {
    public static void main(String[] args) throws URISyntaxException, IOException {
        HproseClient client = HproseClient.create("tcp://127.0.0.1:8087");
        client.use(new CacheHandler());
        client.beforeFilter.use(new StatHandler("BeforeFilter"))
                           .use(new SizeHandler("Non compresssed"));
        client.addFilter(new CompressFilter());
        client.afterFilter.use(new StatHandler("AfterFilter"))
                          .use(new SizeHandler("Compresssed"));

        IEcho h = client.useService(IEcho.class);
        int n = 100000;
        int[] value = new int[n];
        for (int i = 0; i < n; ++i) {
            value[i] = i;
        }
        Map<String, Object> userData = new HashMap<>();
        userData.put("cache", true);
        InvokeSettings settings = new InvokeSettings();
        settings.setUserData(userData);
        System.out.println(h.echo(value, settings).length);
        System.out.println(h.echo(value, settings).length);
    }
}

分别启动服务器和客户端,就会看到如下输出:

服务器输出

START
Compresssed request size: 213178
Non Compresssed request size: 688893
Non Compresssed response size: 688881
AfterFilter: It takes 107 ms.
Compresssed response size: 213154
BeforeFilter: It takes 142 ms.

客户端输出

Non compresssed request size: 688893
Compresssed request size: 213178
Compresssed response size: 213154
AfterFilter: It takes 211 ms.
Non compresssed response size: 688881
BeforeFilter: It takes 239 ms.
100000
100000

我们可以看到两次的执行结果都出来了,但是中间件的输出内容只有一次。原因就是第二次执行时,cachehandler 将缓存的结果直接返回了。因此后面所有的步骤就都略过了。

通过这个例子,我们可以看出,将 Hprose 中间件和 Hprose 过滤器结合,可以实现非常强大的扩展功能。如果你有什么特殊的需求,直接使用 Hprose 无法实现的话,就考虑一下是否可以添加几个 Hprose 中间件和 Hprose 过滤器吧。