diff options
author | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-17 17:05:32 +0200 |
---|---|---|
committer | Kjetil Ørbekk <kjetil.orbekk@gmail.com> | 2012-04-17 17:05:32 +0200 |
commit | 67e3b2d5a0092d41df629e950a38a80743717d81 (patch) | |
tree | d545a33ff9cc9109ed79adcc6b3e5ef13c569095 /same/src/main/java/com/orbekk | |
parent | e2d00cd5fa29e1a5cada502248cd81358e4da491 (diff) |
Master→Client communication uses protobuf services.
– This change causes inconsistency in the .participants list and as a
result some Paxos related tests run infinitely.
Diffstat (limited to 'same/src/main/java/com/orbekk')
-rw-r--r-- | same/src/main/java/com/orbekk/same/Master.java | 195 | ||||
-rw-r--r-- | same/src/main/java/com/orbekk/same/SameController.java | 2 |
2 files changed, 66 insertions, 131 deletions
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 71a95dd..94a3e5a 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -25,6 +25,26 @@ public class Master { private Broadcaster broadcaster; private volatile int masterId = 1; + class RemoveParticipantIfFailsCallback<T> implements RpcCallback<T> { + private final String participantLocation; + private final Rpc rpc; + + public RemoveParticipantIfFailsCallback( + String participantLocation, Rpc rpc) { + this.participantLocation = participantLocation; + this.rpc = rpc; + } + + @Override + public void run(T unused) { + if (rpc.isOk()) { + if (rpc.failed()) { + removeParticipant(participantLocation); + } + } + } + } + public static Master create(ConnectionManager connections, Broadcaster broadcaster, String myUrl, String networkName, String myLocation) { @@ -68,24 +88,8 @@ public class Master { @Override public void joinNetworkRequest(RpcController controller, ClientState request, RpcCallback<Empty> done) { logger.info("joinNetworkRequest({})", request); - - /** Old participant code. */ - List<String> participants = state.getList(".participants"); - sendFullStateThread.add(request.getUrl()); - if (!participants.contains(request.getUrl())) { - participants.add(request.getUrl()); - synchronized (Master.this) { - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - } - updateStateRequestThread.add(".participants"); - } else { - logger.warn("Client {} joining: Already part of network"); - } - - /** New participant code. */ + sendFullStateThread.add(request.getLocation()); addParticipant(request.getLocation()); - done.run(Empty.getDefaultInstance()); } @@ -104,135 +108,72 @@ public class Master { } }; - private MasterService serviceImpl = new MasterService() { - @Override - public boolean updateStateRequest(String component, - String newData, long revision) { - Services.Component request = Services.Component.newBuilder() - .setId(component) - .setData(newData) - .setRevision(revision) - .build(); - final AtomicBoolean result = new AtomicBoolean(false); - RpcCallback<Services.UpdateComponentResponse> done = - new RpcCallback<Services.UpdateComponentResponse>() { - @Override public void run(UpdateComponentResponse response) { - result.set(response.getSuccess()); - } - }; - newMasterImpl.updateStateRequest(null, request, done); - return result.get(); - } - - @Override - public void joinNetworkRequest(String clientUrl) { - Services.ClientState request = Services.ClientState.newBuilder() - .setUrl(clientUrl) - .build(); - RpcCallback<Services.Empty> done = - new RpcCallback<Services.Empty>() { - @Override public void run(Empty response) { - } - }; - newMasterImpl.joinNetworkRequest(null, request, done); - } - }; - WorkQueue<String> updateStateRequestThread = new WorkQueue<String>() { - class UpdateStateCallback - implements RpcCallback<Empty> { - private final String participantLocation; - private final Rpc rpc; - - public UpdateStateCallback(String participantLocation, Rpc rpc) { - this.participantLocation = participantLocation; - this.rpc = rpc; - } - - @Override - public void run(Empty unused) { - if (rpc.isOk()) { - if (rpc.failed()) { - removeParticipant(participantLocation); - } - } - } - } - @Override protected void onChange() { List<String> pending = getAndClear(); + List<Component> updatedComponents = new ArrayList<Component>(); + for (String component : pending) { + updatedComponents.add(state.getComponent(component)); + } + logger.info("updateStateRequestThread: Updated state: {}", pending); -// for (String componentName : pending) { -// Component component = state.getComponent(componentName); -// List<String> participants = state.getList(".participants"); -// broadcastNewComponents(participants, -// Collections.singletonList(component)); -// } for (String clientLocation : state.getList( com.orbekk.same.State.PARTICIPANTS)) { - - Services.Client client = connections.getClient0(clientLocation); - if (client == null) { - removeParticipant(clientLocation); - continue; - } - - for (String componentName : pending) { - Services.Component updatedComponent = - Services.Component.newBuilder() - .setId(componentName) - .setData(state.getDataOf(componentName)) - .setRevision(state.getRevision(componentName)) - .build(); - - Rpc rpc = new Rpc(); - UpdateStateCallback done = new UpdateStateCallback( - clientLocation, rpc); - client.setState(rpc, updatedComponent, done); - } + sendComponents(clientLocation, updatedComponents); } } }; - - public List<Services.Client> getClients() { - List<Services.Client> clients = new ArrayList<Services.Client>(); - for (String location : state.getList(State.PARTICIPANTS)) { - Services.Client client = connections.getClient0(location); - if (client == null) { - removeParticipant(location); - } else { - clients.add(client); - } + + public void sendComponents(String clientLocation, + List<Component> components) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); } - return clients; - } + for (Component component : components) { + Services.Component componentProto = componentToProto(component); + Rpc rpc = new Rpc(); + RpcCallback<Empty> done = + new RemoveParticipantIfFailsCallback<Empty>(clientLocation, + rpc); + client.setState(rpc, componentProto, done); + } + } + WorkQueue<String> sendFullStateThread = new WorkQueue<String>() { @Override protected void onChange() { List<String> pending = getAndClear(); logger.info("Sending full state to {}", pending); final List<Component> components = state.getComponents(); - broadcaster.broadcast(pending, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterTakeover( - state.getDataOf(".masterUrl"), - state.getDataOf(".networkName"), - masterId, - state.getDataOf(".masterLocation")); - } catch (Exception e) { - logger.info("Client failed to acknowledge master. Remove.", - e); - removeParticipant(url); - } + for (String clientLocation : pending) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); + continue; + } + + { // Send masterTakeover(). + Rpc rpc = new Rpc(); + RpcCallback<Empty> done = + new RemoveParticipantIfFailsCallback<Empty>( + clientLocation, rpc); + client.masterTakeover(rpc, getMasterInfo(), done); } - }); - broadcastNewComponents(pending, components); + sendComponents(clientLocation, components); + } } }; + private Services.Component componentToProto(State.Component component) { + return Services.Component.newBuilder() + .setId(component.getName()) + .setData(component.getData()) + .setRevision(component.getRevision()) + .build(); + } + void performWork() { sendFullStateThread.performWork(); updateStateRequestThread.performWork(); @@ -252,10 +193,6 @@ public class Master { return newMasterImpl; } - public MasterService getService() { - return serviceImpl; - } - private synchronized void addParticipant(String location) { List<String> participants = state.getList(State.PARTICIPANTS); if (!participants.contains(location)) { diff --git a/same/src/main/java/com/orbekk/same/SameController.java b/same/src/main/java/com/orbekk/same/SameController.java index b87e6f5..8aeebd6 100644 --- a/same/src/main/java/com/orbekk/same/SameController.java +++ b/same/src/main/java/com/orbekk/same/SameController.java @@ -42,12 +42,10 @@ public class SameController { master.resumeFrom(lastKnownState, masterId); pServer.registerService(master.getNewService()); master.start(); - masterService.setService(master.getService()); } @Override public void disableMaster() { - masterService.setService(null); if (master != null) { master.interrupt(); } |