java伸缩性io

Scalable IO in Java

Outline(大纲)

  • Scalable network services (伸缩的网络服务)
  • Event-driven processing (事件驱动处理)
  • Reactor pattern (反应器模式)
    • Basic version (基础版本)
    • Multithreaded versions (多线程版本)
    • Other variants (其他变种)
  • Walkthrough of java.nio nonblocking IO APIs (非阻塞IO的API)

Network Services (网络服务)

  1. Web services, Distributed Objects, etc (网络服务,分布式对象等)
  2. Most have same basic structure: (其他相似结构) Read request (读请求) Decode request (解码请求) Process service (处理服务) Encode reply (回复编码) Send reply (回复发送)
  3. But differ in nature and cost of each step XML parsing, File transfer, Web page generation, computational services… (但是 解析xml 文件转换 web页面生成 计算服务每一步的耗时是不同的)

image

Classic ServerSocket Loop (经典ServerSocker循环)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class Server implements Runnable {
    private static final int PORT = 7890;

    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
        // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        private static final int MAX_INPUT = 100;
        final Socket socket;

        Handler(Socket s) {
            socket = s;
        }

        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }

        private byte[] process(byte[] cmd) { /* ... */

            return  null;
        }
    }
}

Scalability Goals(可伸缩性目标)

  • Graceful degradation under increasing load (more clients) (在负载下的优雅降解)
  • Continuous improvement with increasing resources (CPU, memory, disk, bandwidth)(通过增加资源而持续改进)
  • Also meet availability and performance goals (同时满足可用性和性能目标) Short latencies (短延时) Meeting peak demand (满足高峰需求) Tunable quality of service(可调的服务质量)
  • Divide-and-conquer is usually the best approach for achieving any scalability goal(分治法通常是实现可伸缩性目标的最佳方法)

Divide and Conquer (分而治之)

  • Divide processing into small tasks Each task performs an action without blocking(将处理过程划分为小任务,每个任务执行一个操作,而不阻塞服务质量)
  • Execute each task when it is enabled Here, an IO event usually serves as trigger(当这里启用每个任务时,IO事件通常充当触发器)

image

  • Basic mechanisms supported in java.nio (支持nio基本的机制) - Non-blocking reads and writes (非阻塞读写) - Dispatch tasks associated with sensed IO events (分派与感知IO事件相关的任务)
  • Endless variation possible A family of event-driven designs(无穷无尽的变化可能是一系列事件驱动的设计)

Event-driven Designs (事件驱动设计)

  • Usually more efficient than alternatives Fewer resources(通常比替代能源更有效率资源更少)
    • Don’t usually need a thread per client Less overhead(通常不需要一个线程,每个客户端更少的开销)
    • Less context switching, often less locking But dispatching can be slower(更少的上下文切换,更少的锁定,但调度可能更慢)
    • Must manually bind actions to events(必须手动将操作绑定到事件)
  • Usually harder to program Must break up into simple non-blocking actions(通常更难编程必须分解为简单的非阻塞操作)
    • Similar to GUI event-driven actions(类似于GUI事件驱动的操作)
    • Cannot eliminate all blocking: GC, page faults, etc Must keep track of logical state of service(不能消除所有阻塞:GC、页面错误等必须跟踪服务的逻辑状态) image

Reactor Pattern (反应器模式)

  • Reactor responds to IO events by dispatching the appropriate handler(反应器通过调度适当的处理程序来响应IO事件)
    • Similar to AWT thread(类似于AWT线程)
  • Handlers perform non-blocking actions(处理程序执行非阻塞操作)
    • Similar to AWT ActionListeners(类似于AWT actionlistener)
  • Manage by binding handlers to events(通过将处理程序绑定到事件来管理)
    • Similar to AWT addActionListener(类似于AWT actionlistener)
  • See Schmidt et al, Pattern-Oriented Software Architecture, Volume 2 (POSA2) image

