TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "common/fe_memutils.h"
23 : #include "libpq-fe.h"
24 : #include "pg_getopt.h"
25 : #include "portability/instr_time.h"
26 :
27 :
28 : static void exit_nicely(PGconn *conn);
29 : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
30 : pg_attribute_printf(2, 3);
31 : static bool process_result(PGconn *conn, PGresult *res, int results,
32 : int numsent);
33 :
34 : const char *const progname = "libpq_pipeline";
35 :
36 : /* Options and defaults */
37 : char *tracefile = NULL; /* path to PQtrace() file */
38 :
39 :
40 : #ifdef DEBUG_OUTPUT
41 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 : #else
43 : #define pg_debug(...)
44 : #endif
45 :
46 : static const char *const drop_table_sql =
47 : "DROP TABLE IF EXISTS pq_pipeline_demo";
48 : static const char *const create_table_sql =
49 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 : "int8filler int8);";
51 : static const char *const insert_sql =
52 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 : static const char *const insert_sql2 =
54 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 :
56 : /* max char length of an int32/64, plus sign and null terminator */
57 : #define MAXINTLEN 12
58 : #define MAXINT8LEN 20
59 EUB :
60 : static void
61 UBC 0 : exit_nicely(PGconn *conn)
62 EUB : {
63 UIC 0 : PQfinish(conn);
64 0 : exit(1);
65 : }
66 :
67 : /*
68 : * Print an error to stderr and terminate the program.
69 : */
70 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
71 EUB : static void
72 : pg_attribute_noreturn()
73 UIC 0 : pg_fatal_impl(int line, const char *fmt,...)
74 : {
75 : va_list args;
76 EUB :
77 :
78 UBC 0 : fflush(stdout);
79 EUB :
80 UBC 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
81 0 : va_start(args, fmt);
82 0 : vfprintf(stderr, fmt, args);
83 0 : va_end(args);
84 0 : Assert(fmt[strlen(fmt) - 1] != '\n');
85 UIC 0 : fprintf(stderr, "\n");
86 0 : exit(1);
87 : }
88 ECB :
89 : static void
90 CBC 1 : test_disallowed_in_pipeline(PGconn *conn)
91 : {
92 1 : PGresult *res = NULL;
93 :
94 1 : fprintf(stderr, "test error cases... ");
95 EUB :
96 GIC 1 : if (PQisnonblocking(conn))
97 LBC 0 : pg_fatal("Expected blocking connection mode");
98 EUB :
99 GIC 1 : if (PQenterPipelineMode(conn) != 1)
100 LBC 0 : pg_fatal("Unable to enter pipeline mode");
101 EUB :
102 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
103 UIC 0 : pg_fatal("Pipeline mode not activated properly");
104 ECB :
105 : /* PQexec should fail in pipeline mode */
106 GBC 1 : res = PQexec(conn, "SELECT 1");
107 CBC 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
108 UIC 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
109 GBC 1 : if (strcmp(PQerrorMessage(conn),
110 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
111 UIC 0 : pg_fatal("did not get expected error message; got: \"%s\"",
112 : PQerrorMessage(conn));
113 ECB :
114 EUB : /* PQsendQuery should fail in pipeline mode */
115 CBC 1 : if (PQsendQuery(conn, "SELECT 1") != 0)
116 UIC 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
117 GBC 1 : if (strcmp(PQerrorMessage(conn),
118 : "PQsendQuery not allowed in pipeline mode\n") != 0)
119 UIC 0 : pg_fatal("did not get expected error message; got: \"%s\"",
120 : PQerrorMessage(conn));
121 ECB :
122 EUB : /* Entering pipeline mode when already in pipeline mode is OK */
123 GIC 1 : if (PQenterPipelineMode(conn) != 1)
124 LBC 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
125 EUB :
126 GIC 1 : if (PQisBusy(conn) != 0)
127 UIC 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
128 ECB :
129 EUB : /* ok, back to normal command mode */
130 GIC 1 : if (PQexitPipelineMode(conn) != 1)
131 LBC 0 : pg_fatal("couldn't exit idle empty pipeline mode");
132 EUB :
133 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
134 UIC 0 : pg_fatal("Pipeline mode not terminated properly");
135 ECB :
136 EUB : /* exiting pipeline mode when not in pipeline mode should be a no-op */
137 GIC 1 : if (PQexitPipelineMode(conn) != 1)
138 UIC 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
139 ECB :
140 : /* can now PQexec again */
141 GBC 1 : res = PQexec(conn, "SELECT 1");
142 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
143 UIC 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
144 ECB : PQerrorMessage(conn));
145 :
146 GIC 1 : fprintf(stderr, "ok\n");
147 1 : }
148 ECB :
149 : static void
150 CBC 1 : test_multi_pipelines(PGconn *conn)
151 ECB : {
152 CBC 1 : PGresult *res = NULL;
153 GIC 1 : const char *dummy_params[1] = {"1"};
154 CBC 1 : Oid dummy_param_oids[1] = {INT4OID};
155 :
156 GIC 1 : fprintf(stderr, "multi pipeline... ");
157 :
158 : /*
159 : * Queue up a couple of small pipelines and process each without returning
160 ECB : * to command mode first.
161 EUB : */
162 GIC 1 : if (PQenterPipelineMode(conn) != 1)
163 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
164 :
165 GBC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
166 : dummy_params, NULL, NULL, 0) != 1)
167 LBC 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
168 EUB :
169 GIC 1 : if (PQpipelineSync(conn) != 1)
170 LBC 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
171 :
172 GBC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
173 : dummy_params, NULL, NULL, 0) != 1)
174 LBC 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
175 EUB :
176 GIC 1 : if (PQpipelineSync(conn) != 1)
177 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
178 ECB :
179 : /* OK, start processing the results */
180 GBC 1 : res = PQgetResult(conn);
181 GIC 1 : if (res == NULL)
182 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
183 ECB : PQerrorMessage(conn));
184 EUB :
185 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
186 LBC 0 : pg_fatal("Unexpected result code %s from first pipeline item",
187 ECB : PQresStatus(PQresultStatus(res)));
188 GIC 1 : PQclear(res);
189 CBC 1 : res = NULL;
190 EUB :
191 GIC 1 : if (PQgetResult(conn) != NULL)
192 LBC 0 : pg_fatal("PQgetResult returned something extra after first result");
193 EUB :
194 GIC 1 : if (PQexitPipelineMode(conn) != 0)
195 LBC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
196 ECB :
197 GBC 1 : res = PQgetResult(conn);
198 GIC 1 : if (res == NULL)
199 UIC 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
200 ECB : PQerrorMessage(conn));
201 EUB :
202 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
203 LBC 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
204 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
205 GIC 1 : PQclear(res);
206 :
207 ECB : /* second pipeline */
208 :
209 GBC 1 : res = PQgetResult(conn);
210 GIC 1 : if (res == NULL)
211 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
212 ECB : PQerrorMessage(conn));
213 EUB :
214 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
215 UIC 0 : pg_fatal("Unexpected result code %s from second pipeline item",
216 ECB : PQresStatus(PQresultStatus(res)));
217 :
218 GBC 1 : res = PQgetResult(conn);
219 GIC 1 : if (res != NULL)
220 UIC 0 : pg_fatal("Expected null result, got %s",
221 ECB : PQresStatus(PQresultStatus(res)));
222 :
223 GBC 1 : res = PQgetResult(conn);
224 GIC 1 : if (res == NULL)
225 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
226 ECB : PQerrorMessage(conn));
227 EUB :
228 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
229 UIC 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
230 : PQresStatus(PQresultStatus(res)));
231 ECB :
232 EUB : /* We're still in pipeline mode ... */
233 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
234 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
235 ECB :
236 EUB : /* until we end it, which we can safely do now */
237 GIC 1 : if (PQexitPipelineMode(conn) != 1)
238 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
239 ECB : PQerrorMessage(conn));
240 EUB :
241 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
242 LBC 0 : pg_fatal("exiting pipeline mode didn't seem to work");
243 ECB :
244 GIC 1 : fprintf(stderr, "ok\n");
245 1 : }
246 :
247 : /*
248 : * Test behavior when a pipeline dispatches a number of commands that are
249 : * not flushed by a sync point.
250 ECB : */
251 : static void
252 CBC 1 : test_nosync(PGconn *conn)
253 ECB : {
254 CBC 1 : int numqueries = 10;
255 GIC 1 : int results = 0;
256 CBC 1 : int sock = PQsocket(conn);
257 :
258 1 : fprintf(stderr, "nosync... ");
259 EUB :
260 GIC 1 : if (sock < 0)
261 LBC 0 : pg_fatal("invalid socket");
262 EUB :
263 CBC 1 : if (PQenterPipelineMode(conn) != 1)
264 UIC 0 : pg_fatal("could not enter pipeline mode");
265 GIC 11 : for (int i = 0; i < numqueries; i++)
266 : {
267 : fd_set input_mask;
268 ECB : struct timeval tv;
269 :
270 GBC 10 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
271 ECB : 0, NULL, NULL, NULL, NULL, 0) != 1)
272 UIC 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
273 GIC 10 : PQflush(conn);
274 :
275 : /*
276 ECB : * If the server has written anything to us, read (some of) it now.
277 : */
278 CBC 170 : FD_ZERO(&input_mask);
279 10 : FD_SET(sock, &input_mask);
280 10 : tv.tv_sec = 0;
281 GIC 10 : tv.tv_usec = 0;
282 GBC 10 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
283 EUB : {
284 UIC 0 : fprintf(stderr, "select() failed: %s\n", strerror(errno));
285 LBC 0 : exit_nicely(conn);
286 EUB : }
287 GIC 10 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
288 UIC 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
289 : }
290 ECB :
291 EUB : /* tell server to flush its output buffer */
292 CBC 1 : if (PQsendFlushRequest(conn) != 1)
293 UIC 0 : pg_fatal("failed to send flush request");
294 GIC 1 : PQflush(conn);
295 :
296 ECB : /* Now read all results */
297 : for (;;)
298 GIC 9 : {
299 ECB : PGresult *res;
300 :
301 GIC 10 : res = PQgetResult(conn);
302 ECB :
303 EUB : /* NULL results are only expected after TUPLES_OK */
304 GIC 10 : if (res == NULL)
305 UIC 0 : pg_fatal("got unexpected NULL result after %d results", results);
306 ECB :
307 : /* We expect exactly one TUPLES_OK result for each query we sent */
308 GIC 10 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
309 9 : {
310 : PGresult *res2;
311 ECB :
312 : /* and one NULL result should follow each */
313 GBC 10 : res2 = PQgetResult(conn);
314 GIC 10 : if (res2 != NULL)
315 LBC 0 : pg_fatal("expected NULL, got %s",
316 ECB : PQresStatus(PQresultStatus(res2)));
317 GIC 10 : PQclear(res);
318 10 : results++;
319 ECB :
320 : /* if we're done, we're done */
321 GIC 10 : if (results == numqueries)
322 CBC 1 : break;
323 :
324 GIC 9 : continue;
325 : }
326 EUB :
327 : /* anything else is unexpected */
328 UIC 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
329 ECB : }
330 :
331 GIC 1 : fprintf(stderr, "ok\n");
332 1 : }
333 :
334 : /*
335 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
336 : * still have to get results for each pipeline item, but the item will just be
337 : * a PGRES_PIPELINE_ABORTED code.
338 : *
339 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
340 : * usually use an xact, but in this case we want to observe the effects of each
341 : * statement.
342 ECB : */
343 : static void
344 CBC 1 : test_pipeline_abort(PGconn *conn)
345 ECB : {
346 CBC 1 : PGresult *res = NULL;
347 GIC 1 : const char *dummy_params[1] = {"1"};
348 1 : Oid dummy_param_oids[1] = {INT4OID};
349 : int i;
350 : int gotrows;
351 ECB : bool goterror;
352 :
353 CBC 1 : fprintf(stderr, "aborted pipeline... ");
354 ECB :
355 GBC 1 : res = PQexec(conn, drop_table_sql);
356 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
357 LBC 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
358 ECB :
359 GBC 1 : res = PQexec(conn, create_table_sql);
360 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
361 UIC 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
362 :
363 : /*
364 : * Queue up a couple of small pipelines and process each without returning
365 : * to command mode first. Make sure the second operation in the first
366 ECB : * pipeline ERRORs.
367 EUB : */
368 GIC 1 : if (PQenterPipelineMode(conn) != 1)
369 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
370 ECB :
371 GIC 1 : dummy_params[0] = "1";
372 GBC 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
373 : dummy_params, NULL, NULL, 0) != 1)
374 LBC 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
375 :
376 GIC 1 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
377 EUB : 1, dummy_param_oids, dummy_params,
378 : NULL, NULL, 0) != 1)
379 LBC 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
380 ECB :
381 GIC 1 : dummy_params[0] = "2";
382 GBC 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
383 : dummy_params, NULL, NULL, 0) != 1)
384 LBC 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
385 EUB :
386 GIC 1 : if (PQpipelineSync(conn) != 1)
387 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
388 ECB :
389 GIC 1 : dummy_params[0] = "3";
390 GBC 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
391 : dummy_params, NULL, NULL, 0) != 1)
392 UIC 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
393 ECB : PQerrorMessage(conn));
394 EUB :
395 GIC 1 : if (PQpipelineSync(conn) != 1)
396 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
397 :
398 : /*
399 : * OK, start processing the pipeline results.
400 : *
401 : * We should get a command-ok for the first query, then a fatal error and
402 : * a pipeline aborted message for the second insert, a pipeline-end, then
403 ECB : * a command-ok and a pipeline-ok for the second pipeline operation.
404 : */
405 GBC 1 : res = PQgetResult(conn);
406 CBC 1 : if (res == NULL)
407 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
408 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
409 UIC 0 : pg_fatal("Unexpected result status %s: %s",
410 ECB : PQresStatus(PQresultStatus(res)),
411 : PQresultErrorMessage(res));
412 GIC 1 : PQclear(res);
413 ECB :
414 EUB : /* NULL result to signal end-of-results for this command */
415 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
416 UIC 0 : pg_fatal("Expected null result, got %s",
417 : PQresStatus(PQresultStatus(res)));
418 ECB :
419 : /* Second query caused error, so we expect an error next */
420 GBC 1 : res = PQgetResult(conn);
421 CBC 1 : if (res == NULL)
422 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
423 GIC 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
424 LBC 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
425 : PQresStatus(PQresultStatus(res)));
426 GIC 1 : PQclear(res);
427 ECB :
428 EUB : /* NULL result to signal end-of-results for this command */
429 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
430 UIC 0 : pg_fatal("Expected null result, got %s",
431 : PQresStatus(PQresultStatus(res)));
432 :
433 : /*
434 : * pipeline should now be aborted.
435 : *
436 : * Note that we could still queue more queries at this point if we wanted;
437 : * they'd get added to a new third pipeline since we've already sent a
438 ECB : * second. The aborted flag relates only to the pipeline being received.
439 EUB : */
440 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
441 UIC 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
442 ECB :
443 : /* third query in pipeline, the second insert */
444 GBC 1 : res = PQgetResult(conn);
445 CBC 1 : if (res == NULL)
446 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
447 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
448 LBC 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
449 : PQresStatus(PQresultStatus(res)));
450 GIC 1 : PQclear(res);
451 ECB :
452 EUB : /* NULL result to signal end-of-results for this command */
453 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
454 LBC 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
455 EUB :
456 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
457 UIC 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
458 ECB :
459 EUB : /* Ensure we're still in pipeline */
460 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
461 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
462 :
463 : /*
464 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
465 : *
466 : * (This is so clients know to start processing results normally again and
467 ECB : * can tell the difference between skipped commands and the sync.)
468 : */
469 GBC 1 : res = PQgetResult(conn);
470 CBC 1 : if (res == NULL)
471 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
472 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
473 UIC 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
474 ECB : "Expected PGRES_PIPELINE_SYNC, got %s",
475 : PQresStatus(PQresultStatus(res)));
476 CBC 1 : PQclear(res);
477 EUB :
478 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
479 UIC 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
480 ECB :
481 EUB : /* We're still in pipeline mode... */
482 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
483 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
484 ECB :
485 : /* the insert from the second pipeline */
486 GBC 1 : res = PQgetResult(conn);
487 CBC 1 : if (res == NULL)
488 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
489 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
490 LBC 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
491 : PQresStatus(PQresultStatus(res)));
492 GIC 1 : PQclear(res);
493 ECB :
494 EUB : /* Read the NULL result at the end of the command */
495 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
496 UIC 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
497 ECB :
498 EUB : /* the second pipeline sync */
499 CBC 1 : if ((res = PQgetResult(conn)) == NULL)
500 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
501 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
502 LBC 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
503 : PQresStatus(PQresultStatus(res)));
504 CBC 1 : PQclear(res);
505 EUB :
506 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
507 UIC 0 : pg_fatal("Expected null result, got %s: %s",
508 : PQresStatus(PQresultStatus(res)),
509 : PQerrorMessage(conn));
510 ECB :
511 EUB : /* Try to send two queries in one command */
512 CBC 1 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
513 UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
514 CBC 1 : if (PQpipelineSync(conn) != 1)
515 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
516 GIC 1 : goterror = false;
517 CBC 2 : while ((res = PQgetResult(conn)) != NULL)
518 : {
519 1 : switch (PQresultStatus(res))
520 ECB : {
521 GBC 1 : case PGRES_FATAL_ERROR:
522 GIC 1 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
523 LBC 0 : pg_fatal("expected error about multiple commands, got %s",
524 ECB : PQerrorMessage(conn));
525 CBC 1 : printf("got expected %s", PQerrorMessage(conn));
526 GBC 1 : goterror = true;
527 1 : break;
528 UIC 0 : default:
529 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
530 : break;
531 ECB : }
532 EUB : }
533 CBC 1 : if (!goterror)
534 LBC 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
535 GBC 1 : res = PQgetResult(conn);
536 CBC 1 : if (res == NULL)
537 UBC 0 : pg_fatal("got NULL result");
538 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
539 LBC 0 : pg_fatal("Unexpected result code %s from pipeline sync",
540 : PQresStatus(PQresultStatus(res)));
541 GIC 1 : fprintf(stderr, "ok\n");
542 ECB :
543 : /* Test single-row mode with an error partways */
544 GBC 1 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
545 ECB : 0, NULL, NULL, NULL, NULL, 0) != 1)
546 UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
547 CBC 1 : if (PQpipelineSync(conn) != 1)
548 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
549 CBC 1 : PQsetSingleRowMode(conn);
550 1 : goterror = false;
551 GIC 1 : gotrows = 0;
552 CBC 5 : while ((res = PQgetResult(conn)) != NULL)
553 : {
554 4 : switch (PQresultStatus(res))
555 ECB : {
556 CBC 3 : case PGRES_SINGLE_TUPLE:
557 3 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
558 3 : gotrows++;
559 3 : break;
560 GBC 1 : case PGRES_FATAL_ERROR:
561 GIC 1 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
562 UIC 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
563 ECB : PQerrorMessage(conn),
564 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
565 CBC 1 : printf("got expected division-by-zero\n");
566 GBC 1 : goterror = true;
567 1 : break;
568 UIC 0 : default:
569 LBC 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
570 : }
571 CBC 4 : PQclear(res);
572 EUB : }
573 CBC 1 : if (!goterror)
574 UBC 0 : pg_fatal("did not get division-by-zero error");
575 GIC 1 : if (gotrows != 3)
576 LBC 0 : pg_fatal("did not get three rows");
577 EUB : /* the third pipeline sync */
578 CBC 1 : if ((res = PQgetResult(conn)) == NULL)
579 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
580 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
581 LBC 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
582 : PQresStatus(PQresultStatus(res)));
583 GIC 1 : PQclear(res);
584 ECB :
585 EUB : /* We're still in pipeline mode... */
586 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
587 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
588 ECB :
589 EUB : /* until we end it, which we can safely do now */
590 GIC 1 : if (PQexitPipelineMode(conn) != 1)
591 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
592 ECB : PQerrorMessage(conn));
593 EUB :
594 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
595 UIC 0 : pg_fatal("exiting pipeline mode didn't seem to work");
596 :
597 : /*-
598 : * Since we fired the pipelines off without a surrounding xact, the results
599 : * should be:
600 : *
601 : * - Implicit xact started by server around 1st pipeline
602 : * - First insert applied
603 : * - Second statement aborted xact
604 : * - Third insert skipped
605 : * - Sync rolled back first implicit xact
606 : * - Implicit xact created by server around 2nd pipeline
607 : * - insert applied from 2nd pipeline
608 : * - Sync commits 2nd xact
609 : *
610 ECB : * So we should only have the value 3 that we inserted.
611 : */
612 CBC 1 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
613 EUB :
614 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
615 LBC 0 : pg_fatal("Expected tuples, got %s: %s",
616 EUB : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
617 CBC 1 : if (PQntuples(res) != 1)
618 UIC 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
619 CBC 2 : for (i = 0; i < PQntuples(res); i++)
620 : {
621 1 : const char *val = PQgetvalue(res, i, 0);
622 EUB :
623 GIC 1 : if (strcmp(val, "3") != 0)
624 UIC 0 : pg_fatal("expected only insert with value 3, got %s", val);
625 ECB : }
626 :
627 CBC 1 : PQclear(res);
628 ECB :
629 GIC 1 : fprintf(stderr, "ok\n");
630 1 : }
631 :
632 : /* State machine enum for test_pipelined_insert */
633 : enum PipelineInsertStep
634 : {
635 : BI_BEGIN_TX,
636 : BI_DROP_TABLE,
637 : BI_CREATE_TABLE,
638 : BI_PREPARE,
639 : BI_INSERT_ROWS,
640 : BI_COMMIT_TX,
641 : BI_SYNC,
642 : BI_DONE
643 : };
644 ECB :
645 : static void
646 CBC 1 : test_pipelined_insert(PGconn *conn, int n_rows)
647 : {
648 GIC 1 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
649 : const char *insert_params[2];
650 ECB : char insert_param_0[MAXINTLEN];
651 : char insert_param_1[MAXINT8LEN];
652 GIC 1 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
653 1 : recv_step = BI_BEGIN_TX;
654 : int rows_to_send,
655 ECB : rows_to_receive;
656 :
657 GIC 1 : insert_params[0] = insert_param_0;
658 CBC 1 : insert_params[1] = insert_param_1;
659 :
660 GIC 1 : rows_to_send = rows_to_receive = n_rows;
661 :
662 : /*
663 ECB : * Do a pipelined insert into a table created at the start of the pipeline
664 EUB : */
665 GIC 1 : if (PQenterPipelineMode(conn) != 1)
666 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
667 :
668 GIC 4 : while (send_step != BI_PREPARE)
669 : {
670 ECB : const char *sql;
671 :
672 CBC 3 : switch (send_step)
673 ECB : {
674 CBC 1 : case BI_BEGIN_TX:
675 1 : sql = "BEGIN TRANSACTION";
676 GIC 1 : send_step = BI_DROP_TABLE;
677 CBC 1 : break;
678 ECB :
679 CBC 1 : case BI_DROP_TABLE:
680 1 : sql = drop_table_sql;
681 GIC 1 : send_step = BI_CREATE_TABLE;
682 CBC 1 : break;
683 ECB :
684 CBC 1 : case BI_CREATE_TABLE:
685 1 : sql = create_table_sql;
686 GIC 1 : send_step = BI_PREPARE;
687 GBC 1 : break;
688 EUB :
689 UIC 0 : default:
690 0 : pg_fatal("invalid state");
691 : sql = NULL; /* keep compiler quiet */
692 : }
693 ECB :
694 : pg_debug("sending: %s\n", sql);
695 GBC 3 : if (PQsendQueryParams(conn, sql,
696 : 0, NULL, NULL, NULL, NULL, 0) != 1)
697 UIC 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
698 ECB : }
699 :
700 CBC 1 : Assert(send_step == BI_PREPARE);
701 EUB : pg_debug("sending: %s\n", insert_sql2);
702 CBC 1 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
703 UIC 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
704 GIC 1 : send_step = BI_INSERT_ROWS;
705 :
706 : /*
707 : * Now we start inserting. We'll be sending enough data that we could fill
708 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
709 : * mode and consume input while we send more output. As results of each
710 : * query are processed we should pop them to allow processing of the next
711 : * query. There's no need to finish the pipeline before processing
712 ECB : * results.
713 EUB : */
714 GIC 1 : if (PQsetnonblocking(conn, 1) != 0)
715 LBC 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
716 :
717 GIC 33248 : while (recv_step != BI_DONE)
718 : {
719 : int sock;
720 : fd_set input_mask;
721 ECB : fd_set output_mask;
722 :
723 CBC 33247 : sock = PQsocket(conn);
724 EUB :
725 GIC 33247 : if (sock < 0)
726 LBC 0 : break; /* shouldn't happen */
727 ECB :
728 CBC 565199 : FD_ZERO(&input_mask);
729 33247 : FD_SET(sock, &input_mask);
730 GIC 565199 : FD_ZERO(&output_mask);
731 CBC 33247 : FD_SET(sock, &output_mask);
732 :
733 GBC 33247 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
734 EUB : {
735 UIC 0 : fprintf(stderr, "select() failed: %s\n", strerror(errno));
736 0 : exit_nicely(conn);
737 : }
738 :
739 : /*
740 : * Process any results, so we keep the server's output buffer free
741 ECB : * flowing and it can continue to process input
742 : */
743 CBC 33247 : if (FD_ISSET(sock, &input_mask))
744 : {
745 GIC 3 : PQconsumeInput(conn);
746 ECB :
747 : /* Read until we'd block if we tried to read */
748 GIC 1414 : while (!PQisBusy(conn) && recv_step < BI_DONE)
749 ECB : {
750 : PGresult *res;
751 GIC 1411 : const char *cmdtag = "";
752 1411 : const char *description = "";
753 : int status;
754 :
755 : /*
756 : * Read next result. If no more results from this query,
757 ECB : * advance to the next query
758 : */
759 CBC 1411 : res = PQgetResult(conn);
760 GIC 1411 : if (res == NULL)
761 CBC 705 : continue;
762 ECB :
763 GIC 706 : status = PGRES_COMMAND_OK;
764 CBC 706 : switch (recv_step)
765 ECB : {
766 CBC 1 : case BI_BEGIN_TX:
767 1 : cmdtag = "BEGIN";
768 1 : recv_step++;
769 1 : break;
770 1 : case BI_DROP_TABLE:
771 1 : cmdtag = "DROP TABLE";
772 1 : recv_step++;
773 1 : break;
774 1 : case BI_CREATE_TABLE:
775 1 : cmdtag = "CREATE TABLE";
776 1 : recv_step++;
777 1 : break;
778 1 : case BI_PREPARE:
779 1 : cmdtag = "";
780 1 : description = "PREPARE";
781 1 : recv_step++;
782 1 : break;
783 700 : case BI_INSERT_ROWS:
784 700 : cmdtag = "INSERT";
785 700 : rows_to_receive--;
786 700 : if (rows_to_receive == 0)
787 1 : recv_step++;
788 700 : break;
789 1 : case BI_COMMIT_TX:
790 1 : cmdtag = "COMMIT";
791 1 : recv_step++;
792 1 : break;
793 1 : case BI_SYNC:
794 1 : cmdtag = "";
795 1 : description = "SYNC";
796 1 : status = PGRES_PIPELINE_SYNC;
797 GBC 1 : recv_step++;
798 GIC 1 : break;
799 UBC 0 : case BI_DONE:
800 : /* unreachable */
801 UIC 0 : pg_fatal("unreachable state");
802 ECB : }
803 EUB :
804 GIC 706 : if (PQresultStatus(res) != status)
805 UIC 0 : pg_fatal("%s reported status %s, expected %s\n"
806 : "Error message: \"%s\"",
807 : description, PQresStatus(PQresultStatus(res)),
808 ECB : PQresStatus(status), PQerrorMessage(conn));
809 EUB :
810 GIC 706 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
811 UIC 0 : pg_fatal("%s expected command tag '%s', got '%s'",
812 : description, cmdtag, PQcmdStatus(res));
813 :
814 ECB : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
815 :
816 GIC 706 : PQclear(res);
817 : }
818 : }
819 ECB :
820 : /* Write more rows and/or the end pipeline message, if needed */
821 CBC 33247 : if (FD_ISSET(sock, &output_mask))
822 : {
823 33245 : PQflush(conn);
824 :
825 33245 : if (send_step == BI_INSERT_ROWS)
826 : {
827 700 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
828 : /* use up some buffer space with a wide value */
829 700 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
830 :
831 GIC 700 : if (PQsendQueryPrepared(conn, "my_insert",
832 : 2, insert_params, NULL, NULL, 0) == 1)
833 : {
834 ECB : pg_debug("sent row %d\n", rows_to_send);
835 :
836 CBC 700 : rows_to_send--;
837 GIC 700 : if (rows_to_send == 0)
838 1 : send_step++;
839 : }
840 : else
841 : {
842 : /*
843 : * in nonblocking mode, so it's OK for an insert to fail
844 EUB : * to send
845 : */
846 UIC 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
847 : rows_to_send, PQerrorMessage(conn));
848 ECB : }
849 : }
850 CBC 32545 : else if (send_step == BI_COMMIT_TX)
851 : {
852 GIC 1 : if (PQsendQueryParams(conn, "COMMIT",
853 : 0, NULL, NULL, NULL, NULL, 0) == 1)
854 ECB : {
855 : pg_debug("sent COMMIT\n");
856 GIC 1 : send_step++;
857 : }
858 EUB : else
859 : {
860 UIC 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
861 : PQerrorMessage(conn));
862 ECB : }
863 : }
864 CBC 32544 : else if (send_step == BI_SYNC)
865 : {
866 1 : if (PQpipelineSync(conn) == 1)
867 ECB : {
868 GIC 1 : fprintf(stdout, "pipeline sync sent\n");
869 1 : send_step++;
870 : }
871 EUB : else
872 : {
873 UIC 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
874 : PQerrorMessage(conn));
875 : }
876 : }
877 : }
878 : }
879 ECB :
880 EUB : /* We've got the sync message and the pipeline should be done */
881 GIC 1 : if (PQexitPipelineMode(conn) != 1)
882 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
883 ECB : PQerrorMessage(conn));
884 EUB :
885 GIC 1 : if (PQsetnonblocking(conn, 0) != 0)
886 LBC 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
887 ECB :
888 GIC 1 : fprintf(stderr, "ok\n");
889 1 : }
890 ECB :
891 : static void
892 CBC 1 : test_prepared(PGconn *conn)
893 ECB : {
894 GIC 1 : PGresult *res = NULL;
895 1 : Oid param_oids[1] = {INT4OID};
896 : Oid expected_oids[4];
897 ECB : Oid typ;
898 :
899 CBC 1 : fprintf(stderr, "prepared... ");
900 EUB :
901 CBC 1 : if (PQenterPipelineMode(conn) != 1)
902 UIC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
903 GIC 1 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
904 EUB : "interval '1 sec'",
905 ECB : 1, param_oids) != 1)
906 LBC 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
907 CBC 1 : expected_oids[0] = INT4OID;
908 1 : expected_oids[1] = TEXTOID;
909 1 : expected_oids[2] = NUMERICOID;
910 GBC 1 : expected_oids[3] = INTERVALOID;
911 CBC 1 : if (PQsendDescribePrepared(conn, "select_one") != 1)
912 UBC 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
913 GIC 1 : if (PQpipelineSync(conn) != 1)
914 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
915 ECB :
916 GBC 1 : res = PQgetResult(conn);
917 CBC 1 : if (res == NULL)
918 UBC 0 : pg_fatal("PQgetResult returned null");
919 CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
920 LBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
921 CBC 1 : PQclear(res);
922 GBC 1 : res = PQgetResult(conn);
923 GIC 1 : if (res != NULL)
924 LBC 0 : pg_fatal("expected NULL result");
925 ECB :
926 GBC 1 : res = PQgetResult(conn);
927 CBC 1 : if (res == NULL)
928 UBC 0 : pg_fatal("PQgetResult returned NULL");
929 CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
930 UBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
931 GIC 1 : if (PQnfields(res) != lengthof(expected_oids))
932 LBC 0 : pg_fatal("expected %zd columns, got %d",
933 : lengthof(expected_oids), PQnfields(res));
934 CBC 5 : for (int i = 0; i < PQnfields(res); i++)
935 ECB : {
936 GBC 4 : typ = PQftype(res, i);
937 GIC 4 : if (typ != expected_oids[i])
938 UIC 0 : pg_fatal("field %d: expected type %u, got %u",
939 ECB : i, expected_oids[i], typ);
940 : }
941 CBC 1 : PQclear(res);
942 GBC 1 : res = PQgetResult(conn);
943 GIC 1 : if (res != NULL)
944 LBC 0 : pg_fatal("expected NULL result");
945 ECB :
946 GBC 1 : res = PQgetResult(conn);
947 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
948 LBC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
949 EUB :
950 GIC 1 : if (PQexitPipelineMode(conn) != 1)
951 LBC 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
952 ECB :
953 CBC 1 : PQexec(conn, "BEGIN");
954 1 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
955 GBC 1 : PQenterPipelineMode(conn);
956 CBC 1 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
957 UBC 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
958 CBC 1 : if (PQpipelineSync(conn) != 1)
959 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
960 GBC 1 : res = PQgetResult(conn);
961 CBC 1 : if (res == NULL)
962 UBC 0 : pg_fatal("expected NULL result");
963 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
964 LBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
965 ECB :
966 GBC 1 : typ = PQftype(res, 0);
967 GIC 1 : if (typ != INT4OID)
968 LBC 0 : pg_fatal("portal: expected type %u, got %u",
969 ECB : INT4OID, typ);
970 CBC 1 : PQclear(res);
971 GBC 1 : res = PQgetResult(conn);
972 CBC 1 : if (res != NULL)
973 LBC 0 : pg_fatal("expected NULL result");
974 GBC 1 : res = PQgetResult(conn);
975 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
976 LBC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
977 EUB :
978 GIC 1 : if (PQexitPipelineMode(conn) != 1)
979 LBC 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
980 ECB :
981 GIC 1 : fprintf(stderr, "ok\n");
982 1 : }
983 :
984 ECB : /* Notice processor: print notices, and count how many we got */
985 : static void
986 CBC 1 : notice_processor(void *arg, const char *message)
987 : {
988 1 : int *n_notices = (int *) arg;
989 ECB :
990 CBC 1 : (*n_notices)++;
991 GIC 1 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
992 1 : }
993 :
994 ECB : /* Verify behavior in "idle" state */
995 : static void
996 GIC 1 : test_pipeline_idle(PGconn *conn)
997 ECB : {
998 : PGresult *res;
999 CBC 1 : int n_notices = 0;
1000 :
1001 1 : fprintf(stderr, "\npipeline idle...\n");
1002 :
1003 GIC 1 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1004 ECB :
1005 EUB : /* Try to exit pipeline mode in pipeline-idle state */
1006 CBC 1 : if (PQenterPipelineMode(conn) != 1)
1007 UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1008 CBC 1 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
1009 LBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1010 CBC 1 : PQsendFlushRequest(conn);
1011 GBC 1 : res = PQgetResult(conn);
1012 GIC 1 : if (res == NULL)
1013 LBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1014 EUB : PQerrorMessage(conn));
1015 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1016 LBC 0 : pg_fatal("unexpected result code %s from first pipeline item",
1017 ECB : PQresStatus(PQresultStatus(res)));
1018 CBC 1 : PQclear(res);
1019 GBC 1 : res = PQgetResult(conn);
1020 CBC 1 : if (res != NULL)
1021 UBC 0 : pg_fatal("did not receive terminating NULL");
1022 CBC 1 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1023 UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1024 CBC 1 : if (PQexitPipelineMode(conn) == 1)
1025 UIC 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
1026 GBC 1 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1027 : strlen("cannot exit pipeline mode")) != 0)
1028 LBC 0 : pg_fatal("did not get expected error; got: %s",
1029 ECB : PQerrorMessage(conn));
1030 CBC 1 : PQsendFlushRequest(conn);
1031 GBC 1 : res = PQgetResult(conn);
1032 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1033 LBC 0 : pg_fatal("unexpected result code %s from second pipeline item",
1034 ECB : PQresStatus(PQresultStatus(res)));
1035 CBC 1 : PQclear(res);
1036 GBC 1 : res = PQgetResult(conn);
1037 CBC 1 : if (res != NULL)
1038 UBC 0 : pg_fatal("did not receive terminating NULL");
1039 GIC 1 : if (PQexitPipelineMode(conn) != 1)
1040 LBC 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1041 EUB :
1042 CBC 1 : if (n_notices > 0)
1043 UIC 0 : pg_fatal("got %d notice(s)", n_notices);
1044 GIC 1 : fprintf(stderr, "ok - 1\n");
1045 ECB :
1046 EUB : /* Have a WARNING in the middle of a resultset */
1047 CBC 1 : if (PQenterPipelineMode(conn) != 1)
1048 UBC 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
1049 CBC 1 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
1050 LBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1051 CBC 1 : PQsendFlushRequest(conn);
1052 GBC 1 : res = PQgetResult(conn);
1053 CBC 1 : if (res == NULL)
1054 UBC 0 : pg_fatal("unexpected NULL result received");
1055 CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1056 UBC 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
1057 CBC 1 : if (PQexitPipelineMode(conn) != 1)
1058 LBC 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
1059 GIC 1 : fprintf(stderr, "ok - 2\n");
1060 1 : }
1061 ECB :
1062 : static void
1063 CBC 1 : test_simple_pipeline(PGconn *conn)
1064 ECB : {
1065 CBC 1 : PGresult *res = NULL;
1066 GIC 1 : const char *dummy_params[1] = {"1"};
1067 CBC 1 : Oid dummy_param_oids[1] = {INT4OID};
1068 :
1069 GIC 1 : fprintf(stderr, "simple pipeline... ");
1070 :
1071 : /*
1072 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1073 : * process the results of as they come in.
1074 : *
1075 : * For a simple case we should be able to do this without interim
1076 : * processing of results since our output buffer will give us enough slush
1077 ECB : * to work with and we won't block on sending. So blocking mode is fine.
1078 EUB : */
1079 GIC 1 : if (PQisnonblocking(conn))
1080 LBC 0 : pg_fatal("Expected blocking connection mode");
1081 EUB :
1082 GIC 1 : if (PQenterPipelineMode(conn) != 1)
1083 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1084 :
1085 GIC 1 : if (PQsendQueryParams(conn, "SELECT $1",
1086 EUB : 1, dummy_param_oids, dummy_params,
1087 : NULL, NULL, 0) != 1)
1088 LBC 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1089 EUB :
1090 GIC 1 : if (PQexitPipelineMode(conn) != 0)
1091 LBC 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1092 EUB :
1093 GIC 1 : if (PQpipelineSync(conn) != 1)
1094 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1095 ECB :
1096 GBC 1 : res = PQgetResult(conn);
1097 GIC 1 : if (res == NULL)
1098 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1099 ECB : PQerrorMessage(conn));
1100 EUB :
1101 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1102 UIC 0 : pg_fatal("Unexpected result code %s from first pipeline item",
1103 ECB : PQresStatus(PQresultStatus(res)));
1104 :
1105 GIC 1 : PQclear(res);
1106 CBC 1 : res = NULL;
1107 EUB :
1108 GIC 1 : if (PQgetResult(conn) != NULL)
1109 UIC 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1110 :
1111 : /*
1112 : * Even though we've processed the result there's still a sync to come and
1113 ECB : * we can't exit pipeline mode yet
1114 EUB : */
1115 GIC 1 : if (PQexitPipelineMode(conn) != 0)
1116 LBC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1117 ECB :
1118 GBC 1 : res = PQgetResult(conn);
1119 GIC 1 : if (res == NULL)
1120 UIC 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1121 ECB : PQerrorMessage(conn));
1122 EUB :
1123 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1124 UIC 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1125 ECB : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1126 :
1127 GIC 1 : PQclear(res);
1128 CBC 1 : res = NULL;
1129 EUB :
1130 GIC 1 : if (PQgetResult(conn) != NULL)
1131 UIC 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1132 : PQresStatus(PQresultStatus(res)));
1133 ECB :
1134 EUB : /* We're still in pipeline mode... */
1135 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1136 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
1137 ECB :
1138 EUB : /* ... until we end it, which we can safely do now */
1139 GIC 1 : if (PQexitPipelineMode(conn) != 1)
1140 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1141 ECB : PQerrorMessage(conn));
1142 EUB :
1143 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1144 LBC 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1145 ECB :
1146 GIC 1 : fprintf(stderr, "ok\n");
1147 1 : }
1148 ECB :
1149 : static void
1150 GIC 1 : test_singlerowmode(PGconn *conn)
1151 : {
1152 ECB : PGresult *res;
1153 : int i;
1154 CBC 1 : bool pipeline_ended = false;
1155 EUB :
1156 GIC 1 : if (PQenterPipelineMode(conn) != 1)
1157 UIC 0 : pg_fatal("failed to enter pipeline mode: %s",
1158 : PQerrorMessage(conn));
1159 ECB :
1160 : /* One series of three commands, using single-row mode for the first two. */
1161 GIC 4 : for (i = 0; i < 3; i++)
1162 : {
1163 ECB : char *param[1];
1164 :
1165 CBC 3 : param[0] = psprintf("%d", 44 + i);
1166 :
1167 GIC 3 : if (PQsendQueryParams(conn,
1168 : "SELECT generate_series(42, $1)",
1169 : 1,
1170 : NULL,
1171 : (const char **) param,
1172 : NULL,
1173 EUB : NULL,
1174 : 0) != 1)
1175 LBC 0 : pg_fatal("failed to send query: %s",
1176 : PQerrorMessage(conn));
1177 CBC 3 : pfree(param[0]);
1178 EUB : }
1179 GIC 1 : if (PQpipelineSync(conn) != 1)
1180 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1181 :
1182 CBC 5 : for (i = 0; !pipeline_ended; i++)
1183 : {
1184 4 : bool first = true;
1185 : bool saw_ending_tuplesok;
1186 GIC 4 : bool isSingleTuple = false;
1187 ECB :
1188 : /* Set single row mode for only first 2 SELECT queries */
1189 CBC 4 : if (i < 2)
1190 EUB : {
1191 GIC 2 : if (PQsetSingleRowMode(conn) != 1)
1192 UIC 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1193 : }
1194 ECB :
1195 : /* Consume rows for this query */
1196 GIC 4 : saw_ending_tuplesok = false;
1197 CBC 14 : while ((res = PQgetResult(conn)) != NULL)
1198 : {
1199 11 : ExecStatusType est = PQresultStatus(res);
1200 :
1201 11 : if (est == PGRES_PIPELINE_SYNC)
1202 ECB : {
1203 CBC 1 : fprintf(stderr, "end of pipeline reached\n");
1204 1 : pipeline_ended = true;
1205 GBC 1 : PQclear(res);
1206 CBC 1 : if (i != 3)
1207 UIC 0 : pg_fatal("Expected three results, got %d", i);
1208 GIC 1 : break;
1209 : }
1210 ECB :
1211 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1212 CBC 10 : if (first)
1213 EUB : {
1214 GIC 3 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1215 LBC 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1216 EUB : i, PQresStatus(est));
1217 GIC 3 : if (i >= 2 && est != PGRES_TUPLES_OK)
1218 LBC 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1219 : i, PQresStatus(est));
1220 GIC 3 : first = false;
1221 ECB : }
1222 :
1223 GIC 10 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1224 CBC 10 : switch (est)
1225 ECB : {
1226 CBC 3 : case PGRES_TUPLES_OK:
1227 3 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1228 GIC 3 : saw_ending_tuplesok = true;
1229 CBC 3 : if (isSingleTuple)
1230 ECB : {
1231 GIC 2 : if (PQntuples(res) == 0)
1232 GBC 2 : fprintf(stderr, "all tuples received in query %d\n", i);
1233 : else
1234 LBC 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1235 : }
1236 CBC 3 : break;
1237 ECB :
1238 CBC 7 : case PGRES_SINGLE_TUPLE:
1239 7 : isSingleTuple = true;
1240 GIC 7 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1241 GBC 7 : break;
1242 EUB :
1243 UIC 0 : default:
1244 LBC 0 : pg_fatal("unexpected");
1245 : }
1246 CBC 10 : PQclear(res);
1247 EUB : }
1248 GIC 4 : if (!pipeline_ended && !saw_ending_tuplesok)
1249 UIC 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1250 : }
1251 :
1252 : /*
1253 : * Now issue one command, get its results in with single-row mode, then
1254 : * issue another command, and get its results in normal mode; make sure
1255 ECB : * the single-row mode flag is reset as expected.
1256 : */
1257 GBC 1 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1258 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1259 LBC 0 : pg_fatal("failed to send query: %s",
1260 EUB : PQerrorMessage(conn));
1261 CBC 1 : if (PQsendFlushRequest(conn) != 1)
1262 UBC 0 : pg_fatal("failed to send flush request");
1263 CBC 1 : if (PQsetSingleRowMode(conn) != 1)
1264 LBC 0 : pg_fatal("PQsetSingleRowMode() failed");
1265 GBC 1 : res = PQgetResult(conn);
1266 CBC 1 : if (res == NULL)
1267 UBC 0 : pg_fatal("unexpected NULL");
1268 GIC 1 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
1269 LBC 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1270 ECB : PQresStatus(PQresultStatus(res)));
1271 GBC 1 : res = PQgetResult(conn);
1272 CBC 1 : if (res == NULL)
1273 UBC 0 : pg_fatal("unexpected NULL");
1274 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1275 LBC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1276 EUB : PQresStatus(PQresultStatus(res)));
1277 GIC 1 : if (PQgetResult(conn) != NULL)
1278 LBC 0 : pg_fatal("expected NULL result");
1279 :
1280 GBC 1 : if (PQsendQueryParams(conn, "SELECT 1",
1281 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1282 LBC 0 : pg_fatal("failed to send query: %s",
1283 EUB : PQerrorMessage(conn));
1284 CBC 1 : if (PQsendFlushRequest(conn) != 1)
1285 LBC 0 : pg_fatal("failed to send flush request");
1286 GBC 1 : res = PQgetResult(conn);
1287 CBC 1 : if (res == NULL)
1288 UBC 0 : pg_fatal("unexpected NULL");
1289 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1290 LBC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1291 EUB : PQresStatus(PQresultStatus(res)));
1292 GIC 1 : if (PQgetResult(conn) != NULL)
1293 LBC 0 : pg_fatal("expected NULL result");
1294 EUB :
1295 GIC 1 : if (PQexitPipelineMode(conn) != 1)
1296 LBC 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1297 ECB :
1298 GIC 1 : fprintf(stderr, "ok\n");
1299 1 : }
1300 :
1301 : /*
1302 : * Simple test to verify that a pipeline is discarded as a whole when there's
1303 : * an error, ignoring transaction commands.
1304 ECB : */
1305 : static void
1306 GIC 1 : test_transaction(PGconn *conn)
1307 : {
1308 ECB : PGresult *res;
1309 : bool expect_null;
1310 CBC 1 : int num_syncs = 0;
1311 :
1312 1 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1313 EUB : "CREATE TABLE pq_pipeline_tst (id int)");
1314 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1315 LBC 0 : pg_fatal("failed to create test table: %s",
1316 : PQerrorMessage(conn));
1317 CBC 1 : PQclear(res);
1318 EUB :
1319 GIC 1 : if (PQenterPipelineMode(conn) != 1)
1320 LBC 0 : pg_fatal("failed to enter pipeline mode: %s",
1321 EUB : PQerrorMessage(conn));
1322 GIC 1 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1323 UIC 0 : pg_fatal("could not send prepare on pipeline: %s",
1324 ECB : PQerrorMessage(conn));
1325 :
1326 GIC 1 : if (PQsendQueryParams(conn,
1327 EUB : "BEGIN",
1328 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1329 LBC 0 : pg_fatal("failed to send query: %s",
1330 : PQerrorMessage(conn));
1331 GIC 1 : if (PQsendQueryParams(conn,
1332 EUB : "SELECT 0/0",
1333 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1334 UIC 0 : pg_fatal("failed to send query: %s",
1335 : PQerrorMessage(conn));
1336 :
1337 : /*
1338 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1339 ECB : * get out of the pipeline-aborted state first.
1340 EUB : */
1341 GIC 1 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1342 UIC 0 : pg_fatal("failed to execute prepared: %s",
1343 : PQerrorMessage(conn));
1344 ECB :
1345 : /* This insert fails because we're in pipeline-aborted state */
1346 GIC 1 : if (PQsendQueryParams(conn,
1347 EUB : "INSERT INTO pq_pipeline_tst VALUES (1)",
1348 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1349 LBC 0 : pg_fatal("failed to send query: %s",
1350 EUB : PQerrorMessage(conn));
1351 CBC 1 : if (PQpipelineSync(conn) != 1)
1352 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1353 GIC 1 : num_syncs++;
1354 :
1355 : /*
1356 : * This insert fails even though the pipeline got a SYNC, because we're in
1357 ECB : * an aborted transaction
1358 : */
1359 GIC 1 : if (PQsendQueryParams(conn,
1360 EUB : "INSERT INTO pq_pipeline_tst VALUES (2)",
1361 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1362 LBC 0 : pg_fatal("failed to send query: %s",
1363 EUB : PQerrorMessage(conn));
1364 CBC 1 : if (PQpipelineSync(conn) != 1)
1365 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1366 GIC 1 : num_syncs++;
1367 :
1368 : /*
1369 : * Send ROLLBACK using prepared stmt. This one works because we just did
1370 ECB : * PQpipelineSync above.
1371 EUB : */
1372 GIC 1 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1373 UIC 0 : pg_fatal("failed to execute prepared: %s",
1374 : PQerrorMessage(conn));
1375 :
1376 : /*
1377 : * Now that we're out of a transaction and in pipeline-good mode, this
1378 ECB : * insert works
1379 : */
1380 GIC 1 : if (PQsendQueryParams(conn,
1381 EUB : "INSERT INTO pq_pipeline_tst VALUES (3)",
1382 : 0, NULL, NULL, NULL, NULL, 0) != 1)
1383 UIC 0 : pg_fatal("failed to send query: %s",
1384 ECB : PQerrorMessage(conn));
1385 EUB : /* Send two syncs now -- match up to SYNC messages below */
1386 CBC 1 : if (PQpipelineSync(conn) != 1)
1387 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1388 GBC 1 : num_syncs++;
1389 CBC 1 : if (PQpipelineSync(conn) != 1)
1390 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1391 CBC 1 : num_syncs++;
1392 ECB :
1393 CBC 1 : expect_null = false;
1394 GIC 1 : for (int i = 0;; i++)
1395 19 : {
1396 ECB : ExecStatusType restype;
1397 :
1398 GIC 20 : res = PQgetResult(conn);
1399 CBC 20 : if (res == NULL)
1400 ECB : {
1401 GBC 8 : printf("%d: got NULL result\n", i);
1402 CBC 8 : if (!expect_null)
1403 LBC 0 : pg_fatal("did not expect NULL here");
1404 GIC 8 : expect_null = false;
1405 CBC 8 : continue;
1406 ECB : }
1407 CBC 12 : restype = PQresultStatus(res);
1408 GBC 12 : printf("%d: got status %s", i, PQresStatus(restype));
1409 CBC 12 : if (expect_null)
1410 LBC 0 : pg_fatal("expected NULL");
1411 CBC 12 : if (restype == PGRES_FATAL_ERROR)
1412 GIC 2 : printf("; error: %s", PQerrorMessage(conn));
1413 CBC 10 : else if (restype == PGRES_PIPELINE_ABORTED)
1414 : {
1415 GIC 2 : printf(": command didn't run because pipeline aborted\n");
1416 ECB : }
1417 : else
1418 GIC 8 : printf("\n");
1419 CBC 12 : PQclear(res);
1420 ECB :
1421 GIC 12 : if (restype == PGRES_PIPELINE_SYNC)
1422 CBC 4 : num_syncs--;
1423 ECB : else
1424 CBC 8 : expect_null = true;
1425 GIC 12 : if (num_syncs <= 0)
1426 CBC 1 : break;
1427 EUB : }
1428 GIC 1 : if (PQgetResult(conn) != NULL)
1429 UIC 0 : pg_fatal("returned something extra after all the syncs: %s",
1430 ECB : PQresStatus(PQresultStatus(res)));
1431 EUB :
1432 GIC 1 : if (PQexitPipelineMode(conn) != 1)
1433 UIC 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1434 ECB :
1435 : /* We expect to find one tuple containing the value "3" */
1436 GBC 1 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1437 CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1438 UBC 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1439 CBC 1 : if (PQntuples(res) != 1)
1440 UBC 0 : pg_fatal("did not get 1 tuple");
1441 CBC 1 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1442 UIC 0 : pg_fatal("did not get expected tuple");
1443 CBC 1 : PQclear(res);
1444 ECB :
1445 GIC 1 : fprintf(stderr, "ok\n");
1446 1 : }
1447 :
1448 : /*
1449 : * In this test mode we send a stream of queries, with one in the middle
1450 : * causing an error. Verify that we can still send some more after the
1451 : * error and have libpq work properly.
1452 ECB : */
1453 : static void
1454 CBC 1 : test_uniqviol(PGconn *conn)
1455 : {
1456 1 : int sock = PQsocket(conn);
1457 : PGresult *res;
1458 GIC 1 : Oid paramTypes[2] = {INT8OID, INT8OID};
1459 : const char *paramValues[2];
1460 ECB : char paramValue0[MAXINT8LEN];
1461 : char paramValue1[MAXINT8LEN];
1462 CBC 1 : int ctr = 0;
1463 1 : int numsent = 0;
1464 1 : int results = 0;
1465 1 : bool read_done = false;
1466 1 : bool write_done = false;
1467 1 : bool error_sent = false;
1468 1 : bool got_error = false;
1469 GIC 1 : int switched = 0;
1470 1 : int socketful = 0;
1471 : fd_set in_fds;
1472 ECB : fd_set out_fds;
1473 :
1474 CBC 1 : fprintf(stderr, "uniqviol ...");
1475 :
1476 1 : PQsetnonblocking(conn, 1);
1477 ECB :
1478 CBC 1 : paramValues[0] = paramValue0;
1479 GIC 1 : paramValues[1] = paramValue1;
1480 CBC 1 : sprintf(paramValue1, "42");
1481 :
1482 1 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1483 EUB : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1484 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1485 LBC 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1486 ECB :
1487 GBC 1 : res = PQexec(conn, "begin");
1488 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1489 LBC 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1490 :
1491 GIC 1 : res = PQprepare(conn, "insertion",
1492 ECB : "insert into ppln_uniqviol values ($1, $2) returning id",
1493 EUB : 2, paramTypes);
1494 GIC 1 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1495 LBC 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1496 EUB :
1497 GIC 1 : if (PQenterPipelineMode(conn) != 1)
1498 LBC 0 : pg_fatal("failed to enter pipeline mode");
1499 :
1500 GIC 8 : while (!read_done)
1501 : {
1502 : /*
1503 : * Avoid deadlocks by reading everything the server has sent before
1504 : * sending anything. (Special precaution is needed here to process
1505 : * PQisBusy before testing the socket for read-readiness, because the
1506 : * socket does not turn read-ready after "sending" queries in aborted
1507 ECB : * pipeline mode.)
1508 : */
1509 GIC 606 : while (PQisBusy(conn) == 0)
1510 : {
1511 ECB : bool new_error;
1512 :
1513 CBC 601 : if (results >= numsent)
1514 EUB : {
1515 CBC 1 : if (write_done)
1516 UIC 0 : read_done = true;
1517 GIC 1 : break;
1518 ECB : }
1519 :
1520 CBC 600 : res = PQgetResult(conn);
1521 GBC 600 : new_error = process_result(conn, res, results, numsent);
1522 CBC 600 : if (new_error && got_error)
1523 LBC 0 : pg_fatal("got two errors");
1524 GIC 600 : got_error |= new_error;
1525 CBC 600 : if (results++ >= numsent - 1)
1526 ECB : {
1527 CBC 2 : if (write_done)
1528 GIC 1 : read_done = true;
1529 2 : break;
1530 : }
1531 ECB : }
1532 :
1533 GIC 8 : if (read_done)
1534 CBC 1 : break;
1535 ECB :
1536 GIC 119 : FD_ZERO(&out_fds);
1537 CBC 7 : FD_SET(sock, &out_fds);
1538 ECB :
1539 GIC 119 : FD_ZERO(&in_fds);
1540 CBC 7 : FD_SET(sock, &in_fds);
1541 :
1542 GBC 7 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
1543 EUB : {
1544 UBC 0 : if (errno == EINTR)
1545 UIC 0 : continue;
1546 0 : pg_fatal("select() failed: %m");
1547 ECB : }
1548 EUB :
1549 GIC 7 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1550 UIC 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1551 :
1552 : /*
1553 : * If the socket is writable and we haven't finished sending queries,
1554 ECB : * send some.
1555 : */
1556 GIC 7 : if (!write_done && FD_ISSET(sock, &out_fds))
1557 ECB : {
1558 : for (;;)
1559 GIC 597 : {
1560 : int flush;
1561 :
1562 : /*
1563 : * provoke uniqueness violation exactly once after having
1564 ECB : * switched to read mode.
1565 : */
1566 CBC 600 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
1567 ECB : {
1568 CBC 1 : sprintf(paramValue0, "%d", numsent / 2);
1569 GIC 1 : fprintf(stderr, "E");
1570 1 : error_sent = true;
1571 : }
1572 ECB : else
1573 : {
1574 GIC 599 : fprintf(stderr, ".");
1575 599 : sprintf(paramValue0, "%d", ctr++);
1576 ECB : }
1577 EUB :
1578 CBC 600 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1579 UIC 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1580 GIC 600 : numsent++;
1581 ECB :
1582 : /* Are we done writing? */
1583 CBC 600 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
1584 EUB : {
1585 CBC 1 : if (PQsendFlushRequest(conn) != 1)
1586 LBC 0 : pg_fatal("failed to send flush request");
1587 CBC 1 : write_done = true;
1588 1 : fprintf(stderr, "\ndone writing\n");
1589 GIC 1 : PQflush(conn);
1590 1 : break;
1591 : }
1592 ECB :
1593 : /* is the outgoing socket full? */
1594 GBC 599 : flush = PQflush(conn);
1595 CBC 599 : if (flush == -1)
1596 UIC 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1597 CBC 599 : if (flush == 1)
1598 ECB : {
1599 CBC 2 : if (socketful == 0)
1600 1 : socketful = numsent;
1601 2 : fprintf(stderr, "\nswitch to reading\n");
1602 GIC 2 : switched++;
1603 2 : break;
1604 : }
1605 : }
1606 : }
1607 ECB : }
1608 EUB :
1609 GIC 1 : if (!got_error)
1610 LBC 0 : pg_fatal("did not get expected error");
1611 ECB :
1612 GIC 1 : fprintf(stderr, "ok\n");
1613 1 : }
1614 :
1615 : /*
1616 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1617 : * the expected NULL that should follow it.
1618 : *
1619 : * Returns true if we read a fatal error message, otherwise false.
1620 ECB : */
1621 : static bool
1622 GIC 600 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
1623 ECB : {
1624 : PGresult *res2;
1625 CBC 600 : bool got_error = false;
1626 EUB :
1627 GIC 600 : if (res == NULL)
1628 LBC 0 : pg_fatal("got unexpected NULL");
1629 :
1630 CBC 600 : switch (PQresultStatus(res))
1631 ECB : {
1632 CBC 1 : case PGRES_FATAL_ERROR:
1633 1 : got_error = true;
1634 GIC 1 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
1635 CBC 1 : PQclear(res);
1636 ECB :
1637 GBC 1 : res2 = PQgetResult(conn);
1638 GIC 1 : if (res2 != NULL)
1639 LBC 0 : pg_fatal("expected NULL, got %s",
1640 : PQresStatus(PQresultStatus(res2)));
1641 CBC 1 : break;
1642 ECB :
1643 CBC 418 : case PGRES_TUPLES_OK:
1644 GIC 418 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
1645 CBC 418 : PQclear(res);
1646 ECB :
1647 GBC 418 : res2 = PQgetResult(conn);
1648 GIC 418 : if (res2 != NULL)
1649 LBC 0 : pg_fatal("expected NULL, got %s",
1650 : PQresStatus(PQresultStatus(res2)));
1651 CBC 418 : break;
1652 ECB :
1653 CBC 181 : case PGRES_PIPELINE_ABORTED:
1654 181 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
1655 GBC 181 : res2 = PQgetResult(conn);
1656 GIC 181 : if (res2 != NULL)
1657 LBC 0 : pg_fatal("expected NULL, got %s",
1658 : PQresStatus(PQresultStatus(res2)));
1659 GBC 181 : break;
1660 EUB :
1661 UIC 0 : default:
1662 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
1663 ECB : }
1664 :
1665 GIC 600 : return got_error;
1666 : }
1667 :
1668 EUB :
1669 : static void
1670 UBC 0 : usage(const char *progname)
1671 EUB : {
1672 UBC 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1673 0 : fprintf(stderr, "Usage:\n");
1674 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
1675 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1676 0 : fprintf(stderr, "\nOptions:\n");
1677 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1678 UIC 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1679 0 : }
1680 ECB :
1681 : static void
1682 CBC 1 : print_test_list(void)
1683 ECB : {
1684 CBC 1 : printf("disallowed_in_pipeline\n");
1685 1 : printf("multi_pipelines\n");
1686 1 : printf("nosync\n");
1687 1 : printf("pipeline_abort\n");
1688 1 : printf("pipeline_idle\n");
1689 1 : printf("pipelined_insert\n");
1690 1 : printf("prepared\n");
1691 1 : printf("simple_pipeline\n");
1692 1 : printf("singlerow\n");
1693 1 : printf("transaction\n");
1694 GIC 1 : printf("uniqviol\n");
1695 1 : }
1696 ECB :
1697 : int
1698 CBC 12 : main(int argc, char **argv)
1699 : {
1700 GIC 12 : const char *conninfo = "";
1701 : PGconn *conn;
1702 ECB : FILE *trace;
1703 : char *testname;
1704 GIC 12 : int numrows = 10000;
1705 : PGresult *res;
1706 ECB : int c;
1707 :
1708 GNC 44 : while ((c = getopt(argc, argv, "r:t:")) != -1)
1709 : {
1710 CBC 20 : switch (c)
1711 ECB : {
1712 GBC 11 : case 'r': /* numrows */
1713 GIC 11 : errno = 0;
1714 GBC 11 : numrows = strtol(optarg, NULL, 10);
1715 GIC 11 : if (errno != 0 || numrows <= 0)
1716 ECB : {
1717 LBC 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
1718 ECB : optarg);
1719 LBC 0 : exit(1);
1720 : }
1721 GIC 11 : break;
1722 GNC 9 : case 't': /* trace file */
1723 9 : tracefile = pg_strdup(optarg);
1724 9 : break;
1725 : }
1726 ECB : }
1727 :
1728 CBC 12 : if (optind < argc)
1729 ECB : {
1730 GIC 12 : testname = pg_strdup(argv[optind]);
1731 12 : optind++;
1732 : }
1733 EUB : else
1734 : {
1735 UIC 0 : usage(argv[0]);
1736 0 : exit(1);
1737 ECB : }
1738 :
1739 CBC 12 : if (strcmp(testname, "tests") == 0)
1740 ECB : {
1741 GIC 1 : print_test_list();
1742 1 : exit(0);
1743 ECB : }
1744 :
1745 CBC 11 : if (optind < argc)
1746 ECB : {
1747 GIC 11 : conninfo = pg_strdup(argv[optind]);
1748 11 : optind++;
1749 : }
1750 ECB :
1751 : /* Make a connection to the database */
1752 GIC 11 : conn = PQconnectdb(conninfo);
1753 GBC 11 : if (PQstatus(conn) != CONNECTION_OK)
1754 : {
1755 UBC 0 : fprintf(stderr, "Connection to database failed: %s\n",
1756 : PQerrorMessage(conn));
1757 UIC 0 : exit_nicely(conn);
1758 ECB : }
1759 :
1760 GBC 11 : res = PQexec(conn, "SET lc_messages TO \"C\"");
1761 CBC 11 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1762 LBC 0 : pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
1763 GNC 11 : res = PQexec(conn, "SET debug_parallel_query = off");
1764 GIC 11 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1765 UNC 0 : pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
1766 ECB :
1767 : /* Set the trace file, if requested */
1768 CBC 11 : if (tracefile != NULL)
1769 EUB : {
1770 GIC 9 : if (strcmp(tracefile, "-") == 0)
1771 LBC 0 : trace = stdout;
1772 ECB : else
1773 GBC 9 : trace = fopen(tracefile, "w");
1774 GIC 9 : if (trace == NULL)
1775 UIC 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
1776 ECB :
1777 : /* Make it line-buffered */
1778 CBC 9 : setvbuf(trace, NULL, PG_IOLBF, 0);
1779 ECB :
1780 GIC 9 : PQtrace(conn, trace);
1781 9 : PQsetTraceFlags(conn,
1782 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
1783 ECB : }
1784 :
1785 CBC 11 : if (strcmp(testname, "disallowed_in_pipeline") == 0)
1786 1 : test_disallowed_in_pipeline(conn);
1787 10 : else if (strcmp(testname, "multi_pipelines") == 0)
1788 1 : test_multi_pipelines(conn);
1789 9 : else if (strcmp(testname, "nosync") == 0)
1790 1 : test_nosync(conn);
1791 8 : else if (strcmp(testname, "pipeline_abort") == 0)
1792 1 : test_pipeline_abort(conn);
1793 7 : else if (strcmp(testname, "pipeline_idle") == 0)
1794 1 : test_pipeline_idle(conn);
1795 6 : else if (strcmp(testname, "pipelined_insert") == 0)
1796 1 : test_pipelined_insert(conn, numrows);
1797 5 : else if (strcmp(testname, "prepared") == 0)
1798 1 : test_prepared(conn);
1799 4 : else if (strcmp(testname, "simple_pipeline") == 0)
1800 1 : test_simple_pipeline(conn);
1801 3 : else if (strcmp(testname, "singlerow") == 0)
1802 1 : test_singlerowmode(conn);
1803 2 : else if (strcmp(testname, "transaction") == 0)
1804 1 : test_transaction(conn);
1805 GIC 1 : else if (strcmp(testname, "uniqviol") == 0)
1806 1 : test_uniqviol(conn);
1807 EUB : else
1808 : {
1809 UIC 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1810 0 : exit(1);
1811 : }
1812 ECB :
1813 : /* close the connection to the database and cleanup */
1814 GIC 11 : PQfinish(conn);
1815 11 : return 0;
1816 : }
|