From dd81bc79eb6a7d5082c2382c73c10d511fd1c4a5 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Wed, 17 Jun 2026 09:52:24 +0200 Subject: [PATCH 1/2] fix(core): self-heal etcd meta watch on transport reconnect (#3036) EtcdMetaDriver.listen/listenPrefix handed jetcd a bare Consumer, so a terminal watch error (e.g. after a transport reconnect) was swallowed and the JVM-global schema-cache-clear listener died silently: a node stopped receiving cross-node cache-clear events with no error or warning. Switch to the Watch.Listener overload and re-subscribe on onError/onCompleted via a daemon-backed backoff, mirroring the self-heal PdMetaDriver already gets from KvClient. The driver watch now stays live across reconnects, so CachedSchemaTransactionV2's register-once flag staying true is correct; the unused resetMetaListenerForReconnect stopgap and its TODO are removed. Add EtcdMetaDriverTest covering re-subscribe on error/completion and event delivery, and register it in UnitTestSuite. --- .../cache/CachedSchemaTransactionV2.java | 29 +---- .../apache/hugegraph/meta/EtcdMetaDriver.java | 63 +++++++++- .../hugegraph/meta/EtcdMetaDriverTest.java | 115 ++++++++++++++++++ .../apache/hugegraph/unit/UnitTestSuite.java | 2 + 4 files changed, 179 insertions(+), 30 deletions(-) create mode 100644 hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java index d6fbe97964..b6c47e7050 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/cache/CachedSchemaTransactionV2.java @@ -52,10 +52,10 @@ public class CachedSchemaTransactionV2 extends SchemaTransactionV2 { // MetaDriver doesn't expose unlisten, register the meta listener once. // Lifecycle: this JVM-global flag is intentionally never reset by - // unlistenChanges() (the underlying gRPC watch is process-wide). If that - // watch is silently dropped after a transport reconnect, recovery is not - // automatic; resetMetaListenerForReconnect() is only a manual hook to let - // the next schema operation install a fresh watch. + // unlistenChanges() (the underlying gRPC watch is process-wide). The driver + // watch self-heals across transport reconnects (PdMetaDriver via KvClient, + // EtcdMetaDriver via Watch.Listener re-subscribe), so the subscription stays + // live and the flag staying true is correct. private static final AtomicBoolean metaEventListenerRegistered = new AtomicBoolean(false); @@ -251,27 +251,6 @@ static void handleSchemaCacheClearEvent(T response) { } } - /** - * Manually reset the JVM-global meta listener flag after detecting that - * the MetaManager transport reconnected and dropped the underlying gRPC - * watch. This method is not wired to a MetaManager/MetaDriver reconnect - * callback today; callers must invoke it explicitly after detecting that - * condition. Without such a manual reset {@link #metaEventListenerRegistered} - * would stay {@code true} forever and this JVM would stop receiving - * cross-node schema cache clear events with no error or warning. - * - *

TODO: wire this into MetaManager once it exposes a transport - * reconnect callback (e.g. {@code listenReconnect} / - * {@code onTransportReconnect}). Until then it must be invoked - * explicitly by code that detects the reconnect. - */ - public static void resetMetaListenerForReconnect() { - if (metaEventListenerRegistered.compareAndSet(true, false)) { - LOG.warn("Schema cache clear meta listener lost on reconnect - " + - "will re-register on next schema operation."); - } - } - public void clearCache(boolean notify) { // Same TOCTOU ordering as clearSchemaCache(String): clear nameCache // first, then the array attachment, then idCache last. diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java index 8c9600e6b9..5a4b920d71 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java @@ -25,6 +25,9 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.commons.io.FileUtils; @@ -33,7 +36,9 @@ import org.apache.hugegraph.meta.lock.LockResult; import org.apache.hugegraph.type.define.CollectionType; import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.Log; import org.apache.hugegraph.util.collection.CollectionFactory; +import org.slf4j.Logger; import com.google.common.base.Strings; @@ -42,6 +47,7 @@ import io.etcd.jetcd.ClientBuilder; import io.etcd.jetcd.KV; import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Watch; import io.etcd.jetcd.kv.GetResponse; import io.etcd.jetcd.lease.LeaseKeepAliveResponse; import io.etcd.jetcd.options.DeleteOption; @@ -57,9 +63,24 @@ public class EtcdMetaDriver implements MetaDriver { + private static final Logger LOG = Log.logger(EtcdMetaDriver.class); + private final Client client; private final EtcdDistributedLock lock; + // Re-subscribes a dropped watch off the jetcd callback thread; single + // daemon thread, process-lifetime (no close()), so JVM shutdown reclaims it. + private final ScheduledExecutorService reWatchExecutor = + Executors.newSingleThreadScheduledExecutor(r -> { + Thread thread = new Thread(r, "etcd-meta-rewatch"); + thread.setDaemon(true); + return thread; + }); + + // Backoff before re-subscribing a dropped watch. Package-private and + // mutable only so tests can set it to 0; never reassigned in production. + long reWatchDelayMs = 1000L; + public EtcdMetaDriver(String trustFile, String clientCertFile, String clientKeyFile, Object... endpoints) { ClientBuilder builder = this.etcdMetaDriverBuilder(endpoints); @@ -76,6 +97,13 @@ public EtcdMetaDriver(Object... endpoints) { this.lock = EtcdDistributedLock.getInstance(this.client); } + // Package-private constructor for tests: inject a mock Client and skip lock + // setup (watch tests never touch the distributed lock). + EtcdMetaDriver(Client client) { + this.client = client; + this.lock = null; + } + private static ByteSequence toByteSequence(String content) { return ByteSequence.from(content.getBytes()); } @@ -303,9 +331,8 @@ public void unlock(String key, LockResult lockResult) { @SuppressWarnings("unchecked") @Override public void listen(String key, Consumer consumer) { - - this.client.getWatchClient().watch(toByteSequence(key), - (Consumer) consumer); + this.watchKey(toByteSequence(key), WatchOption.DEFAULT, + (Consumer) consumer); } /** @@ -314,9 +341,35 @@ public void listen(String key, Consumer consumer) { @SuppressWarnings("unchecked") @Override public void listenPrefix(String prefix, Consumer consumer) { - ByteSequence sequence = toByteSequence(prefix); WatchOption option = WatchOption.newBuilder().isPrefix(true).build(); - this.client.getWatchClient().watch(sequence, option, (Consumer) consumer); + this.watchKey(toByteSequence(prefix), option, + (Consumer) consumer); + } + + /** + * Subscribe a self-healing watch. Unlike the bare {@code Consumer} overload, + * this surfaces {@code onError}/{@code onCompleted}: when the underlying + * watch terminates (e.g. a transport reconnect drops the gRPC stream) it + * re-subscribes after a short backoff, so the listener is not silently lost. + * Mirrors the re-subscribe behaviour PdMetaDriver already gets from KvClient. + */ + private void watchKey(ByteSequence key, WatchOption option, + Consumer consumer) { + Watch.Listener listener = Watch.listener( + consumer, + throwable -> this.scheduleReWatch(key, option, consumer, throwable), + () -> this.scheduleReWatch(key, option, consumer, null)); + // Watcher intentionally not closed: process-lifetime watch, recreated + // on self-heal; the prior watcher's stream has already terminated. + this.client.getWatchClient().watch(key, option, listener); + } + private void scheduleReWatch(ByteSequence key, WatchOption option, + Consumer consumer, + Throwable cause) { + LOG.warn("etcd meta watch dropped, re-subscribing", cause); + this.reWatchExecutor.schedule( + () -> this.watchKey(key, option, consumer), + this.reWatchDelayMs, TimeUnit.MILLISECONDS); } } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java new file mode 100644 index 0000000000..6db9eadfbc --- /dev/null +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hugegraph.meta; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hugegraph.testutil.Assert; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.Watch; +import io.etcd.jetcd.options.WatchOption; +import io.etcd.jetcd.watch.WatchResponse; + +/** + * Unit tests for {@link EtcdMetaDriver}'s self-healing watch: a dropped watch + * (onError / onCompleted) must be re-subscribed so the listener is not silently + * lost after a transport reconnect (issue #3036). Uses a mock jetcd + * {@link Client} via the package-private test constructor; no live etcd needed. + */ +public class EtcdMetaDriverTest { + + @Test + public void testListenReSubscribesOnError() { + Watch watch = Mockito.mock(Watch.class); + EtcdMetaDriver driver = newDriver(watch); + + driver.listen("k", response -> { }); + captureListener(watch).onError(new RuntimeException("watch dropped")); + + // The watch terminated, so the driver must re-subscribe: a second + // watch() call lands once the (0ms) backoff task runs. + Mockito.verify(watch, Mockito.timeout(2000).times(2)) + .watch(Mockito.any(ByteSequence.class), + Mockito.any(WatchOption.class), + Mockito.any(Watch.Listener.class)); + } + + @Test + public void testListenReSubscribesOnCompleted() { + Watch watch = Mockito.mock(Watch.class); + EtcdMetaDriver driver = newDriver(watch); + + driver.listen("k", response -> { }); + captureListener(watch).onCompleted(); + + Mockito.verify(watch, Mockito.timeout(2000).times(2)) + .watch(Mockito.any(ByteSequence.class), + Mockito.any(WatchOption.class), + Mockito.any(Watch.Listener.class)); + } + + @Test + public void testListenPrefixReSubscribesOnError() { + Watch watch = Mockito.mock(Watch.class); + EtcdMetaDriver driver = newDriver(watch); + + driver.listenPrefix("prefix", response -> { }); + captureListener(watch).onError(new RuntimeException("watch dropped")); + + Mockito.verify(watch, Mockito.timeout(2000).times(2)) + .watch(Mockito.any(ByteSequence.class), + Mockito.any(WatchOption.class), + Mockito.any(Watch.Listener.class)); + } + + @Test + public void testListenDeliversEventsToConsumer() { + Watch watch = Mockito.mock(Watch.class); + AtomicReference received = new AtomicReference<>(); + EtcdMetaDriver driver = newDriver(watch); + + driver.listen("k", received::set); + WatchResponse response = Mockito.mock(WatchResponse.class); + captureListener(watch).onNext(response); + + Assert.assertSame(response, received.get()); + } + + private static EtcdMetaDriver newDriver(Watch watch) { + Client client = Mockito.mock(Client.class); + Mockito.when(client.getWatchClient()).thenReturn(watch); + EtcdMetaDriver driver = new EtcdMetaDriver(client); + // No backoff in tests so the re-subscribe runs promptly. + driver.reWatchDelayMs = 0L; + return driver; + } + + private static Watch.Listener captureListener(Watch watch) { + ArgumentCaptor captor = + ArgumentCaptor.forClass(Watch.Listener.class); + Mockito.verify(watch).watch(Mockito.any(ByteSequence.class), + Mockito.any(WatchOption.class), + captor.capture()); + return captor.getValue(); + } +} diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java index fb7f0e744b..2bb10efaf4 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.unit; import org.apache.hugegraph.core.RoleElectionStateMachineTest; +import org.apache.hugegraph.meta.EtcdMetaDriverTest; import org.apache.hugegraph.meta.MetaManagerSchemaCacheClearEventTest; import org.apache.hugegraph.traversal.optimize.TraversalUtilOptimizeTest; import org.apache.hugegraph.unit.api.filter.LoadDetectFilterTest; @@ -99,6 +100,7 @@ CacheTest.LevelCacheTest.class, CachedSchemaTransactionTest.class, MetaManagerSchemaCacheClearEventTest.class, + EtcdMetaDriverTest.class, CachedGraphTransactionTest.class, CacheManagerTest.class, RamTableTest.class, From d2bd2a275d6fe651e7cd1479befe9afb3c746a2d Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Tue, 23 Jun 2026 09:47:57 +0200 Subject: [PATCH 2/2] fix(core): re-watch etcd meta only on terminal close, not on every error Address review on #3062: - jetcd 0.5.9 WatcherImpl.handleError already retries recoverable errors by notifying onError and rescheduling resume() on the same watcher. Re-subscribing from onError therefore opened a duplicate watch on every transient reconnect. Re-subscribe now happens only from onCompleted, jetcd's terminal-close signal, where the old watcher is already removed; onError only logs. - Guard the scheduled re-watch: if the re-subscribe itself throws (endpoint still unreachable), it is retried with the same backoff instead of abandoning recovery after a single failure. - Strengthen tests: assert onError does not re-watch, assert the re-created prefix watch preserves the key and prefix WatchOption, and cover a re-watch that throws once then succeeds. --- .../apache/hugegraph/meta/EtcdMetaDriver.java | 58 ++++++++++++----- .../hugegraph/meta/EtcdMetaDriverTest.java | 63 +++++++++++++++---- 2 files changed, 93 insertions(+), 28 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java index 5a4b920d71..75042131c8 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriver.java @@ -347,29 +347,57 @@ public void listenPrefix(String prefix, Consumer consumer) { } /** - * Subscribe a self-healing watch. Unlike the bare {@code Consumer} overload, - * this surfaces {@code onError}/{@code onCompleted}: when the underlying - * watch terminates (e.g. a transport reconnect drops the gRPC stream) it - * re-subscribes after a short backoff, so the listener is not silently lost. - * Mirrors the re-subscribe behaviour PdMetaDriver already gets from KvClient. + * Subscribe a watch that survives the terminal close jetcd cannot recover + * from. The bare {@code Consumer} overload discards {@code onError} and + * {@code onCompleted}, so a non-retryable failure silently drops the + * listener (issue #3036). + *

+ * jetcd 0.5.9 ({@code WatchImpl.WatcherImpl.handleError}) already retries + * retryable errors itself: it notifies {@code onError} and then + * reschedules {@code resume()} on the same watcher. Re-subscribing from + * {@code onError} would therefore open a duplicate watch on every transient + * reconnect, so {@code onError} only logs here. A non-retryable error (or an + * explicit cancel) ends in {@code close()}, which removes the watcher and + * invokes {@code onCompleted}; that is the only point where the watch is + * truly gone, so re-subscribe happens there. The old watcher is already + * closed and removed, so the replacement is not a duplicate. */ private void watchKey(ByteSequence key, WatchOption option, Consumer consumer) { Watch.Listener listener = Watch.listener( consumer, - throwable -> this.scheduleReWatch(key, option, consumer, throwable), - () -> this.scheduleReWatch(key, option, consumer, null)); - // Watcher intentionally not closed: process-lifetime watch, recreated - // on self-heal; the prior watcher's stream has already terminated. + throwable -> LOG.warn("etcd meta watch error for key '{}', " + + "jetcd will retry if recoverable", + key.toString(Charset.defaultCharset()), + throwable), + () -> this.scheduleReWatch(key, option, consumer)); this.client.getWatchClient().watch(key, option, listener); } private void scheduleReWatch(ByteSequence key, WatchOption option, - Consumer consumer, - Throwable cause) { - LOG.warn("etcd meta watch dropped, re-subscribing", cause); - this.reWatchExecutor.schedule( - () -> this.watchKey(key, option, consumer), - this.reWatchDelayMs, TimeUnit.MILLISECONDS); + Consumer consumer) { + this.reWatchExecutor.schedule(() -> this.reWatch(key, option, consumer), + this.reWatchDelayMs, TimeUnit.MILLISECONDS); + } + + /** + * Re-establish a watch dropped by a terminal close. If the re-subscribe + * itself fails (e.g. the endpoint is still unreachable), it is retried with + * the same backoff instead of giving up, otherwise a single failed attempt + * would lose the listener permanently. + */ + private void reWatch(ByteSequence key, WatchOption option, + Consumer consumer) { + try { + LOG.info("Re-establishing etcd meta watch for key '{}'", + key.toString(Charset.defaultCharset())); + this.watchKey(key, option, consumer); + } catch (Exception e) { + LOG.warn("Failed to re-establish etcd meta watch for key '{}', " + + "retrying in {} ms", + key.toString(Charset.defaultCharset()), + this.reWatchDelayMs, e); + this.scheduleReWatch(key, option, consumer); + } } } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java index 6db9eadfbc..cf0429b550 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/meta/EtcdMetaDriverTest.java @@ -17,6 +17,7 @@ package org.apache.hugegraph.meta; +import java.nio.charset.Charset; import java.util.concurrent.atomic.AtomicReference; import org.apache.hugegraph.testutil.Assert; @@ -31,23 +32,24 @@ import io.etcd.jetcd.watch.WatchResponse; /** - * Unit tests for {@link EtcdMetaDriver}'s self-healing watch: a dropped watch - * (onError / onCompleted) must be re-subscribed so the listener is not silently - * lost after a transport reconnect (issue #3036). Uses a mock jetcd - * {@link Client} via the package-private test constructor; no live etcd needed. + * Unit tests for {@link EtcdMetaDriver}'s watch recovery (issue #3036). jetcd + * retries recoverable errors itself, so the driver must re-subscribe only on the + * terminal {@code onCompleted} close, not on every {@code onError}. A mock jetcd + * {@link Client} drives these paths through the package-private test + * constructor; no live etcd is needed. */ public class EtcdMetaDriverTest { @Test - public void testListenReSubscribesOnError() { + public void testListenReWatchesOnCompleted() { Watch watch = Mockito.mock(Watch.class); EtcdMetaDriver driver = newDriver(watch); driver.listen("k", response -> { }); - captureListener(watch).onError(new RuntimeException("watch dropped")); + // onCompleted is jetcd's terminal-close signal: the watcher is gone, so + // the driver must re-subscribe (a second watch() call). + captureListener(watch).onCompleted(); - // The watch terminated, so the driver must re-subscribe: a second - // watch() call lands once the (0ms) backoff task runs. Mockito.verify(watch, Mockito.timeout(2000).times(2)) .watch(Mockito.any(ByteSequence.class), Mockito.any(WatchOption.class), @@ -55,28 +57,63 @@ public void testListenReSubscribesOnError() { } @Test - public void testListenReSubscribesOnCompleted() { + public void testListenDoesNotReWatchOnError() { Watch watch = Mockito.mock(Watch.class); EtcdMetaDriver driver = newDriver(watch); driver.listen("k", response -> { }); - captureListener(watch).onCompleted(); + // jetcd already reschedules the same watcher for recoverable errors; + // re-subscribing here would create a duplicate watch. Assert it does not. + captureListener(watch).onError(new RuntimeException("recoverable")); - Mockito.verify(watch, Mockito.timeout(2000).times(2)) + Mockito.verify(watch, Mockito.after(500).times(1)) .watch(Mockito.any(ByteSequence.class), Mockito.any(WatchOption.class), Mockito.any(Watch.Listener.class)); } @Test - public void testListenPrefixReSubscribesOnError() { + public void testListenPrefixReWatchPreservesKeyAndPrefix() { Watch watch = Mockito.mock(Watch.class); EtcdMetaDriver driver = newDriver(watch); driver.listenPrefix("prefix", response -> { }); - captureListener(watch).onError(new RuntimeException("watch dropped")); + captureListener(watch).onCompleted(); + ArgumentCaptor keyCaptor = + ArgumentCaptor.forClass(ByteSequence.class); + ArgumentCaptor optionCaptor = + ArgumentCaptor.forClass(WatchOption.class); Mockito.verify(watch, Mockito.timeout(2000).times(2)) + .watch(keyCaptor.capture(), optionCaptor.capture(), + Mockito.any(Watch.Listener.class)); + + // The re-created watch must still target "prefix" with prefix semantics, + // not silently downgrade to an exact-key watch. + ByteSequence reWatchKey = keyCaptor.getAllValues().get(1); + WatchOption reWatchOption = optionCaptor.getAllValues().get(1); + Assert.assertEquals("prefix", + reWatchKey.toString(Charset.defaultCharset())); + Assert.assertTrue(reWatchOption.isPrefix()); + } + + @Test + public void testReWatchRetriesWhenFirstAttemptThrows() { + Watch watch = Mockito.mock(Watch.class); + // Initial watch() succeeds, the first re-watch throws, the retry succeeds. + // A single failed re-watch must not abandon recovery permanently. + Mockito.when(watch.watch(Mockito.any(ByteSequence.class), + Mockito.any(WatchOption.class), + Mockito.any(Watch.Listener.class))) + .thenReturn(null) + .thenThrow(new RuntimeException("etcd still unreachable")) + .thenReturn(null); + EtcdMetaDriver driver = newDriver(watch); + + driver.listen("k", response -> { }); + captureListener(watch).onCompleted(); + + Mockito.verify(watch, Mockito.timeout(2000).times(3)) .watch(Mockito.any(ByteSequence.class), Mockito.any(WatchOption.class), Mockito.any(Watch.Listener.class));