This is an automated email from the ASF dual-hosted git repository.

chenBright pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 321c7893 Fix(selective_channel): isolate backup responses and add race 
regression test (#3294)
321c7893 is described below

commit 321c7893e3e74ac25da41bfd458ce28c6586e754
Author: altman08 <[email protected]>
AuthorDate: Wed May 13 16:15:58 2026 +0800

    Fix(selective_channel): isolate backup responses and add race regression 
test (#3294)
---
 src/brpc/selective_channel.cpp | 12 ++++++---
 test/brpc_channel_unittest.cpp | 55 ++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/src/brpc/selective_channel.cpp b/src/brpc/selective_channel.cpp
index a59580e3..8dee4225 100644
--- a/src/brpc/selective_channel.cpp
+++ b/src/brpc/selective_channel.cpp
@@ -419,9 +419,13 @@ void Sender::Clear() {
     if (_main_cntl == NULL) {
         return;
     }
-    delete _alloc_resources[1].response;
-    delete _alloc_resources[1].sub_done;
-    _alloc_resources[1] = Resource();
+    for (int i = 0; i < _nalloc; ++i) {
+        delete _alloc_resources[i].response;
+        if (_alloc_resources[i].sub_done != &_sub_done0) {
+            delete _alloc_resources[i].sub_done;
+        }
+        _alloc_resources[i] = Resource();
+    }
     const CallId cid = _main_cntl->call_id();
     _main_cntl = NULL;
     if (_user_done) {
@@ -434,7 +438,7 @@ inline Resource Sender::PopFree() {
     if (_nfree == 0) {
         if (_nalloc == 0) {
             Resource r;
-            r.response = _response;
+            r.response = _response->New();
             r.sub_done = &_sub_done0;
             _alloc_resources[_nalloc++] = r;
             return r;
diff --git a/test/brpc_channel_unittest.cpp b/test/brpc_channel_unittest.cpp
index b17fc738..db6e2ac7 100644
--- a/test/brpc_channel_unittest.cpp
+++ b/test/brpc_channel_unittest.cpp
@@ -1487,6 +1487,57 @@ protected:
         EXPECT_EQ(cntl.response_attachment().to_string(), "123");
         StopAndJoin();
     }
+
+    void TestBackupRequestSelectiveResponseRace() {
+        ASSERT_EQ(0, StartAccept(_ep));
+
+        const size_t NCHANS = 8;
+        brpc::SelectiveChannel channel;
+        ASSERT_EQ(0, channel.Init("rr", NULL));
+        for (size_t i = 0; i < NCHANS; ++i) {
+            brpc::Channel* subchan = new brpc::Channel;
+            SetUpChannel(subchan, false, false);
+            ASSERT_EQ(0, channel.AddChannel(subchan, NULL)) << "i=" << i;
+        }
+
+        const int kRounds = 150;
+        const int kCodeListSize = 20000;
+        std::atomic<int> call_cnt(0);
+        _svc.SetMockFunc([&call_cnt](google::protobuf::RpcController*,
+                                     const ::test::EchoRequest*,
+                                     ::test::EchoResponse* res,
+                                     google::protobuf::Closure*) {
+            const int seen = call_cnt.fetch_add(1, std::memory_order_relaxed);
+            const bool slow = ((seen & 1) == 0);
+            if (slow) {
+                bthread_usleep(1500);
+            }
+            res->clear_code_list();
+            const int base = slow ? 1000000 : 2000000;
+            for (int i = 0; i < kCodeListSize; ++i) {
+                res->add_code_list(base + i);
+            }
+            res->set_message(slow ? "slow" : "fast");
+        });
+
+        for (int round = 0; round < kRounds; ++round) {
+            brpc::Controller cntl;
+            test::EchoRequest req;
+            test::EchoResponse res;
+            req.set_message(__FUNCTION__);
+            cntl.set_backup_request_ms(1);
+            cntl.set_timeout_ms(3000);
+            CallMethod(&channel, &cntl, &req, &res, true);
+            ASSERT_FALSE(cntl.Failed()) << "round=" << round
+                                        << " err=" << cntl.ErrorText();
+            ASSERT_EQ(kCodeListSize, res.code_list_size()) << "round=" << 
round;
+            ASSERT_TRUE(res.message() == "slow" || res.message() == "fast")
+                << "round=" << round;
+        }
+
+        EXPECT_EQ(kRounds * 2, call_cnt.load(std::memory_order_relaxed));
+        StopAndJoin();
+    }
     
     void TestCloseFD(bool single_server, bool async, bool short_connection) {
         std::cout << " *** single=" << single_server
@@ -2783,6 +2834,10 @@ TEST_F(ChannelTest, backuprequest_selective) {
     }
 }
 
+TEST_F(ChannelTest, backuprequest_selective_response_race) {
+    TestBackupRequestSelectiveResponseRace();
+}
+
 TEST_F(ChannelTest, close_fd) {
     for (int i = 0; i <= 1; ++i) { // Flag SingleServer 
         for (int j = 0; j <= 1; ++j) { // Flag Asynchronous


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to