[k2] rpc client boost#1635
Conversation
156e9ec to
222adae
Compare
| // prepare and send RPC request | ||
| // 'request_buf' will look like this: | ||
| // [ RpcExtraHeaders (optional) ] [ payload ] | ||
| if (const auto& [opt_new_extra_header, cur_extra_header_size]{kphp::rpc::regularize_extra_headers(tl_storer.view(), ignore_answer)}; opt_new_extra_header) { |
There was a problem hiding this comment.
В новом коде нет вызова regularize_extra_headers(...)
| const auto it_awaiter_task{rpc_client_instance_st.response_awaiter_tasks.find(request_id)}; | ||
| if (it_awaiter_task == rpc_client_instance_st.response_awaiter_tasks.end()) [[unlikely]] { | ||
| kphp::log::warning("could not find rpc query with id {} in pending queries", queue_id); | ||
| const auto it_rpc_request_info{rpc_client_instance_st.rpc_requests_infos.find(request_id)}; |
There was a problem hiding this comment.
кажется, это не будет работать для ignore_answer запросов (если для них, вообще, вызывается rpc_queue_push)
| } | ||
|
|
||
| /** @kphp-extern-func-info interruptible */ | ||
| /** @kphp-extern-func-info */ |
There was a problem hiding this comment.
We can completely remove this now
|
|
||
| /** | ||
| * Try to send rpc request. If case of success write descriptor of rpc request to `rpc_d`, otherwise return `errno` != 0. | ||
| * which should be later used to call `k2_rpc_fetch_response`. |
| * `EAI_MEMORY` => max descriptors count achieved | ||
| * `EINVAL` => invalid `actor_name` or request, or connection pool is empty for this actor. | ||
| */ | ||
| int32_t k2_rpc_send_request(const char* actor_name, size_t actor_name_len, const void* request_ptr, size_t request_size, uint64_t* rpc_d); |
There was a problem hiding this comment.
- Maybe rename to
k2_rpc_send? - Do we really need
actor_name? Can't we just use something likeint64_t actor_id? - Shoudn't we consider multiple RPC implementations here, e.g.
TL RPC,gRPCetc?
|
|
||
| /** | ||
| * Get response size for corresponding request of this `rpc_d`. Write 0 to `response_size` if response is not ready. | ||
| * Write response size value to `response_size` when response is ready. |
There was a problem hiding this comment.
when response is ready => if response is ready
| * | ||
| * @return return `0` on success. libc-like `errno` otherwise | ||
| * | ||
| * `EINVAL` => invalid `rpc_d` descriptor, for example, it is unknown descriptor, or not rpc descriptor. |
There was a problem hiding this comment.
Let's add something like * Possible errno values:
| int32_t k2_rpc_get_response_size(uint64_t rpc_d, size_t* response_size); | ||
|
|
||
| /** | ||
| * Write response for corresponding request of this `rpc_d` to `buf`. Does nothing if response is not ready. |
There was a problem hiding this comment.
IMO it's a bad design choice to have this function do nothing in case a response is not ready yet. We need to indicate this case explicitly
|
|
||
| namespace { | ||
|
|
||
| static std::expected<string, std::pair<int, string>> get_rpc_response(int64_t query_id, k2::descriptor rpc_d, bool collect_responses_extra_info) { |
There was a problem hiding this comment.
- We use sized integral types in
runtime-light - We definitely need an RAII-like type for the RPC descriptor to prevent leaks that cause the instance to hang.
- No need for
staticin anonymous namespace inside acppfile
| if (!first_response_size) { | ||
| return std::unexpected{std::make_pair(TL_ERROR_INTERNAL, string{"error fetching rpc response"})}; | ||
| } | ||
| string response{reinterpret_cast<char*>(k2::alloc(*first_response_size)), static_cast<string::size_type>(*first_response_size)}; |
There was a problem hiding this comment.
Looks like we can have a single buffer for RPC response to reuse
| std::chrono::nanoseconds now_ns{now_instant.time_point_ns}; | ||
| std::chrono::nanoseconds timeout{rpc_request_info.deadline - now_ns}; | ||
| kphp::coro::io_scheduler& m_scheduler{kphp::coro::io_scheduler::get()}; | ||
| switch (co_await m_scheduler.poll(rpc_request_info.rpc_d, kphp::coro::poll_op::read, timeout)) { |
There was a problem hiding this comment.
It's quite hard to understand what's going on here. Can you please make it more human readable? For example, I think we don't need to directly use the scheduler's API here
| k2::TimePoint now_instant{}; | ||
| k2::instant(std::addressof(now_instant)); | ||
| std::chrono::nanoseconds now_ns{now_instant.time_point_ns}; | ||
| std::chrono::nanoseconds timeout{rpc_request_info.deadline - now_ns}; |
There was a problem hiding this comment.
What will happen if nobody will fetch response? I'm afraid we might hang in this case
No description provided.