TLA Line data Source code
1 : /*
2 : * src/test/isolation/isolationtester.c
3 : *
4 : * isolationtester.c
5 : * Runs an isolation test specified by a spec file.
6 : */
7 :
8 : #include "postgres_fe.h"
9 :
10 : #include <sys/select.h>
11 : #include <sys/time.h>
12 :
13 : #include "datatype/timestamp.h"
14 : #include "isolationtester.h"
15 : #include "libpq-fe.h"
16 : #include "pg_getopt.h"
17 : #include "pqexpbuffer.h"
18 :
19 : #define PREP_WAITING "isolationtester_waiting"
20 :
21 : /*
22 : * conns[0] is the global setup, teardown, and watchdog connection. Additional
23 : * connections represent spec-defined sessions.
24 : */
25 : typedef struct IsoConnInfo
26 : {
27 : /* The libpq connection object for this connection. */
28 : PGconn *conn;
29 : /* The backend PID, in numeric and string formats. */
30 : int backend_pid;
31 : const char *backend_pid_str;
32 : /* Name of the associated session. */
33 : const char *sessionname;
34 : /* Active step on this connection, or NULL if idle. */
35 : PermutationStep *active_step;
36 : /* Number of NOTICE messages received from connection. */
37 : int total_notices;
38 : } IsoConnInfo;
39 :
40 : static IsoConnInfo *conns = NULL;
41 : static int nconns = 0;
42 :
43 : /* Flag indicating some new NOTICE has arrived */
44 : static bool any_new_notice = false;
45 :
46 : /* Maximum time to wait before giving up on a step (in usec) */
47 : static int64 max_step_wait = 360 * USECS_PER_SEC;
48 :
49 :
50 : static void check_testspec(TestSpec *testspec);
51 : static void run_testspec(TestSpec *testspec);
52 : static void run_all_permutations(TestSpec *testspec);
53 : static void run_all_permutations_recurse(TestSpec *testspec, int *piles,
54 : int nsteps, PermutationStep **steps);
55 : static void run_named_permutations(TestSpec *testspec);
56 : static void run_permutation(TestSpec *testspec, int nsteps,
57 : PermutationStep **steps);
58 :
59 : /* Flag bits for try_complete_step(s) */
60 : #define STEP_NONBLOCK 0x1 /* return as soon as cmd waits for a lock */
61 : #define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
62 :
63 : static int try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
64 : int nwaiting, int flags);
65 : static bool try_complete_step(TestSpec *testspec, PermutationStep *pstep,
66 : int flags);
67 :
68 : static int step_qsort_cmp(const void *a, const void *b);
69 : static int step_bsearch_cmp(const void *a, const void *b);
70 :
71 : static bool step_has_blocker(PermutationStep *pstep);
72 : static void printResultSet(PGresult *res);
73 : static void isotesterNoticeProcessor(void *arg, const char *message);
74 : static void blackholeNoticeProcessor(void *arg, const char *message);
75 ECB :
76 : static void
77 GIC 131 : disconnect_atexit(void)
78 : {
79 ECB : int i;
80 :
81 CBC 582 : for (i = 0; i < nconns; i++)
82 451 : if (conns[i].conn)
83 GIC 451 : PQfinish(conns[i].conn);
84 131 : }
85 ECB :
86 : int
87 GIC 138 : main(int argc, char **argv)
88 : {
89 : const char *conninfo;
90 : const char *env_wait;
91 : TestSpec *testspec;
92 : PGresult *res;
93 : PQExpBufferData wait_query;
94 : int opt;
95 ECB : int i;
96 :
97 CBC 138 : while ((opt = getopt(argc, argv, "V")) != -1)
98 : {
99 7 : switch (opt)
100 ECB : {
101 CBC 7 : case 'V':
102 GBC 7 : puts("isolationtester (PostgreSQL) " PG_VERSION);
103 7 : exit(0);
104 UBC 0 : default:
105 UIC 0 : fprintf(stderr, "Usage: isolationtester [CONNINFO]\n");
106 0 : return EXIT_FAILURE;
107 : }
108 : }
109 :
110 : /*
111 : * Make stdout unbuffered to match stderr; and ensure stderr is unbuffered
112 ECB : * too, which it should already be everywhere except sometimes in Windows.
113 : */
114 GIC 131 : setbuf(stdout, NULL);
115 131 : setbuf(stderr, NULL);
116 :
117 : /*
118 : * If the user supplies a non-option parameter on the command line, use it
119 : * as the conninfo string; otherwise default to setting dbname=postgres
120 : * and using environment variables or defaults for all other connection
121 ECB : * parameters.
122 : */
123 GIC 131 : if (argc > optind)
124 GBC 131 : conninfo = argv[optind];
125 : else
126 UIC 0 : conninfo = "dbname = postgres";
127 :
128 : /*
129 : * If PG_TEST_TIMEOUT_DEFAULT is set, adopt its value (given in seconds)
130 : * as half the max time to wait for any one step to complete.
131 ECB : */
132 GNC 131 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
133 GIC 131 : if (env_wait != NULL)
134 UNC 0 : max_step_wait = 2 * ((int64) atoi(env_wait)) * USECS_PER_SEC;
135 ECB :
136 : /* Read the test spec from stdin */
137 GIC 131 : spec_yyparse();
138 131 : testspec = &parseresult;
139 ECB :
140 : /* Perform post-parse checking, and fill in linking fields */
141 CBC 131 : check_testspec(testspec);
142 :
143 GIC 131 : printf("Parsed test spec with %d sessions\n", testspec->nsessions);
144 :
145 : /*
146 : * Establish connections to the database, one for each session and an
147 ECB : * extra for lock wait detection and global work.
148 : */
149 CBC 131 : nconns = 1 + testspec->nsessions;
150 GIC 131 : conns = (IsoConnInfo *) pg_malloc0(nconns * sizeof(IsoConnInfo));
151 CBC 131 : atexit(disconnect_atexit);
152 :
153 GIC 582 : for (i = 0; i < nconns; i++)
154 : {
155 ECB : const char *sessionname;
156 :
157 GIC 451 : if (i == 0)
158 CBC 131 : sessionname = "control connection";
159 : else
160 320 : sessionname = testspec->sessions[i - 1]->name;
161 :
162 451 : conns[i].sessionname = sessionname;
163 ECB :
164 GIC 451 : conns[i].conn = PQconnectdb(conninfo);
165 GBC 451 : if (PQstatus(conns[i].conn) != CONNECTION_OK)
166 EUB : {
167 UBC 0 : fprintf(stderr, "Connection %d failed: %s",
168 UIC 0 : i, PQerrorMessage(conns[i].conn));
169 0 : exit(1);
170 : }
171 :
172 : /*
173 : * Set up notice processors for the user-defined connections, so that
174 : * messages can get printed prefixed with the session names. The
175 : * control connection gets a "blackhole" processor instead (hides all
176 ECB : * messages).
177 : */
178 GIC 451 : if (i != 0)
179 CBC 320 : PQsetNoticeProcessor(conns[i].conn,
180 : isotesterNoticeProcessor,
181 320 : (void *) &conns[i]);
182 : else
183 GIC 131 : PQsetNoticeProcessor(conns[i].conn,
184 : blackholeNoticeProcessor,
185 : NULL);
186 :
187 : /*
188 : * Similarly, append the session name to application_name to make it
189 : * easier to map spec file sessions to log output and
190 : * pg_stat_activity. The reason to append instead of just setting the
191 ECB : * name is that we don't know the name of the test currently running.
192 : */
193 GIC 451 : res = PQexecParams(conns[i].conn,
194 : "SELECT set_config('application_name',\n"
195 : " current_setting('application_name') || '/' || $1,\n"
196 : " false)",
197 : 1, NULL,
198 ECB : &sessionname,
199 : NULL, NULL, 0);
200 GBC 451 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
201 EUB : {
202 UBC 0 : fprintf(stderr, "setting of application name failed: %s",
203 UIC 0 : PQerrorMessage(conns[i].conn));
204 0 : exit(1);
205 : }
206 ECB :
207 : /* Save each connection's backend PID for subsequent use. */
208 GIC 451 : conns[i].backend_pid = PQbackendPID(conns[i].conn);
209 451 : conns[i].backend_pid_str = psprintf("%d", conns[i].backend_pid);
210 : }
211 :
212 : /*
213 : * Build the query we'll use to detect lock contention among sessions in
214 : * the test specification. Most of the time, we could get away with
215 : * simply checking whether a session is waiting for *any* lock: we don't
216 : * exactly expect concurrent use of test tables. However, autovacuum will
217 : * occasionally take AccessExclusiveLock to truncate a table, and we must
218 ECB : * ignore that transient wait.
219 : */
220 GIC 131 : initPQExpBuffer(&wait_query);
221 131 : appendPQExpBufferStr(&wait_query,
222 ECB : "SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
223 : /* The spec syntax requires at least one session; assume that here. */
224 CBC 131 : appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
225 320 : for (i = 2; i < nconns; i++)
226 GIC 189 : appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
227 CBC 131 : appendPQExpBufferStr(&wait_query, "}')");
228 ECB :
229 GIC 131 : res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
230 GBC 131 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
231 EUB : {
232 UBC 0 : fprintf(stderr, "prepare of lock wait query failed: %s",
233 UIC 0 : PQerrorMessage(conns[0].conn));
234 LBC 0 : exit(1);
235 ECB : }
236 GIC 131 : PQclear(res);
237 131 : termPQExpBuffer(&wait_query);
238 :
239 : /*
240 : * Run the permutations specified in the spec, or all if none were
241 ECB : * explicitly specified.
242 : */
243 CBC 131 : run_testspec(testspec);
244 :
245 GIC 131 : return 0;
246 : }
247 :
248 : /*
249 : * Validity-check the test spec and fill in cross-links between nodes.
250 ECB : */
251 : static void
252 GIC 131 : check_testspec(TestSpec *testspec)
253 : {
254 : int nallsteps;
255 : Step **allsteps;
256 : int i,
257 : j,
258 : k;
259 ECB :
260 : /* Create a sorted lookup table of all steps. */
261 CBC 131 : nallsteps = 0;
262 GIC 451 : for (i = 0; i < testspec->nsessions; i++)
263 CBC 320 : nallsteps += testspec->sessions[i]->nsteps;
264 :
265 131 : allsteps = pg_malloc(nallsteps * sizeof(Step *));
266 ECB :
267 GIC 131 : k = 0;
268 CBC 451 : for (i = 0; i < testspec->nsessions; i++)
269 ECB : {
270 GIC 1640 : for (j = 0; j < testspec->sessions[i]->nsteps; j++)
271 1320 : allsteps[k++] = testspec->sessions[i]->steps[j];
272 ECB : }
273 :
274 GIC 131 : qsort(allsteps, nallsteps, sizeof(Step *), step_qsort_cmp);
275 ECB :
276 : /* Verify that all step names are unique. */
277 CBC 1320 : for (i = 1; i < nallsteps; i++)
278 ECB : {
279 GIC 1189 : if (strcmp(allsteps[i - 1]->name,
280 GBC 1189 : allsteps[i]->name) == 0)
281 EUB : {
282 UBC 0 : fprintf(stderr, "duplicate step name: %s\n",
283 UIC 0 : allsteps[i]->name);
284 0 : exit(1);
285 : }
286 : }
287 ECB :
288 : /* Set the session index fields in steps. */
289 CBC 451 : for (i = 0; i < testspec->nsessions; i++)
290 : {
291 320 : Session *session = testspec->sessions[i];
292 ECB :
293 GIC 1640 : for (j = 0; j < session->nsteps; j++)
294 1320 : session->steps[j]->session = i;
295 : }
296 :
297 : /*
298 : * If we have manually-specified permutations, link PermutationSteps to
299 ECB : * Steps, and fill in blocker links.
300 : */
301 CBC 1290 : for (i = 0; i < testspec->npermutations; i++)
302 : {
303 1159 : Permutation *p = testspec->permutations[i];
304 :
305 9793 : for (j = 0; j < p->nsteps; j++)
306 ECB : {
307 GIC 8634 : PermutationStep *pstep = p->steps[j];
308 8634 : Step **this = (Step **) bsearch(pstep->name,
309 : allsteps,
310 : nallsteps,
311 : sizeof(Step *),
312 ECB : step_bsearch_cmp);
313 :
314 GBC 8634 : if (this == NULL)
315 : {
316 UBC 0 : fprintf(stderr, "undefined step \"%s\" specified in permutation\n",
317 : pstep->name);
318 LBC 0 : exit(1);
319 : }
320 GIC 8634 : pstep->step = *this;
321 ECB :
322 : /* Mark the step used, for check below */
323 GIC 8634 : pstep->step->used = true;
324 : }
325 :
326 : /*
327 : * Identify any blocker steps. We search only the current
328 : * permutation, since steps not used there couldn't be concurrent.
329 : * Note that it's OK to reference later permutation steps, so this
330 ECB : * can't be combined with the previous loop.
331 : */
332 CBC 9793 : for (j = 0; j < p->nsteps; j++)
333 : {
334 8634 : PermutationStep *pstep = p->steps[j];
335 :
336 8682 : for (k = 0; k < pstep->nblockers; k++)
337 : {
338 GIC 48 : PermutationStepBlocker *blocker = pstep->blockers[k];
339 ECB : int n;
340 :
341 GIC 48 : if (blocker->blocktype == PSB_ONCE)
342 CBC 11 : continue; /* nothing to link to */
343 ECB :
344 GIC 37 : blocker->step = NULL;
345 CBC 162 : for (n = 0; n < p->nsteps; n++)
346 : {
347 162 : PermutationStep *otherp = p->steps[n];
348 :
349 162 : if (strcmp(otherp->name, blocker->stepname) == 0)
350 ECB : {
351 GIC 37 : blocker->step = otherp->step;
352 37 : break;
353 ECB : }
354 : }
355 GBC 37 : if (blocker->step == NULL)
356 : {
357 UBC 0 : fprintf(stderr, "undefined blocking step \"%s\" referenced in permutation step \"%s\"\n",
358 : blocker->stepname, pstep->name);
359 UIC 0 : exit(1);
360 ECB : }
361 : /* can't block on completion of step of own session */
362 GBC 37 : if (blocker->step->session == pstep->step->session)
363 : {
364 UBC 0 : fprintf(stderr, "permutation step \"%s\" cannot block on its own session\n",
365 : pstep->name);
366 UIC 0 : exit(1);
367 : }
368 : }
369 : }
370 : }
371 :
372 : /*
373 : * If we have manually-specified permutations, verify that all steps have
374 : * been used, warning about anything defined but not used. We can skip
375 ECB : * this when using automatically-generated permutations.
376 : */
377 CBC 131 : if (testspec->permutations)
378 : {
379 1361 : for (i = 0; i < nallsteps; i++)
380 ECB : {
381 GIC 1244 : if (!allsteps[i]->used)
382 6 : fprintf(stderr, "unused step name: %s\n", allsteps[i]->name);
383 : }
384 ECB : }
385 :
386 GIC 131 : free(allsteps);
387 131 : }
388 :
389 : /*
390 : * Run the permutations specified in the spec, or all if none were
391 : * explicitly specified.
392 ECB : */
393 : static void
394 CBC 131 : run_testspec(TestSpec *testspec)
395 ECB : {
396 GIC 131 : if (testspec->permutations)
397 CBC 117 : run_named_permutations(testspec);
398 ECB : else
399 GIC 14 : run_all_permutations(testspec);
400 131 : }
401 :
402 : /*
403 : * Run all permutations of the steps and sessions.
404 ECB : */
405 : static void
406 GIC 14 : run_all_permutations(TestSpec *testspec)
407 : {
408 : int nsteps;
409 : int i;
410 : PermutationStep *steps;
411 : PermutationStep **stepptrs;
412 : int *piles;
413 ECB :
414 : /* Count the total number of steps in all sessions */
415 CBC 14 : nsteps = 0;
416 GIC 44 : for (i = 0; i < testspec->nsessions; i++)
417 30 : nsteps += testspec->sessions[i]->nsteps;
418 ECB :
419 : /* Create PermutationStep workspace array */
420 CBC 14 : steps = (PermutationStep *) pg_malloc0(sizeof(PermutationStep) * nsteps);
421 14 : stepptrs = (PermutationStep **) pg_malloc(sizeof(PermutationStep *) * nsteps);
422 GIC 90 : for (i = 0; i < nsteps; i++)
423 76 : stepptrs[i] = steps + i;
424 :
425 : /*
426 : * To generate the permutations, we conceptually put the steps of each
427 : * session on a pile. To generate a permutation, we pick steps from the
428 : * piles until all piles are empty. By picking steps from piles in
429 : * different order, we get different permutations.
430 : *
431 : * A pile is actually just an integer which tells how many steps we've
432 ECB : * already picked from this pile.
433 : */
434 CBC 14 : piles = pg_malloc(sizeof(int) * testspec->nsessions);
435 GIC 44 : for (i = 0; i < testspec->nsessions; i++)
436 CBC 30 : piles[i] = 0;
437 :
438 14 : run_all_permutations_recurse(testspec, piles, 0, stepptrs);
439 ECB :
440 CBC 14 : free(steps);
441 14 : free(stepptrs);
442 GIC 14 : free(piles);
443 14 : }
444 ECB :
445 : static void
446 GIC 1560 : run_all_permutations_recurse(TestSpec *testspec, int *piles,
447 : int nsteps, PermutationStep **steps)
448 ECB : {
449 : int i;
450 CBC 1560 : bool found = false;
451 :
452 GIC 5601 : for (i = 0; i < testspec->nsessions; i++)
453 ECB : {
454 : /* If there's any more steps in this pile, pick it and recurse */
455 CBC 4041 : if (piles[i] < testspec->sessions[i]->nsteps)
456 : {
457 GIC 1546 : Step *newstep = testspec->sessions[i]->steps[piles[i]];
458 :
459 : /*
460 : * These automatically-generated PermutationSteps never have
461 : * blocker conditions. So we need only fill these fields, relying
462 ECB : * on run_all_permutations() to have zeroed the rest:
463 : */
464 GIC 1546 : steps[nsteps]->name = newstep->name;
465 CBC 1546 : steps[nsteps]->step = newstep;
466 :
467 1546 : piles[i]++;
468 :
469 1546 : run_all_permutations_recurse(testspec, piles, nsteps + 1, steps);
470 :
471 1546 : piles[i]--;
472 :
473 GIC 1546 : found = true;
474 : }
475 : }
476 ECB :
477 : /* If all the piles were empty, this permutation is completed. Run it */
478 CBC 1560 : if (!found)
479 GIC 486 : run_permutation(testspec, nsteps, steps);
480 1560 : }
481 :
482 : /*
483 : * Run permutations given in the test spec
484 ECB : */
485 : static void
486 GIC 117 : run_named_permutations(TestSpec *testspec)
487 : {
488 ECB : int i;
489 :
490 CBC 1276 : for (i = 0; i < testspec->npermutations; i++)
491 : {
492 1159 : Permutation *p = testspec->permutations[i];
493 :
494 1159 : run_permutation(testspec, p->nsteps, p->steps);
495 : }
496 GIC 117 : }
497 ECB :
498 : static int
499 CBC 4023 : step_qsort_cmp(const void *a, const void *b)
500 ECB : {
501 GIC 4023 : Step *stepa = *((Step **) a);
502 CBC 4023 : Step *stepb = *((Step **) b);
503 :
504 GIC 4023 : return strcmp(stepa->name, stepb->name);
505 : }
506 ECB :
507 : static int
508 CBC 28145 : step_bsearch_cmp(const void *a, const void *b)
509 ECB : {
510 GIC 28145 : char *stepname = (char *) a;
511 CBC 28145 : Step *step = *((Step **) b);
512 :
513 GIC 28145 : return strcmp(stepname, step->name);
514 : }
515 :
516 : /*
517 : * Run one permutation
518 ECB : */
519 : static void
520 GIC 1645 : run_permutation(TestSpec *testspec, int nsteps, PermutationStep **steps)
521 : {
522 ECB : PGresult *res;
523 : int i;
524 GIC 1645 : int nwaiting = 0;
525 ECB : PermutationStep **waiting;
526 :
527 CBC 1645 : waiting = pg_malloc(sizeof(PermutationStep *) * testspec->nsessions);
528 ECB :
529 CBC 1645 : printf("\nstarting permutation:");
530 13385 : for (i = 0; i < nsteps; i++)
531 GIC 11740 : printf(" %s", steps[i]->name);
532 1645 : printf("\n");
533 ECB :
534 : /* Perform setup */
535 CBC 3301 : for (i = 0; i < testspec->nsetupsqls; i++)
536 ECB : {
537 GIC 1656 : res = PQexec(conns[0].conn, testspec->setupsqls[i]);
538 CBC 1656 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
539 : {
540 56 : printResultSet(res);
541 : }
542 GBC 1600 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
543 EUB : {
544 UIC 0 : fprintf(stderr, "setup failed: %s", PQerrorMessage(conns[0].conn));
545 LBC 0 : exit(1);
546 : }
547 GIC 1656 : PQclear(res);
548 : }
549 ECB :
550 : /* Perform per-session setup */
551 CBC 5553 : for (i = 0; i < testspec->nsessions; i++)
552 : {
553 3908 : if (testspec->sessions[i]->setupsql)
554 ECB : {
555 GIC 2513 : res = PQexec(conns[i + 1].conn, testspec->sessions[i]->setupsql);
556 CBC 2513 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
557 : {
558 26 : printResultSet(res);
559 : }
560 GBC 2487 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
561 EUB : {
562 UBC 0 : fprintf(stderr, "setup of session %s failed: %s",
563 0 : conns[i + 1].sessionname,
564 UIC 0 : PQerrorMessage(conns[i + 1].conn));
565 LBC 0 : exit(1);
566 : }
567 GIC 2513 : PQclear(res);
568 : }
569 : }
570 ECB :
571 : /* Perform steps */
572 CBC 13385 : for (i = 0; i < nsteps; i++)
573 ECB : {
574 CBC 11740 : PermutationStep *pstep = steps[i];
575 11740 : Step *step = pstep->step;
576 GIC 11740 : IsoConnInfo *iconn = &conns[1 + step->session];
577 11740 : PGconn *conn = iconn->conn;
578 : bool mustwait;
579 : int j;
580 :
581 : /*
582 : * Check whether the session that needs to perform the next step is
583 ECB : * still blocked on an earlier step. If so, wait for it to finish.
584 : */
585 GIC 11740 : if (iconn->active_step != NULL)
586 : {
587 ECB : struct timeval start_time;
588 :
589 CBC 33 : gettimeofday(&start_time, NULL);
590 :
591 66 : while (iconn->active_step != NULL)
592 : {
593 GIC 33 : PermutationStep *oldstep = iconn->active_step;
594 :
595 : /*
596 : * Wait for oldstep. But even though we don't use
597 : * STEP_NONBLOCK, it might not complete because of blocker
598 ECB : * conditions.
599 : */
600 GIC 33 : if (!try_complete_step(testspec, oldstep, STEP_RETRY))
601 : {
602 : /* Done, so remove oldstep from the waiting[] array. */
603 ECB : int w;
604 :
605 CBC 46 : for (w = 0; w < nwaiting; w++)
606 ECB : {
607 GIC 46 : if (oldstep == waiting[w])
608 CBC 33 : break;
609 EUB : }
610 CBC 33 : if (w >= nwaiting)
611 UBC 0 : abort(); /* can't happen */
612 GBC 33 : if (w + 1 < nwaiting)
613 LBC 0 : memmove(&waiting[w], &waiting[w + 1],
614 UIC 0 : (nwaiting - (w + 1)) * sizeof(PermutationStep *));
615 GIC 33 : nwaiting--;
616 : }
617 :
618 : /*
619 : * Check for other steps that have finished. We should do
620 : * this if oldstep completed, as it might have unblocked
621 : * something. On the other hand, if oldstep hasn't completed,
622 : * we must poll all the active steps in hopes of unblocking
623 ECB : * oldstep. So either way, poll them.
624 : */
625 GIC 33 : nwaiting = try_complete_steps(testspec, waiting, nwaiting,
626 : STEP_NONBLOCK | STEP_RETRY);
627 :
628 : /*
629 : * If the target session is still busy, apply a timeout to
630 : * keep from hanging indefinitely, which could happen with
631 : * incorrect blocker annotations. Use the same 2 *
632 : * max_step_wait limit as try_complete_step does for deciding
633 : * to die. (We don't bother with trying to cancel anything,
634 ECB : * since it's unclear what to cancel in this case.)
635 : */
636 GIC 33 : if (iconn->active_step != NULL)
637 : {
638 : struct timeval current_time;
639 EUB : int64 td;
640 :
641 UBC 0 : gettimeofday(¤t_time, NULL);
642 0 : td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
643 0 : td *= USECS_PER_SEC;
644 UIC 0 : td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
645 UBC 0 : if (td > 2 * max_step_wait)
646 EUB : {
647 UBC 0 : fprintf(stderr, "step %s timed out after %d seconds\n",
648 0 : iconn->active_step->name,
649 0 : (int) (td / USECS_PER_SEC));
650 UIC 0 : fprintf(stderr, "active steps are:");
651 UBC 0 : for (j = 1; j < nconns; j++)
652 : {
653 0 : IsoConnInfo *oconn = &conns[j];
654 EUB :
655 UBC 0 : if (oconn->active_step != NULL)
656 UIC 0 : fprintf(stderr, " %s",
657 UBC 0 : oconn->active_step->name);
658 EUB : }
659 UIC 0 : fprintf(stderr, "\n");
660 0 : exit(1);
661 : }
662 : }
663 : }
664 : }
665 ECB :
666 : /* Send the query for this step. */
667 GBC 11740 : if (!PQsendQuery(conn, step->sql))
668 : {
669 UBC 0 : fprintf(stdout, "failed to send query for step %s: %s\n",
670 : step->name, PQerrorMessage(conn));
671 UIC 0 : exit(1);
672 : }
673 ECB :
674 : /* Remember we launched a step. */
675 GIC 11740 : iconn->active_step = pstep;
676 ECB :
677 : /* Remember target number of NOTICEs for any blocker conditions. */
678 CBC 11788 : for (j = 0; j < pstep->nblockers; j++)
679 : {
680 48 : PermutationStepBlocker *blocker = pstep->blockers[j];
681 ECB :
682 CBC 48 : if (blocker->blocktype == PSB_NUM_NOTICES)
683 GIC 1 : blocker->target_notices = blocker->num_notices +
684 1 : conns[blocker->step->session + 1].total_notices;
685 : }
686 ECB :
687 : /* Try to complete this step without blocking. */
688 GIC 11740 : mustwait = try_complete_step(testspec, pstep, STEP_NONBLOCK);
689 ECB :
690 : /* Check for completion of any steps that were previously waiting. */
691 GIC 11740 : nwaiting = try_complete_steps(testspec, waiting, nwaiting,
692 : STEP_NONBLOCK | STEP_RETRY);
693 ECB :
694 : /* If this step is waiting, add it to the array of waiters. */
695 GIC 11740 : if (mustwait)
696 652 : waiting[nwaiting++] = pstep;
697 : }
698 ECB :
699 : /* Wait for any remaining queries. */
700 GIC 1645 : nwaiting = try_complete_steps(testspec, waiting, nwaiting, STEP_RETRY);
701 GBC 1645 : if (nwaiting != 0)
702 EUB : {
703 UIC 0 : fprintf(stderr, "failed to complete permutation due to mutually-blocking steps\n");
704 0 : exit(1);
705 : }
706 ECB :
707 : /* Perform per-session teardown */
708 CBC 5553 : for (i = 0; i < testspec->nsessions; i++)
709 : {
710 3908 : if (testspec->sessions[i]->teardownsql)
711 ECB : {
712 GIC 202 : res = PQexec(conns[i + 1].conn, testspec->sessions[i]->teardownsql);
713 CBC 202 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
714 : {
715 85 : printResultSet(res);
716 : }
717 GBC 117 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
718 EUB : {
719 UBC 0 : fprintf(stderr, "teardown of session %s failed: %s",
720 UIC 0 : conns[i + 1].sessionname,
721 0 : PQerrorMessage(conns[i + 1].conn));
722 ECB : /* don't exit on teardown failure */
723 : }
724 GIC 202 : PQclear(res);
725 : }
726 : }
727 ECB :
728 : /* Perform teardown */
729 CBC 1645 : if (testspec->teardownsql)
730 ECB : {
731 GIC 1593 : res = PQexec(conns[0].conn, testspec->teardownsql);
732 CBC 1593 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
733 : {
734 39 : printResultSet(res);
735 : }
736 GBC 1554 : else if (PQresultStatus(res) != PGRES_COMMAND_OK)
737 EUB : {
738 UIC 0 : fprintf(stderr, "teardown failed: %s",
739 0 : PQerrorMessage(conns[0].conn));
740 ECB : /* don't exit on teardown failure */
741 : }
742 GIC 1593 : PQclear(res);
743 ECB : }
744 :
745 GIC 1645 : free(waiting);
746 1645 : }
747 :
748 : /*
749 : * Check for completion of any waiting step(s).
750 : * Remove completed ones from the waiting[] array,
751 : * and return the new value of nwaiting.
752 : * See try_complete_step for the meaning of the flags.
753 ECB : */
754 : static int
755 GIC 13418 : try_complete_steps(TestSpec *testspec, PermutationStep **waiting,
756 : int nwaiting, int flags)
757 : {
758 : int old_nwaiting;
759 : bool have_blocker;
760 :
761 ECB : do
762 : {
763 GIC 13422 : int w = 0;
764 ECB :
765 : /* Reset latch; we only care about notices received within loop. */
766 GIC 13422 : any_new_notice = false;
767 ECB :
768 : /* Likewise, these variables reset for each retry. */
769 GIC 13422 : old_nwaiting = nwaiting;
770 13422 : have_blocker = false;
771 ECB :
772 : /* Scan the array, try to complete steps. */
773 CBC 14523 : while (w < nwaiting)
774 : {
775 GIC 1101 : if (try_complete_step(testspec, waiting[w], flags))
776 ECB : {
777 : /* Still blocked, leave it alone. */
778 CBC 482 : if (waiting[w]->nblockers > 0)
779 GIC 20 : have_blocker = true;
780 482 : w++;
781 : }
782 : else
783 ECB : {
784 : /* Done, remove it from array. */
785 CBC 619 : if (w + 1 < nwaiting)
786 21 : memmove(&waiting[w], &waiting[w + 1],
787 GIC 21 : (nwaiting - (w + 1)) * sizeof(PermutationStep *));
788 619 : nwaiting--;
789 : }
790 : }
791 :
792 : /*
793 : * If any of the still-waiting steps have blocker conditions attached,
794 : * it's possible that one of the steps we examined afterwards has
795 : * released them (either by completing, or by sending a NOTICE). If
796 : * any step completions or NOTICEs happened, repeat the loop until
797 : * none occurs. Without this provision, completion timing could vary
798 ECB : * depending on the order in which the steps appear in the array.
799 : */
800 GIC 13422 : } while (have_blocker && (nwaiting < old_nwaiting || any_new_notice));
801 13418 : return nwaiting;
802 : }
803 :
804 : /*
805 : * Our caller already sent the query associated with this step. Wait for it
806 : * to either complete, or hit a blocking condition.
807 : *
808 : * When calling this function on behalf of a given step for a second or later
809 : * time, pass the STEP_RETRY flag. Do not pass it on the first call.
810 : *
811 : * Returns true if the step was *not* completed, false if it was completed.
812 : * Reasons for non-completion are (a) the STEP_NONBLOCK flag was specified
813 : * and the query is waiting to acquire a lock, or (b) the step has an
814 : * unsatisfied blocker condition. When STEP_NONBLOCK is given, we assume
815 : * that any lock wait will persist until we have executed additional steps.
816 ECB : */
817 : static bool
818 CBC 12874 : try_complete_step(TestSpec *testspec, PermutationStep *pstep, int flags)
819 ECB : {
820 CBC 12874 : Step *step = pstep->step;
821 GIC 12874 : IsoConnInfo *iconn = &conns[1 + step->session];
822 12874 : PGconn *conn = iconn->conn;
823 : fd_set read_set;
824 ECB : struct timeval start_time;
825 : struct timeval timeout;
826 GIC 12874 : int sock = PQsocket(conn);
827 : int ret;
828 ECB : PGresult *res;
829 : PGnotify *notify;
830 GIC 12874 : bool canceled = false;
831 :
832 : /*
833 : * If the step is annotated with (*), then on the first call, force it to
834 : * wait. This is useful for ensuring consistent output when the step
835 ECB : * might or might not complete so fast that we don't observe it waiting.
836 : */
837 GIC 12874 : if (!(flags & STEP_RETRY))
838 : {
839 ECB : int i;
840 :
841 CBC 11777 : for (i = 0; i < pstep->nblockers; i++)
842 : {
843 48 : PermutationStepBlocker *blocker = pstep->blockers[i];
844 :
845 48 : if (blocker->blocktype == PSB_ONCE)
846 : {
847 11 : printf("step %s: %s <waiting ...>\n",
848 : step->name, step->sql);
849 GIC 11 : return true;
850 : }
851 : }
852 ECB : }
853 :
854 GBC 12863 : if (sock < 0)
855 EUB : {
856 UIC 0 : fprintf(stderr, "invalid socket: %s", PQerrorMessage(conn));
857 0 : exit(1);
858 ECB : }
859 :
860 GIC 12863 : gettimeofday(&start_time, NULL);
861 CBC 218671 : FD_ZERO(&read_set);
862 :
863 27058 : while (PQisBusy(conn))
864 ECB : {
865 CBC 15286 : FD_SET(sock, &read_set);
866 GIC 15286 : timeout.tv_sec = 0;
867 CBC 15286 : timeout.tv_usec = 10000; /* Check for lock waits every 10ms. */
868 ECB :
869 GIC 15286 : ret = select(sock + 1, &read_set, NULL, NULL, &timeout);
870 GBC 15286 : if (ret < 0) /* error in select() */
871 EUB : {
872 UBC 0 : if (errno == EINTR)
873 0 : continue;
874 UIC 0 : fprintf(stderr, "select failed: %s\n", strerror(errno));
875 LBC 0 : exit(1);
876 : }
877 GIC 15286 : else if (ret == 0) /* select() timeout: check for lock wait */
878 : {
879 : struct timeval current_time;
880 : int64 td;
881 ECB :
882 : /* If it's OK for the step to block, check whether it has. */
883 GIC 3200 : if (flags & STEP_NONBLOCK)
884 : {
885 ECB : bool waiting;
886 :
887 GIC 3192 : res = PQexecPrepared(conns[0].conn, PREP_WAITING, 1,
888 CBC 3192 : &conns[step->session + 1].backend_pid_str,
889 ECB : NULL, NULL, 0);
890 GIC 6384 : if (PQresultStatus(res) != PGRES_TUPLES_OK ||
891 GBC 3192 : PQntuples(res) != 1)
892 EUB : {
893 UBC 0 : fprintf(stderr, "lock wait query failed: %s",
894 UIC 0 : PQerrorMessage(conns[0].conn));
895 LBC 0 : exit(1);
896 ECB : }
897 GIC 3192 : waiting = ((PQgetvalue(res, 0, 0))[0] == 't');
898 CBC 3192 : PQclear(res);
899 :
900 GIC 3192 : if (waiting) /* waiting to acquire a lock */
901 : {
902 : /*
903 : * Since it takes time to perform the lock-check query,
904 : * some data --- notably, NOTICE messages --- might have
905 : * arrived since we looked. We must call PQconsumeInput
906 : * and then PQisBusy to collect and process any such
907 : * messages. In the (unlikely) case that PQisBusy then
908 : * returns false, we might as well go examine the
909 ECB : * available result.
910 : */
911 GBC 1091 : if (!PQconsumeInput(conn))
912 : {
913 UBC 0 : fprintf(stderr, "PQconsumeInput failed: %s\n",
914 : PQerrorMessage(conn));
915 LBC 0 : exit(1);
916 EUB : }
917 GIC 1091 : if (!PQisBusy(conn))
918 UIC 0 : break;
919 :
920 : /*
921 : * conn is still busy, so conclude that the step really is
922 ECB : * waiting.
923 : */
924 GIC 1091 : if (!(flags & STEP_RETRY))
925 CBC 610 : printf("step %s: %s <waiting ...>\n",
926 : step->name, step->sql);
927 GIC 1091 : return true;
928 : }
929 : /* else, not waiting */
930 : }
931 ECB :
932 : /* Figure out how long we've been waiting for this step. */
933 CBC 2109 : gettimeofday(¤t_time, NULL);
934 2109 : td = (int64) current_time.tv_sec - (int64) start_time.tv_sec;
935 GIC 2109 : td *= USECS_PER_SEC;
936 2109 : td += (int64) current_time.tv_usec - (int64) start_time.tv_usec;
937 :
938 : /*
939 : * After max_step_wait microseconds, try to cancel the query.
940 : *
941 : * If the user tries to test an invalid permutation, we don't want
942 : * to hang forever, especially when this is running in the
943 : * buildfarm. This will presumably lead to this permutation
944 : * failing, but remaining permutations and tests should still be
945 ECB : * OK.
946 : */
947 GBC 2109 : if (td > max_step_wait && !canceled)
948 : {
949 UBC 0 : PGcancel *cancel = PQgetCancel(conn);
950 :
951 UIC 0 : if (cancel != NULL)
952 : {
953 EUB : char buf[256];
954 :
955 UIC 0 : if (PQcancel(cancel, buf, sizeof(buf)))
956 : {
957 : /*
958 : * print to stdout not stderr, as this should appear
959 EUB : * in the test case's results
960 : */
961 UBC 0 : printf("isolationtester: canceling step %s after %d seconds\n",
962 : step->name, (int) (td / USECS_PER_SEC));
963 UIC 0 : canceled = true;
964 EUB : }
965 : else
966 UIC 0 : fprintf(stderr, "PQcancel failed: %s\n", buf);
967 0 : PQfreeCancel(cancel);
968 : }
969 : }
970 :
971 : /*
972 : * After twice max_step_wait, just give up and die.
973 : *
974 : * Since cleanup steps won't be run in this case, this may cause
975 : * later tests to fail. That stinks, but it's better than waiting
976 ECB : * forever for the server to respond to the cancel.
977 : */
978 GBC 2109 : if (td > 2 * max_step_wait)
979 EUB : {
980 UBC 0 : fprintf(stderr, "step %s timed out after %d seconds\n",
981 UIC 0 : step->name, (int) (td / USECS_PER_SEC));
982 0 : exit(1);
983 ECB : }
984 : }
985 GBC 12086 : else if (!PQconsumeInput(conn)) /* select(): data available */
986 : {
987 UBC 0 : fprintf(stderr, "PQconsumeInput failed: %s\n",
988 : PQerrorMessage(conn));
989 UIC 0 : exit(1);
990 : }
991 : }
992 :
993 : /*
994 : * The step is done, but we won't report it as complete so long as there
995 ECB : * are blockers.
996 : */
997 CBC 11772 : if (step_has_blocker(pstep))
998 ECB : {
999 GIC 32 : if (!(flags & STEP_RETRY))
1000 CBC 31 : printf("step %s: %s <waiting ...>\n",
1001 : step->name, step->sql);
1002 GIC 32 : return true;
1003 : }
1004 ECB :
1005 : /* Otherwise, go ahead and complete it. */
1006 GIC 11740 : if (flags & STEP_RETRY)
1007 CBC 652 : printf("step %s: <... completed>\n", step->name);
1008 : else
1009 11088 : printf("step %s: %s\n", step->name, step->sql);
1010 :
1011 23746 : while ((res = PQgetResult(conn)))
1012 : {
1013 12006 : switch (PQresultStatus(res))
1014 : {
1015 8148 : case PGRES_COMMAND_OK:
1016 ECB : case PGRES_EMPTY_QUERY:
1017 CBC 8148 : break;
1018 3386 : case PGRES_TUPLES_OK:
1019 3386 : printResultSet(res);
1020 GIC 3386 : break;
1021 472 : case PGRES_FATAL_ERROR:
1022 :
1023 : /*
1024 : * Detail may contain XID values, so we want to just show
1025 : * primary. Beware however that libpq-generated error results
1026 : * may not contain subfields, only an old-style message.
1027 ECB : */
1028 : {
1029 CBC 472 : const char *sev = PQresultErrorField(res,
1030 : PG_DIAG_SEVERITY);
1031 GIC 472 : const char *msg = PQresultErrorField(res,
1032 ECB : PG_DIAG_MESSAGE_PRIMARY);
1033 :
1034 GIC 472 : if (sev && msg)
1035 CBC 470 : printf("%s: %s\n", sev, msg);
1036 : else
1037 2 : printf("%s\n", PQresultErrorMessage(res));
1038 EUB : }
1039 GBC 472 : break;
1040 UIC 0 : default:
1041 0 : printf("unexpected result status: %s\n",
1042 ECB : PQresStatus(PQresultStatus(res)));
1043 : }
1044 GIC 12006 : PQclear(res);
1045 : }
1046 ECB :
1047 : /* Report any available NOTIFY messages, too */
1048 GIC 11740 : PQconsumeInput(conn);
1049 11767 : while ((notify = PQnotifies(conn)) != NULL)
1050 ECB : {
1051 : /* Try to identify which session it came from */
1052 GIC 27 : const char *sendername = NULL;
1053 : char pidstring[32];
1054 ECB : int i;
1055 :
1056 CBC 27 : for (i = 0; i < testspec->nsessions; i++)
1057 : {
1058 27 : if (notify->be_pid == conns[i + 1].backend_pid)
1059 ECB : {
1060 GIC 27 : sendername = conns[i + 1].sessionname;
1061 27 : break;
1062 ECB : }
1063 : }
1064 GIC 27 : if (sendername == NULL)
1065 EUB : {
1066 : /* Doesn't seem to be any test session, so show the hard way */
1067 UIC 0 : snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
1068 LBC 0 : sendername = pidstring;
1069 : }
1070 GIC 27 : printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
1071 ECB : testspec->sessions[step->session]->name,
1072 : notify->relname, notify->extra, sendername);
1073 GIC 27 : PQfreemem(notify);
1074 27 : PQconsumeInput(conn);
1075 : }
1076 ECB :
1077 : /* Connection is now idle. */
1078 CBC 11740 : iconn->active_step = NULL;
1079 :
1080 GIC 11740 : return false;
1081 : }
1082 :
1083 ECB : /* Detect whether a step has any unsatisfied blocker conditions */
1084 : static bool
1085 GIC 11772 : step_has_blocker(PermutationStep *pstep)
1086 : {
1087 ECB : int i;
1088 :
1089 CBC 11820 : for (i = 0; i < pstep->nblockers; i++)
1090 : {
1091 GIC 80 : PermutationStepBlocker *blocker = pstep->blockers[i];
1092 ECB : IsoConnInfo *iconn;
1093 :
1094 CBC 80 : switch (blocker->blocktype)
1095 : {
1096 11 : case PSB_ONCE:
1097 ECB : /* Ignore; try_complete_step handles this specially */
1098 GIC 11 : break;
1099 CBC 68 : case PSB_OTHER_STEP:
1100 ECB : /* Block if referenced step is active */
1101 CBC 68 : iconn = &conns[1 + blocker->step->session];
1102 68 : if (iconn->active_step &&
1103 32 : iconn->active_step->step == blocker->step)
1104 32 : return true;
1105 GIC 36 : break;
1106 CBC 1 : case PSB_NUM_NOTICES:
1107 ECB : /* Block if not enough notices received yet */
1108 GBC 1 : iconn = &conns[1 + blocker->step->session];
1109 CBC 1 : if (iconn->total_notices < blocker->target_notices)
1110 UIC 0 : return true;
1111 GIC 1 : break;
1112 ECB : }
1113 : }
1114 GIC 11740 : return false;
1115 : }
1116 ECB :
1117 : static void
1118 GIC 3592 : printResultSet(PGresult *res)
1119 : {
1120 ECB : PQprintOpt popt;
1121 :
1122 CBC 3592 : memset(&popt, 0, sizeof(popt));
1123 3592 : popt.header = true;
1124 3592 : popt.align = true;
1125 3592 : popt.fieldSep = "|";
1126 GIC 3592 : PQprint(stdout, res, &popt);
1127 3592 : }
1128 :
1129 ECB : /* notice processor for regular user sessions */
1130 : static void
1131 CBC 544 : isotesterNoticeProcessor(void *arg, const char *message)
1132 : {
1133 GIC 544 : IsoConnInfo *myconn = (IsoConnInfo *) arg;
1134 ECB :
1135 : /* Prefix the backend's message with the session name. */
1136 CBC 544 : printf("%s: %s", myconn->sessionname, message);
1137 ECB : /* Record notices, since we may need this to decide to unblock a step. */
1138 CBC 544 : myconn->total_notices++;
1139 GIC 544 : any_new_notice = true;
1140 544 : }
1141 :
1142 ECB : /* notice processor, hides the message */
1143 : static void
1144 GIC 434 : blackholeNoticeProcessor(void *arg, const char *message)
1145 ECB : {
1146 : /* do nothing */
1147 GIC 434 : }
|