java.nio Support

  • Channels Connections to files, sockets etc that support non-blocking reads(连接到支持非阻塞读取的文件、套接字等)
  • Buffers Array-like objects that can be directly read or written by Channels(可由通道直接读取或写入的类数组对象)
  • Selectors Tell which of a set of Channels have IO events(告诉哪一组通道具有IO事件)
  • SelectionKeys Maintain IO event status and bindings(维护IO事件状态和绑定)

Reactor 1: Setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;

    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(
                new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk =
                serverSocket.register(selector,
                        SelectionKey.OP_ACCEPT);
        sk.attach(new LexicalContextNode.Acceptor());
    }
    /*
        Alternatively, use explicit SPI provider:
        SelectorProvider p = SelectorProvider.provider();
        selector = p.openSelector();
        serverSocket = p.openServerSocketChannel();
    */
    @Override
    public void run() {

    }
}

Reactor 2: Dispatch Loop

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
 public void run() { // normally in a new Thread
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext())
                    dispatch((SelectionKey) (it.next());
                selected.clear();
            }
        } catch (IOException ex) { /* ... */ }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        if (r != null)
            r.run();
    }

Reactor 3: Acceptor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
 // class Reactor continued
    class Acceptor implements Runnable { // inner
        public void run() {
            try {
                SocketChannel c = serverSocket.accept();
                if (c != null)
                    new Handler(selector, c);
            } catch (IOException ex) { /* ... */ }
        }
    }

image

