summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-07 13:43:36 +0200
committerKjetil Ørbekk <kjetil.orbekk@gmail.com>2012-05-07 13:43:36 +0200
commit9fc7d7a372301715745d4ef85cc2f7eba8a555a9 (patch)
tree2412a0da0c76942ea2ac28312fe78a9c48e260c3
parentc7511602bc2715d2c9211e082157ac13a7fc01a1 (diff)
State-wide revision field maintained by master.
-rw-r--r--same/src/main/java/com/orbekk/same/Client.java4
-rw-r--r--same/src/main/java/com/orbekk/same/Master.java15
-rw-r--r--same/src/main/java/com/orbekk/same/Services.java115
-rw-r--r--same/src/main/java/com/orbekk/same/State.java12
-rw-r--r--same/src/main/java/com/orbekk/same/services.proto3
5 files changed, 111 insertions, 38 deletions
diff --git a/same/src/main/java/com/orbekk/same/Client.java b/same/src/main/java/com/orbekk/same/Client.java
index 6b0ddc4..0ee4c10 100644
--- a/same/src/main/java/com/orbekk/same/Client.java
+++ b/same/src/main/java/com/orbekk/same/Client.java
@@ -57,11 +57,9 @@ public class Client {
private ClientInterfaceImpl() {
}
- /** Get a copy of all the client state.
- */
@Override
public State getState() {
- return new State(state);
+ return state;
}
@Override
diff --git a/same/src/main/java/com/orbekk/same/Master.java b/same/src/main/java/com/orbekk/same/Master.java
index e3ada43..4327a7a 100644
--- a/same/src/main/java/com/orbekk/same/Master.java
+++ b/same/src/main/java/com/orbekk/same/Master.java
@@ -18,6 +18,7 @@ package com.orbekk.same;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@ public class Master {
private final ConnectionManager connections;
private String myUrl;
private String myLocation; // Protobuf server location, i.e., myIp:port
+ private AtomicLong revision = new AtomicLong(1);
State state;
private volatile int masterId = 1;
private final RpcFactory rpcf;
@@ -96,6 +98,7 @@ public class Master {
.setMasterLocation(getLocation())
.setNetworkName(getNetworkName())
.setMasterId(masterId)
+ .setRevision(revision.get())
.build();
}
@@ -112,13 +115,15 @@ public class Master {
Services.Component request,
RpcCallback<Services.UpdateComponentResponse> done) {
logger.info("updateStateRequest({})", request);
- boolean updated = state.update(request.getId(), request.getData(),
- request.getRevision() + 1);
- if (updated) {
+ boolean success = false;
+ if (state.checkRevision(request.getId(), request.getRevision())) {
+ success = true;
+ long newRevision = revision.incrementAndGet();
+ state.forceUpdate(request.getId(), request.getData(), newRevision);
updateStateRequestThread.add(request.getId());
}
done.run(Services.UpdateComponentResponse.newBuilder()
- .setSuccess(updated).build());
+ .setSuccess(success).build());
}
};
@@ -264,6 +269,6 @@ public class Master {
client.masterTakeover(rpc, getMasterInfo(), done);
}
updateStateRequestThread.add(".masterUrl");
- updateStateRequestThread .add(".masterLocation");
+ updateStateRequestThread.add(".masterLocation");
}
}
diff --git a/same/src/main/java/com/orbekk/same/Services.java b/same/src/main/java/com/orbekk/same/Services.java
index 06489f7..8e08529 100644
--- a/same/src/main/java/com/orbekk/same/Services.java
+++ b/same/src/main/java/com/orbekk/same/Services.java
@@ -2811,6 +2811,10 @@ public final class Services {
// optional string master_location = 4;
boolean hasMasterLocation();
String getMasterLocation();
+
+ // optional int64 revision = 5;
+ boolean hasRevision();
+ long getRevision();
}
public static final class MasterState extends
com.google.protobuf.GeneratedMessage
@@ -2947,11 +2951,22 @@ public final class Services {
}
}
+ // optional int64 revision = 5;
+ public static final int REVISION_FIELD_NUMBER = 5;
+ private long revision_;
+ public boolean hasRevision() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getRevision() {
+ return revision_;
+ }
+
private void initFields() {
masterUrl_ = "";
masterId_ = 0;
networkName_ = "";
masterLocation_ = "";
+ revision_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -2977,6 +2992,9 @@ public final class Services {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(4, getMasterLocationBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeInt64(5, revision_);
+ }
getUnknownFields().writeTo(output);
}
@@ -3002,6 +3020,10 @@ public final class Services {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(4, getMasterLocationBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeInt64Size(5, revision_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -3134,6 +3156,8 @@ public final class Services {
bitField0_ = (bitField0_ & ~0x00000004);
masterLocation_ = "";
bitField0_ = (bitField0_ & ~0x00000008);
+ revision_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
@@ -3188,6 +3212,10 @@ public final class Services {
to_bitField0_ |= 0x00000008;
}
result.masterLocation_ = masterLocation_;
+ if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.revision_ = revision_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -3216,6 +3244,9 @@ public final class Services {
if (other.hasMasterLocation()) {
setMasterLocation(other.getMasterLocation());
}
+ if (other.hasRevision()) {
+ setRevision(other.getRevision());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -3267,6 +3298,11 @@ public final class Services {
masterLocation_ = input.readBytes();
break;
}
+ case 40: {
+ bitField0_ |= 0x00000010;
+ revision_ = input.readInt64();
+ break;
+ }
}
}
}
@@ -3402,6 +3438,27 @@ public final class Services {
onChanged();
}
+ // optional int64 revision = 5;
+ private long revision_ ;
+ public boolean hasRevision() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ public long getRevision() {
+ return revision_;
+ }
+ public Builder setRevision(long value) {
+ bitField0_ |= 0x00000010;
+ revision_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearRevision() {
+ bitField0_ = (bitField0_ & ~0x00000010);
+ revision_ = 0L;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:com.orbekk.same.MasterState)
}
@@ -6749,36 +6806,36 @@ public final class Services {
"rState\022:\n\026client_state_component\030\005 \003(\0132\032" +
".com.orbekk.same.Component\022\031\n\021extra_clie" +
"nt_info\030\006 \003(\t\"7\n\tComponent\022\n\n\002id\030\001 \002(\t\022\014" +
- "\n\004data\030\002 \002(\t\022\020\n\010revision\030\003 \002(\003\"c\n\013Master" +
+ "\n\004data\030\002 \002(\t\022\020\n\010revision\030\003 \002(\003\"u\n\013Master" +
"State\022\022\n\nmaster_url\030\001 \001(\t\022\021\n\tmaster_id\030\002" +
" \001(\005\022\024\n\014network_name\030\003 \001(\t\022\027\n\017master_loc" +
- "ation\030\004 \001(\t\",\n\013ClientState\022\013\n\003url\030\001 \001(\t\022" +
- "\020\n\010location\030\002 \001(\t\"A\n\020NetworkDirectory\022-\n" +
- "\007network\030\001 \003(\0132\034.com.orbekk.same.MasterS" +
- "tate\"T\n\014PaxosRequest\022,\n\006client\030\001 \001(\0132\034.c",
- "om.orbekk.same.ClientState\022\026\n\016proposalNu" +
- "mber\030\002 \001(\005\"\037\n\rPaxosResponse\022\016\n\006result\030\001 " +
- "\001(\0052\324\001\n\006Client\022>\n\010SetState\022\032.com.orbekk." +
- "same.Component\032\026.com.orbekk.same.Empty\022F" +
- "\n\016MasterTakeover\022\034.com.orbekk.same.Maste" +
- "rState\032\026.com.orbekk.same.Empty\022B\n\nMaster" +
- "Down\022\034.com.orbekk.same.MasterState\032\026.com" +
- ".orbekk.same.Empty2\260\001\n\006Master\022J\n\022JoinNet" +
- "workRequest\022\034.com.orbekk.same.ClientStat" +
- "e\032\026.com.orbekk.same.Empty\022Z\n\022UpdateState",
- "Request\022\032.com.orbekk.same.Component\032(.co" +
- "m.orbekk.same.UpdateComponentResponse2\236\001" +
- "\n\tDirectory\022G\n\017RegisterNetwork\022\034.com.orb" +
- "ekk.same.MasterState\032\026.com.orbekk.same.E" +
- "mpty\022H\n\013GetNetworks\022\026.com.orbekk.same.Em" +
- "pty\032!.com.orbekk.same.NetworkDirectory2\241" +
- "\001\n\005Paxos\022H\n\007Propose\022\035.com.orbekk.same.Pa" +
- "xosRequest\032\036.com.orbekk.same.PaxosRespon" +
- "se\022N\n\rAcceptRequest\022\035.com.orbekk.same.Pa" +
- "xosRequest\032\036.com.orbekk.same.PaxosRespon",
- "se2Y\n\rSystemService\022H\n\017GetSystemStatus\022\026" +
- ".com.orbekk.same.Empty\032\035.com.orbekk.same" +
- ".SystemStatusB\003\210\001\001"
+ "ation\030\004 \001(\t\022\020\n\010revision\030\005 \001(\003\",\n\013ClientS" +
+ "tate\022\013\n\003url\030\001 \001(\t\022\020\n\010location\030\002 \001(\t\"A\n\020N" +
+ "etworkDirectory\022-\n\007network\030\001 \003(\0132\034.com.o" +
+ "rbekk.same.MasterState\"T\n\014PaxosRequest\022,",
+ "\n\006client\030\001 \001(\0132\034.com.orbekk.same.ClientS" +
+ "tate\022\026\n\016proposalNumber\030\002 \001(\005\"\037\n\rPaxosRes" +
+ "ponse\022\016\n\006result\030\001 \001(\0052\324\001\n\006Client\022>\n\010SetS" +
+ "tate\022\032.com.orbekk.same.Component\032\026.com.o" +
+ "rbekk.same.Empty\022F\n\016MasterTakeover\022\034.com" +
+ ".orbekk.same.MasterState\032\026.com.orbekk.sa" +
+ "me.Empty\022B\n\nMasterDown\022\034.com.orbekk.same" +
+ ".MasterState\032\026.com.orbekk.same.Empty2\260\001\n" +
+ "\006Master\022J\n\022JoinNetworkRequest\022\034.com.orbe" +
+ "kk.same.ClientState\032\026.com.orbekk.same.Em",
+ "pty\022Z\n\022UpdateStateRequest\022\032.com.orbekk.s" +
+ "ame.Component\032(.com.orbekk.same.UpdateCo" +
+ "mponentResponse2\236\001\n\tDirectory\022G\n\017Registe" +
+ "rNetwork\022\034.com.orbekk.same.MasterState\032\026" +
+ ".com.orbekk.same.Empty\022H\n\013GetNetworks\022\026." +
+ "com.orbekk.same.Empty\032!.com.orbekk.same." +
+ "NetworkDirectory2\241\001\n\005Paxos\022H\n\007Propose\022\035." +
+ "com.orbekk.same.PaxosRequest\032\036.com.orbek" +
+ "k.same.PaxosResponse\022N\n\rAcceptRequest\022\035." +
+ "com.orbekk.same.PaxosRequest\032\036.com.orbek",
+ "k.same.PaxosResponse2Y\n\rSystemService\022H\n" +
+ "\017GetSystemStatus\022\026.com.orbekk.same.Empty" +
+ "\032\035.com.orbekk.same.SystemStatusB\003\210\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -6822,7 +6879,7 @@ public final class Services {
internal_static_com_orbekk_same_MasterState_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_com_orbekk_same_MasterState_descriptor,
- new java.lang.String[] { "MasterUrl", "MasterId", "NetworkName", "MasterLocation", },
+ new java.lang.String[] { "MasterUrl", "MasterId", "NetworkName", "MasterLocation", "Revision", },
com.orbekk.same.Services.MasterState.class,
com.orbekk.same.Services.MasterState.Builder.class);
internal_static_com_orbekk_same_ClientState_descriptor =
diff --git a/same/src/main/java/com/orbekk/same/State.java b/same/src/main/java/com/orbekk/same/State.java
index ce4f18d..262fef0 100644
--- a/same/src/main/java/com/orbekk/same/State.java
+++ b/same/src/main/java/com/orbekk/same/State.java
@@ -57,6 +57,18 @@ public class State {
state.clear();
}
+ public synchronized boolean checkRevision(String componentName,
+ long expectedRevision) {
+ Component component = state.get(componentName);
+ if (component == null) {
+ return true;
+ } else if (component.getRevision() == expectedRevision) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
public synchronized void forceUpdate(String componentName,
String data, long revision) {
Component oldComponent = state.get(componentName);
diff --git a/same/src/main/java/com/orbekk/same/services.proto b/same/src/main/java/com/orbekk/same/services.proto
index 9b135eb..99045ca 100644
--- a/same/src/main/java/com/orbekk/same/services.proto
+++ b/same/src/main/java/com/orbekk/same/services.proto
@@ -29,12 +29,13 @@ message Component {
required int64 revision = 3;
}
-// Next tag: 5
+// Next tag: 6
message MasterState {
optional string master_url = 1;
optional int32 master_id = 2;
optional string network_name = 3;
optional string master_location = 4;
+ optional int64 revision = 5;
}
message ClientState {