diff options
| author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-11 14:53:23 +0200 | 
|---|---|---|
| committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-11 14:53:23 +0200 | 
| commit | a4414f27a1076dbf53224c22765c4dc64c5b7bd8 (patch) | |
| tree | 609d14afce4d17eb57d2996696d94576af70192c /src/main/java/com/orbekk | |
| parent | 6f414b99c94f5e1562785f9f2b95940a50a70a88 (diff) | |
Remove old RpcChannel.
Fix tests with NewRpcChannel.
Diffstat (limited to 'src/main/java/com/orbekk')
| -rw-r--r-- | src/main/java/com/orbekk/example/ExampleClient.java | 6 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/NewRpcChannel.java | 20 | ||||
| -rw-r--r-- | src/main/java/com/orbekk/protobuf/RpcChannel.java | 216 | 
3 files changed, 21 insertions, 221 deletions
diff --git a/src/main/java/com/orbekk/example/ExampleClient.java b/src/main/java/com/orbekk/example/ExampleClient.java index 3590182..e047ad9 100644 --- a/src/main/java/com/orbekk/example/ExampleClient.java +++ b/src/main/java/com/orbekk/example/ExampleClient.java @@ -19,14 +19,14 @@ import java.util.concurrent.CountDownLatch;  import com.google.protobuf.RpcCallback;  import com.orbekk.example.Example.FortuneReply; +import com.orbekk.protobuf.NewRpcChannel;  import com.orbekk.protobuf.Rpc; -import com.orbekk.protobuf.RpcChannel;  public class ExampleClient {      public void runClient(int port) { -        RpcChannel channel = null; +        NewRpcChannel channel = null;          try { -            channel = RpcChannel.create("localhost", port); +            channel = NewRpcChannel.createOrNull("localhost", port);              Example.FortuneService service =                      Example.FortuneService.newStub(channel);              printFortune(service); diff --git a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java index 82e3dd2..12f1433 100644 --- a/src/main/java/com/orbekk/protobuf/NewRpcChannel.java +++ b/src/main/java/com/orbekk/protobuf/NewRpcChannel.java @@ -24,14 +24,14 @@ import com.google.protobuf.RpcController;  public class NewRpcChannel implements com.google.protobuf.RpcChannel {      public static int NUM_CONCURRENT_REQUESTS = 5;      private static final Logger logger = -            Logger.getLogger(RpcChannel.class.getName()); +            Logger.getLogger(NewRpcChannel.class.getName());      private final String host;      private final int port;      private final AtomicLong nextId = new AtomicLong(0);      private final ExecutorService responseHandlerPool =              Executors.newSingleThreadExecutor();      private final BlockingQueue<Data.Request> requestQueue = -            new ArrayBlockingQueue(NUM_CONCURRENT_REQUESTS); +            new ArrayBlockingQueue<Data.Request>(NUM_CONCURRENT_REQUESTS);      private volatile Socket socket = null;      private final ConcurrentHashMap<Long, RequestMetadata> ongoingRequests =              new ConcurrentHashMap<Long, RequestMetadata>(); @@ -124,6 +124,9 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {                  for (;;) {                      Data.Response response = Data.Response                              .parseDelimitedFrom(inputStream); +                    if (response == null) { +                        throw new IOException("Could not read response."); +                    }                      responseHandlerPool.execute(new ResponseHandler(response));                  }              } catch (IOException e) { @@ -139,6 +142,18 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {          }      } +    public static NewRpcChannel createOrNull(String host, int port) { +        try { +            return create(host, port); +        } catch (UnknownHostException e) { +            logger.log(Level.WARNING, "Unable to create RPC channel.", e); +            return null; +        } catch (IOException e) { +            logger.log(Level.WARNING, "Unable to create RPC channel.", e); +            return null; +        } +    } +      public static NewRpcChannel create(String host, int port)              throws UnknownHostException, IOException {          NewRpcChannel channel = new NewRpcChannel(host, port); @@ -173,6 +188,7 @@ public class NewRpcChannel implements com.google.protobuf.RpcChannel {      private void tryCloseSocket(Socket socket) {          try {              socket.close(); +            cancelAllRequests("channel closed");          } catch (IOException e1) {              logger.log(Level.WARNING,                      "Unable to close socket " + socket, diff --git a/src/main/java/com/orbekk/protobuf/RpcChannel.java b/src/main/java/com/orbekk/protobuf/RpcChannel.java deleted file mode 100644 index 8e24d0c..0000000 --- a/src/main/java/com/orbekk/protobuf/RpcChannel.java +++ /dev/null @@ -1,216 +0,0 @@ -/** - * Copyright 2012 Kjetil Ørbekk <kjetil.orbekk@gmail.com> - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - *     http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.orbekk.protobuf; - -import java.io.Closeable; -import java.io.IOException; -import java.net.Socket; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; -import java.util.logging.Level; -import java.util.logging.Logger; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -public class RpcChannel extends Thread implements -        com.google.protobuf.RpcChannel { -    static final Logger logger = -            Logger.getLogger(RpcChannel.class.getName()); -    private final String host; -    private final int port; -    private volatile Socket socket = null; -    private AtomicLong nextId = new AtomicLong(0); -    private Map<Long, RpcChannel.OngoingRequest> rpcs = -            Collections.synchronizedMap( -                    new HashMap<Long, RpcChannel.OngoingRequest>()); -    private BlockingQueue<Socket> sockets = new LinkedBlockingQueue<Socket>(); - -    private static class OngoingRequest implements Closeable { -        long id; -        Rpc rpc; -        RpcCallback<Message> done; -        Message responsePrototype; -        Map<Long, RpcChannel.OngoingRequest> rpcs; -         -        public OngoingRequest(long id, Rpc rpc, -                RpcCallback<Message> done, Message responsePrototype, -                Map<Long, RpcChannel.OngoingRequest> rpcs) { -            this.id = id; -            this.rpc = rpc; -            this.done = done; -            this.responsePrototype = responsePrototype; -            this.rpcs = rpcs; -        } - -        @Override -        public void close() throws IOException { -            throw new AssertionError("Not implemented"); -        } -    } -     -    public static RpcChannel create(String host, int port) { -        RpcChannel channel = new RpcChannel(host, port); -        channel.start(); -        return channel; -    } -     -    private RpcChannel(String host, int port) { -        this.host = host; -        this.port = port; -    } - -    private Socket getSocket() { -        if (socket == null || socket.isClosed()) { -            try { -                logger.info("Creating new socket to " + host + ":" + port); -                socket = new Socket(host, port); -                sockets.add(socket); -            } catch (UnknownHostException e) { -                return null; -            } catch (IOException e) { -                logger.log(Level.WARNING, -                        "Could not establish connection.", e); -                return null; -            } -        } -        return socket; -    } -     -    private Data.Request createRequest(Descriptors.MethodDescriptor method, -            RpcController controller, -            Message requestMessage, -            Message responsePrototype, -            RpcCallback<Message> done) { -        long id = nextId.incrementAndGet(); -        Rpc rpc = (Rpc)controller; -        OngoingRequest ongoingRequest = new OngoingRequest(id, rpc, -                done, responsePrototype, rpcs); -        rpcs.put(id, ongoingRequest); -         -        Data.Request request = Data.Request.newBuilder() -                .setRequestId(id) -                .setFullServiceName(method.getService().getFullName()) -                .setMethodName(method.getName()) -                .setRequestProto(requestMessage.toByteString()) -                .build(); -         -        return request; -    } -     -    private void finishRequest(Data.Response response) { -        OngoingRequest ongoingRequest = rpcs.remove(response.getRequestId()); -        if (ongoingRequest != null) { -            try { -                Message responsePb = ongoingRequest.responsePrototype.toBuilder() -                        .mergeFrom(response.getResponseProto()).build(); -                ongoingRequest.rpc.readFrom(response); -                ongoingRequest.done.run(responsePb); -            } catch (InvalidProtocolBufferException e) { -                throw new AssertionError("Should fail here."); -            } -        } -    } - -    @Override public void callMethod( -            Descriptors.MethodDescriptor method, -            RpcController controller, -            Message requestMessage, -            Message responsePrototype, -            RpcCallback<Message> done) { -        try { -            Data.Request request = createRequest(method, controller, -                    requestMessage, responsePrototype, done); -            Socket socket = getSocket(); -            if (socket == null) { -                cancelAllRpcs(); -            } else { -                request.writeDelimitedTo(socket.getOutputStream()); -            } -        } catch (IOException e) { -            throw new AssertionError("Should return error."); -        } -    } -     -    private void handleResponses(Socket socket) { -        try { -            logger.info("Handling responses to socket " + socket); -            while (!socket.isClosed()) { -                Data.Response response; -                response = Data.Response.parseDelimitedFrom( -                        socket.getInputStream()); -                if (response == null) { -                    throw new IOException("No response."); -                } -                finishRequest(response); -            } -        } catch (IOException e) { -            if (!rpcs.isEmpty()) { -                logger.log(Level.WARNING, "IO Error. Canceling " + -                        rpcs.size() + " requests.", e); -                cancelAllRpcs(); -            } -        } finally { -            if (socket != null && !socket.isClosed()) { -                try { -                    socket.close(); -                } catch (IOException e) { -                    // Socket is closed. -                } -            } -        } -    } -     -    private void cancelAllRpcs() { -        synchronized (rpcs) { -            for (OngoingRequest request : rpcs.values()) { -                request.rpc.setFailed("connection closed"); -                request.done.run(null); -            } -            rpcs.clear(); -        }        -    } -     -    public void run() { -        while (!Thread.interrupted()) { -            try { -                Socket socket = sockets.take(); -                handleResponses(socket); -            } catch (InterruptedException e) { -                Thread.currentThread().interrupt(); -            } -        } -    } - -    public void close() { -        if (socket != null) { -            try { -                this.interrupt(); -                socket.close(); -            } catch (IOException e) { -                logger.info("Error closing socket."); -            } -        } -    } -}
\ No newline at end of file  | 