Reactor 4: Handler setup

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    Handler(Selector sel, SocketChannel c)
            throws IOException {
        socket = c;
        c.configureBlocking(false);
// Optionally try first read now
        sk = socket.register(sel, 0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete() { /* ... */ }

    boolean outputIsComplete() { /* ... */ }

    void process(){}
}

Reactor 5: Request handling

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15

    void read() throws IOException {
        socket.read(input);
        if (inputIsComplete()) {
            process();
            state = SENDING;
// Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    }

    void send() throws IOException {
        socket.write(output);
        if (outputIsComplete()) sk.cancel();
    }

Per-State Handlers

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class Handler { // ...
    public void run() { // initial state is reader
        socket.read(input);
        if (inputIsComplete()) {
            process();
            sk.attach(new Sender());
            sk.interest(SelectionKey.OP_WRITE);
            sk.selector().wakeup();
        }
    }

    class Sender implements Runnable {
        public void run() { // ...
            socket.write(output);
            if (outputIsComplete()) sk.cancel();
        }
    }
}

Multithreaded Designs (多线程设计)

  • Strategically add threads for scalability Mainly applicable to multiprocessors(策略性地添加线程以实现可伸缩性,主要适用于多处理器)
  • Worker Threads (工作线程组)
    • Reactors should quickly trigger handlers(反应堆应该能迅速触发处理器)
    • Handler processing slows down Reactor(处理器处理减慢反应堆)
    • Offload non-IO processing to other threads(到其他线程中卸载非io处理)
  • Multiple Reactor Threads (工作反应器线程组)
    • Reactor threads can saturate doing IO Distribute load to other reactors(反应堆线程可以饱和进行IO分配负载到其他反应堆)
    • Load-balance to match CPU and IO rates(负载平衡,以匹配CPU和IO速率)

Worker Threads (工作线程组)

-Offload non-IO processing to speed up Reactor thread(卸载非io处理以加快反应堆线程) - Similar to POSA2 Proactor designs(类似于POSA2 Proactor设计) -Simpler than reworking compute-bound processing into event-driven form(比将计算绑定处理重新处理为事件驱动形式更简单) - Should still be pure nonblocking computation(仍然应该是纯非阻塞计算) -Enough processing to outweigh overhead(足够的处理超过开销) -But harder to overlap processing with IO(但很难与IO重叠处理) - Best when can first read all input into a buffer(最好能先将所有输入读入缓冲区) -Use thread pool so can tune and control (使用线程池以便进行调优和控制) - Normally need many fewer threads than clients(通常需要比客户机更少的线程)

Worker Thread Pools

image

Handler with Thread Pool (线程池处理)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class Handler implements Runnable {
    // uses util.concurrent thread pool
    static PooledExecutor pool = new PooledExecutor();
    static final int PROCESSING = 3;

    // ...
    synchronized void read() { // ...
        socket.read(input);
        if (inputIsComplete()) {
            state = PROCESSING;
            pool.execute(new Processer());
        }
    }

    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        sk.interest(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable {
        public void run() {
            processAndHandOff();
        }
    }
}

Coordinating Tasks (协调任务)

  • Handoffs
    • Each task enables, triggers, or calls next one (每个任务启用、触发或调用下一个任务)
    • Usually fastest but can be brittle(通常是最快的,但也可能很脆)
  • Callbacks to per-handler dispatcher (对每个处理程序分派器的回调)
    • Sets state, attachment, etc(设置状态、附件等)
    • A variant of GoF Mediator pattern(一种GoF中介模式的变体)
  • Queues
    • For example, passing buffers across stages(例如,跨阶段传递缓冲区)
  • Futures
    • When each task produces a result(当每个任务产生一个结果时)
    • Coordination layered on top of join or wait/notify(协调层在连接或等待/通知的顶部)

Using PooledExecutor (使用PooledExecutor)

  • A tunable worker thread pool(可调工作线程池)
  • Main method execute(Runnable r)
  • Controls for:
    • The kind of task queue (any Channel)(任务队列的类型)
    • Maximum number of threads (最大线程数)
    • Minimum number of threads (最小线程数)
    • “Warm” versus on-demand threads (“温暖”与按需线程)
    • Keep-alive interval until idle threads die to be later replaced by new ones if necessary(保持活动间隔,直到空闲线程死亡,如果必要的话,稍后将由新的线程替换)
    • Saturation policy block, drop, producer-runs, etc(饱和策略块、降、生产者运行等)

Multiple Reactor Threads (多反应器线程策略)

  • Using Reactor Pools (使用反应堆池)
  • Use to match CPU and IO rates (用于匹配CPU和IO速率)
  • Static or dynamic construction (静态或动态结构) Each with own Selector, Thread, dispatch loop (每个都有自己的选择器、线程、调度循环)
  • Main acceptor distributes to other reactors(主受主向其他反应器分布)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
    Selector[] selectors; // also create threads
    int next = 0;

    class Acceptor { // ...
        public synchronized void run() { ...
            Socket connection = serverSocket.accept();
            if (connection != null)
                new Handler(selectors[next], connection);
            if (++next == selectors.length) next = 0;
        }
    }

image

Using other java.nio features

  • Multiple Selectors per Reactor (每个反应器有多个选择器)
    • To bind different handlers to different IO events(将不同的处理程序绑定到不同的IO事件)
    • May need careful synchronization to coordinate (可能需要仔细的同步来协调)
  • File transfer(文件转换)
    • Automated file-to-net or net-to-file copying(自动文件到网络或网络到文件复制)
  • Memory-mapped files(内存映射文件)
    • Access files via buffers(通过缓冲区访问文件)
  • Direct buffers(直接缓冲区)
    • Can sometimes achieve zero-copy transfer(有时能实现零拷贝传输)
    • But have setup and finalization overhead(但是有设置和终结开销)
    • Best for applications with long-lived connections(最适合长时间连接的应用程序)

Connection-Based Extensions

  • Instead of a single service request,
    • Client connects
    • Client sends a series of messages/requests
    • Client disconnects
  • Examples
    • Databases and Transaction monitors
    • Multi-participant games, chat, etc
  • Can extend basic network service patterns
    • Handle many relatively long-lived clients
    • Track client and session state (including drops)
    • Distribute services across multiple hosts

API Walkthrough

  • Buffer
  • ByteBuffer (CharBuffer, LongBuffer, etc not shown.)
  • Channel
  • SelectableChannel
  • SocketChannel
  • ServerSocketChannel
  • FileChannel
  • Selector
  • SelectionKey

image