This source file includes following definitions.
- crm_ipc_init
- crm_ipc_default_buffer_size
- generateReference
- create_request_adv
- create_reply_adv
- crm_client_get
- crm_client_get_by_id
- crm_client_name
- crm_client_init
- crm_client_cleanup
- crm_client_disconnect_all
- crm_client_alloc
- crm_client_new
- crm_client_destroy
- crm_set_client_queue_max
- crm_ipcs_client_pid
- crm_ipcs_recv
- crm_ipcs_flush_events_cb
- delay_next_flush
- crm_ipcs_flush_events
- crm_ipc_prepare
- crm_ipcs_sendv
- crm_ipcs_send
- crm_ipcs_send_ack
- pick_ipc_buffer
- crm_ipc_new
- crm_ipc_connect
- crm_ipc_close
- crm_ipc_destroy
- crm_ipc_get_fd
- crm_ipc_connected
- crm_ipc_ready
- crm_ipc_decompress
- crm_ipc_read
- crm_ipc_buffer
- crm_ipc_buffer_flags
- crm_ipc_name
- internal_ipc_send_recv
- internal_ipc_send_request
- internal_ipc_get_reply
- crm_ipc_send
- create_hello_message
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 #include <crm_internal.h>
20
21 #include <sys/param.h>
22
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <grp.h>
28
29 #include <errno.h>
30 #include <fcntl.h>
31 #include <bzlib.h>
32
33 #include <crm/crm.h>
34 #include <crm/msg_xml.h>
35 #include <crm/common/ipc.h>
36 #include <crm/common/ipcs.h>
37
38 #define PCMK_IPC_VERSION 1
39
40
41 #define PCMK_IPC_DEFAULT_QUEUE_MAX 500
42
43 struct crm_ipc_response_header {
44 struct qb_ipc_response_header qb;
45 uint32_t size_uncompressed;
46 uint32_t size_compressed;
47 uint32_t flags;
48 uint8_t version;
49 };
50
51 static int hdr_offset = 0;
52 static unsigned int ipc_buffer_max = 0;
53 static unsigned int pick_ipc_buffer(unsigned int max);
54
55 static inline void
56 crm_ipc_init(void)
57 {
58 if (hdr_offset == 0) {
59 hdr_offset = sizeof(struct crm_ipc_response_header);
60 }
61 if (ipc_buffer_max == 0) {
62 ipc_buffer_max = pick_ipc_buffer(0);
63 }
64 }
65
66 unsigned int
67 crm_ipc_default_buffer_size(void)
68 {
69 return pick_ipc_buffer(0);
70 }
71
72 static char *
73 generateReference(const char *custom1, const char *custom2)
74 {
75 static uint ref_counter = 0;
76 const char *local_cust1 = custom1;
77 const char *local_cust2 = custom2;
78 int reference_len = 4;
79 char *since_epoch = NULL;
80
81 reference_len += 20;
82 reference_len += 40;
83
84 if (local_cust1 == NULL) {
85 local_cust1 = "_empty_";
86 }
87 reference_len += strlen(local_cust1);
88
89 if (local_cust2 == NULL) {
90 local_cust2 = "_empty_";
91 }
92 reference_len += strlen(local_cust2);
93
94 since_epoch = calloc(1, reference_len);
95
96 if (since_epoch != NULL) {
97 sprintf(since_epoch, "%s-%s-%lu-%u",
98 local_cust1, local_cust2, (unsigned long)time(NULL), ref_counter++);
99 }
100
101 return since_epoch;
102 }
103
104 xmlNode *
105 create_request_adv(const char *task, xmlNode * msg_data,
106 const char *host_to, const char *sys_to,
107 const char *sys_from, const char *uuid_from, const char *origin)
108 {
109 char *true_from = NULL;
110 xmlNode *request = NULL;
111 char *reference = generateReference(task, sys_from);
112
113 if (uuid_from != NULL) {
114 true_from = generate_hash_key(sys_from, uuid_from);
115 } else if (sys_from != NULL) {
116 true_from = strdup(sys_from);
117 } else {
118 crm_err("No sys from specified");
119 }
120
121
122 request = create_xml_node(NULL, __FUNCTION__);
123 crm_xml_add(request, F_CRM_ORIGIN, origin);
124 crm_xml_add(request, F_TYPE, T_CRM);
125 crm_xml_add(request, F_CRM_VERSION, CRM_FEATURE_SET);
126 crm_xml_add(request, F_CRM_MSG_TYPE, XML_ATTR_REQUEST);
127 crm_xml_add(request, F_CRM_REFERENCE, reference);
128 crm_xml_add(request, F_CRM_TASK, task);
129 crm_xml_add(request, F_CRM_SYS_TO, sys_to);
130 crm_xml_add(request, F_CRM_SYS_FROM, true_from);
131
132
133 if (host_to != NULL && strlen(host_to) > 0) {
134 crm_xml_add(request, F_CRM_HOST_TO, host_to);
135 }
136
137 if (msg_data != NULL) {
138 add_message_xml(request, F_CRM_DATA, msg_data);
139 }
140 free(reference);
141 free(true_from);
142
143 return request;
144 }
145
146
147
148
149 xmlNode *
150 create_reply_adv(xmlNode * original_request, xmlNode * xml_response_data, const char *origin)
151 {
152 xmlNode *reply = NULL;
153
154 const char *host_from = crm_element_value(original_request, F_CRM_HOST_FROM);
155 const char *sys_from = crm_element_value(original_request, F_CRM_SYS_FROM);
156 const char *sys_to = crm_element_value(original_request, F_CRM_SYS_TO);
157 const char *type = crm_element_value(original_request, F_CRM_MSG_TYPE);
158 const char *operation = crm_element_value(original_request, F_CRM_TASK);
159 const char *crm_msg_reference = crm_element_value(original_request, F_CRM_REFERENCE);
160
161 if (type == NULL) {
162 crm_err("Cannot create new_message, no message type in original message");
163 CRM_ASSERT(type != NULL);
164 return NULL;
165 #if 0
166 } else if (strcasecmp(XML_ATTR_REQUEST, type) != 0) {
167 crm_err("Cannot create new_message, original message was not a request");
168 return NULL;
169 #endif
170 }
171 reply = create_xml_node(NULL, __FUNCTION__);
172 if (reply == NULL) {
173 crm_err("Cannot create new_message, malloc failed");
174 return NULL;
175 }
176
177 crm_xml_add(reply, F_CRM_ORIGIN, origin);
178 crm_xml_add(reply, F_TYPE, T_CRM);
179 crm_xml_add(reply, F_CRM_VERSION, CRM_FEATURE_SET);
180 crm_xml_add(reply, F_CRM_MSG_TYPE, XML_ATTR_RESPONSE);
181 crm_xml_add(reply, F_CRM_REFERENCE, crm_msg_reference);
182 crm_xml_add(reply, F_CRM_TASK, operation);
183
184
185 crm_xml_add(reply, F_CRM_SYS_TO, sys_from);
186 crm_xml_add(reply, F_CRM_SYS_FROM, sys_to);
187
188
189 if (host_from != NULL && strlen(host_from) > 0) {
190 crm_xml_add(reply, F_CRM_HOST_TO, host_from);
191 }
192
193 if (xml_response_data != NULL) {
194 add_message_xml(reply, F_CRM_DATA, xml_response_data);
195 }
196
197 return reply;
198 }
199
200
201
202
203
204 GHashTable *client_connections = NULL;
205
206 crm_client_t *
207 crm_client_get(qb_ipcs_connection_t * c)
208 {
209 if (client_connections) {
210 return g_hash_table_lookup(client_connections, c);
211 }
212
213 crm_trace("No client found for %p", c);
214 return NULL;
215 }
216
217 crm_client_t *
218 crm_client_get_by_id(const char *id)
219 {
220 gpointer key;
221 crm_client_t *client;
222 GHashTableIter iter;
223
224 if (client_connections && id) {
225 g_hash_table_iter_init(&iter, client_connections);
226 while (g_hash_table_iter_next(&iter, &key, (gpointer *) & client)) {
227 if (strcmp(client->id, id) == 0) {
228 return client;
229 }
230 }
231 }
232
233 crm_trace("No client found with id=%s", id);
234 return NULL;
235 }
236
237 const char *
238 crm_client_name(crm_client_t * c)
239 {
240 if (c == NULL) {
241 return "null";
242 } else if (c->name == NULL && c->id == NULL) {
243 return "unknown";
244 } else if (c->name == NULL) {
245 return c->id;
246 } else {
247 return c->name;
248 }
249 }
250
251 void
252 crm_client_init(void)
253 {
254 if (client_connections == NULL) {
255 crm_trace("Creating client hash table");
256 client_connections = g_hash_table_new(g_direct_hash, g_direct_equal);
257 }
258 }
259
260 void
261 crm_client_cleanup(void)
262 {
263 if (client_connections != NULL) {
264 int active = g_hash_table_size(client_connections);
265
266 if (active) {
267 crm_err("Exiting with %d active connections", active);
268 }
269 g_hash_table_destroy(client_connections); client_connections = NULL;
270 }
271 }
272
273 void
274 crm_client_disconnect_all(qb_ipcs_service_t *service)
275 {
276 qb_ipcs_connection_t *c = NULL;
277
278 if (service == NULL) {
279 return;
280 }
281
282 c = qb_ipcs_connection_first_get(service);
283
284 while (c != NULL) {
285 qb_ipcs_connection_t *last = c;
286
287 c = qb_ipcs_connection_next_get(service, last);
288
289
290 crm_notice("Disconnecting client %p, pid=%d...", last, crm_ipcs_client_pid(last));
291 qb_ipcs_disconnect(last);
292 qb_ipcs_connection_unref(last);
293 }
294 }
295
296
297
298
299
300
301
302
303 crm_client_t *
304 crm_client_alloc(void *key)
305 {
306 crm_client_t *client = calloc(1, sizeof(crm_client_t));
307
308 CRM_ASSERT(client != NULL);
309 client->id = crm_generate_uuid();
310 g_hash_table_insert(client_connections, (key? key : client->id), client);
311 return client;
312 }
313
314 crm_client_t *
315 crm_client_new(qb_ipcs_connection_t * c, uid_t uid_client, gid_t gid_client)
316 {
317 static gid_t uid_cluster = 0;
318 static gid_t gid_cluster = 0;
319
320 crm_client_t *client = NULL;
321
322 CRM_LOG_ASSERT(c);
323 if (c == NULL) {
324 return NULL;
325 }
326
327 if (uid_cluster == 0) {
328 if (crm_user_lookup(CRM_DAEMON_USER, &uid_cluster, &gid_cluster) < 0) {
329 static bool have_error = FALSE;
330 if(have_error == FALSE) {
331 crm_warn("Could not find user and group IDs for user %s",
332 CRM_DAEMON_USER);
333 have_error = TRUE;
334 }
335 }
336 }
337
338 if (uid_client != 0) {
339 crm_trace("Giving access to group %u", gid_cluster);
340
341 qb_ipcs_connection_auth_set(c, -1, gid_cluster, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP);
342 }
343
344 crm_client_init();
345
346
347 client = crm_client_alloc(c);
348 client->ipcs = c;
349 client->kind = CRM_CLIENT_IPC;
350 client->pid = crm_ipcs_client_pid(c);
351
352 if ((uid_client == 0) || (uid_client == uid_cluster)) {
353
354 set_bit(client->flags, crm_client_flag_ipc_privileged);
355 }
356
357 crm_debug("Connecting %p for uid=%d gid=%d pid=%u id=%s", c, uid_client, gid_client, client->pid, client->id);
358
359 #if ENABLE_ACL
360 client->user = uid2username(uid_client);
361 #endif
362 return client;
363 }
364
365 void
366 crm_client_destroy(crm_client_t * c)
367 {
368 if (c == NULL) {
369 return;
370 }
371
372 if (client_connections) {
373 if (c->ipcs) {
374 crm_trace("Destroying %p/%p (%d remaining)",
375 c, c->ipcs, crm_hash_table_size(client_connections) - 1);
376 g_hash_table_remove(client_connections, c->ipcs);
377
378 } else {
379 crm_trace("Destroying remote connection %p (%d remaining)",
380 c, crm_hash_table_size(client_connections) - 1);
381 g_hash_table_remove(client_connections, c->id);
382 }
383 }
384
385 if (c->event_timer) {
386 g_source_remove(c->event_timer);
387 }
388
389 crm_debug("Destroying %d events", g_list_length(c->event_queue));
390 while (c->event_queue) {
391 struct iovec *event = c->event_queue->data;
392
393 c->event_queue = g_list_remove(c->event_queue, event);
394 free(event[0].iov_base);
395 free(event[1].iov_base);
396 free(event);
397 }
398
399 free(c->id);
400 free(c->name);
401 free(c->user);
402 if (c->remote) {
403 if (c->remote->auth_timeout) {
404 g_source_remove(c->remote->auth_timeout);
405 }
406 free(c->remote->buffer);
407 free(c->remote);
408 }
409 free(c);
410 }
411
412
413
414
415
416
417
418
419
420 bool
421 crm_set_client_queue_max(crm_client_t *client, const char *qmax)
422 {
423 if (is_set(client->flags, crm_client_flag_ipc_privileged)) {
424 int qmax_int = crm_int_helper(qmax, NULL);
425
426 if ((errno == 0) && (qmax_int > 0)) {
427 client->queue_max = qmax_int;
428 return TRUE;
429 }
430 }
431 return FALSE;
432 }
433
434 int
435 crm_ipcs_client_pid(qb_ipcs_connection_t * c)
436 {
437 struct qb_ipcs_connection_stats stats;
438
439 stats.client_pid = 0;
440 qb_ipcs_connection_stats_get(c, &stats, 0);
441 return stats.client_pid;
442 }
443
444 xmlNode *
445 crm_ipcs_recv(crm_client_t * c, void *data, size_t size, uint32_t * id, uint32_t * flags)
446 {
447 xmlNode *xml = NULL;
448 char *uncompressed = NULL;
449 char *text = ((char *)data) + sizeof(struct crm_ipc_response_header);
450 struct crm_ipc_response_header *header = data;
451
452 if (id) {
453 *id = ((struct qb_ipc_response_header *)data)->id;
454 }
455 if (flags) {
456 *flags = header->flags;
457 }
458
459 if (is_set(header->flags, crm_ipc_proxied)) {
460
461
462
463 c->flags |= crm_client_flag_ipc_proxied;
464 }
465
466 if(header->version > PCMK_IPC_VERSION) {
467 crm_err("Filtering incompatible v%d IPC message, we only support versions <= %d",
468 header->version, PCMK_IPC_VERSION);
469 return NULL;
470 }
471
472 if (header->size_compressed) {
473 int rc = 0;
474 unsigned int size_u = 1 + header->size_uncompressed;
475 uncompressed = calloc(1, size_u);
476
477 crm_trace("Decompressing message data %u bytes into %u bytes",
478 header->size_compressed, size_u);
479
480 rc = BZ2_bzBuffToBuffDecompress(uncompressed, &size_u, text, header->size_compressed, 1, 0);
481 text = uncompressed;
482
483 if (rc != BZ_OK) {
484 crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
485 free(uncompressed);
486 return NULL;
487 }
488 }
489
490 CRM_ASSERT(text[header->size_uncompressed - 1] == 0);
491
492 crm_trace("Received %.200s", text);
493 xml = string2xml(text);
494
495 free(uncompressed);
496 return xml;
497 }
498
499 ssize_t crm_ipcs_flush_events(crm_client_t * c);
500
501 static gboolean
502 crm_ipcs_flush_events_cb(gpointer data)
503 {
504 crm_client_t *c = data;
505
506 c->event_timer = 0;
507 crm_ipcs_flush_events(c);
508 return FALSE;
509 }
510
511
512
513
514
515
516
517
518 static inline void
519 delay_next_flush(crm_client_t *c, unsigned int queue_len)
520 {
521
522 guint delay = (queue_len < 40)? (1000 + 100 * queue_len) : 5000;
523
524 c->event_timer = g_timeout_add(delay, crm_ipcs_flush_events_cb, c);
525 }
526
527 ssize_t
528 crm_ipcs_flush_events(crm_client_t * c)
529 {
530 ssize_t rc = 0;
531 unsigned int sent = 0;
532 unsigned int queue_len = 0;
533
534 if (c == NULL) {
535 return pcmk_ok;
536
537 } else if (c->event_timer) {
538
539 crm_trace("Timer active for %p - %d", c->ipcs, c->event_timer);
540 return pcmk_ok;
541 }
542
543 queue_len = g_list_length(c->event_queue);
544 while (c->event_queue && sent < 100) {
545 struct crm_ipc_response_header *header = NULL;
546 struct iovec *event = c->event_queue->data;
547
548 rc = qb_ipcs_event_sendv(c->ipcs, event, 2);
549 if (rc < 0) {
550 break;
551 }
552
553 sent++;
554 header = event[0].iov_base;
555 if (header->size_compressed) {
556 crm_trace("Event %d to %p[%d] (%lld compressed bytes) sent",
557 header->qb.id, c->ipcs, c->pid, (long long) rc);
558 } else {
559 crm_trace("Event %d to %p[%d] (%lld bytes) sent: %.120s",
560 header->qb.id, c->ipcs, c->pid, (long long) rc,
561 (char *) (event[1].iov_base));
562 }
563
564 c->event_queue = g_list_remove(c->event_queue, event);
565 free(event[0].iov_base);
566 free(event[1].iov_base);
567 free(event);
568 }
569
570 queue_len -= sent;
571 if (sent > 0 || queue_len) {
572 crm_trace("Sent %d events (%d remaining) for %p[%d]: %s (%lld)",
573 sent, queue_len, c->ipcs, c->pid,
574 pcmk_strerror(rc < 0 ? rc : 0), (long long) rc);
575 }
576
577 if (queue_len) {
578
579
580
581
582
583 if (queue_len > QB_MAX(c->queue_max, PCMK_IPC_DEFAULT_QUEUE_MAX)) {
584 if ((c->queue_backlog <= 1) || (queue_len < c->queue_backlog)) {
585
586 crm_warn("Client with process ID %u has a backlog of %u messages "
587 CRM_XS " %p", c->pid, queue_len, c->ipcs);
588 } else {
589 crm_err("Evicting client with process ID %u due to backlog of %u messages "
590 CRM_XS " %p", c->pid, queue_len, c->ipcs);
591 c->queue_backlog = 0;
592 qb_ipcs_disconnect(c->ipcs);
593 return rc;
594 }
595 }
596
597 c->queue_backlog = queue_len;
598 delay_next_flush(c, queue_len);
599
600 } else {
601
602 c->queue_backlog = 0;
603 }
604
605 return rc;
606 }
607
608 ssize_t
609 crm_ipc_prepare(uint32_t request, xmlNode * message, struct iovec ** result, uint32_t max_send_size)
610 {
611 static unsigned int biggest = 0;
612 struct iovec *iov;
613 unsigned int total = 0;
614 char *compressed = NULL;
615 char *buffer = dump_xml_unformatted(message);
616 struct crm_ipc_response_header *header = calloc(1, sizeof(struct crm_ipc_response_header));
617
618 CRM_ASSERT(result != NULL);
619
620 crm_ipc_init();
621
622 if (max_send_size == 0) {
623 max_send_size = ipc_buffer_max;
624 }
625
626 CRM_LOG_ASSERT(max_send_size != 0);
627
628 *result = NULL;
629 iov = calloc(2, sizeof(struct iovec));
630
631
632 iov[0].iov_len = hdr_offset;
633 iov[0].iov_base = header;
634
635 header->version = PCMK_IPC_VERSION;
636 header->size_uncompressed = 1 + strlen(buffer);
637 total = iov[0].iov_len + header->size_uncompressed;
638
639 if (total < max_send_size) {
640 iov[1].iov_base = buffer;
641 iov[1].iov_len = header->size_uncompressed;
642
643 } else {
644 unsigned int new_size = 0;
645
646 if (crm_compress_string
647 (buffer, header->size_uncompressed, max_send_size, &compressed, &new_size)) {
648
649 header->flags |= crm_ipc_compressed;
650 header->size_compressed = new_size;
651
652 iov[1].iov_len = header->size_compressed;
653 iov[1].iov_base = compressed;
654
655 free(buffer);
656
657 biggest = QB_MAX(header->size_compressed, biggest);
658
659 } else {
660 ssize_t rc = -EMSGSIZE;
661
662 crm_log_xml_trace(message, "EMSGSIZE");
663 biggest = QB_MAX(header->size_uncompressed, biggest);
664
665 crm_err
666 ("Could not compress the message (%u bytes) into less than the configured ipc limit (%u bytes). "
667 "Set PCMK_ipc_buffer to a higher value (%u bytes suggested)",
668 header->size_uncompressed, max_send_size, 4 * biggest);
669
670 free(compressed);
671 free(buffer);
672 free(header);
673 free(iov);
674
675 return rc;
676 }
677 }
678
679 header->qb.size = iov[0].iov_len + iov[1].iov_len;
680 header->qb.id = (int32_t)request;
681
682 *result = iov;
683 CRM_ASSERT(header->qb.size > 0);
684 return header->qb.size;
685 }
686
687 ssize_t
688 crm_ipcs_sendv(crm_client_t * c, struct iovec * iov, enum crm_ipc_flags flags)
689 {
690 ssize_t rc;
691 static uint32_t id = 1;
692 struct crm_ipc_response_header *header = iov[0].iov_base;
693
694 if (c->flags & crm_client_flag_ipc_proxied) {
695
696 if (is_not_set(flags, crm_ipc_server_event)) {
697 flags |= crm_ipc_server_event;
698
699
700 flags |= crm_ipc_proxied_relay_response;
701 }
702 }
703
704 header->flags |= flags;
705 if (flags & crm_ipc_server_event) {
706 header->qb.id = id++;
707
708 if (flags & crm_ipc_server_free) {
709 crm_trace("Sending the original to %p[%d]", c->ipcs, c->pid);
710 c->event_queue = g_list_append(c->event_queue, iov);
711
712 } else {
713 struct iovec *iov_copy = calloc(2, sizeof(struct iovec));
714
715 crm_trace("Sending a copy to %p[%d]", c->ipcs, c->pid);
716 iov_copy[0].iov_len = iov[0].iov_len;
717 iov_copy[0].iov_base = malloc(iov[0].iov_len);
718 memcpy(iov_copy[0].iov_base, iov[0].iov_base, iov[0].iov_len);
719
720 iov_copy[1].iov_len = iov[1].iov_len;
721 iov_copy[1].iov_base = malloc(iov[1].iov_len);
722 memcpy(iov_copy[1].iov_base, iov[1].iov_base, iov[1].iov_len);
723
724 c->event_queue = g_list_append(c->event_queue, iov_copy);
725 }
726
727 } else {
728 CRM_LOG_ASSERT(header->qb.id != 0);
729
730 rc = qb_ipcs_response_sendv(c->ipcs, iov, 2);
731 if (rc < header->qb.size) {
732 crm_notice("Response %d to %p[%d] (%u bytes) failed: %s (%d)",
733 header->qb.id, c->ipcs, c->pid, header->qb.size, pcmk_strerror(rc), rc);
734
735 } else {
736 crm_trace("Response %d sent, %lld bytes to %p[%d]",
737 header->qb.id, (long long) rc, c->ipcs, c->pid);
738 }
739
740 if (flags & crm_ipc_server_free) {
741 free(iov[0].iov_base);
742 free(iov[1].iov_base);
743 free(iov);
744 }
745 }
746
747 if (flags & crm_ipc_server_event) {
748 rc = crm_ipcs_flush_events(c);
749 } else {
750 crm_ipcs_flush_events(c);
751 }
752
753 if (rc == -EPIPE || rc == -ENOTCONN) {
754 crm_trace("Client %p disconnected", c->ipcs);
755 }
756
757 return rc;
758 }
759
760 ssize_t
761 crm_ipcs_send(crm_client_t * c, uint32_t request, xmlNode * message,
762 enum crm_ipc_flags flags)
763 {
764 struct iovec *iov = NULL;
765 ssize_t rc = 0;
766
767 if(c == NULL) {
768 return -EDESTADDRREQ;
769 }
770 crm_ipc_init();
771
772 rc = crm_ipc_prepare(request, message, &iov, ipc_buffer_max);
773 if (rc > 0) {
774 rc = crm_ipcs_sendv(c, iov, flags | crm_ipc_server_free);
775
776 } else {
777 free(iov);
778 crm_notice("Message to %p[%d] failed: %s (%d)",
779 c->ipcs, c->pid, pcmk_strerror(rc), rc);
780 }
781
782 return rc;
783 }
784
785 void
786 crm_ipcs_send_ack(crm_client_t * c, uint32_t request, uint32_t flags, const char *tag, const char *function,
787 int line)
788 {
789 if (flags & crm_ipc_client_response) {
790 xmlNode *ack = create_xml_node(NULL, tag);
791
792 crm_trace("Ack'ing msg from %s (%p)", crm_client_name(c), c);
793 c->request_id = 0;
794 crm_xml_add(ack, "function", function);
795 crm_xml_add_int(ack, "line", line);
796 crm_ipcs_send(c, request, ack, flags);
797 free_xml(ack);
798 }
799 }
800
801
802
803 #define MIN_MSG_SIZE 12336
804 #define MAX_MSG_SIZE 128*1024
805
806 struct crm_ipc_s {
807 struct pollfd pfd;
808
809
810 unsigned int max_buf_size;
811
812 unsigned int buf_size;
813 int msg_size;
814 int need_reply;
815 char *buffer;
816 char *name;
817 uint32_t buffer_flags;
818
819 qb_ipcc_connection_t *ipc;
820
821 };
822
823 static unsigned int
824 pick_ipc_buffer(unsigned int max)
825 {
826 static unsigned int global_max = 0;
827
828 if (global_max == 0) {
829 const char *env = getenv("PCMK_ipc_buffer");
830
831 if (env) {
832 int env_max = crm_parse_int(env, "0");
833
834 global_max = (env_max > 0)? QB_MAX(MIN_MSG_SIZE, env_max) : MAX_MSG_SIZE;
835
836 } else {
837 global_max = MAX_MSG_SIZE;
838 }
839 }
840
841 return QB_MAX(max, global_max);
842 }
843
844 crm_ipc_t *
845 crm_ipc_new(const char *name, size_t max_size)
846 {
847 crm_ipc_t *client = NULL;
848
849 client = calloc(1, sizeof(crm_ipc_t));
850
851 client->name = strdup(name);
852 client->buf_size = pick_ipc_buffer(max_size);
853 client->buffer = malloc(client->buf_size);
854
855
856 client->max_buf_size = client->buf_size;
857
858 client->pfd.fd = -1;
859 client->pfd.events = POLLIN;
860 client->pfd.revents = 0;
861
862 return client;
863 }
864
865
866
867
868
869
870
871
872 bool
873 crm_ipc_connect(crm_ipc_t * client)
874 {
875 client->need_reply = FALSE;
876 client->ipc = qb_ipcc_connect(client->name, client->buf_size);
877
878 if (client->ipc == NULL) {
879 crm_debug("Could not establish %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno);
880 return FALSE;
881 }
882
883 client->pfd.fd = crm_ipc_get_fd(client);
884 if (client->pfd.fd < 0) {
885 crm_debug("Could not obtain file descriptor for %s connection: %s (%d)", client->name, pcmk_strerror(errno), errno);
886 return FALSE;
887 }
888
889 qb_ipcc_context_set(client->ipc, client);
890
891 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
892 client->max_buf_size = qb_ipcc_get_buffer_size(client->ipc);
893 if (client->max_buf_size > client->buf_size) {
894 free(client->buffer);
895 client->buffer = calloc(1, client->max_buf_size);
896 client->buf_size = client->max_buf_size;
897 }
898 #endif
899
900 return TRUE;
901 }
902
903 void
904 crm_ipc_close(crm_ipc_t * client)
905 {
906 if (client) {
907 crm_trace("Disconnecting %s IPC connection %p (%p)", client->name, client, client->ipc);
908
909 if (client->ipc) {
910 qb_ipcc_connection_t *ipc = client->ipc;
911
912 client->ipc = NULL;
913 qb_ipcc_disconnect(ipc);
914 }
915 }
916 }
917
918 void
919 crm_ipc_destroy(crm_ipc_t * client)
920 {
921 if (client) {
922 if (client->ipc && qb_ipcc_is_connected(client->ipc)) {
923 crm_notice("Destroying an active IPC connection to %s", client->name);
924
925
926
927
928
929
930
931
932
933 }
934 crm_trace("Destroying IPC connection to %s: %p", client->name, client);
935 free(client->buffer);
936 free(client->name);
937 free(client);
938 }
939 }
940
941 int
942 crm_ipc_get_fd(crm_ipc_t * client)
943 {
944 int fd = 0;
945
946 if (client && client->ipc && (qb_ipcc_fd_get(client->ipc, &fd) == 0)) {
947 return fd;
948 }
949 errno = EINVAL;
950 crm_perror(LOG_ERR, "Could not obtain file IPC descriptor for %s",
951 (client? client->name : "unspecified client"));
952 return -errno;
953 }
954
955 bool
956 crm_ipc_connected(crm_ipc_t * client)
957 {
958 bool rc = FALSE;
959
960 if (client == NULL) {
961 crm_trace("No client");
962 return FALSE;
963
964 } else if (client->ipc == NULL) {
965 crm_trace("No connection");
966 return FALSE;
967
968 } else if (client->pfd.fd < 0) {
969 crm_trace("Bad descriptor");
970 return FALSE;
971 }
972
973 rc = qb_ipcc_is_connected(client->ipc);
974 if (rc == FALSE) {
975 client->pfd.fd = -EINVAL;
976 }
977 return rc;
978 }
979
980
981
982
983
984
985
986
987 int
988 crm_ipc_ready(crm_ipc_t *client)
989 {
990 int rc;
991
992 CRM_ASSERT(client != NULL);
993
994 if (crm_ipc_connected(client) == FALSE) {
995 return -ENOTCONN;
996 }
997
998 client->pfd.revents = 0;
999 rc = poll(&(client->pfd), 1, 0);
1000 return (rc < 0)? -errno : rc;
1001 }
1002
1003 static int
1004 crm_ipc_decompress(crm_ipc_t * client)
1005 {
1006 struct crm_ipc_response_header *header = (struct crm_ipc_response_header *)(void*)client->buffer;
1007
1008 if (header->size_compressed) {
1009 int rc = 0;
1010 unsigned int size_u = 1 + header->size_uncompressed;
1011
1012 unsigned int new_buf_size = QB_MAX((hdr_offset + size_u), client->max_buf_size);
1013 char *uncompressed = calloc(1, new_buf_size);
1014
1015 crm_trace("Decompressing message data %u bytes into %u bytes",
1016 header->size_compressed, size_u);
1017
1018 rc = BZ2_bzBuffToBuffDecompress(uncompressed + hdr_offset, &size_u,
1019 client->buffer + hdr_offset, header->size_compressed, 1, 0);
1020
1021 if (rc != BZ_OK) {
1022 crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
1023 free(uncompressed);
1024 return -EILSEQ;
1025 }
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035 CRM_ASSERT(size_u == header->size_uncompressed);
1036
1037 memcpy(uncompressed, client->buffer, hdr_offset);
1038 header = (struct crm_ipc_response_header *)(void*)uncompressed;
1039
1040 free(client->buffer);
1041 client->buf_size = new_buf_size;
1042 client->buffer = uncompressed;
1043 }
1044
1045 CRM_ASSERT(client->buffer[hdr_offset + header->size_uncompressed - 1] == 0);
1046 return pcmk_ok;
1047 }
1048
1049 long
1050 crm_ipc_read(crm_ipc_t * client)
1051 {
1052 struct crm_ipc_response_header *header = NULL;
1053
1054 CRM_ASSERT(client != NULL);
1055 CRM_ASSERT(client->ipc != NULL);
1056 CRM_ASSERT(client->buffer != NULL);
1057
1058 crm_ipc_init();
1059
1060 client->buffer[0] = 0;
1061 client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, client->buf_size - 1, 0);
1062 if (client->msg_size >= 0) {
1063 int rc = crm_ipc_decompress(client);
1064
1065 if (rc != pcmk_ok) {
1066 return rc;
1067 }
1068
1069 header = (struct crm_ipc_response_header *)(void*)client->buffer;
1070 if(header->version > PCMK_IPC_VERSION) {
1071 crm_err("Filtering incompatible v%d IPC message, we only support versions <= %d",
1072 header->version, PCMK_IPC_VERSION);
1073 return -EBADMSG;
1074 }
1075
1076 crm_trace("Received %s event %d, size=%u, rc=%d, text: %.100s",
1077 client->name, header->qb.id, header->qb.size, client->msg_size,
1078 client->buffer + hdr_offset);
1079
1080 } else {
1081 crm_trace("No message from %s received: %s", client->name, pcmk_strerror(client->msg_size));
1082 }
1083
1084 if (crm_ipc_connected(client) == FALSE || client->msg_size == -ENOTCONN) {
1085 crm_err("Connection to %s failed", client->name);
1086 }
1087
1088 if (header) {
1089
1090 return header->size_uncompressed;
1091 }
1092 return -ENOMSG;
1093 }
1094
1095 const char *
1096 crm_ipc_buffer(crm_ipc_t * client)
1097 {
1098 CRM_ASSERT(client != NULL);
1099 return client->buffer + sizeof(struct crm_ipc_response_header);
1100 }
1101
1102 uint32_t
1103 crm_ipc_buffer_flags(crm_ipc_t * client)
1104 {
1105 struct crm_ipc_response_header *header = NULL;
1106
1107 CRM_ASSERT(client != NULL);
1108 if (client->buffer == NULL) {
1109 return 0;
1110 }
1111
1112 header = (struct crm_ipc_response_header *)(void*)client->buffer;
1113 return header->flags;
1114 }
1115
1116 const char *
1117 crm_ipc_name(crm_ipc_t * client)
1118 {
1119 CRM_ASSERT(client != NULL);
1120 return client->name;
1121 }
1122
1123 static int
1124 internal_ipc_send_recv(crm_ipc_t * client, const void *iov)
1125 {
1126 int rc = 0;
1127
1128 do {
1129 rc = qb_ipcc_sendv_recv(client->ipc, iov, 2, client->buffer, client->buf_size, -1);
1130 } while (rc == -EAGAIN && crm_ipc_connected(client));
1131
1132 return rc;
1133 }
1134
1135 static int
1136 internal_ipc_send_request(crm_ipc_t * client, const void *iov, int ms_timeout)
1137 {
1138 int rc = 0;
1139 time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
1140
1141 do {
1142 rc = qb_ipcc_sendv(client->ipc, iov, 2);
1143 } while (rc == -EAGAIN && time(NULL) < timeout && crm_ipc_connected(client));
1144
1145 return rc;
1146 }
1147
1148 static int
1149 internal_ipc_get_reply(crm_ipc_t * client, int request_id, int ms_timeout)
1150 {
1151 time_t timeout = time(NULL) + 1 + (ms_timeout / 1000);
1152 int rc = 0;
1153
1154 crm_ipc_init();
1155
1156
1157 crm_trace("client %s waiting on reply to msg id %d", client->name, request_id);
1158 do {
1159
1160 rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, 1000);
1161 if (rc > 0) {
1162 struct crm_ipc_response_header *hdr = NULL;
1163
1164 int rc = crm_ipc_decompress(client);
1165
1166 if (rc != pcmk_ok) {
1167 return rc;
1168 }
1169
1170 hdr = (struct crm_ipc_response_header *)(void*)client->buffer;
1171 if (hdr->qb.id == request_id) {
1172
1173 break;
1174 } else if (hdr->qb.id < request_id) {
1175 xmlNode *bad = string2xml(crm_ipc_buffer(client));
1176
1177 crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id);
1178 crm_log_xml_notice(bad, "OldIpcReply");
1179
1180 } else {
1181 xmlNode *bad = string2xml(crm_ipc_buffer(client));
1182
1183 crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id);
1184 crm_log_xml_notice(bad, "ImpossibleReply");
1185 CRM_ASSERT(hdr->qb.id <= request_id);
1186 }
1187 } else if (crm_ipc_connected(client) == FALSE) {
1188 crm_err("Server disconnected client %s while waiting for msg id %d", client->name,
1189 request_id);
1190 break;
1191 }
1192
1193 } while (time(NULL) < timeout);
1194
1195 return rc;
1196 }
1197
1198 int
1199 crm_ipc_send(crm_ipc_t * client, xmlNode * message, enum crm_ipc_flags flags, int32_t ms_timeout,
1200 xmlNode ** reply)
1201 {
1202 long rc = 0;
1203 struct iovec *iov;
1204 static uint32_t id = 0;
1205 static int factor = 8;
1206 struct crm_ipc_response_header *header;
1207
1208 crm_ipc_init();
1209
1210 if (client == NULL) {
1211 crm_notice("Invalid connection");
1212 return -ENOTCONN;
1213
1214 } else if (crm_ipc_connected(client) == FALSE) {
1215
1216 crm_notice("Connection to %s closed", client->name);
1217 return -ENOTCONN;
1218 }
1219
1220 if (ms_timeout == 0) {
1221 ms_timeout = 5000;
1222 }
1223
1224 if (client->need_reply) {
1225 crm_trace("Trying again to obtain pending reply from %s", client->name);
1226 rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout);
1227 if (rc < 0) {
1228 crm_warn("Sending to %s (%p) is disabled until pending reply is received", client->name,
1229 client->ipc);
1230 return -EALREADY;
1231
1232 } else {
1233 crm_notice("Lost reply from %s (%p) finally arrived, sending re-enabled", client->name,
1234 client->ipc);
1235 client->need_reply = FALSE;
1236 }
1237 }
1238
1239 id++;
1240 CRM_LOG_ASSERT(id != 0);
1241 rc = crm_ipc_prepare(id, message, &iov, client->max_buf_size);
1242 if(rc < 0) {
1243 return rc;
1244 }
1245
1246 header = iov[0].iov_base;
1247 header->flags |= flags;
1248
1249 if(is_set(flags, crm_ipc_proxied)) {
1250
1251 clear_bit(flags, crm_ipc_client_response);
1252 }
1253
1254 if(header->size_compressed) {
1255 if(factor < 10 && (client->max_buf_size / 10) < (rc / factor)) {
1256 crm_notice("Compressed message exceeds %d0%% of the configured ipc limit (%u bytes), "
1257 "consider setting PCMK_ipc_buffer to %u or higher",
1258 factor, client->max_buf_size, 2 * client->max_buf_size);
1259 factor++;
1260 }
1261 }
1262
1263 crm_trace("Sending from client: %s request id: %d bytes: %u timeout:%d msg...",
1264 client->name, header->qb.id, header->qb.size, ms_timeout);
1265
1266 if (ms_timeout > 0 || is_not_set(flags, crm_ipc_client_response)) {
1267
1268 rc = internal_ipc_send_request(client, iov, ms_timeout);
1269
1270 if (rc <= 0) {
1271 crm_trace("Failed to send from client %s request %d with %u bytes...",
1272 client->name, header->qb.id, header->qb.size);
1273 goto send_cleanup;
1274
1275 } else if (is_not_set(flags, crm_ipc_client_response)) {
1276 crm_trace("Message sent, not waiting for reply to %d from %s to %u bytes...",
1277 header->qb.id, client->name, header->qb.size);
1278
1279 goto send_cleanup;
1280 }
1281
1282 rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout);
1283 if (rc < 0) {
1284
1285
1286
1287
1288
1289
1290
1291 client->need_reply = TRUE;
1292 }
1293
1294 } else {
1295 rc = internal_ipc_send_recv(client, iov);
1296 }
1297
1298 if (rc > 0) {
1299 struct crm_ipc_response_header *hdr = (struct crm_ipc_response_header *)(void*)client->buffer;
1300
1301 crm_trace("Received response %d, size=%u, rc=%ld, text: %.200s", hdr->qb.id, hdr->qb.size,
1302 rc, crm_ipc_buffer(client));
1303
1304 if (reply) {
1305 *reply = string2xml(crm_ipc_buffer(client));
1306 }
1307
1308 } else {
1309 crm_trace("Response not received: rc=%ld, errno=%d", rc, errno);
1310 }
1311
1312 send_cleanup:
1313 if (crm_ipc_connected(client) == FALSE) {
1314 crm_notice("Connection to %s closed: %s (%ld)", client->name, pcmk_strerror(rc), rc);
1315
1316 } else if (rc == -ETIMEDOUT) {
1317 crm_warn("Request %d to %s (%p) failed: %s (%ld) after %dms",
1318 header->qb.id, client->name, client->ipc, pcmk_strerror(rc), rc, ms_timeout);
1319 crm_write_blackbox(0, NULL);
1320
1321 } else if (rc <= 0) {
1322 crm_warn("Request %d to %s (%p) failed: %s (%ld)",
1323 header->qb.id, client->name, client->ipc, pcmk_strerror(rc), rc);
1324 }
1325
1326 free(header);
1327 free(iov[1].iov_base);
1328 free(iov);
1329 return rc;
1330 }
1331
1332
1333
1334 xmlNode *
1335 create_hello_message(const char *uuid,
1336 const char *client_name, const char *major_version, const char *minor_version)
1337 {
1338 xmlNode *hello_node = NULL;
1339 xmlNode *hello = NULL;
1340
1341 if (uuid == NULL || strlen(uuid) == 0
1342 || client_name == NULL || strlen(client_name) == 0
1343 || major_version == NULL || strlen(major_version) == 0
1344 || minor_version == NULL || strlen(minor_version) == 0) {
1345 crm_err("Missing fields, Hello message will not be valid.");
1346 return NULL;
1347 }
1348
1349 hello_node = create_xml_node(NULL, XML_TAG_OPTIONS);
1350 crm_xml_add(hello_node, "major_version", major_version);
1351 crm_xml_add(hello_node, "minor_version", minor_version);
1352 crm_xml_add(hello_node, "client_name", client_name);
1353 crm_xml_add(hello_node, "client_uuid", uuid);
1354
1355 crm_trace("creating hello message");
1356 hello = create_request(CRM_OP_HELLO, hello_node, NULL, NULL, client_name, uuid);
1357 free_xml(hello_node);
1358
1359 return hello;
1360 }