diff --git a/zeronos-server/src/main/java/io/korandoru/zeronos/server/state/ZeroStateMachine.java b/zeronos-server/src/main/java/io/korandoru/zeronos/server/state/ZeroStateMachine.java index e0507f8..5a5e5e2 100644 --- a/zeronos-server/src/main/java/io/korandoru/zeronos/server/state/ZeroStateMachine.java +++ b/zeronos-server/src/main/java/io/korandoru/zeronos/server/state/ZeroStateMachine.java @@ -161,7 +161,8 @@ private DeleteRangeResponse deleteRange(WriteTxn writeTxn, DeleteRangeRequest re private boolean applyCompares(ReadTxn readTxn, List compareList, long revision) throws InvalidProtocolBufferException { for (Compare compare : compareList) { - final RangeRequest req = RangeRequest.newBuilder().setKey(compare.getKey()).build(); + final RangeRequest req = + RangeRequest.newBuilder().setKey(compare.getKey()).build(); final RangeResponse resp = range(readTxn, req, revision); if (!applyCompare(compare, resp.getKvsList())) { return false; @@ -179,21 +180,25 @@ private boolean applyCompare(Compare compare, List kvs) { final Comparator comparator = ByteString.unsignedLexicographicalComparator(); for (KeyValue kv : kvs) { - final int result = switch (compare.getTarget()) { - case VERSION -> Long.compare(kv.getVersion(), compare.getVersion()); - case VALUE -> comparator.compare(kv.getValue(), compare.getValue()); - default -> throw new UnsupportedOperationException(compare.getTarget().name()); - }; - - final boolean match = switch (compare.getResult()) { - case EQUAL -> result == 0; - case GREATER -> result > 0; - case LESS -> result < 0; - case NOT_EQUAL -> result != 0; - case GREATER_OR_EQUAL -> result >= 0; - case LESS_OR_EQUAL -> result <= 0; - default -> throw new UnsupportedOperationException(compare.getResult().name()); - }; + final int result = + switch (compare.getTarget()) { + case VERSION -> Long.compare(kv.getVersion(), compare.getVersion()); + case VALUE -> comparator.compare(kv.getValue(), compare.getValue()); + default -> throw new UnsupportedOperationException( + compare.getTarget().name()); + }; + + final boolean match = + switch (compare.getResult()) { + case EQUAL -> result == 0; + case GREATER -> result > 0; + case LESS -> result < 0; + case NOT_EQUAL -> result != 0; + case GREATER_OR_EQUAL -> result >= 0; + case LESS_OR_EQUAL -> result <= 0; + default -> throw new UnsupportedOperationException( + compare.getResult().name()); + }; if (!match) { return false; @@ -205,15 +210,25 @@ private boolean applyCompare(Compare compare, List kvs) { @Override public CompletableFuture query(Message request) { - final List requestList; + final TxnRequest txnRequest; try { - final TxnRequest.Builder req = TxnRequest.newBuilder(); - req.mergeFrom(request.getContent()); - requestList = req.getSuccessList(); + final TxnRequest.Builder builder = TxnRequest.newBuilder(); + builder.mergeFrom(request.getContent()); + txnRequest = builder.build(); } catch (InvalidProtocolBufferException e) { return CompletableFuture.failedFuture(e); } + final TermIndex termIndex = getLastAppliedTermIndex(); + + final boolean success; + try (final ReadTxn readTxn = backend.readTxn()) { + success = applyCompares(readTxn, txnRequest.getCompareList(), termIndex.getIndex()); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + final List requestList = success ? txnRequest.getSuccessList() : txnRequest.getFailureList(); + for (RequestOp requestOp : requestList) { if (!requestOp.hasRequestRange()) { final String message = "readonly message contains mutations: " + requestOp.getRequestCase(); @@ -222,7 +237,6 @@ public CompletableFuture query(Message request) { } final List responseOps = new ArrayList<>(); - final TermIndex termIndex = getLastAppliedTermIndex(); try (final ReadTxn readTxn = backend.readTxn()) { for (RequestOp requestOp : requestList) { @@ -235,8 +249,10 @@ public CompletableFuture query(Message request) { return CompletableFuture.failedFuture(e); } - final TxnResponse resp = - TxnResponse.newBuilder().addAllResponses(responseOps).build(); + final TxnResponse resp = TxnResponse.newBuilder() + .setSucceeded(success) + .addAllResponses(responseOps) + .build(); return CompletableFuture.completedFuture(Message.valueOf(resp)); } @@ -244,15 +260,23 @@ public CompletableFuture query(Message request) { public CompletableFuture applyTransaction(TransactionContext trx) { final RaftProtos.LogEntryProto entry = trx.getLogEntry(); - final List requestList; + final TxnRequest txnRequest; try { - final TxnRequest.Builder req = TxnRequest.newBuilder(); - req.mergeFrom(entry.getStateMachineLogEntry().getLogData()); - requestList = req.getSuccessList(); + final TxnRequest.Builder builder = TxnRequest.newBuilder(); + builder.mergeFrom(entry.getStateMachineLogEntry().getLogData()); + txnRequest = builder.build(); } catch (InvalidProtocolBufferException e) { return CompletableFuture.failedFuture(e); } + final boolean success; + try (final ReadTxn readTxn = backend.readTxn()) { + success = applyCompares(readTxn, txnRequest.getCompareList(), entry.getIndex()); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + final List requestList = success ? txnRequest.getSuccessList() : txnRequest.getFailureList(); + final List responseOps = new ArrayList<>(); final AtomicLong sub = new AtomicLong(); try (final WriteTxn writeTxn = backend.writeTxn()) { @@ -294,8 +318,10 @@ public CompletableFuture applyTransaction(TransactionContext trx) { } updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); - final TxnResponse resp = - TxnResponse.newBuilder().addAllResponses(responseOps).build(); + final TxnResponse resp = TxnResponse.newBuilder() + .setSucceeded(success) + .addAllResponses(responseOps) + .build(); return CompletableFuture.completedFuture(Message.valueOf(resp)); } }