From 87468f93796c0ef2246fb1e16ecc5441625613b5 Mon Sep 17 00:00:00 2001 From: Artyom Dubinin Date: Wed, 26 Apr 2023 19:15:22 +0300 Subject: [PATCH 1/3] Add balancing test with discovery and connections --- .../integration/ClusterConnectionIT.java | 189 ++++++++++++++++-- .../cartridge/app/roles/api_router.lua | 67 +------ .../resources/cartridge/app/roles/custom.lua | 8 + src/test/resources/cartridge/instances.yml | 5 + .../resources/cartridge/modules/counter.lua | 78 ++++++++ .../resources/cartridge/utils/metadata.lua | 7 + 6 files changed, 277 insertions(+), 77 deletions(-) create mode 100644 src/test/resources/cartridge/modules/counter.lua create mode 100644 src/test/resources/cartridge/utils/metadata.lua diff --git a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java index 2fb5e9dde..32414d352 100644 --- a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java +++ b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java @@ -1,34 +1,46 @@ package io.tarantool.driver.integration; -import io.tarantool.driver.api.TarantoolClientConfig; -import io.tarantool.driver.api.TarantoolClusterAddressProvider; -import io.tarantool.driver.api.TarantoolServerAddress; -import io.tarantool.driver.api.connection.TarantoolConnection; -import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies; -import io.tarantool.driver.auth.SimpleTarantoolCredentials; -import io.tarantool.driver.core.ClusterTarantoolTupleClient; -import io.tarantool.driver.core.ProxyTarantoolTupleClient; -import io.tarantool.driver.core.RetryingTarantoolTupleClient; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.rnorth.ducttape.unreliables.Unreliables; -import org.testcontainers.containers.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.rnorth.ducttape.unreliables.Unreliables; +import org.testcontainers.containers.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.tarantool.driver.api.TarantoolClient; +import io.tarantool.driver.api.TarantoolClientConfig; +import io.tarantool.driver.api.TarantoolClientFactory; +import io.tarantool.driver.api.TarantoolClusterAddressProvider; +import io.tarantool.driver.api.TarantoolResult; +import io.tarantool.driver.api.TarantoolServerAddress; +import io.tarantool.driver.api.connection.TarantoolConnection; +import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies; +import io.tarantool.driver.api.tuple.TarantoolTuple; +import io.tarantool.driver.auth.SimpleTarantoolCredentials; +import io.tarantool.driver.auth.TarantoolCredentials; +import io.tarantool.driver.cluster.BinaryClusterDiscoveryEndpoint; +import io.tarantool.driver.cluster.BinaryDiscoveryClusterAddressProvider; +import io.tarantool.driver.cluster.TarantoolClusterDiscoveryConfig; +import io.tarantool.driver.cluster.TestWrappedClusterAddressProvider; +import io.tarantool.driver.core.ClusterTarantoolTupleClient; +import io.tarantool.driver.core.ProxyTarantoolTupleClient; +import io.tarantool.driver.core.RetryingTarantoolTupleClient; + /** * @author Alexey Kuzin * @author Artyom Dubinin @@ -49,9 +61,9 @@ public static void setUp() throws TimeoutException { private TarantoolClientConfig.Builder prepareConfig() { return TarantoolClientConfig.builder() - .withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD)) - .withConnectTimeout(1000) - .withReadTimeout(1000); + .withCredentials(new SimpleTarantoolCredentials(USER_NAME, PASSWORD)) + .withConnectTimeout(1000) + .withReadTimeout(1000); } private RetryingTarantoolTupleClient setupRouterClient(int port, int retries, long delay) { @@ -59,7 +71,8 @@ private RetryingTarantoolTupleClient setupRouterClient(int port, int retries, lo prepareConfig().build(), container.getRouterHost(), container.getMappedPort(port)); return new RetryingTarantoolTupleClient(new ProxyTarantoolTupleClient(clusterClient), - TarantoolRequestRetryPolicies.byNumberOfAttempts(retries).withDelay(delay).build()); + TarantoolRequestRetryPolicies.byNumberOfAttempts(retries) + .withDelay(delay).build()); } private RetryingTarantoolTupleClient setupClusterClient( @@ -70,7 +83,143 @@ private RetryingTarantoolTupleClient setupClusterClient( ProxyTarantoolTupleClient client = new ProxyTarantoolTupleClient(clusterClient); return new RetryingTarantoolTupleClient(client, - TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true).withDelay(delay).build()); + TarantoolRequestRetryPolicies.byNumberOfAttempts(retries, e -> true) + .withDelay(delay).build()); + } + + @Test + void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections() + throws ExecutionException, InterruptedException, IOException { + + TarantoolClient> clusterClient = + getTarantoolClusterClientWithDiscovery(2, 5_000); + + TarantoolClient> routerClient1 = getSimpleClient(3301); + TarantoolClient> routerClient2 = getSimpleClient(3302); + TarantoolClient> routerClient3 = getSimpleClient(3303); + // 3306 isn't in cluster topology yet + TarantoolClient> routerClient4 = getSimpleClient(3306); + + int callCounter = 15; + for (int i = 0; i < callCounter; i++) { + clusterClient.callForSingleResult( + "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); + } + + String getAllConnectionCalls = + "return box.space.request_counters.index.count:select(0, {iterator = box.index.GT})"; + // 15 calls on 3 routers on 2 connection == 15 / 3 == 5 / 2 == 2 or 3 calls per connect + for (TarantoolClient router : Arrays.asList(routerClient1, routerClient2, routerClient3)) { + assertEquals(Arrays.asList(2, 3), getCallCountersPerConnection(getAllConnectionCalls, router)); + } + + // add new router + // put 3306 in topology as router + routerClient1.eval("cartridge = require('cartridge') " + + "replicasets = { " + + " { " + + " alias = 'app-router-fourth', " + + " roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, " + + " join_servers = { { uri = 'localhost:3306' } } " + + " }} " + + "cartridge.admin_edit_topology({ replicasets = replicasets }) ").join(); + + // wait until discovery get topology + Thread.sleep(5_000); + + callCounter = 16; + // 16 / 4 / 2 = 2 requests per connect + for (int i = 0; i < callCounter; i++) { + clusterClient.callForSingleResult( + "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); + } + + for (TarantoolClient router : + Arrays.asList(routerClient1, routerClient2, routerClient3)) { + assertEquals(Arrays.asList(4, 5), getCallCountersPerConnection(getAllConnectionCalls, router)); + } + + Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); + assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection); + + String pid = container.execInContainer("pgrep", "-f", "testapp@fourth-router") + .getStdout().replace("\n", ""); + container.execInContainer("kill", "-9", pid); + // wait until discovery get topology + Thread.sleep(5_000); + + callCounter = 12; + // 12 / 3 / 2 = 2 requests per connect + for (int i = 0; i < callCounter; i++) { + clusterClient.callForSingleResult( + "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); + } + Thread.sleep(5_000); + for (TarantoolClient router : + Arrays.asList(routerClient1, routerClient3)) { + assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router)); + } + routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); + assertEquals(Arrays.asList(4, 4), routerCallCounterPerConnection); + } + + private static TarantoolClient> getSimpleClient(Integer port) { + return TarantoolClientFactory.createClient() + .withAddress(container.getRouterHost(), container.getMappedPort(port)) + .withCredentials(USER_NAME, PASSWORD) + .build(); + } + + private static TarantoolClient> + getTarantoolClusterClientWithDiscovery( + int connections, int delay) { + String host = container.getRouterHost(); + int port = container.getRouterPort(); + + TarantoolCredentials credentials = new SimpleTarantoolCredentials( + USER_NAME, + PASSWORD + ); + TarantoolClientConfig config = TarantoolClientConfig.builder() + .withCredentials(credentials) + .build(); + + BinaryClusterDiscoveryEndpoint endpoint = new BinaryClusterDiscoveryEndpoint.Builder() + .withClientConfig(config) + .withEntryFunction("get_routers") + .withEndpointProvider(() -> Collections.singletonList( + new TarantoolServerAddress( + host, port + ))) + .build(); + + TarantoolClusterDiscoveryConfig clusterDiscoveryConfig = new TarantoolClusterDiscoveryConfig.Builder() + .withDelay(delay) + .withEndpoint(endpoint) + .build(); + + BinaryDiscoveryClusterAddressProvider discoveryProvider = new BinaryDiscoveryClusterAddressProvider( + clusterDiscoveryConfig); + + TarantoolClusterAddressProvider wrapperDiscoveryProvider + = new TestWrappedClusterAddressProvider(discoveryProvider, container); // because we use docker ports + + return TarantoolClientFactory.createClient() + .withAddressProvider(wrapperDiscoveryProvider) + .withCredentials(USER_NAME, PASSWORD) + .withConnections(connections) + .build(); + } + + @NotNull + private static Object getCallCountersPerConnection(String getAllConnectionCalls, TarantoolClient router) { + List luaResponse = router.eval(getAllConnectionCalls).join(); + ArrayList tuples = (ArrayList) luaResponse.get(0); // because lua has multivalue response + + Object routerCallCounterPerConnection = tuples.stream() + .map(item -> ((ArrayList) item).get(1)) + .collect(Collectors.toList()); + return routerCallCounterPerConnection; } @Test diff --git a/src/test/resources/cartridge/app/roles/api_router.lua b/src/test/resources/cartridge/app/roles/api_router.lua index 25f785214..7963a371a 100644 --- a/src/test/resources/cartridge/app/roles/api_router.lua +++ b/src/test/resources/cartridge/app/roles/api_router.lua @@ -1,11 +1,12 @@ local vshard = require('vshard') local cartridge_rpc = require('cartridge.rpc') -local fiber = require('fiber') local crud = require('crud') local uuid = require('uuid') local log = require('log') +local metadata_utils = require('utils.metadata') local crud_utils = require('utils.crud') +local counter = require('modules.counter') local function get_schema() for _, instance_uri in pairs(cartridge_rpc.get_candidates('app.roles.api_storage', { leader_only = true })) do @@ -100,51 +101,6 @@ local function raising_error() error("Test error: raising_error() called") end -local function reset_request_counters() - box.space.request_counters:replace({ 1, 0 }) -end - -local function get_router_name() - return string.sub(box.cfg.custom_proc_title, 9) -end - -local function simple_long_running_function(seconds_to_sleep) - fiber.sleep(seconds_to_sleep) - return true -end - -local function long_running_function(values) - local seconds_to_sleep = 0 - local disabled_router_name = "" - if values ~= nil then - if type(values) == "table" then - values = values or {} - seconds_to_sleep = values[1] - disabled_router_name = values[2] - else - seconds_to_sleep = values - end - end - - -- need using number instead field name as string in update function for compatibility with tarantool 1.10 - box.space.request_counters:update(1, { { '+', 2, 1 } }) - log.info('Executing long-running function ' .. - tostring(box.space.request_counters:get(1)[2]) .. - "(name: " .. disabled_router_name .. - "; sleep: " .. seconds_to_sleep .. ")") - if get_router_name() == disabled_router_name then - return nil, "Disabled by client; router_name = " .. disabled_router_name - end - if seconds_to_sleep then - fiber.sleep(seconds_to_sleep) - end - return true -end - -local function get_request_count() - return box.space.request_counters:get(1)[2] -end - -- it's like vshard error throwing local function box_error_unpack_no_connection() return nil, box.error.new(box.error.NO_CONNECTION):unpack() @@ -202,12 +158,6 @@ local function select_router_space() end local function init_router_spaces() - local request_counters = box.schema.space.create('request_counters', { - format = { { 'id', 'unsigned' }, { 'count', 'unsigned' } }, - if_not_exists = true - }) - request_counters:create_index('id', { parts = { 'id' }, if_not_exists = true }) - local router_space = box.schema.space.create('router_space', { format = { { 'id', 'unsigned' } }, if_not_exists = true @@ -220,6 +170,7 @@ end local function init(opts) if opts.is_master then init_router_spaces() + counter.init_counter_space() end patch_crud_methods_for_tests() @@ -235,11 +186,13 @@ local function init(opts) rawset(_G, 'retrying_function', retrying_function) rawset(_G, 'raising_error', raising_error) - rawset(_G, 'reset_request_counters', reset_request_counters) - rawset(_G, 'get_router_name', get_router_name) - rawset(_G, 'long_running_function', long_running_function) - rawset(_G, 'simple_long_running_function', simple_long_running_function) - rawset(_G, 'get_request_count', get_request_count) + rawset(_G, 'get_router_name', metadata_utils.get_router_name) + + rawset(_G, 'reset_request_counters', counter.reset_request_counters) + rawset(_G, 'simple_long_running_function', counter.simple_long_running_function) + rawset(_G, 'long_running_function', counter.long_running_function) + rawset(_G, 'get_request_count', counter.get_request_count) + rawset(_G, 'box_error_unpack_no_connection', box_error_unpack_no_connection) rawset(_G, 'box_error_unpack_timeout', box_error_unpack_timeout) rawset(_G, 'box_error_timeout', box_error_timeout) diff --git a/src/test/resources/cartridge/app/roles/custom.lua b/src/test/resources/cartridge/app/roles/custom.lua index 680692747..cfd04e6cd 100644 --- a/src/test/resources/cartridge/app/roles/custom.lua +++ b/src/test/resources/cartridge/app/roles/custom.lua @@ -1,5 +1,7 @@ local cartridge = require('cartridge') +local counter = require('modules.counter') + function get_routers() local function table_contains(table, element) for _, value in pairs(table) do @@ -44,10 +46,16 @@ local function init(opts) -- luacheck: no unused args if opts.is_master then box.schema.user.grant('guest', 'read,write', 'universe', nil, { if_not_exists = true }) + counter.init_counter_space() end init_httpd() + rawset(_G, 'reset_request_counters', counter.reset_request_counters) + rawset(_G, 'simple_long_running_function', counter.simple_long_running_function) + rawset(_G, 'long_running_function', counter.long_running_function) + rawset(_G, 'get_request_count', counter.get_request_count) + return true end diff --git a/src/test/resources/cartridge/instances.yml b/src/test/resources/cartridge/instances.yml index 2a802a6dd..42d94ecaf 100644 --- a/src/test/resources/cartridge/instances.yml +++ b/src/test/resources/cartridge/instances.yml @@ -13,6 +13,11 @@ testapp.third-router: advertise_uri: localhost:3303 http_port: 8083 +testapp.fourth-router: + workdir: ./tmp/db_dev/3306 + advertise_uri: localhost:3306 + http_port: 8086 + testapp.s1-storage: workdir: ./tmp/db_dev/3304 advertise_uri: localhost:3304 diff --git a/src/test/resources/cartridge/modules/counter.lua b/src/test/resources/cartridge/modules/counter.lua new file mode 100644 index 000000000..5bcefeccd --- /dev/null +++ b/src/test/resources/cartridge/modules/counter.lua @@ -0,0 +1,78 @@ +local fiber = require('fiber') +local log = require('log') +local metadata_utils = require('utils.metadata') + +local function reset_request_counters() + box.space.request_counters:replace({ 1, 0 }) +end + +local function update_request_counters(with_session_id) + with_session_id = with_session_id or false + -- need using number instead field name as string in update function for compatibility with tarantool 1.10 + if with_session_id then + box.space.request_counters:update(box.session.id(), { { '+', 2, 1 } }) + else + box.space.request_counters:update(1, { { '+', 2, 1 } }) + end +end + +local function get_request_count() + return box.space.request_counters:get(1)[2] +end + +local function simple_long_running_function(seconds_to_sleep, with_session_id) + update_request_counters(with_session_id) + fiber.sleep(seconds_to_sleep) + return true +end + +local function long_running_function(values, with_session_id) + local seconds_to_sleep = 0 + local disabled_router_name = "" + if values ~= nil then + if type(values) == "table" then + values = values or {} + seconds_to_sleep = values[1] + disabled_router_name = values[2] + else + seconds_to_sleep = values + end + end + + update_request_counters(with_session_id) + log.info('Executing long-running function ' .. + tostring(box.space.request_counters:get(1)[2]) .. + "(name: " .. disabled_router_name .. + "; sleep: " .. seconds_to_sleep .. ")") + if metadata_utils.get_router_name() == disabled_router_name then + return nil, "Disabled by client; router_name = " .. disabled_router_name + end + if seconds_to_sleep then + fiber.sleep(seconds_to_sleep) + end + return true +end + +local function reset_request_counters_on_connect() + box.space.request_counters:replace({ box.session.id(), 0 }) +end + +local function init_counter_space() + local request_counters = box.schema.space.create('request_counters', { + format = { { 'id', 'unsigned' }, { 'count', 'unsigned' } }, + if_not_exists = true + }) + request_counters:create_index('id', { parts = { 'id' }, if_not_exists = true }) + request_counters:create_index('count', { parts = { 'count' }, if_not_exists = true, unique = false }) + + + box.session.on_connect(reset_request_counters_on_connect) +end + +return { + reset_request_counters = reset_request_counters, + simple_long_running_function = simple_long_running_function, + long_running_function = long_running_function, + get_request_count = get_request_count, + init_counter_space = init_counter_space, +} diff --git a/src/test/resources/cartridge/utils/metadata.lua b/src/test/resources/cartridge/utils/metadata.lua new file mode 100644 index 000000000..a159bab29 --- /dev/null +++ b/src/test/resources/cartridge/utils/metadata.lua @@ -0,0 +1,7 @@ +local function get_router_name() + return string.sub(box.cfg.custom_proc_title, 9) +end + +return { + get_router_name = get_router_name +} From 4ea0205a0a07da8b387cac3a6e3e6b40f9059f22 Mon Sep 17 00:00:00 2001 From: Artyom Dubinin Date: Thu, 27 Apr 2023 13:57:13 +0300 Subject: [PATCH 2/3] Fix dependencies between tests --- .../CartridgeMixedInstancesContainer.java | 24 +++++++++---------- .../integration/ClusterConnectionIT.java | 10 ++++---- .../integration/ClusterDiscoveryIT.java | 4 ++-- .../driver/integration/ReconnectIT.java | 5 +++- .../integration/SharedCartridgeContainer.java | 2 +- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/src/test/java/io/tarantool/driver/integration/CartridgeMixedInstancesContainer.java b/src/test/java/io/tarantool/driver/integration/CartridgeMixedInstancesContainer.java index 9be006d17..108d1d469 100644 --- a/src/test/java/io/tarantool/driver/integration/CartridgeMixedInstancesContainer.java +++ b/src/test/java/io/tarantool/driver/integration/CartridgeMixedInstancesContainer.java @@ -1,14 +1,14 @@ package io.tarantool.driver.integration; +import java.time.Duration; +import java.util.HashMap; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.TarantoolCartridgeContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.containers.wait.strategy.Wait; -import java.time.Duration; -import java.util.HashMap; - abstract class CartridgeMixedInstancesContainer { private static final Logger logger = LoggerFactory.getLogger(CartridgeMixedInstancesContainer.class); @@ -16,15 +16,15 @@ abstract class CartridgeMixedInstancesContainer { protected static final TarantoolCartridgeContainer container; static { - final HashMap buildArgs = new HashMap<>(); - buildArgs.put("TARANTOOL_INSTANCES_FILE", "./instances_mixed.yml"); - container = new TarantoolCartridgeContainer( - "cartridge/instances_mixed.yml", - "cartridge/topology_mixed.lua", buildArgs) - .withDirectoryBinding("cartridge") - .withLogConsumer(new Slf4jLogConsumer(logger)) - .waitingFor(Wait.forLogMessage(".*Listening HTTP on.*", 3)) - .withStartupTimeout(Duration.ofMinutes(2)); + final HashMap env = new HashMap<>(); + env.put("TARANTOOL_INSTANCES_FILE", "./instances_mixed.yml"); + container = new TarantoolCartridgeContainer("cartridge/instances_mixed.yml", + "cartridge/topology_mixed.lua") + .withDirectoryBinding("cartridge") + .withLogConsumer(new Slf4jLogConsumer(logger)) + .waitingFor(Wait.forLogMessage(".*Listening HTTP on.*", 3)) + .withStartupTimeout(Duration.ofMinutes(2)) + .withEnv(env); } protected static void startCluster() { diff --git a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java index 32414d352..439c4778f 100644 --- a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java +++ b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java @@ -142,9 +142,7 @@ void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections() Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection); - String pid = container.execInContainer("pgrep", "-f", "testapp@fourth-router") - .getStdout().replace("\n", ""); - container.execInContainer("kill", "-9", pid); + stopInstance("fourth-router"); // wait until discovery get topology Thread.sleep(5_000); @@ -156,11 +154,11 @@ void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections() } Thread.sleep(5_000); for (TarantoolClient router : - Arrays.asList(routerClient1, routerClient3)) { + Arrays.asList(routerClient1, routerClient2, routerClient3)) { assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router)); } - routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); - assertEquals(Arrays.asList(4, 4), routerCallCounterPerConnection); + + startCartridge(); } private static TarantoolClient> getSimpleClient(Integer port) { diff --git a/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java b/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java index 17574eb03..208c2d933 100644 --- a/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java +++ b/src/test/java/io/tarantool/driver/integration/ClusterDiscoveryIT.java @@ -44,7 +44,7 @@ public void httpClusterDiscovererTest() throws TarantoolClientException { HTTPDiscoveryClusterAddressProvider addressProvider = getHttpProvider(); Collection nodes = addressProvider.getAddresses(); - assertEquals(nodes.size(), 3); + assertEquals(nodes.size(), 4); Set nodeSet = new HashSet<>(nodes); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI))); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI))); @@ -70,7 +70,7 @@ public void binaryClusterDiscovererTest() { TarantoolClusterAddressProvider addressProvider = getBinaryProvider(); Collection nodes = addressProvider.getAddresses(); - assertEquals(nodes.size(), 3); + assertEquals(nodes.size(), 4); Set nodeSet = new HashSet<>(nodes); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER1_URI))); assertTrue(nodeSet.contains(new TarantoolServerAddress(TEST_ROUTER2_URI))); diff --git a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java index 6b33f3a1b..54d0f0fd0 100644 --- a/src/test/java/io/tarantool/driver/integration/ReconnectIT.java +++ b/src/test/java/io/tarantool/driver/integration/ReconnectIT.java @@ -25,6 +25,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; import java.util.Arrays; import java.util.Collection; @@ -245,6 +246,8 @@ void test_should_closeConnections_ifAddressProviderReturnsNewAddresses() throws // restart routers for resetting connections stopInstances(Arrays.asList("router", "second-router")); startCartridge(); + String status = container.execInContainer("cartridge", "status", "--run-dir=/tmp/run").getStderr(); + assertEquals(6, StringUtils.countMatches(status, "RUNNING")); final TarantoolServerAddress firstAddress = new TarantoolServerAddress(container.getRouterHost(), container.getMappedPort(3301)); @@ -298,7 +301,7 @@ public void setRefreshCallback(Runnable runnable) { numberOfSwitching.incrementAndGet(); runnable.run(); } - }, 500, 100, TimeUnit.MILLISECONDS); + }, 0, 100, TimeUnit.MILLISECONDS); } }).build(); diff --git a/src/test/java/io/tarantool/driver/integration/SharedCartridgeContainer.java b/src/test/java/io/tarantool/driver/integration/SharedCartridgeContainer.java index f3a96040e..a700bb5fd 100644 --- a/src/test/java/io/tarantool/driver/integration/SharedCartridgeContainer.java +++ b/src/test/java/io/tarantool/driver/integration/SharedCartridgeContainer.java @@ -55,6 +55,6 @@ protected static void startInstance(String instanceName) throws IOException, Int } protected static void stopInstance(String instanceName) throws IOException, InterruptedException { - container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", "--data-dir=/tmp/data", instanceName); + container.execInContainer("cartridge", "stop", "--run-dir=/tmp/run", instanceName); } } From c3a874c594e2dfafcb569b0cb63362346d774a5c Mon Sep 17 00:00:00 2001 From: Artyom Dubinin Date: Thu, 27 Apr 2023 17:12:33 +0300 Subject: [PATCH 3/3] Change tests logic split them and make independent --- .../integration/ClusterConnectionIT.java | 237 +++++++++++++----- 1 file changed, 172 insertions(+), 65 deletions(-) diff --git a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java index 439c4778f..f8240df90 100644 --- a/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java +++ b/src/test/java/io/tarantool/driver/integration/ClusterConnectionIT.java @@ -1,6 +1,7 @@ package io.tarantool.driver.integration; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -9,6 +10,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -16,18 +18,24 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.rnorth.ducttape.unreliables.Unreliables; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container; +import org.testcontainers.containers.TarantoolCartridgeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Testcontainers; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import io.tarantool.driver.TarantoolUtils; import io.tarantool.driver.api.TarantoolClient; import io.tarantool.driver.api.TarantoolClientConfig; import io.tarantool.driver.api.TarantoolClientFactory; import io.tarantool.driver.api.TarantoolClusterAddressProvider; import io.tarantool.driver.api.TarantoolResult; import io.tarantool.driver.api.TarantoolServerAddress; +import io.tarantool.driver.api.conditions.Conditions; import io.tarantool.driver.api.connection.TarantoolConnection; import io.tarantool.driver.api.retry.TarantoolRequestRetryPolicies; import io.tarantool.driver.api.tuple.TarantoolTuple; @@ -87,96 +95,192 @@ private RetryingTarantoolTupleClient setupClusterClient( .withDelay(delay).build()); } + // TODO: Add parallel threads + // TODO: A lot of routers + // TODO: Parallel round robin and default as test parameter @Test void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections() - throws ExecutionException, InterruptedException, IOException { - - TarantoolClient> clusterClient = - getTarantoolClusterClientWithDiscovery(2, 5_000); + throws ExecutionException, InterruptedException { - TarantoolClient> routerClient1 = getSimpleClient(3301); - TarantoolClient> routerClient2 = getSimpleClient(3302); - TarantoolClient> routerClient3 = getSimpleClient(3303); - // 3306 isn't in cluster topology yet - TarantoolClient> routerClient4 = getSimpleClient(3306); + TarantoolCartridgeContainer testContainer = runIndependentContainer(); - int callCounter = 15; - for (int i = 0; i < callCounter; i++) { + TarantoolClient> clusterClient = + getTarantoolClusterClientWithDiscovery(testContainer, 2, 2_000); + + TarantoolClient> routerClient1 = getSimpleClient(testContainer, + 3301); + TarantoolClient> routerClient2 = getSimpleClient(testContainer, + 3302); + TarantoolClient> routerClient3 = getSimpleClient(testContainer, + 3303); + + int batch = 1500; + int batchPerConnect = + batch / 3 / 2; // 1_500 calls on 3 routers on 2 connection == 1_500 / 6 == 250 calls per connect + for (int i = 0; i < batch; i++) { clusterClient.callForSingleResult( "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); } - String getAllConnectionCalls = - "return box.space.request_counters.index.count:select(0, {iterator = box.index.GT})"; - // 15 calls on 3 routers on 2 connection == 15 / 3 == 5 / 2 == 2 or 3 calls per connect - for (TarantoolClient router : Arrays.asList(routerClient1, routerClient2, routerClient3)) { - assertEquals(Arrays.asList(2, 3), getCallCountersPerConnection(getAllConnectionCalls, router)); + for (TarantoolClient> router : Arrays.asList(routerClient1, + routerClient2, + routerClient3)) { + assertEquals(Arrays.asList(batchPerConnect, batchPerConnect), getCallCountersPerConnection(router)); } + } + + @Test + void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections_andRouterJoining() + throws ExecutionException, InterruptedException { + + TarantoolCartridgeContainer testContainer = runIndependentContainer(); + + TarantoolClient> clusterClient = + getTarantoolClusterClientWithDiscovery(testContainer, 2, 2_000); + + TarantoolClient> routerClient1 = getSimpleClient(testContainer, + 3301); + TarantoolClient> routerClient2 = getSimpleClient(testContainer, + 3302); + TarantoolClient> routerClient3 = getSimpleClient(testContainer, + 3303); + // 3306 is not in cluster topology yet + TarantoolClient> routerClient4 = getSimpleClient(testContainer, + 3306); // add new router // put 3306 in topology as router routerClient1.eval("cartridge = require('cartridge') " + - "replicasets = { " + - " { " + - " alias = 'app-router-fourth', " + - " roles = { 'vshard-router', 'app.roles.custom', 'app.roles.api_router' }, " + - " join_servers = { { uri = 'localhost:3306' } } " + - " }} " + - "cartridge.admin_edit_topology({ replicasets = replicasets }) ").join(); - - // wait until discovery get topology - Thread.sleep(5_000); - - callCounter = 16; - // 16 / 4 / 2 = 2 requests per connect - for (int i = 0; i < callCounter; i++) { + "replicasets = { " + + " { " + + " alias = 'app-router-fourth', " + + " roles = { 'vshard-router', 'app.roles.custom', 'app.roles" + + ".api_router' }, " + + " join_servers = { { uri = 'localhost:3306' } } " + + " }} " + + "cartridge.admin_edit_topology({ replicasets = replicasets }) ").join(); + + int batch = 2000; + int batchPerConnect = + batch / 4 / 2; // 2_000 calls on 4 routers on 2 connection == 2_000 / 8 == 250 calls per connect + for (int i = 0; i < batch; i++) { clusterClient.callForSingleResult( "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); } - for (TarantoolClient router : + AtomicInteger sum = new AtomicInteger(); + for (TarantoolClient> router : Arrays.asList(routerClient1, routerClient2, routerClient3)) { - assertEquals(Arrays.asList(4, 5), getCallCountersPerConnection(getAllConnectionCalls, router)); + assertEquals(2, getCallCountersPerConnection(router).stream().filter(cnt -> { + sum.addAndGet(cnt); + return cnt > batchPerConnect; // because forth router was in starting stage some time + }).count()); } + assertEquals(2, getCallCountersPerConnection(routerClient4).stream().filter(cnt -> { + sum.addAndGet(cnt); + return 0 < cnt && cnt < batchPerConnect; + }).count()); + assertEquals(batch, sum.get()); + } + + @Test + void test_roundRobin_shouldWorkCorrectly_withDiscoveryAndConnections_andRouterDying() + throws ExecutionException, InterruptedException, IOException { - Object routerCallCounterPerConnection = getCallCountersPerConnection(getAllConnectionCalls, routerClient4); - assertEquals(Arrays.asList(2, 2), routerCallCounterPerConnection); + TarantoolCartridgeContainer testContainer = runIndependentContainer(); + + TarantoolClient> clusterClient = + getTarantoolClusterClientWithDiscovery(testContainer, 2, 2_000); + + TarantoolClient> routerClient1 = getSimpleClient(testContainer, + 3301); + TarantoolClient> routerClient2 = getSimpleClient(testContainer, + 3302); + TarantoolClient> routerClient3 = getSimpleClient(testContainer, + 3303); + // 3306 is not in cluster topology yet + TarantoolClient> routerClient4 = getSimpleClient(testContainer, + 3306); + + // add new router + // put 3306 in topology as router + routerClient1.eval("cartridge = require('cartridge') " + + "replicasets = { " + + " { " + + " alias = 'app-router-fourth', " + + " roles = { 'vshard-router', 'app.roles.custom', 'app.roles" + + ".api_router' }, " + + " join_servers = { { uri = 'localhost:3306' } } " + + " }} " + + "cartridge.admin_edit_topology({ replicasets = replicasets }) ").join(); + + String healthyCmd = "return cartridge.is_healthy()"; + + TarantoolUtils.retry(() -> { + try { + assertEquals(true, testContainer.executeCommand(healthyCmd).get().get(0)); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); stopInstance("fourth-router"); - // wait until discovery get topology - Thread.sleep(5_000); - callCounter = 12; - // 12 / 3 / 2 = 2 requests per connect - for (int i = 0; i < callCounter; i++) { + int batch = 2000; + int batchPerConnect = + batch / 4 / 2; // 2_000 calls on 4 routers on 2 connection == 2_000 / 8 == 250 calls per connect + for (int i = 0; i < batch; i++) { clusterClient.callForSingleResult( "simple_long_running_function", Arrays.asList(0, true), Boolean.class).get(); } - Thread.sleep(5_000); - for (TarantoolClient router : + + AtomicInteger sum = new AtomicInteger(); + for (TarantoolClient> router : Arrays.asList(routerClient1, routerClient2, routerClient3)) { - assertEquals(Arrays.asList(6, 7), getCallCountersPerConnection(getAllConnectionCalls, router)); + assertEquals(2, getCallCountersPerConnection(router).stream().filter(cnt -> { + sum.addAndGet(cnt); + return cnt > batchPerConnect; + }).count()); } + assertEquals(2, getCallCountersPerConnection(routerClient4).stream().filter(cnt -> { + sum.addAndGet(cnt); + return 0 < cnt && cnt < batchPerConnect; + }).count()); + assertEquals(batch, sum.get()); + } - startCartridge(); + private static TarantoolCartridgeContainer runIndependentContainer() { + TarantoolCartridgeContainer container = + new TarantoolCartridgeContainer( + "cartridge/instances.yml", + "cartridge/topology.lua") + .withDirectoryBinding("cartridge") + .withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(ClusterConnectionIT.class))) + .waitingFor(Wait.forLogMessage(".*Listening HTTP on.*", 4)) + .withStartupTimeout(Duration.ofMinutes(2)); + container.start(); + return container; } - private static TarantoolClient> getSimpleClient(Integer port) { + private static TarantoolClient> getSimpleClient( + TarantoolCartridgeContainer testContainer, Integer port) { return TarantoolClientFactory.createClient() - .withAddress(container.getRouterHost(), container.getMappedPort(port)) - .withCredentials(USER_NAME, PASSWORD) - .build(); + .withAddress(testContainer.getRouterHost(), testContainer.getMappedPort(port)) + .withCredentials(testContainer.getUsername(), testContainer.getPassword()) + .build(); } private static TarantoolClient> - getTarantoolClusterClientWithDiscovery( + getTarantoolClusterClientWithDiscovery(TarantoolCartridgeContainer testContainer, int connections, int delay) { - String host = container.getRouterHost(); - int port = container.getRouterPort(); + String host = testContainer.getRouterHost(); + int port = testContainer.getRouterPort(); + String username = testContainer.getUsername(); + String password = testContainer.getPassword(); TarantoolCredentials credentials = new SimpleTarantoolCredentials( - USER_NAME, - PASSWORD + username, + password ); TarantoolClientConfig config = TarantoolClientConfig.builder() .withCredentials(credentials) @@ -200,24 +304,27 @@ private static TarantoolClient> clusterDiscoveryConfig); TarantoolClusterAddressProvider wrapperDiscoveryProvider - = new TestWrappedClusterAddressProvider(discoveryProvider, container); // because we use docker ports + = new TestWrappedClusterAddressProvider(discoveryProvider, testContainer); // because we use docker ports return TarantoolClientFactory.createClient() - .withAddressProvider(wrapperDiscoveryProvider) - .withCredentials(USER_NAME, PASSWORD) - .withConnections(connections) - .build(); + .withAddressProvider(wrapperDiscoveryProvider) + .withCredentials(username, password) + .withConnections(connections) + .build(); } @NotNull - private static Object getCallCountersPerConnection(String getAllConnectionCalls, TarantoolClient router) { - List luaResponse = router.eval(getAllConnectionCalls).join(); - ArrayList tuples = (ArrayList) luaResponse.get(0); // because lua has multivalue response - - Object routerCallCounterPerConnection = tuples.stream() - .map(item -> ((ArrayList) item).get(1)) - .collect(Collectors.toList()); - return routerCallCounterPerConnection; + private static List getCallCountersPerConnection( + TarantoolClient> router) + throws ExecutionException, InterruptedException { + TarantoolResult tuples = router.space("request_counters") + .select( + Conditions.indexGreaterThan( + "count", + Collections.singletonList(0))).get(); + return tuples.stream() + .map(tuple -> tuple.getInteger("count")) + .collect(Collectors.toList()); } @Test