Line data Source code
1 : /*
2 : * Unix SMB/CIFS implementation.
3 : * Samba internal messaging functions
4 : * Copyright (C) 2013 by Volker Lendecke
5 : *
6 : * This program is free software; you can redistribute it and/or modify
7 : * it under the terms of the GNU General Public License as published by
8 : * the Free Software Foundation; either version 3 of the License, or
9 : * (at your option) any later version.
10 : *
11 : * This program is distributed in the hope that it will be useful,
12 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 : * GNU General Public License for more details.
15 : *
16 : * You should have received a copy of the GNU General Public License
17 : * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 : */
19 :
20 : #include "replace.h"
21 : #include "util/util.h"
22 : #include "system/network.h"
23 : #include "system/filesys.h"
24 : #include "system/dir.h"
25 : #include "system/select.h"
26 : #include "lib/util/debug.h"
27 : #include "messages_dgm.h"
28 : #include "lib/util/genrand.h"
29 : #include "lib/util/dlinklist.h"
30 : #include "lib/pthreadpool/pthreadpool_tevent.h"
31 : #include "lib/util/msghdr.h"
32 : #include "lib/util/iov_buf.h"
33 : #include "lib/util/blocking.h"
34 : #include "lib/util/tevent_unix.h"
35 : #include "lib/util/smb_strtox.h"
36 :
37 : #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
38 :
39 : struct sun_path_buf {
40 : /*
41 : * This will carry enough for a socket path
42 : */
43 : char buf[sizeof(struct sockaddr_un)];
44 : };
45 :
46 : /*
47 : * We can only have one tevent_fd per dgm_context and per
48 : * tevent_context. Maintain a list of registered tevent_contexts per
49 : * dgm_context.
50 : */
51 : struct messaging_dgm_fde_ev {
52 : struct messaging_dgm_fde_ev *prev, *next;
53 :
54 : /*
55 : * Backreference to enable DLIST_REMOVE from our
56 : * destructor. Also, set to NULL when the dgm_context dies
57 : * before the messaging_dgm_fde_ev.
58 : */
59 : struct messaging_dgm_context *ctx;
60 :
61 : struct tevent_context *ev;
62 : struct tevent_fd *fde;
63 : };
64 :
65 : struct messaging_dgm_out {
66 : struct messaging_dgm_out *prev, *next;
67 : struct messaging_dgm_context *ctx;
68 :
69 : pid_t pid;
70 : int sock;
71 : bool is_blocking;
72 : uint64_t cookie;
73 :
74 : struct tevent_queue *queue;
75 : struct tevent_timer *idle_timer;
76 : };
77 :
78 : struct messaging_dgm_in_msg {
79 : struct messaging_dgm_in_msg *prev, *next;
80 : struct messaging_dgm_context *ctx;
81 : size_t msglen;
82 : size_t received;
83 : pid_t sender_pid;
84 : int sender_sock;
85 : uint64_t cookie;
86 : uint8_t buf[];
87 : };
88 :
89 : struct messaging_dgm_context {
90 : struct tevent_context *ev;
91 : pid_t pid;
92 : struct sun_path_buf socket_dir;
93 : struct sun_path_buf lockfile_dir;
94 : int lockfile_fd;
95 :
96 : int sock;
97 : struct messaging_dgm_in_msg *in_msgs;
98 :
99 : struct messaging_dgm_fde_ev *fde_evs;
100 : void (*recv_cb)(struct tevent_context *ev,
101 : const uint8_t *msg,
102 : size_t msg_len,
103 : int *fds,
104 : size_t num_fds,
105 : void *private_data);
106 : void *recv_cb_private_data;
107 :
108 : bool *have_dgm_context;
109 :
110 : struct pthreadpool_tevent *pool;
111 : struct messaging_dgm_out *outsocks;
112 : };
113 :
114 : /* Set socket close on exec. */
115 101702 : static int prepare_socket_cloexec(int sock)
116 : {
117 : #ifdef FD_CLOEXEC
118 1191 : int flags;
119 :
120 101702 : flags = fcntl(sock, F_GETFD, 0);
121 101702 : if (flags == -1) {
122 0 : return errno;
123 : }
124 101702 : flags |= FD_CLOEXEC;
125 101702 : if (fcntl(sock, F_SETFD, flags) == -1) {
126 0 : return errno;
127 : }
128 : #endif
129 100511 : return 0;
130 : }
131 :
132 61542 : static void close_fd_array(int *fds, size_t num_fds)
133 : {
134 49361 : size_t i;
135 :
136 61580 : for (i = 0; i < num_fds; i++) {
137 38 : if (fds[i] == -1) {
138 0 : continue;
139 : }
140 :
141 38 : close(fds[i]);
142 38 : fds[i] = -1;
143 : }
144 61542 : }
145 :
146 : /*
147 : * The idle handler can free the struct messaging_dgm_out *,
148 : * if it's unused (qlen of zero) which closes the socket.
149 : */
150 :
151 12843 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
152 : struct tevent_timer *te,
153 : struct timeval current_time,
154 : void *private_data)
155 : {
156 12843 : struct messaging_dgm_out *out = talloc_get_type_abort(
157 : private_data, struct messaging_dgm_out);
158 28 : size_t qlen;
159 :
160 12843 : out->idle_timer = NULL;
161 :
162 12843 : qlen = tevent_queue_length(out->queue);
163 12843 : if (qlen == 0) {
164 12842 : TALLOC_FREE(out);
165 : }
166 12843 : }
167 :
168 : /*
169 : * Setup the idle handler to fire after 1 second if the
170 : * queue is zero.
171 : */
172 :
173 537376 : static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
174 : {
175 93253 : size_t qlen;
176 :
177 537376 : qlen = tevent_queue_length(out->queue);
178 537376 : if (qlen != 0) {
179 92562 : TALLOC_FREE(out->idle_timer);
180 92562 : return;
181 : }
182 :
183 444814 : if (out->idle_timer != NULL) {
184 420388 : tevent_update_timer(out->idle_timer,
185 : tevent_timeval_current_ofs(1, 0));
186 420388 : return;
187 : }
188 :
189 24426 : out->idle_timer = tevent_add_timer(
190 : out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
191 : messaging_dgm_out_idle_handler, out);
192 : /*
193 : * No NULL check, we'll come back here. Worst case we're
194 : * leaking a bit.
195 : */
196 : }
197 :
198 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
199 : static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
200 : struct tevent_timer *te,
201 : struct timeval current_time,
202 : void *private_data);
203 :
204 : /*
205 : * Connect to an existing rendezvous point for another
206 : * pid - wrapped inside a struct messaging_dgm_out *.
207 : */
208 :
209 29964 : static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
210 : struct messaging_dgm_context *ctx,
211 : pid_t pid, struct messaging_dgm_out **pout)
212 : {
213 552 : struct messaging_dgm_out *out;
214 29964 : struct sockaddr_un addr = { .sun_family = AF_UNIX };
215 29964 : int ret = ENOMEM;
216 552 : int out_pathlen;
217 552 : char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
218 :
219 29964 : out = talloc(mem_ctx, struct messaging_dgm_out);
220 29964 : if (out == NULL) {
221 0 : goto fail;
222 : }
223 :
224 29964 : *out = (struct messaging_dgm_out) {
225 : .pid = pid,
226 : .ctx = ctx,
227 : .cookie = 1
228 : };
229 :
230 29964 : out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
231 29964 : "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
232 29964 : if (out_pathlen < 0) {
233 0 : goto errno_fail;
234 : }
235 29964 : if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
236 0 : ret = ENAMETOOLONG;
237 0 : goto fail;
238 : }
239 :
240 29964 : memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
241 :
242 29964 : out->queue = tevent_queue_create(out, addr.sun_path);
243 29964 : if (out->queue == NULL) {
244 0 : ret = ENOMEM;
245 0 : goto fail;
246 : }
247 :
248 29964 : out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
249 29964 : if (out->sock == -1) {
250 0 : goto errno_fail;
251 : }
252 :
253 29964 : DLIST_ADD(ctx->outsocks, out);
254 29964 : talloc_set_destructor(out, messaging_dgm_out_destructor);
255 :
256 552 : do {
257 29964 : ret = connect(out->sock,
258 : (const struct sockaddr *)(const void *)&addr,
259 : sizeof(addr));
260 29964 : } while ((ret == -1) && (errno == EINTR));
261 :
262 29964 : if (ret == -1) {
263 5710 : goto errno_fail;
264 : }
265 :
266 24254 : ret = set_blocking(out->sock, false);
267 24254 : if (ret == -1) {
268 0 : goto errno_fail;
269 : }
270 24254 : out->is_blocking = false;
271 :
272 24254 : *pout = out;
273 24254 : return 0;
274 5710 : errno_fail:
275 5710 : ret = errno;
276 5710 : fail:
277 5710 : TALLOC_FREE(out);
278 5710 : return ret;
279 : }
280 :
281 59911 : static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
282 : {
283 59911 : DLIST_REMOVE(out->ctx->outsocks, out);
284 :
285 59911 : if ((tevent_queue_length(out->queue) != 0) &&
286 2 : (tevent_cached_getpid() == out->ctx->pid)) {
287 : /*
288 : * We have pending jobs. We can't close the socket,
289 : * this has been handed over to messaging_dgm_out_queue_state.
290 : */
291 0 : return 0;
292 : }
293 :
294 59909 : if (out->sock != -1) {
295 59909 : close(out->sock);
296 59909 : out->sock = -1;
297 : }
298 58889 : return 0;
299 : }
300 :
301 : /*
302 : * Find the struct messaging_dgm_out * to talk to pid.
303 : * If we don't have one, create it. Set the timer to
304 : * delete after 1 sec.
305 : */
306 :
307 496030 : static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
308 : struct messaging_dgm_out **pout)
309 : {
310 46880 : struct messaging_dgm_out *out;
311 46880 : int ret;
312 :
313 900474 : for (out = ctx->outsocks; out != NULL; out = out->next) {
314 870510 : if (out->pid == pid) {
315 419738 : break;
316 : }
317 : }
318 :
319 496030 : if (out == NULL) {
320 29964 : ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
321 29964 : if (ret != 0) {
322 5710 : return ret;
323 : }
324 : }
325 :
326 : /*
327 : * shouldn't be possible, should be set if messaging_dgm_out_create
328 : * succeeded. This check is to satisfy static checker
329 : */
330 490320 : if (out == NULL) {
331 0 : return EINVAL;
332 : }
333 490320 : messaging_dgm_out_rearm_idle_timer(out);
334 :
335 490320 : *pout = out;
336 490320 : return 0;
337 : }
338 :
339 : /*
340 : * This function is called directly to send a message fragment
341 : * when the outgoing queue is zero, and from a pthreadpool
342 : * job thread when messages are being queued (qlen != 0).
343 : * Make sure *ONLY* thread-safe functions are called within.
344 : */
345 :
346 504239 : static ssize_t messaging_dgm_sendmsg(int sock,
347 : const struct iovec *iov, int iovlen,
348 : const int *fds, size_t num_fds,
349 : int *perrno)
350 : {
351 47898 : struct msghdr msg;
352 47898 : ssize_t fdlen, ret;
353 :
354 : /*
355 : * Do the actual sendmsg syscall. This will be called from a
356 : * pthreadpool helper thread, so be careful what you do here.
357 : */
358 :
359 504239 : msg = (struct msghdr) {
360 : .msg_iov = discard_const_p(struct iovec, iov),
361 : .msg_iovlen = iovlen
362 : };
363 :
364 504239 : fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
365 504239 : if (fdlen == -1) {
366 0 : *perrno = EINVAL;
367 0 : return -1;
368 : }
369 :
370 504239 : {
371 504239 : uint8_t buf[fdlen];
372 :
373 504239 : msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
374 :
375 47898 : do {
376 504239 : ret = sendmsg(sock, &msg, 0);
377 504236 : } while ((ret == -1) && (errno == EINTR));
378 : }
379 :
380 504236 : if (ret == -1) {
381 357 : *perrno = errno;
382 : }
383 456338 : return ret;
384 : }
385 :
386 : struct messaging_dgm_out_queue_state {
387 : struct tevent_context *ev;
388 : struct pthreadpool_tevent *pool;
389 :
390 : struct tevent_req *req;
391 : struct tevent_req *subreq;
392 :
393 : int sock;
394 :
395 : int *fds;
396 : uint8_t *buf;
397 :
398 : ssize_t sent;
399 : int err;
400 : };
401 :
402 : static int messaging_dgm_out_queue_state_destructor(
403 : struct messaging_dgm_out_queue_state *state);
404 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
405 : void *private_data);
406 : static void messaging_dgm_out_threaded_job(void *private_data);
407 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
408 :
409 : /*
410 : * Push a message fragment onto a queue to be sent by a
411 : * threadpool job. Makes copies of data/fd's to be sent.
412 : * The running tevent_queue internally creates an immediate
413 : * event to schedule the write.
414 : */
415 :
416 48049 : static struct tevent_req *messaging_dgm_out_queue_send(
417 : TALLOC_CTX *mem_ctx, struct tevent_context *ev,
418 : struct messaging_dgm_out *out,
419 : const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
420 : {
421 47363 : struct tevent_req *req;
422 47363 : struct messaging_dgm_out_queue_state *state;
423 47363 : struct tevent_queue_entry *e;
424 47363 : size_t i;
425 47363 : ssize_t buflen;
426 :
427 48049 : req = tevent_req_create(out, &state,
428 : struct messaging_dgm_out_queue_state);
429 48049 : if (req == NULL) {
430 0 : return NULL;
431 : }
432 48049 : state->ev = ev;
433 48049 : state->pool = out->ctx->pool;
434 48049 : state->sock = out->sock;
435 48049 : state->req = req;
436 :
437 : /*
438 : * Go blocking in a thread
439 : */
440 48049 : if (!out->is_blocking) {
441 248 : int ret = set_blocking(out->sock, true);
442 248 : if (ret == -1) {
443 0 : tevent_req_error(req, errno);
444 0 : return tevent_req_post(req, ev);
445 : }
446 248 : out->is_blocking = true;
447 : }
448 :
449 48049 : buflen = iov_buflen(iov, iovlen);
450 48049 : if (buflen == -1) {
451 0 : tevent_req_error(req, EMSGSIZE);
452 0 : return tevent_req_post(req, ev);
453 : }
454 :
455 48049 : state->buf = talloc_array(state, uint8_t, buflen);
456 48049 : if (tevent_req_nomem(state->buf, req)) {
457 0 : return tevent_req_post(req, ev);
458 : }
459 48049 : iov_buf(iov, iovlen, state->buf, buflen);
460 :
461 48049 : state->fds = talloc_array(state, int, num_fds);
462 48049 : if (tevent_req_nomem(state->fds, req)) {
463 0 : return tevent_req_post(req, ev);
464 : }
465 :
466 48087 : for (i=0; i<num_fds; i++) {
467 38 : state->fds[i] = -1;
468 : }
469 :
470 48087 : for (i=0; i<num_fds; i++) {
471 :
472 38 : state->fds[i] = dup(fds[i]);
473 :
474 38 : if (state->fds[i] == -1) {
475 0 : int ret = errno;
476 :
477 0 : close_fd_array(state->fds, num_fds);
478 :
479 0 : tevent_req_error(req, ret);
480 0 : return tevent_req_post(req, ev);
481 : }
482 : }
483 :
484 48049 : talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
485 :
486 48049 : e = tevent_queue_add_entry(out->queue, ev, req,
487 47363 : messaging_dgm_out_queue_trigger, req);
488 48049 : if (tevent_req_nomem(e, req)) {
489 0 : return tevent_req_post(req, ev);
490 : }
491 686 : return req;
492 : }
493 :
494 48046 : static int messaging_dgm_out_queue_state_destructor(
495 : struct messaging_dgm_out_queue_state *state)
496 : {
497 47363 : int *fds;
498 47363 : size_t num_fds;
499 :
500 48046 : if (state->subreq != NULL) {
501 : /*
502 : * We're scheduled, but we're destroyed. This happens
503 : * if the messaging_dgm_context is destroyed while
504 : * we're stuck in a blocking send. There's nothing we
505 : * can do but to leak memory.
506 : */
507 2 : TALLOC_FREE(state->subreq);
508 2 : (void)talloc_reparent(state->req, NULL, state);
509 2 : return -1;
510 : }
511 :
512 48044 : fds = state->fds;
513 48044 : num_fds = talloc_array_length(fds);
514 48044 : close_fd_array(fds, num_fds);
515 48044 : return 0;
516 : }
517 :
518 : /*
519 : * tevent_queue callback that schedules the pthreadpool to actually
520 : * send the queued message fragment.
521 : */
522 :
523 47038 : static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
524 : void *private_data)
525 : {
526 47038 : struct messaging_dgm_out_queue_state *state = tevent_req_data(
527 : req, struct messaging_dgm_out_queue_state);
528 :
529 47038 : tevent_req_reset_endtime(req);
530 :
531 47038 : state->subreq = pthreadpool_tevent_job_send(
532 : state, state->ev, state->pool,
533 : messaging_dgm_out_threaded_job, state);
534 47038 : if (tevent_req_nomem(state->subreq, req)) {
535 0 : return;
536 : }
537 47038 : tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
538 : req);
539 : }
540 :
541 : /*
542 : * Wrapper function run by the pthread that calls
543 : * messaging_dgm_sendmsg() to actually do the sendmsg().
544 : */
545 :
546 47038 : static void messaging_dgm_out_threaded_job(void *private_data)
547 : {
548 47038 : struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
549 : private_data, struct messaging_dgm_out_queue_state);
550 :
551 94076 : struct iovec iov = { .iov_base = state->buf,
552 47038 : .iov_len = talloc_get_size(state->buf) };
553 47038 : size_t num_fds = talloc_array_length(state->fds);
554 47038 : int msec = 1;
555 :
556 46375 : while (true) {
557 46375 : int ret;
558 :
559 94073 : state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
560 47038 : state->fds, num_fds, &state->err);
561 :
562 47035 : if (state->sent != -1) {
563 660 : return;
564 : }
565 1 : if (state->err != ENOBUFS) {
566 0 : return;
567 : }
568 :
569 : /*
570 : * ENOBUFS is the FreeBSD way of saying "Try
571 : * again". We have to do polling.
572 : */
573 0 : do {
574 0 : ret = poll(NULL, 0, msec);
575 0 : } while ((ret == -1) && (errno == EINTR));
576 :
577 : /*
578 : * Exponential backoff up to once a second
579 : */
580 0 : msec *= 2;
581 0 : msec = MIN(msec, 1000);
582 : }
583 : }
584 :
585 : /*
586 : * Pickup the results of the pthread sendmsg().
587 : */
588 :
589 47033 : static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
590 : {
591 47033 : struct tevent_req *req = tevent_req_callback_data(
592 : subreq, struct tevent_req);
593 47033 : struct messaging_dgm_out_queue_state *state = tevent_req_data(
594 : req, struct messaging_dgm_out_queue_state);
595 46373 : int ret;
596 :
597 47033 : if (subreq != state->subreq) {
598 0 : abort();
599 : }
600 :
601 47033 : ret = pthreadpool_tevent_job_recv(subreq);
602 :
603 47033 : TALLOC_FREE(subreq);
604 47033 : state->subreq = NULL;
605 :
606 47033 : if (tevent_req_error(req, ret)) {
607 0 : return;
608 : }
609 47033 : if (state->sent == -1) {
610 0 : tevent_req_error(req, state->err);
611 0 : return;
612 : }
613 47033 : tevent_req_done(req);
614 : }
615 :
616 47056 : static int messaging_dgm_out_queue_recv(struct tevent_req *req)
617 : {
618 47056 : return tevent_req_simple_recv_unix(req);
619 : }
620 :
621 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
622 :
623 : /*
624 : * Core function to send a message fragment given a
625 : * connected struct messaging_dgm_out * destination.
626 : * If no current queue tries to send nonblocking
627 : * directly. If not, queues the fragment (which makes
628 : * a copy of it) and adds a 60-second timeout on the send.
629 : */
630 :
631 505002 : static int messaging_dgm_out_send_fragment(
632 : struct tevent_context *ev, struct messaging_dgm_out *out,
633 : const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
634 : {
635 48880 : struct tevent_req *req;
636 48880 : size_t qlen;
637 48880 : bool ok;
638 :
639 505002 : qlen = tevent_queue_length(out->queue);
640 505002 : if (qlen == 0) {
641 1523 : ssize_t nsent;
642 457201 : int err = 0;
643 :
644 457201 : if (out->is_blocking) {
645 146 : int ret = set_blocking(out->sock, false);
646 146 : if (ret == -1) {
647 456953 : return errno;
648 : }
649 146 : out->is_blocking = false;
650 : }
651 :
652 457201 : nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
653 : num_fds, &err);
654 457201 : if (nsent >= 0) {
655 455427 : return 0;
656 : }
657 :
658 356 : if (err == ENOBUFS) {
659 : /*
660 : * FreeBSD's way of telling us the dst socket
661 : * is full. EWOULDBLOCK makes us spawn a
662 : * polling helper thread.
663 : */
664 0 : err = EWOULDBLOCK;
665 : }
666 :
667 356 : if (err != EWOULDBLOCK) {
668 9 : return err;
669 : }
670 : }
671 :
672 48049 : req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
673 : fds, num_fds);
674 48049 : if (req == NULL) {
675 0 : return ENOMEM;
676 : }
677 48049 : tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
678 :
679 48049 : ok = tevent_req_set_endtime(req, ev,
680 : tevent_timeval_current_ofs(60, 0));
681 48049 : if (!ok) {
682 0 : TALLOC_FREE(req);
683 0 : return ENOMEM;
684 : }
685 :
686 686 : return 0;
687 : }
688 :
689 : /*
690 : * Pickup the result of the fragment send. Reset idle timer
691 : * if queue empty.
692 : */
693 :
694 47056 : static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
695 : {
696 47056 : struct messaging_dgm_out *out = tevent_req_callback_data(
697 : req, struct messaging_dgm_out);
698 46373 : int ret;
699 :
700 47056 : ret = messaging_dgm_out_queue_recv(req);
701 47056 : TALLOC_FREE(req);
702 :
703 47056 : if (ret != 0) {
704 23 : DBG_WARNING("messaging_out_queue_recv returned %s\n",
705 : strerror(ret));
706 : }
707 :
708 47056 : messaging_dgm_out_rearm_idle_timer(out);
709 47056 : }
710 :
711 :
712 : struct messaging_dgm_fragment_hdr {
713 : size_t msglen;
714 : pid_t pid;
715 : int sock;
716 : };
717 :
718 : /*
719 : * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
720 : * size chunks and send it.
721 : *
722 : * Message fragments are prefixed by a 64-bit cookie that
723 : * stays the same for all fragments. This allows the receiver
724 : * to recognise fragments of the same message and re-assemble
725 : * them on the other end.
726 : *
727 : * Note that this allows other message fragments from other
728 : * senders to be interleaved in the receive read processing,
729 : * the combination of the cookie and header info allows unique
730 : * identification of the message from a specific sender in
731 : * re-assembly.
732 : *
733 : * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
734 : * then send a single message with cookie set to zero.
735 : *
736 : * Otherwise the message is fragmented into chunks and added
737 : * to the sending queue. Any file descriptors are passed only
738 : * in the last fragment.
739 : *
740 : * Finally the cookie is incremented (wrap over zero) to
741 : * prepare for the next message sent to this channel.
742 : *
743 : */
744 :
745 490320 : static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
746 : struct messaging_dgm_out *out,
747 : const struct iovec *iov,
748 : int iovlen,
749 : const int *fds, size_t num_fds)
750 490320 : {
751 46880 : ssize_t msglen, sent;
752 490320 : int ret = 0;
753 490320 : struct iovec iov_copy[iovlen+2];
754 46880 : struct messaging_dgm_fragment_hdr hdr;
755 46880 : struct iovec src_iov;
756 :
757 490320 : if (iovlen < 0) {
758 0 : return EINVAL;
759 : }
760 :
761 490320 : msglen = iov_buflen(iov, iovlen);
762 490320 : if (msglen == -1) {
763 0 : return EMSGSIZE;
764 : }
765 490320 : if (num_fds > INT8_MAX) {
766 0 : return EINVAL;
767 : }
768 :
769 490320 : if ((size_t) msglen <=
770 : (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
771 477795 : uint64_t cookie = 0;
772 :
773 477795 : iov_copy[0].iov_base = &cookie;
774 477795 : iov_copy[0].iov_len = sizeof(cookie);
775 477795 : if (iovlen > 0) {
776 477795 : memcpy(&iov_copy[1], iov,
777 : sizeof(struct iovec) * iovlen);
778 : }
779 :
780 477795 : return messaging_dgm_out_send_fragment(
781 : ev, out, iov_copy, iovlen+1, fds, num_fds);
782 :
783 : }
784 :
785 25050 : hdr = (struct messaging_dgm_fragment_hdr) {
786 : .msglen = msglen,
787 12525 : .pid = tevent_cached_getpid(),
788 12525 : .sock = out->sock
789 : };
790 :
791 12525 : iov_copy[0].iov_base = &out->cookie;
792 12525 : iov_copy[0].iov_len = sizeof(out->cookie);
793 12525 : iov_copy[1].iov_base = &hdr;
794 12525 : iov_copy[1].iov_len = sizeof(hdr);
795 :
796 12525 : sent = 0;
797 12525 : src_iov = iov[0];
798 :
799 : /*
800 : * The following write loop sends the user message in pieces. We have
801 : * filled the first two iovecs above with "cookie" and "hdr". In the
802 : * following loops we pull message chunks from the user iov array and
803 : * fill iov_copy piece by piece, possibly truncating chunks from the
804 : * caller's iov array. Ugly, but hopefully efficient.
805 : */
806 :
807 39732 : while (sent < msglen) {
808 : size_t fragment_len;
809 25205 : size_t iov_index = 2;
810 :
811 25205 : fragment_len = sizeof(out->cookie) + sizeof(hdr);
812 :
813 54414 : while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
814 2004 : size_t space, chunk;
815 :
816 39732 : space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
817 39732 : chunk = MIN(space, src_iov.iov_len);
818 :
819 39732 : iov_copy[iov_index].iov_base = src_iov.iov_base;
820 39732 : iov_copy[iov_index].iov_len = chunk;
821 39732 : iov_index += 1;
822 :
823 39732 : src_iov.iov_base = (char *)src_iov.iov_base + chunk;
824 39732 : src_iov.iov_len -= chunk;
825 39732 : fragment_len += chunk;
826 :
827 39732 : if (src_iov.iov_len == 0) {
828 25050 : iov += 1;
829 25050 : iovlen -= 1;
830 25050 : if (iovlen == 0) {
831 12523 : break;
832 : }
833 12525 : src_iov = iov[0];
834 : }
835 : }
836 27207 : sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
837 :
838 : /*
839 : * only the last fragment should pass the fd array.
840 : * That simplifies the receiver a lot.
841 : */
842 27207 : if (sent < msglen) {
843 14682 : ret = messaging_dgm_out_send_fragment(
844 : ev, out, iov_copy, iov_index, NULL, 0);
845 : } else {
846 12525 : ret = messaging_dgm_out_send_fragment(
847 : ev, out, iov_copy, iov_index, fds, num_fds);
848 : }
849 27207 : if (ret != 0) {
850 0 : break;
851 : }
852 : }
853 :
854 12525 : out->cookie += 1;
855 12525 : if (out->cookie == 0) {
856 0 : out->cookie += 1;
857 : }
858 :
859 12523 : return ret;
860 : }
861 :
862 : static struct messaging_dgm_context *global_dgm_context;
863 :
864 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
865 :
866 59056 : static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
867 : pid_t pid, int *plockfile_fd,
868 : uint64_t *punique)
869 : {
870 1133 : char buf[64];
871 1133 : int lockfile_fd;
872 1133 : struct sun_path_buf lockfile_name;
873 1133 : struct flock lck;
874 1133 : uint64_t unique;
875 1133 : int unique_len, ret;
876 1133 : ssize_t written;
877 :
878 59056 : ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
879 59056 : "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
880 59056 : if (ret < 0) {
881 0 : return errno;
882 : }
883 59056 : if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
884 0 : return ENAMETOOLONG;
885 : }
886 :
887 : /* no O_EXCL, existence check is via the fcntl lock */
888 :
889 59056 : lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
890 : 0644);
891 :
892 59056 : if ((lockfile_fd == -1) &&
893 0 : ((errno == ENXIO) /* Linux */ ||
894 0 : (errno == ENODEV) /* Linux kernel bug */ ||
895 0 : (errno == EOPNOTSUPP) /* FreeBSD */)) {
896 : /*
897 : * Huh -- a socket? This might be a stale socket from
898 : * an upgrade of Samba. Just unlink and retry, nobody
899 : * else is supposed to be here at this time.
900 : *
901 : * Yes, this is racy, but I don't see a way to deal
902 : * with this properly.
903 : */
904 0 : unlink(lockfile_name.buf);
905 :
906 0 : lockfile_fd = open(lockfile_name.buf,
907 : O_NONBLOCK|O_CREAT|O_WRONLY,
908 : 0644);
909 : }
910 :
911 59056 : if (lockfile_fd == -1) {
912 0 : ret = errno;
913 0 : DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
914 0 : return ret;
915 : }
916 :
917 59056 : lck = (struct flock) {
918 : .l_type = F_WRLCK,
919 : .l_whence = SEEK_SET
920 : };
921 :
922 59056 : ret = fcntl(lockfile_fd, F_SETLK, &lck);
923 59056 : if (ret == -1) {
924 0 : ret = errno;
925 0 : DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
926 0 : goto fail_close;
927 : }
928 :
929 : /*
930 : * Directly using the binary value for
931 : * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
932 : * violation. But including all of ndr here just for this
933 : * seems to be a bit overkill to me. Also, messages_dgm might
934 : * be replaced sooner or later by something streams-based,
935 : * where unique_id generation will be handled differently.
936 : */
937 :
938 1133 : do {
939 59056 : generate_random_buffer((uint8_t *)&unique, sizeof(unique));
940 59056 : } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
941 :
942 59056 : unique_len = snprintf(buf, sizeof(buf), "%"PRIu64"\n", unique);
943 :
944 : /* shorten a potentially preexisting file */
945 :
946 59056 : ret = ftruncate(lockfile_fd, unique_len);
947 59056 : if (ret == -1) {
948 0 : ret = errno;
949 0 : DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
950 : strerror(ret)));
951 0 : goto fail_unlink;
952 : }
953 :
954 59056 : written = write(lockfile_fd, buf, unique_len);
955 59056 : if (written != unique_len) {
956 0 : ret = errno;
957 0 : DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
958 0 : goto fail_unlink;
959 : }
960 :
961 59056 : *plockfile_fd = lockfile_fd;
962 59056 : *punique = unique;
963 59056 : return 0;
964 :
965 0 : fail_unlink:
966 0 : unlink(lockfile_name.buf);
967 0 : fail_close:
968 0 : close(lockfile_fd);
969 0 : return ret;
970 : }
971 :
972 : static void messaging_dgm_read_handler(struct tevent_context *ev,
973 : struct tevent_fd *fde,
974 : uint16_t flags,
975 : void *private_data);
976 :
977 : /*
978 : * Create the rendezvous point in the file system
979 : * that other processes can use to send messages to
980 : * this pid.
981 : */
982 :
983 59069 : int messaging_dgm_init(struct tevent_context *ev,
984 : uint64_t *punique,
985 : const char *socket_dir,
986 : const char *lockfile_dir,
987 : void (*recv_cb)(struct tevent_context *ev,
988 : const uint8_t *msg,
989 : size_t msg_len,
990 : int *fds,
991 : size_t num_fds,
992 : void *private_data),
993 : void *recv_cb_private_data)
994 : {
995 1140 : struct messaging_dgm_context *ctx;
996 1140 : int ret;
997 1140 : struct sockaddr_un socket_address;
998 1140 : size_t len;
999 1140 : static bool have_dgm_context = false;
1000 :
1001 59069 : if (have_dgm_context) {
1002 0 : return EEXIST;
1003 : }
1004 :
1005 59069 : if ((socket_dir == NULL) || (lockfile_dir == NULL)) {
1006 0 : return EINVAL;
1007 : }
1008 :
1009 59069 : ctx = talloc_zero(NULL, struct messaging_dgm_context);
1010 59069 : if (ctx == NULL) {
1011 0 : goto fail_nomem;
1012 : }
1013 59069 : ctx->ev = ev;
1014 59069 : ctx->pid = tevent_cached_getpid();
1015 59069 : ctx->recv_cb = recv_cb;
1016 59069 : ctx->recv_cb_private_data = recv_cb_private_data;
1017 :
1018 59069 : len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1019 : sizeof(ctx->lockfile_dir.buf));
1020 59069 : if (len >= sizeof(ctx->lockfile_dir.buf)) {
1021 5 : TALLOC_FREE(ctx);
1022 5 : return ENAMETOOLONG;
1023 : }
1024 :
1025 59064 : len = strlcpy(ctx->socket_dir.buf, socket_dir,
1026 : sizeof(ctx->socket_dir.buf));
1027 59064 : if (len >= sizeof(ctx->socket_dir.buf)) {
1028 8 : TALLOC_FREE(ctx);
1029 8 : return ENAMETOOLONG;
1030 : }
1031 :
1032 59056 : socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1033 59056 : len = snprintf(socket_address.sun_path,
1034 : sizeof(socket_address.sun_path),
1035 59056 : "%s/%u", socket_dir, (unsigned)ctx->pid);
1036 59056 : if (len >= sizeof(socket_address.sun_path)) {
1037 0 : TALLOC_FREE(ctx);
1038 0 : return ENAMETOOLONG;
1039 : }
1040 :
1041 59056 : ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1042 : punique);
1043 59056 : if (ret != 0) {
1044 0 : DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1045 : __func__, strerror(ret)));
1046 0 : TALLOC_FREE(ctx);
1047 0 : return ret;
1048 : }
1049 :
1050 59056 : unlink(socket_address.sun_path);
1051 :
1052 59056 : ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1053 59056 : if (ctx->sock == -1) {
1054 0 : ret = errno;
1055 0 : DBG_WARNING("socket failed: %s\n", strerror(ret));
1056 0 : TALLOC_FREE(ctx);
1057 0 : return ret;
1058 : }
1059 :
1060 59056 : ret = prepare_socket_cloexec(ctx->sock);
1061 59056 : if (ret == -1) {
1062 0 : ret = errno;
1063 0 : DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1064 : strerror(ret));
1065 0 : TALLOC_FREE(ctx);
1066 0 : return ret;
1067 : }
1068 :
1069 59056 : ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1070 : sizeof(socket_address));
1071 59056 : if (ret == -1) {
1072 0 : ret = errno;
1073 0 : DBG_WARNING("bind failed: %s\n", strerror(ret));
1074 0 : TALLOC_FREE(ctx);
1075 0 : return ret;
1076 : }
1077 :
1078 59056 : talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1079 :
1080 59056 : ctx->have_dgm_context = &have_dgm_context;
1081 :
1082 59056 : ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1083 59056 : if (ret != 0) {
1084 0 : DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1085 : strerror(ret));
1086 0 : TALLOC_FREE(ctx);
1087 0 : return ret;
1088 : }
1089 :
1090 59056 : global_dgm_context = ctx;
1091 59056 : return 0;
1092 :
1093 0 : fail_nomem:
1094 0 : TALLOC_FREE(ctx);
1095 0 : return ENOMEM;
1096 : }
1097 :
1098 : /*
1099 : * Remove the rendezvous point in the filesystem
1100 : * if we're the owner.
1101 : */
1102 :
1103 98658 : static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1104 : {
1105 155522 : while (c->outsocks != NULL) {
1106 38162 : TALLOC_FREE(c->outsocks);
1107 : }
1108 119432 : while (c->in_msgs != NULL) {
1109 1181 : TALLOC_FREE(c->in_msgs);
1110 : }
1111 121719 : while (c->fde_evs != NULL) {
1112 23061 : tevent_fd_set_flags(c->fde_evs->fde, 0);
1113 23061 : c->fde_evs->ctx = NULL;
1114 24242 : DLIST_REMOVE(c->fde_evs, c->fde_evs);
1115 : }
1116 :
1117 98658 : close(c->sock);
1118 :
1119 98658 : if (tevent_cached_getpid() == c->pid) {
1120 267 : struct sun_path_buf name;
1121 267 : int ret;
1122 :
1123 50889 : ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1124 50889 : c->socket_dir.buf, (unsigned)c->pid);
1125 50889 : if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1126 : /*
1127 : * We've checked the length when creating, so this
1128 : * should never happen
1129 : */
1130 0 : abort();
1131 : }
1132 50889 : unlink(name.buf);
1133 :
1134 50889 : ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1135 50889 : c->lockfile_dir.buf, (unsigned)c->pid);
1136 50889 : if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1137 : /*
1138 : * We've checked the length when creating, so this
1139 : * should never happen
1140 : */
1141 0 : abort();
1142 : }
1143 50889 : unlink(name.buf);
1144 : }
1145 98658 : close(c->lockfile_fd);
1146 :
1147 98658 : if (c->have_dgm_context != NULL) {
1148 98658 : *c->have_dgm_context = false;
1149 : }
1150 :
1151 98658 : return 0;
1152 : }
1153 :
1154 1231787 : static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1155 : {
1156 : #ifdef DEVELOPER
1157 1231787 : pid_t pid = tevent_cached_getpid();
1158 111144 : struct sockaddr_storage addr;
1159 1231787 : socklen_t addrlen = sizeof(addr);
1160 111144 : struct sockaddr_un *un_addr;
1161 111144 : struct sun_path_buf pathbuf;
1162 111144 : struct stat st1, st2;
1163 111144 : int ret;
1164 :
1165 : /*
1166 : * Protect against using the wrong messaging context after a
1167 : * fork without reinit_after_fork.
1168 : */
1169 :
1170 1231787 : ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1171 1231787 : if (ret == -1) {
1172 0 : DBG_ERR("getsockname failed: %s\n", strerror(errno));
1173 0 : goto fail;
1174 : }
1175 1231787 : if (addr.ss_family != AF_UNIX) {
1176 0 : DBG_ERR("getsockname returned family %d\n",
1177 : (int)addr.ss_family);
1178 0 : goto fail;
1179 : }
1180 1231787 : un_addr = (struct sockaddr_un *)&addr;
1181 :
1182 1231787 : ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1183 1231787 : "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1184 1231787 : if (ret < 0) {
1185 0 : DBG_ERR("snprintf failed: %s\n", strerror(errno));
1186 0 : goto fail;
1187 : }
1188 1231787 : if ((size_t)ret >= sizeof(pathbuf.buf)) {
1189 0 : DBG_ERR("snprintf returned %d chars\n", (int)ret);
1190 0 : goto fail;
1191 : }
1192 :
1193 1231787 : if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1194 0 : DBG_ERR("sockname wrong: Expected %s, got %s\n",
1195 : pathbuf.buf, un_addr->sun_path);
1196 0 : goto fail;
1197 : }
1198 :
1199 1231787 : ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1200 1231787 : "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1201 1231787 : if (ret < 0) {
1202 0 : DBG_ERR("snprintf failed: %s\n", strerror(errno));
1203 0 : goto fail;
1204 : }
1205 1231787 : if ((size_t)ret >= sizeof(pathbuf.buf)) {
1206 0 : DBG_ERR("snprintf returned %d chars\n", (int)ret);
1207 0 : goto fail;
1208 : }
1209 :
1210 1231787 : ret = stat(pathbuf.buf, &st1);
1211 1231787 : if (ret == -1) {
1212 0 : DBG_ERR("stat failed: %s\n", strerror(errno));
1213 0 : goto fail;
1214 : }
1215 1231787 : ret = fstat(ctx->lockfile_fd, &st2);
1216 1231787 : if (ret == -1) {
1217 0 : DBG_ERR("fstat failed: %s\n", strerror(errno));
1218 0 : goto fail;
1219 : }
1220 :
1221 1231787 : if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1222 0 : DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1223 : (int)st2.st_dev, (int)st2.st_ino,
1224 : (int)st1.st_dev, (int)st1.st_ino);
1225 0 : goto fail;
1226 : }
1227 :
1228 1231787 : return;
1229 0 : fail:
1230 0 : abort();
1231 : #else
1232 : return;
1233 : #endif
1234 : }
1235 :
1236 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1237 : struct tevent_context *ev,
1238 : uint8_t *msg, size_t msg_len,
1239 : int *fds, size_t num_fds);
1240 :
1241 : /*
1242 : * Raw read callback handler - passes to messaging_dgm_recv()
1243 : * for fragment reassembly processing.
1244 : */
1245 :
1246 181285 : static void messaging_dgm_read_handler(struct tevent_context *ev,
1247 : struct tevent_fd *fde,
1248 : uint16_t flags,
1249 : void *private_data)
1250 181285 : {
1251 181285 : struct messaging_dgm_context *ctx = talloc_get_type_abort(
1252 : private_data, struct messaging_dgm_context);
1253 46802 : ssize_t received;
1254 46802 : struct msghdr msg;
1255 46802 : struct iovec iov;
1256 181285 : size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1257 181285 : uint8_t msgbuf[msgbufsize];
1258 46802 : uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1259 46802 : size_t num_fds;
1260 :
1261 181285 : messaging_dgm_validate(ctx);
1262 :
1263 181285 : if ((flags & TEVENT_FD_READ) == 0) {
1264 0 : return;
1265 : }
1266 :
1267 181285 : iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1268 181285 : msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1269 :
1270 181285 : msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1271 :
1272 : #ifdef MSG_CMSG_CLOEXEC
1273 181285 : msg.msg_flags |= MSG_CMSG_CLOEXEC;
1274 : #endif
1275 :
1276 181285 : received = recvmsg(ctx->sock, &msg, 0);
1277 181285 : if (received == -1) {
1278 0 : if ((errno == EAGAIN) ||
1279 0 : (errno == EWOULDBLOCK) ||
1280 0 : (errno == EINTR) ||
1281 0 : (errno == ENOMEM)) {
1282 : /* Not really an error - just try again. */
1283 0 : return;
1284 : }
1285 : /* Problem with the socket. Set it unreadable. */
1286 0 : tevent_fd_set_flags(fde, 0);
1287 0 : return;
1288 : }
1289 :
1290 181285 : if ((size_t)received > sizeof(buf)) {
1291 : /* More than we expected, not for us */
1292 0 : return;
1293 : }
1294 :
1295 181285 : num_fds = msghdr_extract_fds(&msg, NULL, 0);
1296 181285 : if (num_fds == 0) {
1297 46747 : int fds[1];
1298 :
1299 138642 : messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1300 42643 : } else {
1301 55 : size_t i;
1302 42643 : int fds[num_fds];
1303 :
1304 42643 : msghdr_extract_fds(&msg, fds, num_fds);
1305 :
1306 85344 : for (i = 0; i < num_fds; i++) {
1307 58 : int err;
1308 :
1309 42646 : err = prepare_socket_cloexec(fds[i]);
1310 42646 : if (err != 0) {
1311 0 : close_fd_array(fds, num_fds);
1312 0 : num_fds = 0;
1313 : }
1314 : }
1315 :
1316 42643 : messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1317 : }
1318 : }
1319 :
1320 0 : static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1321 : {
1322 0 : DLIST_REMOVE(m->ctx->in_msgs, m);
1323 0 : return 0;
1324 : }
1325 :
1326 167787 : static void messaging_dgm_close_unconsumed(int *fds, size_t num_fds)
1327 : {
1328 44802 : size_t i;
1329 :
1330 210433 : for (i=0; i<num_fds; i++) {
1331 42646 : if (fds[i] != -1) {
1332 6 : close(fds[i]);
1333 6 : fds[i] = -1;
1334 : }
1335 : }
1336 167787 : }
1337 :
1338 : /*
1339 : * Deal with identification of fragmented messages and
1340 : * re-assembly into full messages sent, then calls the
1341 : * callback.
1342 : */
1343 :
1344 181285 : static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1345 : struct tevent_context *ev,
1346 : uint8_t *buf, size_t buflen,
1347 : int *fds, size_t num_fds)
1348 : {
1349 46802 : struct messaging_dgm_fragment_hdr hdr;
1350 46802 : struct messaging_dgm_in_msg *msg;
1351 46802 : size_t space;
1352 46802 : uint64_t cookie;
1353 :
1354 181285 : if (buflen < sizeof(cookie)) {
1355 0 : goto close_fds;
1356 : }
1357 181285 : memcpy(&cookie, buf, sizeof(cookie));
1358 181285 : buf += sizeof(cookie);
1359 181285 : buflen -= sizeof(cookie);
1360 :
1361 181285 : if (cookie == 0) {
1362 156446 : ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1363 : ctx->recv_cb_private_data);
1364 156446 : messaging_dgm_close_unconsumed(fds, num_fds);
1365 212587 : return;
1366 : }
1367 :
1368 24839 : if (buflen < sizeof(hdr)) {
1369 0 : goto close_fds;
1370 : }
1371 24839 : memcpy(&hdr, buf, sizeof(hdr));
1372 24839 : buf += sizeof(hdr);
1373 24839 : buflen -= sizeof(hdr);
1374 :
1375 24839 : for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1376 13498 : if ((msg->sender_pid == hdr.pid) &&
1377 11498 : (msg->sender_sock == hdr.sock)) {
1378 11498 : break;
1379 : }
1380 : }
1381 :
1382 24839 : if ((msg != NULL) && (msg->cookie != cookie)) {
1383 2002 : TALLOC_FREE(msg);
1384 : }
1385 :
1386 24839 : if (msg == NULL) {
1387 2 : size_t msglen;
1388 11341 : msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1389 11339 : hdr.msglen;
1390 :
1391 11341 : msg = talloc_size(ctx, msglen);
1392 11341 : if (msg == NULL) {
1393 0 : goto close_fds;
1394 : }
1395 11341 : talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1396 :
1397 11341 : *msg = (struct messaging_dgm_in_msg) {
1398 11339 : .ctx = ctx, .msglen = hdr.msglen,
1399 11339 : .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1400 : .cookie = cookie
1401 : };
1402 11341 : DLIST_ADD(ctx->in_msgs, msg);
1403 11341 : talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1404 : }
1405 :
1406 24839 : space = msg->msglen - msg->received;
1407 24839 : if (buflen > space) {
1408 0 : goto close_fds;
1409 : }
1410 :
1411 24839 : memcpy(msg->buf + msg->received, buf, buflen);
1412 24839 : msg->received += buflen;
1413 :
1414 24839 : if (msg->received < msg->msglen) {
1415 : /*
1416 : * Any valid sender will send the fds in the last
1417 : * block. Invalid senders might have sent fd's that we
1418 : * need to close here.
1419 : */
1420 13498 : goto close_fds;
1421 : }
1422 :
1423 11341 : DLIST_REMOVE(ctx->in_msgs, msg);
1424 11341 : talloc_set_destructor(msg, NULL);
1425 :
1426 11341 : ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1427 : ctx->recv_cb_private_data);
1428 11341 : messaging_dgm_close_unconsumed(fds, num_fds);
1429 :
1430 11341 : TALLOC_FREE(msg);
1431 11339 : return;
1432 :
1433 13498 : close_fds:
1434 13498 : close_fd_array(fds, num_fds);
1435 : }
1436 :
1437 98658 : void messaging_dgm_destroy(void)
1438 : {
1439 98658 : TALLOC_FREE(global_dgm_context);
1440 98658 : }
1441 :
1442 495922 : int messaging_dgm_send(pid_t pid,
1443 : const struct iovec *iov, int iovlen,
1444 : const int *fds, size_t num_fds)
1445 : {
1446 495922 : struct messaging_dgm_context *ctx = global_dgm_context;
1447 46781 : struct messaging_dgm_out *out;
1448 46781 : int ret;
1449 495922 : unsigned retries = 0;
1450 :
1451 495922 : if (ctx == NULL) {
1452 0 : return ENOTCONN;
1453 : }
1454 :
1455 495922 : messaging_dgm_validate(ctx);
1456 :
1457 496030 : again:
1458 496030 : ret = messaging_dgm_out_get(ctx, pid, &out);
1459 496030 : if (ret != 0) {
1460 5710 : return ret;
1461 : }
1462 :
1463 490320 : DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1464 :
1465 490320 : ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1466 : fds, num_fds);
1467 490320 : if (ret == ECONNREFUSED) {
1468 : /*
1469 : * We cache outgoing sockets. If the receiver has
1470 : * closed and re-opened the socket since our last
1471 : * message, we get connection refused. Retry.
1472 : */
1473 :
1474 108 : TALLOC_FREE(out);
1475 :
1476 108 : if (retries < 5) {
1477 108 : retries += 1;
1478 108 : goto again;
1479 : }
1480 : }
1481 443431 : return ret;
1482 : }
1483 :
1484 554351 : static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1485 : {
1486 17560 : char buf[25];
1487 17560 : ssize_t rw_ret;
1488 554351 : int error = 0;
1489 17560 : unsigned long long unique;
1490 17560 : char *endptr;
1491 :
1492 554351 : rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1493 554351 : if (rw_ret == -1) {
1494 0 : return errno;
1495 : }
1496 554351 : buf[rw_ret] = '\0';
1497 :
1498 554351 : unique = smb_strtoull(buf, &endptr, 10, &error, SMB_STR_STANDARD);
1499 554351 : if (error != 0) {
1500 0 : return error;
1501 : }
1502 :
1503 554351 : if (endptr[0] != '\n') {
1504 0 : return EINVAL;
1505 : }
1506 554351 : *punique = unique;
1507 554351 : return 0;
1508 : }
1509 :
1510 554355 : int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1511 : {
1512 554355 : struct messaging_dgm_context *ctx = global_dgm_context;
1513 17560 : struct sun_path_buf lockfile_name;
1514 17560 : int ret, fd;
1515 :
1516 554355 : if (ctx == NULL) {
1517 0 : return EBADF;
1518 : }
1519 :
1520 554355 : messaging_dgm_validate(ctx);
1521 :
1522 554355 : if (pid == tevent_cached_getpid()) {
1523 : /*
1524 : * Protect against losing our own lock
1525 : */
1526 464922 : return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1527 : }
1528 :
1529 89433 : ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1530 89433 : "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1531 89433 : if (ret < 0) {
1532 0 : return errno;
1533 : }
1534 89433 : if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1535 0 : return ENAMETOOLONG;
1536 : }
1537 :
1538 89433 : fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1539 89433 : if (fd == -1) {
1540 4 : return errno;
1541 : }
1542 :
1543 89429 : ret = messaging_dgm_read_unique(fd, unique);
1544 89429 : close(fd);
1545 89429 : return ret;
1546 : }
1547 :
1548 15345 : int messaging_dgm_cleanup(pid_t pid)
1549 : {
1550 15345 : struct messaging_dgm_context *ctx = global_dgm_context;
1551 0 : struct sun_path_buf lockfile_name, socket_name;
1552 0 : int fd, len, ret;
1553 15345 : struct flock lck = {
1554 : .l_pid = 0,
1555 : };
1556 :
1557 15345 : if (ctx == NULL) {
1558 0 : return ENOTCONN;
1559 : }
1560 :
1561 15345 : len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1562 15345 : ctx->socket_dir.buf, (unsigned)pid);
1563 15345 : if (len < 0) {
1564 0 : return errno;
1565 : }
1566 15345 : if ((size_t)len >= sizeof(socket_name.buf)) {
1567 0 : return ENAMETOOLONG;
1568 : }
1569 :
1570 15345 : len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1571 15345 : ctx->lockfile_dir.buf, (unsigned)pid);
1572 15345 : if (len < 0) {
1573 0 : return errno;
1574 : }
1575 15345 : if ((size_t)len >= sizeof(lockfile_name.buf)) {
1576 0 : return ENAMETOOLONG;
1577 : }
1578 :
1579 15345 : fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1580 15345 : if (fd == -1) {
1581 15303 : ret = errno;
1582 15303 : if (ret != ENOENT) {
1583 0 : DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1584 : lockfile_name.buf, strerror(ret)));
1585 : }
1586 15303 : return ret;
1587 : }
1588 :
1589 42 : lck.l_type = F_WRLCK;
1590 42 : lck.l_whence = SEEK_SET;
1591 42 : lck.l_start = 0;
1592 42 : lck.l_len = 0;
1593 :
1594 42 : ret = fcntl(fd, F_SETLK, &lck);
1595 42 : if (ret != 0) {
1596 0 : ret = errno;
1597 0 : if ((ret != EACCES) && (ret != EAGAIN)) {
1598 0 : DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1599 : strerror(ret)));
1600 : }
1601 0 : close(fd);
1602 0 : return ret;
1603 : }
1604 :
1605 42 : DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1606 :
1607 42 : (void)unlink(socket_name.buf);
1608 42 : (void)unlink(lockfile_name.buf);
1609 42 : (void)close(fd);
1610 42 : return 0;
1611 : }
1612 :
1613 0 : static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1614 : {
1615 0 : pid_t *our_pid = (pid_t *)private_data;
1616 0 : int ret;
1617 :
1618 0 : if (pid == *our_pid) {
1619 : /*
1620 : * fcntl(F_GETLK) will succeed for ourselves, we hold
1621 : * that lock ourselves.
1622 : */
1623 0 : return 0;
1624 : }
1625 :
1626 0 : ret = messaging_dgm_cleanup(pid);
1627 0 : DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1628 : (unsigned long)pid, ret ? strerror(ret) : "ok"));
1629 :
1630 0 : return 0;
1631 : }
1632 :
1633 0 : int messaging_dgm_wipe(void)
1634 : {
1635 0 : pid_t pid = tevent_cached_getpid();
1636 0 : messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1637 0 : return 0;
1638 : }
1639 :
1640 225 : int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1641 : void *private_data)
1642 : {
1643 225 : struct messaging_dgm_context *ctx = global_dgm_context;
1644 1 : DIR *msgdir;
1645 1 : struct dirent *dp;
1646 225 : int error = 0;
1647 :
1648 225 : if (ctx == NULL) {
1649 0 : return ENOTCONN;
1650 : }
1651 :
1652 225 : messaging_dgm_validate(ctx);
1653 :
1654 : /*
1655 : * We scan the socket directory and not the lock directory. Otherwise
1656 : * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1657 : * and fcntl(SETLK).
1658 : */
1659 :
1660 225 : msgdir = opendir(ctx->socket_dir.buf);
1661 225 : if (msgdir == NULL) {
1662 0 : return errno;
1663 : }
1664 :
1665 14326 : while ((dp = readdir(msgdir)) != NULL) {
1666 8 : unsigned long pid;
1667 8 : int ret;
1668 :
1669 14101 : pid = smb_strtoul(dp->d_name, NULL, 10, &error, SMB_STR_STANDARD);
1670 14101 : if ((pid == 0) || (error != 0)) {
1671 : /*
1672 : * . and .. and other malformed entries
1673 : */
1674 450 : continue;
1675 : }
1676 :
1677 13651 : ret = fn(pid, private_data);
1678 13651 : if (ret != 0) {
1679 0 : break;
1680 : }
1681 : }
1682 225 : closedir(msgdir);
1683 :
1684 225 : return 0;
1685 : }
1686 :
1687 : struct messaging_dgm_fde {
1688 : struct tevent_fd *fde;
1689 : };
1690 :
1691 157768 : static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1692 : {
1693 157768 : if (fde_ev->ctx != NULL) {
1694 141953 : DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1695 141953 : fde_ev->ctx = NULL;
1696 : }
1697 157768 : return 0;
1698 : }
1699 :
1700 : /*
1701 : * Reference counter for a struct tevent_fd messaging read event
1702 : * (with callback function) on a struct tevent_context registered
1703 : * on a messaging context.
1704 : *
1705 : * If we've already registered this struct tevent_context before
1706 : * (so already have a read event), just increase the reference count.
1707 : *
1708 : * Otherwise create a new struct tevent_fd messaging read event on the
1709 : * previously unseen struct tevent_context - this is what drives
1710 : * the message receive processing.
1711 : *
1712 : */
1713 :
1714 636523 : struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1715 : TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1716 : {
1717 636523 : struct messaging_dgm_context *ctx = global_dgm_context;
1718 21900 : struct messaging_dgm_fde_ev *fde_ev;
1719 21900 : struct messaging_dgm_fde *fde;
1720 :
1721 636523 : if (ctx == NULL) {
1722 0 : return NULL;
1723 : }
1724 :
1725 636523 : fde = talloc(mem_ctx, struct messaging_dgm_fde);
1726 636523 : if (fde == NULL) {
1727 0 : return NULL;
1728 : }
1729 :
1730 831851 : for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1731 713950 : if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1732 : /*
1733 : * If the event context got deleted,
1734 : * tevent_fd_get_flags() will return 0
1735 : * for the stale fde.
1736 : *
1737 : * In that case we should not
1738 : * use fde_ev->ev anymore.
1739 : */
1740 60126 : continue;
1741 : }
1742 653824 : if (fde_ev->ev == ev) {
1743 501638 : break;
1744 : }
1745 : }
1746 :
1747 636523 : if (fde_ev == NULL) {
1748 117901 : fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1749 117901 : if (fde_ev == NULL) {
1750 0 : return NULL;
1751 : }
1752 117901 : fde_ev->fde = tevent_add_fd(
1753 : ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1754 : messaging_dgm_read_handler, ctx);
1755 117901 : if (fde_ev->fde == NULL) {
1756 0 : TALLOC_FREE(fde);
1757 0 : return NULL;
1758 : }
1759 117901 : fde_ev->ev = ev;
1760 117901 : fde_ev->ctx = ctx;
1761 117901 : DLIST_ADD(ctx->fde_evs, fde_ev);
1762 117901 : talloc_set_destructor(
1763 : fde_ev, messaging_dgm_fde_ev_destructor);
1764 : } else {
1765 : /*
1766 : * Same trick as with tdb_wrap: The caller will never
1767 : * see the talloc_referenced object, the
1768 : * messaging_dgm_fde_ev, so problems with
1769 : * talloc_unlink will not happen.
1770 : */
1771 518622 : if (talloc_reference(fde, fde_ev) == NULL) {
1772 0 : TALLOC_FREE(fde);
1773 0 : return NULL;
1774 : }
1775 : }
1776 :
1777 636523 : fde->fde = fde_ev->fde;
1778 636523 : return fde;
1779 : }
1780 :
1781 461792 : bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1782 : {
1783 89043 : uint16_t flags;
1784 :
1785 461792 : if (fde == NULL) {
1786 0 : return false;
1787 : }
1788 461792 : flags = tevent_fd_get_flags(fde->fde);
1789 461792 : return (flags != 0);
1790 : }
|