From 9fc7d7a372301715745d4ef85cc2f7eba8a555a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kjetil=20=C3=98rbekk?= Date: Mon, 7 May 2012 13:43:36 +0200 Subject: State-wide revision field maintained by master. --- same/src/main/java/com/orbekk/same/Client.java | 4 +- same/src/main/java/com/orbekk/same/Master.java | 15 ++- same/src/main/java/com/orbekk/same/Services.java | 115 ++++++++++++++++------ same/src/main/java/com/orbekk/same/State.java | 12 +++ same/src/main/java/com/orbekk/same/services.proto | 3 +- 5 files changed, 111 insertions(+), 38 deletions(-) diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java index 6b0ddc4..0ee4c10 100644 --- a/same/src/main/java/com/orbekk/same/Client.java +++ b/same/src/main/java/com/orbekk/same/Client.java @@ -57,11 +57,9 @@ public class Client { private ClientInterfaceImpl() { } - /** Get a copy of all the client state. - */ @Override public State getState() { - return new State(state); + return state; } @Override diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java index e3ada43..4327a7a 100644 --- a/same/src/main/java/com/orbekk/same/Master.java +++ b/same/src/main/java/com/orbekk/same/Master.java @@ -18,6 +18,7 @@ package com.orbekk.same; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ public class Master { private final ConnectionManager connections; private String myUrl; private String myLocation; // Protobuf server location, i.e., myIp:port + private AtomicLong revision = new AtomicLong(1); State state; private volatile int masterId = 1; private final RpcFactory rpcf; @@ -96,6 +98,7 @@ public class Master { .setMasterLocation(getLocation()) .setNetworkName(getNetworkName()) .setMasterId(masterId) + .setRevision(revision.get()) .build(); } @@ -112,13 +115,15 @@ public class Master { Services.Component request, RpcCallback done) { logger.info("updateStateRequest({})", request); - boolean updated = state.update(request.getId(), request.getData(), - request.getRevision() + 1); - if (updated) { + boolean success = false; + if (state.checkRevision(request.getId(), request.getRevision())) { + success = true; + long newRevision = revision.incrementAndGet(); + state.forceUpdate(request.getId(), request.getData(), newRevision); updateStateRequestThread.add(request.getId()); } done.run(Services.UpdateComponentResponse.newBuilder() - .setSuccess(updated).build()); + .setSuccess(success).build()); } }; @@ -264,6 +269,6 @@ public class Master { client.masterTakeover(rpc, getMasterInfo(), done); } updateStateRequestThread.add(".masterUrl"); - updateStateRequestThread .add(".masterLocation"); + updateStateRequestThread.add(".masterLocation"); } } diff --git a/same/src/main/java/com/orbekk/same/Services.java b/same/src/main/java/com/orbekk/same/Services.java index 06489f7..8e08529 100644 --- a/same/src/main/java/com/orbekk/same/Services.java +++ b/same/src/main/java/com/orbekk/same/Services.java @@ -2811,6 +2811,10 @@ public final class Services { // optional string master_location = 4; boolean hasMasterLocation(); String getMasterLocation(); + + // optional int64 revision = 5; + boolean hasRevision(); + long getRevision(); } public static final class MasterState extends com.google.protobuf.GeneratedMessage @@ -2947,11 +2951,22 @@ public final class Services { } } + // optional int64 revision = 5; + public static final int REVISION_FIELD_NUMBER = 5; + private long revision_; + public boolean hasRevision() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getRevision() { + return revision_; + } + private void initFields() { masterUrl_ = ""; masterId_ = 0; networkName_ = ""; masterLocation_ = ""; + revision_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -2977,6 +2992,9 @@ public final class Services { if (((bitField0_ & 0x00000008) == 0x00000008)) { output.writeBytes(4, getMasterLocationBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + output.writeInt64(5, revision_); + } getUnknownFields().writeTo(output); } @@ -3002,6 +3020,10 @@ public final class Services { size += com.google.protobuf.CodedOutputStream .computeBytesSize(4, getMasterLocationBytes()); } + if (((bitField0_ & 0x00000010) == 0x00000010)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(5, revision_); + } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; @@ -3134,6 +3156,8 @@ public final class Services { bitField0_ = (bitField0_ & ~0x00000004); masterLocation_ = ""; bitField0_ = (bitField0_ & ~0x00000008); + revision_ = 0L; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -3188,6 +3212,10 @@ public final class Services { to_bitField0_ |= 0x00000008; } result.masterLocation_ = masterLocation_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000010; + } + result.revision_ = revision_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -3216,6 +3244,9 @@ public final class Services { if (other.hasMasterLocation()) { setMasterLocation(other.getMasterLocation()); } + if (other.hasRevision()) { + setRevision(other.getRevision()); + } this.mergeUnknownFields(other.getUnknownFields()); return this; } @@ -3267,6 +3298,11 @@ public final class Services { masterLocation_ = input.readBytes(); break; } + case 40: { + bitField0_ |= 0x00000010; + revision_ = input.readInt64(); + break; + } } } } @@ -3402,6 +3438,27 @@ public final class Services { onChanged(); } + // optional int64 revision = 5; + private long revision_ ; + public boolean hasRevision() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + public long getRevision() { + return revision_; + } + public Builder setRevision(long value) { + bitField0_ |= 0x00000010; + revision_ = value; + onChanged(); + return this; + } + public Builder clearRevision() { + bitField0_ = (bitField0_ & ~0x00000010); + revision_ = 0L; + onChanged(); + return this; + } + // @@protoc_insertion_point(builder_scope:com.orbekk.same.MasterState) } @@ -6749,36 +6806,36 @@ public final class Services { "rState\022:\n\026client_state_component\030\005 \003(\0132\032" + ".com.orbekk.same.Component\022\031\n\021extra_clie" + "nt_info\030\006 \003(\t\"7\n\tComponent\022\n\n\002id\030\001 \002(\t\022\014" + - "\n\004data\030\002 \002(\t\022\020\n\010revision\030\003 \002(\003\"c\n\013Master" + + "\n\004data\030\002 \002(\t\022\020\n\010revision\030\003 \002(\003\"u\n\013Master" + "State\022\022\n\nmaster_url\030\001 \001(\t\022\021\n\tmaster_id\030\002" + " \001(\005\022\024\n\014network_name\030\003 \001(\t\022\027\n\017master_loc" + - "ation\030\004 \001(\t\",\n\013ClientState\022\013\n\003url\030\001 \001(\t\022" + - "\020\n\010location\030\002 \001(\t\"A\n\020NetworkDirectory\022-\n" + - "\007network\030\001 \003(\0132\034.com.orbekk.same.MasterS" + - "tate\"T\n\014PaxosRequest\022,\n\006client\030\001 \001(\0132\034.c", - "om.orbekk.same.ClientState\022\026\n\016proposalNu" + - "mber\030\002 \001(\005\"\037\n\rPaxosResponse\022\016\n\006result\030\001 " + - "\001(\0052\324\001\n\006Client\022>\n\010SetState\022\032.com.orbekk." + - "same.Component\032\026.com.orbekk.same.Empty\022F" + - "\n\016MasterTakeover\022\034.com.orbekk.same.Maste" + - "rState\032\026.com.orbekk.same.Empty\022B\n\nMaster" + - "Down\022\034.com.orbekk.same.MasterState\032\026.com" + - ".orbekk.same.Empty2\260\001\n\006Master\022J\n\022JoinNet" + - "workRequest\022\034.com.orbekk.same.ClientStat" + - "e\032\026.com.orbekk.same.Empty\022Z\n\022UpdateState", - "Request\022\032.com.orbekk.same.Component\032(.co" + - "m.orbekk.same.UpdateComponentResponse2\236\001" + - "\n\tDirectory\022G\n\017RegisterNetwork\022\034.com.orb" + - "ekk.same.MasterState\032\026.com.orbekk.same.E" + - "mpty\022H\n\013GetNetworks\022\026.com.orbekk.same.Em" + - "pty\032!.com.orbekk.same.NetworkDirectory2\241" + - "\001\n\005Paxos\022H\n\007Propose\022\035.com.orbekk.same.Pa" + - "xosRequest\032\036.com.orbekk.same.PaxosRespon" + - "se\022N\n\rAcceptRequest\022\035.com.orbekk.same.Pa" + - "xosRequest\032\036.com.orbekk.same.PaxosRespon", - "se2Y\n\rSystemService\022H\n\017GetSystemStatus\022\026" + - ".com.orbekk.same.Empty\032\035.com.orbekk.same" + - ".SystemStatusB\003\210\001\001" + "ation\030\004 \001(\t\022\020\n\010revision\030\005 \001(\003\",\n\013ClientS" + + "tate\022\013\n\003url\030\001 \001(\t\022\020\n\010location\030\002 \001(\t\"A\n\020N" + + "etworkDirectory\022-\n\007network\030\001 \003(\0132\034.com.o" + + "rbekk.same.MasterState\"T\n\014PaxosRequest\022,", + "\n\006client\030\001 \001(\0132\034.com.orbekk.same.ClientS" + + "tate\022\026\n\016proposalNumber\030\002 \001(\005\"\037\n\rPaxosRes" + + "ponse\022\016\n\006result\030\001 \001(\0052\324\001\n\006Client\022>\n\010SetS" + + "tate\022\032.com.orbekk.same.Component\032\026.com.o" + + "rbekk.same.Empty\022F\n\016MasterTakeover\022\034.com" + + ".orbekk.same.MasterState\032\026.com.orbekk.sa" + + "me.Empty\022B\n\nMasterDown\022\034.com.orbekk.same" + + ".MasterState\032\026.com.orbekk.same.Empty2\260\001\n" + + "\006Master\022J\n\022JoinNetworkRequest\022\034.com.orbe" + + "kk.same.ClientState\032\026.com.orbekk.same.Em", + "pty\022Z\n\022UpdateStateRequest\022\032.com.orbekk.s" + + "ame.Component\032(.com.orbekk.same.UpdateCo" + + "mponentResponse2\236\001\n\tDirectory\022G\n\017Registe" + + "rNetwork\022\034.com.orbekk.same.MasterState\032\026" + + ".com.orbekk.same.Empty\022H\n\013GetNetworks\022\026." + + "com.orbekk.same.Empty\032!.com.orbekk.same." + + "NetworkDirectory2\241\001\n\005Paxos\022H\n\007Propose\022\035." + + "com.orbekk.same.PaxosRequest\032\036.com.orbek" + + "k.same.PaxosResponse\022N\n\rAcceptRequest\022\035." + + "com.orbekk.same.PaxosRequest\032\036.com.orbek", + "k.same.PaxosResponse2Y\n\rSystemService\022H\n" + + "\017GetSystemStatus\022\026.com.orbekk.same.Empty" + + "\032\035.com.orbekk.same.SystemStatusB\003\210\001\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -6822,7 +6879,7 @@ public final class Services { internal_static_com_orbekk_same_MasterState_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_com_orbekk_same_MasterState_descriptor, - new java.lang.String[] { "MasterUrl", "MasterId", "NetworkName", "MasterLocation", }, + new java.lang.String[] { "MasterUrl", "MasterId", "NetworkName", "MasterLocation", "Revision", }, com.orbekk.same.Services.MasterState.class, com.orbekk.same.Services.MasterState.Builder.class); internal_static_com_orbekk_same_ClientState_descriptor = diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java index ce4f18d..262fef0 100644 --- a/same/src/main/java/com/orbekk/same/State.java +++ b/same/src/main/java/com/orbekk/same/State.java @@ -57,6 +57,18 @@ public class State { state.clear(); } + public synchronized boolean checkRevision(String componentName, + long expectedRevision) { + Component component = state.get(componentName); + if (component == null) { + return true; + } else if (component.getRevision() == expectedRevision) { + return true; + } else { + return false; + } + } + public synchronized void forceUpdate(String componentName, String data, long revision) { Component oldComponent = state.get(componentName); diff --git a/same/src/main/java/com/orbekk/same/services.proto b/same/src/main/java/com/orbekk/same/services.proto index 9b135eb..99045ca 100644 --- a/same/src/main/java/com/orbekk/same/services.proto +++ b/same/src/main/java/com/orbekk/same/services.proto @@ -29,12 +29,13 @@ message Component { required int64 revision = 3; } -// Next tag: 5 +// Next tag: 6 message MasterState { optional string master_url = 1; optional int32 master_id = 2; optional string network_name = 3; optional string master_location = 4; + optional int64 revision = 5; } message ClientState { -- cgit v1.2.3