From 67e3b2d5a0092d41df629e950a38a80743717d81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Tue, 17 Apr 2012 17:05:32 +0200 Subject: =?UTF-8?q?Master=E2=86=92Client=20communication=20uses=20protobuf?= =?UTF-8?q?=20services.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit – This change causes inconsistency in the .participants list and as a result some Paxos related tests run infinitely. --- same/src/main/java/com/orbekk/same/Master.java | 195 +++++++++---------------- 1 file changed, 66 insertions(+), 129 deletions(-) (limited to 'same/src/main/java/com/orbekk/same/Master.java') diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index 71a95dd..94a3e5a 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -25,6 +25,26 @@ public class Master { private Broadcaster broadcaster; private volatile int masterId = 1; + class RemoveParticipantIfFailsCallback implements RpcCallback { + private final String participantLocation; + private final Rpc rpc; + + public RemoveParticipantIfFailsCallback( + String participantLocation, Rpc rpc) { + this.participantLocation = participantLocation; + this.rpc = rpc; + } + + @Override + public void run(T unused) { + if (rpc.isOk()) { + if (rpc.failed()) { + removeParticipant(participantLocation); + } + } + } + } + public static Master create(ConnectionManager connections, Broadcaster broadcaster, String myUrl, String networkName, String myLocation) { @@ -68,24 +88,8 @@ public class Master { @Override public void joinNetworkRequest(RpcController controller, ClientState request, RpcCallback done) { logger.info("joinNetworkRequest({})", request); - - /** Old participant code. */ - List participants = state.getList(".participants"); - sendFullStateThread.add(request.getUrl()); - if (!participants.contains(request.getUrl())) { - participants.add(request.getUrl()); - synchronized (Master.this) { - state.updateFromObject(".participants", participants, - state.getRevision(".participants") + 1); - } - updateStateRequestThread.add(".participants"); - } else { - logger.warn("Client {} joining: Already part of network"); - } - - /** New participant code. */ + sendFullStateThread.add(request.getLocation()); addParticipant(request.getLocation()); - done.run(Empty.getDefaultInstance()); } @@ -104,135 +108,72 @@ public class Master { } }; - private MasterService serviceImpl = new MasterService() { - @Override - public boolean updateStateRequest(String component, - String newData, long revision) { - Services.Component request = Services.Component.newBuilder() - .setId(component) - .setData(newData) - .setRevision(revision) - .build(); - final AtomicBoolean result = new AtomicBoolean(false); - RpcCallback done = - new RpcCallback() { - @Override public void run(UpdateComponentResponse response) { - result.set(response.getSuccess()); - } - }; - newMasterImpl.updateStateRequest(null, request, done); - return result.get(); - } - - @Override - public void joinNetworkRequest(String clientUrl) { - Services.ClientState request = Services.ClientState.newBuilder() - .setUrl(clientUrl) - .build(); - RpcCallback done = - new RpcCallback() { - @Override public void run(Empty response) { - } - }; - newMasterImpl.joinNetworkRequest(null, request, done); - } - }; - WorkQueue updateStateRequestThread = new WorkQueue() { - class UpdateStateCallback - implements RpcCallback { - private final String participantLocation; - private final Rpc rpc; - - public UpdateStateCallback(String participantLocation, Rpc rpc) { - this.participantLocation = participantLocation; - this.rpc = rpc; - } - - @Override - public void run(Empty unused) { - if (rpc.isOk()) { - if (rpc.failed()) { - removeParticipant(participantLocation); - } - } - } - } - @Override protected void onChange() { List pending = getAndClear(); + List updatedComponents = new ArrayList(); + for (String component : pending) { + updatedComponents.add(state.getComponent(component)); + } + logger.info("updateStateRequestThread: Updated state: {}", pending); -// for (String componentName : pending) { -// Component component = state.getComponent(componentName); -// List participants = state.getList(".participants"); -// broadcastNewComponents(participants, -// Collections.singletonList(component)); -// } for (String clientLocation : state.getList( com.orbekk.same.State.PARTICIPANTS)) { - - Services.Client client = connections.getClient0(clientLocation); - if (client == null) { - removeParticipant(clientLocation); - continue; - } - - for (String componentName : pending) { - Services.Component updatedComponent = - Services.Component.newBuilder() - .setId(componentName) - .setData(state.getDataOf(componentName)) - .setRevision(state.getRevision(componentName)) - .build(); - - Rpc rpc = new Rpc(); - UpdateStateCallback done = new UpdateStateCallback( - clientLocation, rpc); - client.setState(rpc, updatedComponent, done); - } + sendComponents(clientLocation, updatedComponents); } } }; - - public List getClients() { - List clients = new ArrayList(); - for (String location : state.getList(State.PARTICIPANTS)) { - Services.Client client = connections.getClient0(location); - if (client == null) { - removeParticipant(location); - } else { - clients.add(client); - } + + public void sendComponents(String clientLocation, + List components) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); } - return clients; - } + for (Component component : components) { + Services.Component componentProto = componentToProto(component); + Rpc rpc = new Rpc(); + RpcCallback done = + new RemoveParticipantIfFailsCallback(clientLocation, + rpc); + client.setState(rpc, componentProto, done); + } + } + WorkQueue sendFullStateThread = new WorkQueue() { @Override protected void onChange() { List pending = getAndClear(); logger.info("Sending full state to {}", pending); final List components = state.getComponents(); - broadcaster.broadcast(pending, new ServiceOperation() { - @Override public void run(String url) { - ClientService client = connections.getClient(url); - try { - client.masterTakeover( - state.getDataOf(".masterUrl"), - state.getDataOf(".networkName"), - masterId, - state.getDataOf(".masterLocation")); - } catch (Exception e) { - logger.info("Client failed to acknowledge master. Remove.", - e); - removeParticipant(url); - } + for (String clientLocation : pending) { + Services.Client client = connections.getClient0(clientLocation); + if (client == null) { + removeParticipant(clientLocation); + continue; + } + + { // Send masterTakeover(). + Rpc rpc = new Rpc(); + RpcCallback done = + new RemoveParticipantIfFailsCallback( + clientLocation, rpc); + client.masterTakeover(rpc, getMasterInfo(), done); } - }); - broadcastNewComponents(pending, components); + sendComponents(clientLocation, components); + } } }; + private Services.Component componentToProto(State.Component component) { + return Services.Component.newBuilder() + .setId(component.getName()) + .setData(component.getData()) + .setRevision(component.getRevision()) + .build(); + } + void performWork() { sendFullStateThread.performWork(); updateStateRequestThread.performWork(); @@ -252,10 +193,6 @@ public class Master { return newMasterImpl; } - public MasterService getService() { - return serviceImpl; - } - private synchronized void addParticipant(String location) { List participants = state.getList(State.PARTICIPANTS); if (!participants.contains(location)) { -- cgit v1.2.3