include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp

81.0% Lines (337/416) 93.3% Functions (28/30)
include/boost/corosio/native/detail/epoll/epoll_socket_service.hpp
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/epoll/epoll_socket.hpp>
22 #include <boost/corosio/native/detail/epoll/epoll_scheduler.hpp>
23
24 #include <boost/corosio/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/make_err.hpp>
26 #include <boost/corosio/detail/dispatch_coro.hpp>
27 #include <boost/corosio/detail/except.hpp>
28 #include <boost/capy/buffers.hpp>
29
30 #include <coroutine>
31 #include <mutex>
32 #include <unordered_map>
33 #include <utility>
34
35 #include <errno.h>
36 #include <netinet/in.h>
37 #include <netinet/tcp.h>
38 #include <sys/epoll.h>
39 #include <sys/socket.h>
40 #include <unistd.h>
41
42 /*
43 epoll Socket Implementation
44 ===========================
45
46 Each I/O operation follows the same pattern:
47 1. Try the syscall immediately (non-blocking socket)
48 2. If it succeeds or fails with a real error, post to completion queue
49 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
50
51 This "try first" approach avoids unnecessary epoll round-trips for
52 operations that can complete immediately (common for small reads/writes
53 on fast local connections).
54
55 One-Shot Registration
56 ---------------------
57 We use one-shot epoll registration: each operation registers, waits for
58 one event, then unregisters. This simplifies the state machine since we
59 don't need to track whether an fd is currently registered or handle
60 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
61 simplicity is worth it.
62
63 Cancellation
64 ------------
65 See op.hpp for the completion/cancellation race handling via the
66 `registered` atomic. cancel() must complete pending operations (post
67 them with cancelled flag) so coroutines waiting on them can resume.
68 close_socket() calls cancel() first to ensure this.
69
70 Impl Lifetime with shared_ptr
71 -----------------------------
72 Socket impls use enable_shared_from_this. The service owns impls via
73 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
74 removal. When a user calls close(), we call cancel() which posts pending
75 ops to the scheduler.
76
77 CRITICAL: The posted ops must keep the impl alive until they complete.
78 Otherwise the scheduler would process a freed op (use-after-free). The
79 cancel() method captures shared_from_this() into op.impl_ptr before
80 posting. When the op completes, impl_ptr is cleared, allowing the impl
81 to be destroyed if no other references exist.
82
83 Service Ownership
84 -----------------
85 epoll_socket_service owns all socket impls. destroy_impl() removes the
86 shared_ptr from the map, but the impl may survive if ops still hold
87 impl_ptr refs. shutdown() closes all sockets and clears the map; any
88 in-flight ops will complete and release their refs.
89 */
90
91 namespace boost::corosio::detail {
92
93 /** State for epoll socket service. */
94 class epoll_socket_state
95 {
96 public:
97 239 explicit epoll_socket_state(epoll_scheduler& sched) noexcept : sched_(sched)
98 {
99 239 }
100
101 epoll_scheduler& sched_;
102 std::mutex mutex_;
103 intrusive_list<epoll_socket> socket_list_;
104 std::unordered_map<epoll_socket*, std::shared_ptr<epoll_socket>>
105 socket_ptrs_;
106 };
107
108 /** epoll socket service implementation.
109
110 Inherits from socket_service to enable runtime polymorphism.
111 Uses key_type = socket_service for service lookup.
112 */
113 class BOOST_COROSIO_DECL epoll_socket_service final : public socket_service
114 {
115 public:
116 explicit epoll_socket_service(capy::execution_context& ctx);
117 ~epoll_socket_service() override;
118
119 epoll_socket_service(epoll_socket_service const&) = delete;
120 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
121
122 void shutdown() override;
123
124 io_object::implementation* construct() override;
125 void destroy(io_object::implementation*) override;
126 void close(io_object::handle&) override;
127 std::error_code
128 open_socket(tcp_socket::implementation& impl,
129 int family, int type, int protocol) override;
130
131 337623 epoll_scheduler& scheduler() const noexcept
132 {
133 337623 return state_->sched_;
134 }
135 void post(epoll_op* op);
136 void work_started() noexcept;
137 void work_finished() noexcept;
138
139 private:
140 std::unique_ptr<epoll_socket_state> state_;
141 };
142
143 //--------------------------------------------------------------------------
144 //
145 // Implementation
146 //
147 //--------------------------------------------------------------------------
148
149 // Register an op with the reactor, handling cached edge events.
150 // Called under the EAGAIN/EINPROGRESS path when speculative I/O failed.
151 inline void
152 4813 epoll_socket::register_op(
153 epoll_op& op,
154 epoll_op*& desc_slot,
155 bool& ready_flag,
156 bool& cancel_flag) noexcept
157 {
158 4813 svc_.work_started();
159
160 4813 std::lock_guard lock(desc_state_.mutex);
161 4813 bool io_done = false;
162 4813 if (ready_flag)
163 {
164 142 ready_flag = false;
165 142 op.perform_io();
166 142 io_done = (op.errn != EAGAIN && op.errn != EWOULDBLOCK);
167 142 if (!io_done)
168 142 op.errn = 0;
169 }
170
171 4813 if (cancel_flag)
172 {
173 95 cancel_flag = false;
174 95 op.cancelled.store(true, std::memory_order_relaxed);
175 }
176
177 4813 if (io_done || op.cancelled.load(std::memory_order_acquire))
178 {
179 95 svc_.post(&op);
180 95 svc_.work_finished();
181 }
182 else
183 {
184 4718 desc_slot = &op;
185 }
186 4813 }
187
188 inline void
189 105 epoll_op::canceller::operator()() const noexcept
190 {
191 105 op->cancel();
192 105 }
193
194 inline void
195 epoll_connect_op::cancel() noexcept
196 {
197 if (socket_impl_)
198 socket_impl_->cancel_single_op(*this);
199 else
200 request_cancel();
201 }
202
203 inline void
204 99 epoll_read_op::cancel() noexcept
205 {
206 99 if (socket_impl_)
207 99 socket_impl_->cancel_single_op(*this);
208 else
209 request_cancel();
210 99 }
211
212 inline void
213 epoll_write_op::cancel() noexcept
214 {
215 if (socket_impl_)
216 socket_impl_->cancel_single_op(*this);
217 else
218 request_cancel();
219 }
220
221 inline void
222 52532 epoll_op::operator()()
223 {
224 52532 stop_cb.reset();
225
226 52532 socket_impl_->svc_.scheduler().reset_inline_budget();
227
228 52532 if (cancelled.load(std::memory_order_acquire))
229 206 *ec_out = capy::error::canceled;
230 52326 else if (errn != 0)
231 *ec_out = make_err(errn);
232 52326 else if (is_read_operation() && bytes_transferred == 0)
233 *ec_out = capy::error::eof;
234 else
235 52326 *ec_out = {};
236
237 52532 *bytes_out = bytes_transferred;
238
239 // Move to stack before resuming coroutine. The coroutine might close
240 // the socket, releasing the last wrapper ref. If impl_ptr were the
241 // last ref and we destroyed it while still in operator(), we'd have
242 // use-after-free. Moving to local ensures destruction happens at
243 // function exit, after all member accesses are complete.
244 52532 capy::executor_ref saved_ex(ex);
245 52532 std::coroutine_handle<> saved_h(h);
246 52532 auto prevent_premature_destruction = std::move(impl_ptr);
247 52532 dispatch_coro(saved_ex, saved_h).resume();
248 52532 }
249
250 inline void
251 4611 epoll_connect_op::operator()()
252 {
253 4611 stop_cb.reset();
254
255 4611 socket_impl_->svc_.scheduler().reset_inline_budget();
256
257 4611 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
258
259 // Cache endpoints on successful connect
260 4611 if (success && socket_impl_)
261 {
262 4609 endpoint local_ep;
263 4609 sockaddr_storage local_storage{};
264 4609 socklen_t local_len = sizeof(local_storage);
265 4609 if (::getsockname(
266 fd, reinterpret_cast<sockaddr*>(&local_storage),
267 4609 &local_len) == 0)
268 4609 local_ep = from_sockaddr(local_storage);
269 4609 static_cast<epoll_socket*>(socket_impl_)
270 4609 ->set_endpoints(local_ep, target_endpoint);
271 }
272
273 4611 if (cancelled.load(std::memory_order_acquire))
274 *ec_out = capy::error::canceled;
275 4611 else if (errn != 0)
276 2 *ec_out = make_err(errn);
277 else
278 4609 *ec_out = {};
279
280 // Move to stack before resuming. See epoll_op::operator()() for rationale.
281 4611 capy::executor_ref saved_ex(ex);
282 4611 std::coroutine_handle<> saved_h(h);
283 4611 auto prevent_premature_destruction = std::move(impl_ptr);
284 4611 dispatch_coro(saved_ex, saved_h).resume();
285 4611 }
286
287 13890 inline epoll_socket::epoll_socket(epoll_socket_service& svc) noexcept
288 13890 : svc_(svc)
289 {
290 13890 }
291
292 13890 inline epoll_socket::~epoll_socket() = default;
293
294 inline std::coroutine_handle<>
295 4611 epoll_socket::connect(
296 std::coroutine_handle<> h,
297 capy::executor_ref ex,
298 endpoint ep,
299 std::stop_token token,
300 std::error_code* ec)
301 {
302 4611 auto& op = conn_;
303
304 4611 sockaddr_storage storage{};
305 socklen_t addrlen =
306 4611 detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
307 int result =
308 4611 ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
309
310 4611 if (result == 0)
311 {
312 sockaddr_storage local_storage{};
313 socklen_t local_len = sizeof(local_storage);
314 if (::getsockname(
315 fd_, reinterpret_cast<sockaddr*>(&local_storage),
316 &local_len) == 0)
317 local_endpoint_ = detail::from_sockaddr(local_storage);
318 remote_endpoint_ = ep;
319 }
320
321 4611 if (result == 0 || errno != EINPROGRESS)
322 {
323 int err = (result < 0) ? errno : 0;
324 if (svc_.scheduler().try_consume_inline_budget())
325 {
326 *ec = err ? make_err(err) : std::error_code{};
327 return dispatch_coro(ex, h);
328 }
329 op.reset();
330 op.h = h;
331 op.ex = ex;
332 op.ec_out = ec;
333 op.fd = fd_;
334 op.target_endpoint = ep;
335 op.start(token, this);
336 op.impl_ptr = shared_from_this();
337 op.complete(err, 0);
338 svc_.post(&op);
339 return std::noop_coroutine();
340 }
341
342 // EINPROGRESS — register with reactor
343 4611 op.reset();
344 4611 op.h = h;
345 4611 op.ex = ex;
346 4611 op.ec_out = ec;
347 4611 op.fd = fd_;
348 4611 op.target_endpoint = ep;
349 4611 op.start(token, this);
350 4611 op.impl_ptr = shared_from_this();
351
352 4611 register_op(
353 4611 op, desc_state_.connect_op, desc_state_.write_ready,
354 4611 desc_state_.connect_cancel_pending);
355 4611 return std::noop_coroutine();
356 }
357
358 inline std::coroutine_handle<>
359 131207 epoll_socket::read_some(
360 std::coroutine_handle<> h,
361 capy::executor_ref ex,
362 io_buffer_param param,
363 std::stop_token token,
364 std::error_code* ec,
365 std::size_t* bytes_out)
366 {
367 131207 auto& op = rd_;
368 131207 op.reset();
369
370 131207 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
371 131207 op.iovec_count =
372 131207 static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
373
374 131207 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
375 {
376 1 op.empty_buffer_read = true;
377 1 op.h = h;
378 1 op.ex = ex;
379 1 op.ec_out = ec;
380 1 op.bytes_out = bytes_out;
381 1 op.start(token, this);
382 1 op.impl_ptr = shared_from_this();
383 1 op.complete(0, 0);
384 1 svc_.post(&op);
385 1 return std::noop_coroutine();
386 }
387
388 262412 for (int i = 0; i < op.iovec_count; ++i)
389 {
390 131206 op.iovecs[i].iov_base = bufs[i].data();
391 131206 op.iovecs[i].iov_len = bufs[i].size();
392 }
393
394 // Speculative read
395 ssize_t n;
396 do
397 {
398 131206 n = ::readv(fd_, op.iovecs, op.iovec_count);
399 }
400 131206 while (n < 0 && errno == EINTR);
401
402 131206 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
403 {
404 131004 int err = (n < 0) ? errno : 0;
405 131004 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
406
407 131004 if (svc_.scheduler().try_consume_inline_budget())
408 {
409 104849 if (err)
410 *ec = make_err(err);
411 104849 else if (n == 0)
412 5 *ec = capy::error::eof;
413 else
414 104844 *ec = {};
415 104849 *bytes_out = bytes;
416 104849 return dispatch_coro(ex, h);
417 }
418 26155 op.h = h;
419 26155 op.ex = ex;
420 26155 op.ec_out = ec;
421 26155 op.bytes_out = bytes_out;
422 26155 op.start(token, this);
423 26155 op.impl_ptr = shared_from_this();
424 26155 op.complete(err, bytes);
425 26155 svc_.post(&op);
426 26155 return std::noop_coroutine();
427 }
428
429 // EAGAIN — register with reactor
430 202 op.h = h;
431 202 op.ex = ex;
432 202 op.ec_out = ec;
433 202 op.bytes_out = bytes_out;
434 202 op.fd = fd_;
435 202 op.start(token, this);
436 202 op.impl_ptr = shared_from_this();
437
438 202 register_op(
439 202 op, desc_state_.read_op, desc_state_.read_ready,
440 202 desc_state_.read_cancel_pending);
441 202 return std::noop_coroutine();
442 }
443
444 inline std::coroutine_handle<>
445 131007 epoll_socket::write_some(
446 std::coroutine_handle<> h,
447 capy::executor_ref ex,
448 io_buffer_param param,
449 std::stop_token token,
450 std::error_code* ec,
451 std::size_t* bytes_out)
452 {
453 131007 auto& op = wr_;
454 131007 op.reset();
455
456 131007 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
457 131007 op.iovec_count =
458 131007 static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
459
460 131007 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
461 {
462 1 op.h = h;
463 1 op.ex = ex;
464 1 op.ec_out = ec;
465 1 op.bytes_out = bytes_out;
466 1 op.start(token, this);
467 1 op.impl_ptr = shared_from_this();
468 1 op.complete(0, 0);
469 1 svc_.post(&op);
470 1 return std::noop_coroutine();
471 }
472
473 262012 for (int i = 0; i < op.iovec_count; ++i)
474 {
475 131006 op.iovecs[i].iov_base = bufs[i].data();
476 131006 op.iovecs[i].iov_len = bufs[i].size();
477 }
478
479 // Speculative write
480 131006 msghdr msg{};
481 131006 msg.msg_iov = op.iovecs;
482 131006 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
483
484 ssize_t n;
485 do
486 {
487 131006 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
488 }
489 131006 while (n < 0 && errno == EINTR);
490
491 131006 if (n >= 0 || (errno != EAGAIN && errno != EWOULDBLOCK))
492 {
493 131006 int err = (n < 0) ? errno : 0;
494 131006 auto bytes = (n > 0) ? static_cast<std::size_t>(n) : std::size_t(0);
495
496 131006 if (svc_.scheduler().try_consume_inline_budget())
497 {
498 104833 *ec = err ? make_err(err) : std::error_code{};
499 104833 *bytes_out = bytes;
500 104833 return dispatch_coro(ex, h);
501 }
502 26173 op.h = h;
503 26173 op.ex = ex;
504 26173 op.ec_out = ec;
505 26173 op.bytes_out = bytes_out;
506 26173 op.start(token, this);
507 26173 op.impl_ptr = shared_from_this();
508 26173 op.complete(err, bytes);
509 26173 svc_.post(&op);
510 26173 return std::noop_coroutine();
511 }
512
513 // EAGAIN — register with reactor
514 op.h = h;
515 op.ex = ex;
516 op.ec_out = ec;
517 op.bytes_out = bytes_out;
518 op.fd = fd_;
519 op.start(token, this);
520 op.impl_ptr = shared_from_this();
521
522 register_op(
523 op, desc_state_.write_op, desc_state_.write_ready,
524 desc_state_.write_cancel_pending);
525 return std::noop_coroutine();
526 }
527
528 inline std::error_code
529 3 epoll_socket::shutdown(tcp_socket::shutdown_type what) noexcept
530 {
531 int how;
532 3 switch (what)
533 {
534 1 case tcp_socket::shutdown_receive:
535 1 how = SHUT_RD;
536 1 break;
537 1 case tcp_socket::shutdown_send:
538 1 how = SHUT_WR;
539 1 break;
540 1 case tcp_socket::shutdown_both:
541 1 how = SHUT_RDWR;
542 1 break;
543 default:
544 return make_err(EINVAL);
545 }
546 3 if (::shutdown(fd_, how) != 0)
547 return make_err(errno);
548 3 return {};
549 }
550
551 inline std::error_code
552 32 epoll_socket::set_option(
553 int level, int optname,
554 void const* data, std::size_t size) noexcept
555 {
556 32 if (::setsockopt(fd_, level, optname, data,
557 32 static_cast<socklen_t>(size)) != 0)
558 return make_err(errno);
559 32 return {};
560 }
561
562 inline std::error_code
563 31 epoll_socket::get_option(
564 int level, int optname,
565 void* data, std::size_t* size) const noexcept
566 {
567 31 socklen_t len = static_cast<socklen_t>(*size);
568 31 if (::getsockopt(fd_, level, optname, data, &len) != 0)
569 return make_err(errno);
570 31 *size = static_cast<std::size_t>(len);
571 31 return {};
572 }
573
574 inline void
575 187 epoll_socket::cancel() noexcept
576 {
577 187 auto self = weak_from_this().lock();
578 187 if (!self)
579 return;
580
581 187 conn_.request_cancel();
582 187 rd_.request_cancel();
583 187 wr_.request_cancel();
584
585 187 epoll_op* conn_claimed = nullptr;
586 187 epoll_op* rd_claimed = nullptr;
587 187 epoll_op* wr_claimed = nullptr;
588 {
589 187 std::lock_guard lock(desc_state_.mutex);
590 187 if (desc_state_.connect_op == &conn_)
591 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
592 else
593 187 desc_state_.connect_cancel_pending = true;
594 187 if (desc_state_.read_op == &rd_)
595 3 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
596 else
597 184 desc_state_.read_cancel_pending = true;
598 187 if (desc_state_.write_op == &wr_)
599 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
600 else
601 187 desc_state_.write_cancel_pending = true;
602 187 }
603
604 187 if (conn_claimed)
605 {
606 conn_.impl_ptr = self;
607 svc_.post(&conn_);
608 svc_.work_finished();
609 }
610 187 if (rd_claimed)
611 {
612 3 rd_.impl_ptr = self;
613 3 svc_.post(&rd_);
614 3 svc_.work_finished();
615 }
616 187 if (wr_claimed)
617 {
618 wr_.impl_ptr = self;
619 svc_.post(&wr_);
620 svc_.work_finished();
621 }
622 187 }
623
624 inline void
625 99 epoll_socket::cancel_single_op(epoll_op& op) noexcept
626 {
627 99 auto self = weak_from_this().lock();
628 99 if (!self)
629 return;
630
631 99 op.request_cancel();
632
633 99 epoll_op** desc_op_ptr = nullptr;
634 99 if (&op == &conn_)
635 desc_op_ptr = &desc_state_.connect_op;
636 99 else if (&op == &rd_)
637 99 desc_op_ptr = &desc_state_.read_op;
638 else if (&op == &wr_)
639 desc_op_ptr = &desc_state_.write_op;
640
641 99 if (desc_op_ptr)
642 {
643 99 epoll_op* claimed = nullptr;
644 {
645 99 std::lock_guard lock(desc_state_.mutex);
646 99 if (*desc_op_ptr == &op)
647 99 claimed = std::exchange(*desc_op_ptr, nullptr);
648 else if (&op == &conn_)
649 desc_state_.connect_cancel_pending = true;
650 else if (&op == &rd_)
651 desc_state_.read_cancel_pending = true;
652 else if (&op == &wr_)
653 desc_state_.write_cancel_pending = true;
654 99 }
655 99 if (claimed)
656 {
657 99 op.impl_ptr = self;
658 99 svc_.post(&op);
659 99 svc_.work_finished();
660 }
661 }
662 99 }
663
664 inline void
665 41641 epoll_socket::close_socket() noexcept
666 {
667 41641 auto self = weak_from_this().lock();
668 41641 if (self)
669 {
670 41641 conn_.request_cancel();
671 41641 rd_.request_cancel();
672 41641 wr_.request_cancel();
673
674 41641 epoll_op* conn_claimed = nullptr;
675 41641 epoll_op* rd_claimed = nullptr;
676 41641 epoll_op* wr_claimed = nullptr;
677 {
678 41641 std::lock_guard lock(desc_state_.mutex);
679 41641 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
680 41641 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
681 41641 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
682 41641 desc_state_.read_ready = false;
683 41641 desc_state_.write_ready = false;
684 41641 desc_state_.read_cancel_pending = false;
685 41641 desc_state_.write_cancel_pending = false;
686 41641 desc_state_.connect_cancel_pending = false;
687 41641 }
688
689 41641 if (conn_claimed)
690 {
691 conn_.impl_ptr = self;
692 svc_.post(&conn_);
693 svc_.work_finished();
694 }
695 41641 if (rd_claimed)
696 {
697 1 rd_.impl_ptr = self;
698 1 svc_.post(&rd_);
699 1 svc_.work_finished();
700 }
701 41641 if (wr_claimed)
702 {
703 wr_.impl_ptr = self;
704 svc_.post(&wr_);
705 svc_.work_finished();
706 }
707
708 41641 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
709 97 desc_state_.impl_ref_ = self;
710 }
711
712 41641 if (fd_ >= 0)
713 {
714 9235 if (desc_state_.registered_events != 0)
715 9235 svc_.scheduler().deregister_descriptor(fd_);
716 9235 ::close(fd_);
717 9235 fd_ = -1;
718 }
719
720 41641 desc_state_.fd = -1;
721 41641 desc_state_.registered_events = 0;
722
723 41641 local_endpoint_ = endpoint{};
724 41641 remote_endpoint_ = endpoint{};
725 41641 }
726
727 239 inline epoll_socket_service::epoll_socket_service(capy::execution_context& ctx)
728 239 : state_(
729 std::make_unique<epoll_socket_state>(
730 239 ctx.use_service<epoll_scheduler>()))
731 {
732 239 }
733
734 478 inline epoll_socket_service::~epoll_socket_service() {}
735
736 inline void
737 239 epoll_socket_service::shutdown()
738 {
739 239 std::lock_guard lock(state_->mutex_);
740
741 239 while (auto* impl = state_->socket_list_.pop_front())
742 impl->close_socket();
743
744 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
745 // drains completed_ops_, calling destroy() on each queued op. If we
746 // released our shared_ptrs now, an epoll_op::destroy() could free the
747 // last ref to an impl whose embedded descriptor_state is still linked
748 // in the queue — use-after-free on the next pop(). Letting ~state_
749 // release the ptrs (during service destruction, after scheduler
750 // shutdown) keeps every impl alive until all ops have been drained.
751 239 }
752
753 inline io_object::implementation*
754 13890 epoll_socket_service::construct()
755 {
756 13890 auto impl = std::make_shared<epoll_socket>(*this);
757 13890 auto* raw = impl.get();
758
759 {
760 13890 std::lock_guard lock(state_->mutex_);
761 13890 state_->socket_list_.push_back(raw);
762 13890 state_->socket_ptrs_.emplace(raw, std::move(impl));
763 13890 }
764
765 13890 return raw;
766 13890 }
767
768 inline void
769 13890 epoll_socket_service::destroy(io_object::implementation* impl)
770 {
771 13890 auto* epoll_impl = static_cast<epoll_socket*>(impl);
772 13890 epoll_impl->close_socket();
773 13890 std::lock_guard lock(state_->mutex_);
774 13890 state_->socket_list_.remove(epoll_impl);
775 13890 state_->socket_ptrs_.erase(epoll_impl);
776 13890 }
777
778 inline std::error_code
779 4626 epoll_socket_service::open_socket(
780 tcp_socket::implementation& impl,
781 int family, int type, int protocol)
782 {
783 4626 auto* epoll_impl = static_cast<epoll_socket*>(&impl);
784 4626 epoll_impl->close_socket();
785
786 4626 int fd = ::socket(family, type | SOCK_NONBLOCK | SOCK_CLOEXEC, protocol);
787 4626 if (fd < 0)
788 return make_err(errno);
789
790 4626 if (family == AF_INET6)
791 {
792 5 int one = 1;
793 5 ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
794 }
795
796 4626 epoll_impl->fd_ = fd;
797
798 // Register fd with epoll (edge-triggered mode)
799 4626 epoll_impl->desc_state_.fd = fd;
800 {
801 4626 std::lock_guard lock(epoll_impl->desc_state_.mutex);
802 4626 epoll_impl->desc_state_.read_op = nullptr;
803 4626 epoll_impl->desc_state_.write_op = nullptr;
804 4626 epoll_impl->desc_state_.connect_op = nullptr;
805 4626 }
806 4626 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
807
808 4626 return {};
809 }
810
811 inline void
812 23125 epoll_socket_service::close(io_object::handle& h)
813 {
814 23125 static_cast<epoll_socket*>(h.get())->close_socket();
815 23125 }
816
817 inline void
818 52528 epoll_socket_service::post(epoll_op* op)
819 {
820 52528 state_->sched_.post(op);
821 52528 }
822
823 inline void
824 4813 epoll_socket_service::work_started() noexcept
825 {
826 4813 state_->sched_.work_started();
827 4813 }
828
829 inline void
830 198 epoll_socket_service::work_finished() noexcept
831 {
832 198 state_->sched_.work_finished();
833 198 }
834
835 } // namespace boost::corosio::detail
836
837 #endif // BOOST_COROSIO_HAS_EPOLL
838
839 #endif // BOOST_COROSIO_NATIVE_DETAIL_EPOLL_EPOLL_SOCKET_SERVICE_HPP
840