Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * libpq_pipeline.c
4 : : * Verify libpq pipeline execution functionality
5 : : *
6 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
7 : : * Portions Copyright (c) 1994, Regents of the University of California
8 : : *
9 : : *
10 : : * IDENTIFICATION
11 : : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : : *
13 : : *-------------------------------------------------------------------------
14 : : */
15 : :
16 : : #include "postgres_fe.h"
17 : :
18 : : #include <sys/select.h>
19 : : #include <sys/time.h>
20 : :
21 : : #include "catalog/pg_type_d.h"
22 : : #include "common/fe_memutils.h"
23 : : #include "libpq-fe.h"
24 : : #include "pg_getopt.h"
25 : : #include "portability/instr_time.h"
26 : :
27 : :
28 : : static void exit_nicely(PGconn *conn);
29 : : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
30 : : pg_attribute_printf(2, 3);
31 : : static bool process_result(PGconn *conn, PGresult *res, int results,
32 : : int numsent);
33 : :
34 : : const char *const progname = "libpq_pipeline";
35 : :
36 : : /* Options and defaults */
37 : : char *tracefile = NULL; /* path to PQtrace() file */
38 : :
39 : :
40 : : #ifdef DEBUG_OUTPUT
41 : : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 : : #else
43 : : #define pg_debug(...)
44 : : #endif
45 : :
46 : : static const char *const drop_table_sql =
47 : : "DROP TABLE IF EXISTS pq_pipeline_demo";
48 : : static const char *const create_table_sql =
49 : : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 : : "int8filler int8);";
51 : : static const char *const insert_sql =
52 : : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 : : static const char *const insert_sql2 =
54 : : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 : :
56 : : /* max char length of an int32/64, plus sign and null terminator */
57 : : #define MAXINTLEN 12
58 : : #define MAXINT8LEN 20
59 : :
60 : : static void
1126 alvherre@alvh.no-ip. 61 :UBC 0 : exit_nicely(PGconn *conn)
62 : : {
63 : 0 : PQfinish(conn);
64 : 0 : exit(1);
65 : : }
66 : :
67 : : /*
68 : : * The following few functions are wrapped in macros to make the reported line
69 : : * number in an error match the line number of the invocation.
70 : : */
71 : :
72 : : /*
73 : : * Print an error to stderr and terminate the program.
74 : : */
75 : : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
76 : : static void
77 : : pg_attribute_noreturn()
78 : 0 : pg_fatal_impl(int line, const char *fmt,...)
79 : : {
80 : : va_list args;
81 : :
82 : 0 : fflush(stdout);
83 : :
84 : 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
85 : 0 : va_start(args, fmt);
86 : 0 : vfprintf(stderr, fmt, args);
87 : 0 : va_end(args);
88 [ # # ]: 0 : Assert(fmt[strlen(fmt) - 1] != '\n');
89 : 0 : fprintf(stderr, "\n");
90 : 0 : exit(1);
91 : : }
92 : :
93 : : /*
94 : : * Check that the query on the given connection got canceled.
95 : : */
96 : : #define confirm_query_canceled(conn) confirm_query_canceled_impl(__LINE__, conn)
97 : : static void
34 alvherre@alvh.no-ip. 98 :GNC 6 : confirm_query_canceled_impl(int line, PGconn *conn)
99 : : {
100 : 6 : PGresult *res = NULL;
101 : :
102 : 6 : res = PQgetResult(conn);
103 [ - + ]: 6 : if (res == NULL)
34 alvherre@alvh.no-ip. 104 :UNC 0 : pg_fatal_impl(line, "PQgetResult returned null: %s",
105 : : PQerrorMessage(conn));
34 alvherre@alvh.no-ip. 106 [ - + ]:GNC 6 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
34 alvherre@alvh.no-ip. 107 :UNC 0 : pg_fatal_impl(line, "query did not fail when it was expected");
34 alvherre@alvh.no-ip. 108 [ - + ]:GNC 6 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "57014") != 0)
34 alvherre@alvh.no-ip. 109 :UNC 0 : pg_fatal_impl(line, "query failed with a different error than cancellation: %s",
110 : : PQerrorMessage(conn));
34 alvherre@alvh.no-ip. 111 :GNC 6 : PQclear(res);
112 : :
113 [ - + ]: 6 : while (PQisBusy(conn))
34 alvherre@alvh.no-ip. 114 :UNC 0 : PQconsumeInput(conn);
34 alvherre@alvh.no-ip. 115 :GNC 6 : }
116 : :
117 : : /*
118 : : * Using monitorConn, query pg_stat_activity to see that the connection with
119 : : * the given PID is either in the given state, or waiting on the given event
120 : : * (only one of them can be given).
121 : : */
122 : : static void
27 123 : 12 : wait_for_connection_state(int line, PGconn *monitorConn, int procpid,
124 : : char *state, char *event)
125 : : {
32 126 : 12 : const Oid paramTypes[] = {INT4OID, TEXTOID};
127 : : const char *paramValues[2];
128 : 12 : char *pidstr = psprintf("%d", procpid);
129 : :
27 130 [ - + ]: 12 : Assert((state == NULL) ^ (event == NULL));
131 : :
32 132 : 12 : paramValues[0] = pidstr;
27 133 [ + + ]: 12 : paramValues[1] = state ? state : event;
134 : :
135 : : while (true)
34 alvherre@alvh.no-ip. 136 :UNC 0 : {
137 : : PGresult *res;
138 : : char *value;
139 : :
27 alvherre@alvh.no-ip. 140 [ + + ]:GNC 12 : if (state != NULL)
141 : 6 : res = PQexecParams(monitorConn,
142 : : "SELECT count(*) FROM pg_stat_activity WHERE "
143 : : "pid = $1 AND state = $2",
144 : : 2, paramTypes, paramValues, NULL, NULL, 0);
145 : : else
146 : 6 : res = PQexecParams(monitorConn,
147 : : "SELECT count(*) FROM pg_stat_activity WHERE "
148 : : "pid = $1 AND wait_event = $2",
149 : : 2, paramTypes, paramValues, NULL, NULL, 0);
150 : :
34 151 [ - + ]: 12 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
32 alvherre@alvh.no-ip. 152 :UNC 0 : pg_fatal_impl(line, "could not query pg_stat_activity: %s", PQerrorMessage(monitorConn));
34 alvherre@alvh.no-ip. 153 [ - + ]:GNC 12 : if (PQntuples(res) != 1)
32 alvherre@alvh.no-ip. 154 :UNC 0 : pg_fatal_impl(line, "unexpected number of rows received: %d", PQntuples(res));
34 alvherre@alvh.no-ip. 155 [ - + ]:GNC 12 : if (PQnfields(res) != 1)
32 alvherre@alvh.no-ip. 156 :UNC 0 : pg_fatal_impl(line, "unexpected number of columns received: %d", PQnfields(res));
34 alvherre@alvh.no-ip. 157 :GNC 12 : value = PQgetvalue(res, 0, 0);
27 158 [ + - ]: 12 : if (strcmp(value, "0") != 0)
159 : : {
34 160 : 12 : PQclear(res);
161 : 12 : break;
162 : : }
34 alvherre@alvh.no-ip. 163 :UNC 0 : PQclear(res);
164 : :
165 : : /* wait 10ms before polling again */
166 : 0 : pg_usleep(10000);
167 : : }
168 : :
32 alvherre@alvh.no-ip. 169 :GNC 12 : pfree(pidstr);
170 : 12 : }
171 : :
172 : : #define send_cancellable_query(conn, monitorConn) \
173 : : send_cancellable_query_impl(__LINE__, conn, monitorConn)
174 : : static void
175 : 6 : send_cancellable_query_impl(int line, PGconn *conn, PGconn *monitorConn)
176 : : {
177 : : const char *env_wait;
178 : 6 : const Oid paramTypes[1] = {INT4OID};
179 : :
180 : : /*
181 : : * Wait for the connection to be idle, so that our check for an active
182 : : * connection below is reliable, instead of possibly seeing an outdated
183 : : * state.
184 : : */
27 185 : 6 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), "idle", NULL);
186 : :
32 187 : 6 : env_wait = getenv("PG_TEST_TIMEOUT_DEFAULT");
188 [ + - ]: 6 : if (env_wait == NULL)
189 : 6 : env_wait = "180";
190 : :
191 [ - + ]: 6 : if (PQsendQueryParams(conn, "SELECT pg_sleep($1)", 1, paramTypes,
192 : : &env_wait, NULL, NULL, 0) != 1)
32 alvherre@alvh.no-ip. 193 :UNC 0 : pg_fatal_impl(line, "failed to send query: %s", PQerrorMessage(conn));
194 : :
195 : : /*
196 : : * Wait for the sleep to be active, because if the query is not running
197 : : * yet, the cancel request that we send won't have any effect.
198 : : */
27 alvherre@alvh.no-ip. 199 :GNC 6 : wait_for_connection_state(line, monitorConn, PQbackendPID(conn), NULL, "PgSleep");
34 200 : 6 : }
201 : :
202 : : /*
203 : : * Create a new connection with the same conninfo as the given one.
204 : : */
205 : : static PGconn *
206 : 1 : copy_connection(PGconn *conn)
207 : : {
208 : : PGconn *copyConn;
209 : 1 : PQconninfoOption *opts = PQconninfo(conn);
210 : : const char **keywords;
211 : : const char **vals;
212 : 1 : int nopts = 1;
213 : 1 : int i = 0;
214 : :
215 [ + + ]: 42 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
216 : 41 : nopts++;
217 : :
218 : 1 : keywords = pg_malloc(sizeof(char *) * nopts);
219 : 1 : vals = pg_malloc(sizeof(char *) * nopts);
220 : :
221 [ + + ]: 42 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
222 : : {
223 [ + + ]: 41 : if (opt->val)
224 : : {
225 : 19 : keywords[i] = opt->keyword;
226 : 19 : vals[i] = opt->val;
227 : 19 : i++;
228 : : }
229 : : }
230 : 1 : keywords[i] = vals[i] = NULL;
231 : :
232 : 1 : copyConn = PQconnectdbParams(keywords, vals, false);
233 : :
234 [ - + ]: 1 : if (PQstatus(copyConn) != CONNECTION_OK)
34 alvherre@alvh.no-ip. 235 :UNC 0 : pg_fatal("Connection to database failed: %s",
236 : : PQerrorMessage(copyConn));
237 : :
34 alvherre@alvh.no-ip. 238 :GNC 1 : return copyConn;
239 : : }
240 : :
241 : : /*
242 : : * Test query cancellation routines
243 : : */
244 : : static void
245 : 1 : test_cancel(PGconn *conn)
246 : : {
247 : : PGcancel *cancel;
248 : : PGcancelConn *cancelConn;
249 : : PGconn *monitorConn;
250 : : char errorbuf[256];
251 : :
252 : 1 : fprintf(stderr, "test cancellations... ");
253 : :
254 [ - + ]: 1 : if (PQsetnonblocking(conn, 1) != 0)
34 alvherre@alvh.no-ip. 255 :UNC 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
256 : :
257 : : /*
258 : : * Make a separate connection to the database to monitor the query on the
259 : : * main connection.
260 : : */
34 alvherre@alvh.no-ip. 261 :GNC 1 : monitorConn = copy_connection(conn);
262 [ - + ]: 1 : Assert(PQstatus(monitorConn) == CONNECTION_OK);
263 : :
264 : : /* test PQcancel */
265 : 1 : send_cancellable_query(conn, monitorConn);
266 : 1 : cancel = PQgetCancel(conn);
267 [ - + ]: 1 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
34 alvherre@alvh.no-ip. 268 :UNC 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
34 alvherre@alvh.no-ip. 269 :GNC 1 : confirm_query_canceled(conn);
270 : :
271 : : /* PGcancel object can be reused for the next query */
272 : 1 : send_cancellable_query(conn, monitorConn);
273 [ - + ]: 1 : if (!PQcancel(cancel, errorbuf, sizeof(errorbuf)))
34 alvherre@alvh.no-ip. 274 :UNC 0 : pg_fatal("failed to run PQcancel: %s", errorbuf);
34 alvherre@alvh.no-ip. 275 :GNC 1 : confirm_query_canceled(conn);
276 : :
277 : 1 : PQfreeCancel(cancel);
278 : :
279 : : /* test PQrequestCancel */
280 : 1 : send_cancellable_query(conn, monitorConn);
281 [ - + ]: 1 : if (!PQrequestCancel(conn))
34 alvherre@alvh.no-ip. 282 :UNC 0 : pg_fatal("failed to run PQrequestCancel: %s", PQerrorMessage(conn));
34 alvherre@alvh.no-ip. 283 :GNC 1 : confirm_query_canceled(conn);
284 : :
285 : : /* test PQcancelBlocking */
33 286 : 1 : send_cancellable_query(conn, monitorConn);
287 : 1 : cancelConn = PQcancelCreate(conn);
288 [ - + ]: 1 : if (!PQcancelBlocking(cancelConn))
33 alvherre@alvh.no-ip. 289 :UNC 0 : pg_fatal("failed to run PQcancelBlocking: %s", PQcancelErrorMessage(cancelConn));
33 alvherre@alvh.no-ip. 290 :GNC 1 : confirm_query_canceled(conn);
291 : 1 : PQcancelFinish(cancelConn);
292 : :
293 : : /* test PQcancelCreate and then polling with PQcancelPoll */
294 : 1 : send_cancellable_query(conn, monitorConn);
295 : 1 : cancelConn = PQcancelCreate(conn);
296 [ - + ]: 1 : if (!PQcancelStart(cancelConn))
33 alvherre@alvh.no-ip. 297 :UNC 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
298 : : while (true)
33 alvherre@alvh.no-ip. 299 :GNC 1 : {
300 : : struct timeval tv;
301 : : fd_set input_mask;
302 : : fd_set output_mask;
303 : 2 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
304 : 2 : int sock = PQcancelSocket(cancelConn);
305 : :
306 [ + + ]: 2 : if (pollres == PGRES_POLLING_OK)
307 : 1 : break;
308 : :
309 [ + + ]: 17 : FD_ZERO(&input_mask);
310 [ + + ]: 17 : FD_ZERO(&output_mask);
311 [ + - - ]: 1 : switch (pollres)
312 : : {
313 : 1 : case PGRES_POLLING_READING:
314 : : pg_debug("polling for reads\n");
315 : 1 : FD_SET(sock, &input_mask);
316 : 1 : break;
33 alvherre@alvh.no-ip. 317 :UNC 0 : case PGRES_POLLING_WRITING:
318 : : pg_debug("polling for writes\n");
319 : 0 : FD_SET(sock, &output_mask);
320 : 0 : break;
321 : 0 : default:
322 : 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
323 : : }
324 : :
33 alvherre@alvh.no-ip. 325 [ - + ]:GNC 1 : if (sock < 0)
33 alvherre@alvh.no-ip. 326 :UNC 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
327 : :
33 alvherre@alvh.no-ip. 328 :GNC 1 : tv.tv_sec = 3;
329 : 1 : tv.tv_usec = 0;
330 : :
331 : : while (true)
332 : : {
333 [ - + ]: 1 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
334 : : {
33 alvherre@alvh.no-ip. 335 [ # # ]:UNC 0 : if (errno == EINTR)
336 : 0 : continue;
337 : 0 : pg_fatal("select() failed: %m");
338 : : }
33 alvherre@alvh.no-ip. 339 :GNC 1 : break;
340 : : }
341 : : }
342 [ - + ]: 1 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
33 alvherre@alvh.no-ip. 343 :UNC 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
33 alvherre@alvh.no-ip. 344 :GNC 1 : confirm_query_canceled(conn);
345 : :
346 : : /*
347 : : * test PQcancelReset works on the cancel connection and it can be reused
348 : : * afterwards
349 : : */
350 : 1 : PQcancelReset(cancelConn);
351 : :
352 : 1 : send_cancellable_query(conn, monitorConn);
353 [ - + ]: 1 : if (!PQcancelStart(cancelConn))
33 alvherre@alvh.no-ip. 354 :UNC 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
355 : : while (true)
33 alvherre@alvh.no-ip. 356 :GNC 1 : {
357 : : struct timeval tv;
358 : : fd_set input_mask;
359 : : fd_set output_mask;
360 : 2 : PostgresPollingStatusType pollres = PQcancelPoll(cancelConn);
361 : 2 : int sock = PQcancelSocket(cancelConn);
362 : :
363 [ + + ]: 2 : if (pollres == PGRES_POLLING_OK)
364 : 1 : break;
365 : :
366 [ + + ]: 17 : FD_ZERO(&input_mask);
367 [ + + ]: 17 : FD_ZERO(&output_mask);
368 [ + - - ]: 1 : switch (pollres)
369 : : {
370 : 1 : case PGRES_POLLING_READING:
371 : : pg_debug("polling for reads\n");
372 : 1 : FD_SET(sock, &input_mask);
373 : 1 : break;
33 alvherre@alvh.no-ip. 374 :UNC 0 : case PGRES_POLLING_WRITING:
375 : : pg_debug("polling for writes\n");
376 : 0 : FD_SET(sock, &output_mask);
377 : 0 : break;
378 : 0 : default:
379 : 0 : pg_fatal("bad cancel connection: %s", PQcancelErrorMessage(cancelConn));
380 : : }
381 : :
33 alvherre@alvh.no-ip. 382 [ - + ]:GNC 1 : if (sock < 0)
33 alvherre@alvh.no-ip. 383 :UNC 0 : pg_fatal("sock did not exist: %s", PQcancelErrorMessage(cancelConn));
384 : :
33 alvherre@alvh.no-ip. 385 :GNC 1 : tv.tv_sec = 3;
386 : 1 : tv.tv_usec = 0;
387 : :
388 : : while (true)
389 : : {
390 [ - + ]: 1 : if (select(sock + 1, &input_mask, &output_mask, NULL, &tv) < 0)
391 : : {
33 alvherre@alvh.no-ip. 392 [ # # ]:UNC 0 : if (errno == EINTR)
393 : 0 : continue;
394 : 0 : pg_fatal("select() failed: %m");
395 : : }
33 alvherre@alvh.no-ip. 396 :GNC 1 : break;
397 : : }
398 : : }
399 [ - + ]: 1 : if (PQcancelStatus(cancelConn) != CONNECTION_OK)
33 alvherre@alvh.no-ip. 400 :UNC 0 : pg_fatal("unexpected cancel connection status: %s", PQcancelErrorMessage(cancelConn));
33 alvherre@alvh.no-ip. 401 :GNC 1 : confirm_query_canceled(conn);
402 : :
403 : 1 : PQcancelFinish(cancelConn);
404 : :
34 405 : 1 : fprintf(stderr, "ok\n");
406 : 1 : }
407 : :
408 : : static void
1126 alvherre@alvh.no-ip. 409 :CBC 1 : test_disallowed_in_pipeline(PGconn *conn)
410 : : {
411 : 1 : PGresult *res = NULL;
412 : :
413 : 1 : fprintf(stderr, "test error cases... ");
414 : :
415 [ - + ]: 1 : if (PQisnonblocking(conn))
1126 alvherre@alvh.no-ip. 416 :UBC 0 : pg_fatal("Expected blocking connection mode");
417 : :
1126 alvherre@alvh.no-ip. 418 [ - + ]:CBC 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 419 :UBC 0 : pg_fatal("Unable to enter pipeline mode");
420 : :
1126 alvherre@alvh.no-ip. 421 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 422 :UBC 0 : pg_fatal("Pipeline mode not activated properly");
423 : :
424 : : /* PQexec should fail in pipeline mode */
1126 alvherre@alvh.no-ip. 425 :CBC 1 : res = PQexec(conn, "SELECT 1");
426 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1126 alvherre@alvh.no-ip. 427 :UBC 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
569 alvherre@alvh.no-ip. 428 [ - + ]:CBC 1 : if (strcmp(PQerrorMessage(conn),
429 : : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
569 alvherre@alvh.no-ip. 430 :UBC 0 : pg_fatal("did not get expected error message; got: \"%s\"",
431 : : PQerrorMessage(conn));
432 : :
433 : : /* PQsendQuery should fail in pipeline mode */
569 alvherre@alvh.no-ip. 434 [ - + ]:CBC 1 : if (PQsendQuery(conn, "SELECT 1") != 0)
569 alvherre@alvh.no-ip. 435 :UBC 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
569 alvherre@alvh.no-ip. 436 [ - + ]:CBC 1 : if (strcmp(PQerrorMessage(conn),
437 : : "PQsendQuery not allowed in pipeline mode\n") != 0)
569 alvherre@alvh.no-ip. 438 :UBC 0 : pg_fatal("did not get expected error message; got: \"%s\"",
439 : : PQerrorMessage(conn));
440 : :
441 : : /* Entering pipeline mode when already in pipeline mode is OK */
1126 alvherre@alvh.no-ip. 442 [ - + ]:CBC 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 443 :UBC 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
444 : :
1126 alvherre@alvh.no-ip. 445 [ - + ]:CBC 1 : if (PQisBusy(conn) != 0)
1126 alvherre@alvh.no-ip. 446 :UBC 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
447 : :
448 : : /* ok, back to normal command mode */
1126 alvherre@alvh.no-ip. 449 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 450 :UBC 0 : pg_fatal("couldn't exit idle empty pipeline mode");
451 : :
1126 alvherre@alvh.no-ip. 452 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 453 :UBC 0 : pg_fatal("Pipeline mode not terminated properly");
454 : :
455 : : /* exiting pipeline mode when not in pipeline mode should be a no-op */
1126 alvherre@alvh.no-ip. 456 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 457 :UBC 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
458 : :
459 : : /* can now PQexec again */
1126 alvherre@alvh.no-ip. 460 :CBC 1 : res = PQexec(conn, "SELECT 1");
461 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 462 :UBC 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
463 : : PQerrorMessage(conn));
464 : :
1126 alvherre@alvh.no-ip. 465 :CBC 1 : fprintf(stderr, "ok\n");
466 : 1 : }
467 : :
468 : : static void
469 : 1 : test_multi_pipelines(PGconn *conn)
470 : : {
471 : 1 : PGresult *res = NULL;
472 : 1 : const char *dummy_params[1] = {"1"};
473 : 1 : Oid dummy_param_oids[1] = {INT4OID};
474 : :
475 : 1 : fprintf(stderr, "multi pipeline... ");
476 : :
477 : : /*
478 : : * Queue up a couple of small pipelines and process each without returning
479 : : * to command mode first.
480 : : */
481 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 482 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
483 : :
484 : : /* first pipeline */
1126 alvherre@alvh.no-ip. 485 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
486 : : dummy_params, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 487 :UBC 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
488 : :
1126 alvherre@alvh.no-ip. 489 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 490 :UBC 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
491 : :
492 : : /* second pipeline */
1126 alvherre@alvh.no-ip. 493 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
494 : : dummy_params, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 495 :UBC 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
496 : :
497 : : /* Skip flushing once. */
89 michael@paquier.xyz 498 [ - + ]:GNC 1 : if (PQsendPipelineSync(conn) != 1)
89 michael@paquier.xyz 499 :UNC 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
500 : :
501 : : /* third pipeline */
89 michael@paquier.xyz 502 [ - + ]:GNC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
503 : : dummy_params, NULL, NULL, 0) != 1)
89 michael@paquier.xyz 504 :UNC 0 : pg_fatal("dispatching third SELECT failed: %s", PQerrorMessage(conn));
505 : :
1126 alvherre@alvh.no-ip. 506 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 507 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
508 : :
509 : : /* OK, start processing the results */
510 : :
511 : : /* first pipeline */
512 : :
1126 alvherre@alvh.no-ip. 513 :CBC 1 : res = PQgetResult(conn);
514 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 515 :UBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
516 : : PQerrorMessage(conn));
517 : :
1126 alvherre@alvh.no-ip. 518 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 519 :UBC 0 : pg_fatal("Unexpected result code %s from first pipeline item",
520 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 521 :CBC 1 : PQclear(res);
522 : 1 : res = NULL;
523 : :
524 [ - + ]: 1 : if (PQgetResult(conn) != NULL)
1126 alvherre@alvh.no-ip. 525 :UBC 0 : pg_fatal("PQgetResult returned something extra after first result");
526 : :
1126 alvherre@alvh.no-ip. 527 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 0)
1126 alvherre@alvh.no-ip. 528 :UBC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
529 : :
1126 alvherre@alvh.no-ip. 530 :CBC 1 : res = PQgetResult(conn);
531 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 532 :UBC 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
533 : : PQerrorMessage(conn));
534 : :
1126 alvherre@alvh.no-ip. 535 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 536 :UBC 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
537 : : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 538 :CBC 1 : PQclear(res);
539 : :
540 : : /* second pipeline */
541 : :
542 : 1 : res = PQgetResult(conn);
543 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 544 :UBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
545 : : PQerrorMessage(conn));
546 : :
1126 alvherre@alvh.no-ip. 547 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 548 :UBC 0 : pg_fatal("Unexpected result code %s from second pipeline item",
549 : : PQresStatus(PQresultStatus(res)));
89 michael@paquier.xyz 550 :GNC 1 : PQclear(res);
551 : 1 : res = NULL;
552 : :
553 [ - + ]: 1 : if (PQgetResult(conn) != NULL)
89 michael@paquier.xyz 554 :UNC 0 : pg_fatal("PQgetResult returned something extra after first result");
555 : :
89 michael@paquier.xyz 556 [ - + ]:GNC 1 : if (PQexitPipelineMode(conn) != 0)
89 michael@paquier.xyz 557 :UNC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
558 : :
89 michael@paquier.xyz 559 :GNC 1 : res = PQgetResult(conn);
560 [ - + ]: 1 : if (res == NULL)
89 michael@paquier.xyz 561 :UNC 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
562 : : PQerrorMessage(conn));
563 : :
89 michael@paquier.xyz 564 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
89 michael@paquier.xyz 565 :UNC 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
566 : : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
89 michael@paquier.xyz 567 :GNC 1 : PQclear(res);
568 : :
569 : : /* third pipeline */
570 : :
571 : 1 : res = PQgetResult(conn);
572 [ - + ]: 1 : if (res == NULL)
89 michael@paquier.xyz 573 :UNC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
574 : : PQerrorMessage(conn));
575 : :
89 michael@paquier.xyz 576 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
89 michael@paquier.xyz 577 :UNC 0 : pg_fatal("Unexpected result code %s from third pipeline item",
578 : : PQresStatus(PQresultStatus(res)));
579 : :
1126 alvherre@alvh.no-ip. 580 :CBC 1 : res = PQgetResult(conn);
581 [ - + ]: 1 : if (res != NULL)
1126 alvherre@alvh.no-ip. 582 :UBC 0 : pg_fatal("Expected null result, got %s",
583 : : PQresStatus(PQresultStatus(res)));
584 : :
1126 alvherre@alvh.no-ip. 585 :CBC 1 : res = PQgetResult(conn);
586 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 587 :UBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
588 : : PQerrorMessage(conn));
589 : :
1126 alvherre@alvh.no-ip. 590 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 591 :UBC 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
592 : : PQresStatus(PQresultStatus(res)));
593 : :
594 : : /* We're still in pipeline mode ... */
1126 alvherre@alvh.no-ip. 595 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 596 :UBC 0 : pg_fatal("Fell out of pipeline mode somehow");
597 : :
598 : : /* until we end it, which we can safely do now */
1126 alvherre@alvh.no-ip. 599 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 600 :UBC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
601 : : PQerrorMessage(conn));
602 : :
1126 alvherre@alvh.no-ip. 603 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 604 :UBC 0 : pg_fatal("exiting pipeline mode didn't seem to work");
605 : :
1126 alvherre@alvh.no-ip. 606 :CBC 1 : fprintf(stderr, "ok\n");
607 : 1 : }
608 : :
609 : : /*
610 : : * Test behavior when a pipeline dispatches a number of commands that are
611 : : * not flushed by a sync point.
612 : : */
613 : : static void
1020 614 : 1 : test_nosync(PGconn *conn)
615 : : {
616 : 1 : int numqueries = 10;
617 : 1 : int results = 0;
618 : 1 : int sock = PQsocket(conn);
619 : :
620 : 1 : fprintf(stderr, "nosync... ");
621 : :
622 [ - + ]: 1 : if (sock < 0)
1020 alvherre@alvh.no-ip. 623 :UBC 0 : pg_fatal("invalid socket");
624 : :
1020 alvherre@alvh.no-ip. 625 [ - + ]:CBC 1 : if (PQenterPipelineMode(conn) != 1)
1020 alvherre@alvh.no-ip. 626 :UBC 0 : pg_fatal("could not enter pipeline mode");
1020 alvherre@alvh.no-ip. 627 [ + + ]:CBC 11 : for (int i = 0; i < numqueries; i++)
628 : : {
629 : : fd_set input_mask;
630 : : struct timeval tv;
631 : :
632 [ - + ]: 10 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
633 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1020 alvherre@alvh.no-ip. 634 :UBC 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
1020 alvherre@alvh.no-ip. 635 :CBC 10 : PQflush(conn);
636 : :
637 : : /*
638 : : * If the server has written anything to us, read (some of) it now.
639 : : */
640 [ + + ]: 170 : FD_ZERO(&input_mask);
641 : 10 : FD_SET(sock, &input_mask);
642 : 10 : tv.tv_sec = 0;
643 : 10 : tv.tv_usec = 0;
644 [ - + ]: 10 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
645 : : {
33 michael@paquier.xyz 646 :UNC 0 : fprintf(stderr, "select() failed: %m\n");
1020 alvherre@alvh.no-ip. 647 :UBC 0 : exit_nicely(conn);
648 : : }
1020 alvherre@alvh.no-ip. 649 [ - + - - ]:CBC 10 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
1020 alvherre@alvh.no-ip. 650 :UBC 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
651 : : }
652 : :
653 : : /* tell server to flush its output buffer */
1020 alvherre@alvh.no-ip. 654 [ - + ]:CBC 1 : if (PQsendFlushRequest(conn) != 1)
1020 alvherre@alvh.no-ip. 655 :UBC 0 : pg_fatal("failed to send flush request");
1020 alvherre@alvh.no-ip. 656 :CBC 1 : PQflush(conn);
657 : :
658 : : /* Now read all results */
659 : : for (;;)
660 : 9 : {
661 : : PGresult *res;
662 : :
663 : 10 : res = PQgetResult(conn);
664 : :
665 : : /* NULL results are only expected after TUPLES_OK */
666 [ - + ]: 10 : if (res == NULL)
1020 alvherre@alvh.no-ip. 667 :UBC 0 : pg_fatal("got unexpected NULL result after %d results", results);
668 : :
669 : : /* We expect exactly one TUPLES_OK result for each query we sent */
1020 alvherre@alvh.no-ip. 670 [ + - ]:CBC 10 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
671 : 9 : {
672 : : PGresult *res2;
673 : :
674 : : /* and one NULL result should follow each */
675 : 10 : res2 = PQgetResult(conn);
676 [ - + ]: 10 : if (res2 != NULL)
1020 alvherre@alvh.no-ip. 677 :UBC 0 : pg_fatal("expected NULL, got %s",
678 : : PQresStatus(PQresultStatus(res2)));
1020 alvherre@alvh.no-ip. 679 :CBC 10 : PQclear(res);
680 : 10 : results++;
681 : :
682 : : /* if we're done, we're done */
683 [ + + ]: 10 : if (results == numqueries)
684 : 1 : break;
685 : :
686 : 9 : continue;
687 : : }
688 : :
689 : : /* anything else is unexpected */
1020 alvherre@alvh.no-ip. 690 :UBC 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
691 : : }
692 : :
1020 alvherre@alvh.no-ip. 693 :CBC 1 : fprintf(stderr, "ok\n");
694 : 1 : }
695 : :
696 : : /*
697 : : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
698 : : * still have to get results for each pipeline item, but the item will just be
699 : : * a PGRES_PIPELINE_ABORTED code.
700 : : *
701 : : * This intentionally doesn't use a transaction to wrap the pipeline. You should
702 : : * usually use an xact, but in this case we want to observe the effects of each
703 : : * statement.
704 : : */
705 : : static void
1126 706 : 1 : test_pipeline_abort(PGconn *conn)
707 : : {
708 : 1 : PGresult *res = NULL;
709 : 1 : const char *dummy_params[1] = {"1"};
710 : 1 : Oid dummy_param_oids[1] = {INT4OID};
711 : : int i;
712 : : int gotrows;
713 : : bool goterror;
714 : :
715 : 1 : fprintf(stderr, "aborted pipeline... ");
716 : :
717 : 1 : res = PQexec(conn, drop_table_sql);
718 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 719 :UBC 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
720 : :
1126 alvherre@alvh.no-ip. 721 :CBC 1 : res = PQexec(conn, create_table_sql);
722 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 723 :UBC 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
724 : :
725 : : /*
726 : : * Queue up a couple of small pipelines and process each without returning
727 : : * to command mode first. Make sure the second operation in the first
728 : : * pipeline ERRORs.
729 : : */
1126 alvherre@alvh.no-ip. 730 [ - + ]:CBC 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 731 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
732 : :
1126 alvherre@alvh.no-ip. 733 :CBC 1 : dummy_params[0] = "1";
734 [ - + ]: 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
735 : : dummy_params, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 736 :UBC 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
737 : :
1126 alvherre@alvh.no-ip. 738 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
739 : : 1, dummy_param_oids, dummy_params,
740 : : NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 741 :UBC 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
742 : :
1126 alvherre@alvh.no-ip. 743 :CBC 1 : dummy_params[0] = "2";
744 [ - + ]: 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
745 : : dummy_params, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 746 :UBC 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
747 : :
1126 alvherre@alvh.no-ip. 748 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 749 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
750 : :
1126 alvherre@alvh.no-ip. 751 :CBC 1 : dummy_params[0] = "3";
752 [ - + ]: 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
753 : : dummy_params, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 754 :UBC 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
755 : : PQerrorMessage(conn));
756 : :
1126 alvherre@alvh.no-ip. 757 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 758 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
759 : :
760 : : /*
761 : : * OK, start processing the pipeline results.
762 : : *
763 : : * We should get a command-ok for the first query, then a fatal error and
764 : : * a pipeline aborted message for the second insert, a pipeline-end, then
765 : : * a command-ok and a pipeline-ok for the second pipeline operation.
766 : : */
1126 alvherre@alvh.no-ip. 767 :CBC 1 : res = PQgetResult(conn);
768 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 769 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 770 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 771 :UBC 0 : pg_fatal("Unexpected result status %s: %s",
772 : : PQresStatus(PQresultStatus(res)),
773 : : PQresultErrorMessage(res));
1126 alvherre@alvh.no-ip. 774 :CBC 1 : PQclear(res);
775 : :
776 : : /* NULL result to signal end-of-results for this command */
777 [ - + ]: 1 : if ((res = PQgetResult(conn)) != NULL)
1126 alvherre@alvh.no-ip. 778 :UBC 0 : pg_fatal("Expected null result, got %s",
779 : : PQresStatus(PQresultStatus(res)));
780 : :
781 : : /* Second query caused error, so we expect an error next */
1126 alvherre@alvh.no-ip. 782 :CBC 1 : res = PQgetResult(conn);
783 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 784 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 785 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
1126 alvherre@alvh.no-ip. 786 :UBC 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
787 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 788 :CBC 1 : PQclear(res);
789 : :
790 : : /* NULL result to signal end-of-results for this command */
791 [ - + ]: 1 : if ((res = PQgetResult(conn)) != NULL)
1126 alvherre@alvh.no-ip. 792 :UBC 0 : pg_fatal("Expected null result, got %s",
793 : : PQresStatus(PQresultStatus(res)));
794 : :
795 : : /*
796 : : * pipeline should now be aborted.
797 : : *
798 : : * Note that we could still queue more queries at this point if we wanted;
799 : : * they'd get added to a new third pipeline since we've already sent a
800 : : * second. The aborted flag relates only to the pipeline being received.
801 : : */
1126 alvherre@alvh.no-ip. 802 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
1126 alvherre@alvh.no-ip. 803 :UBC 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
804 : :
805 : : /* third query in pipeline, the second insert */
1126 alvherre@alvh.no-ip. 806 :CBC 1 : res = PQgetResult(conn);
807 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 808 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 809 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
1126 alvherre@alvh.no-ip. 810 :UBC 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
811 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 812 :CBC 1 : PQclear(res);
813 : :
814 : : /* NULL result to signal end-of-results for this command */
815 [ - + ]: 1 : if ((res = PQgetResult(conn)) != NULL)
1126 alvherre@alvh.no-ip. 816 :UBC 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
817 : :
1126 alvherre@alvh.no-ip. 818 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
1126 alvherre@alvh.no-ip. 819 :UBC 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
820 : :
821 : : /* Ensure we're still in pipeline */
1126 alvherre@alvh.no-ip. 822 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 823 :UBC 0 : pg_fatal("Fell out of pipeline mode somehow");
824 : :
825 : : /*
826 : : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
827 : : *
828 : : * (This is so clients know to start processing results normally again and
829 : : * can tell the difference between skipped commands and the sync.)
830 : : */
1126 alvherre@alvh.no-ip. 831 :CBC 1 : res = PQgetResult(conn);
832 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 833 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 834 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 835 :UBC 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
836 : : "Expected PGRES_PIPELINE_SYNC, got %s",
837 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 838 :CBC 1 : PQclear(res);
839 : :
840 [ - + ]: 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
1126 alvherre@alvh.no-ip. 841 :UBC 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
842 : :
843 : : /* We're still in pipeline mode... */
1126 alvherre@alvh.no-ip. 844 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 845 :UBC 0 : pg_fatal("Fell out of pipeline mode somehow");
846 : :
847 : : /* the insert from the second pipeline */
1126 alvherre@alvh.no-ip. 848 :CBC 1 : res = PQgetResult(conn);
849 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 850 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 851 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 852 :UBC 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
853 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 854 :CBC 1 : PQclear(res);
855 : :
856 : : /* Read the NULL result at the end of the command */
857 [ - + ]: 1 : if ((res = PQgetResult(conn)) != NULL)
1126 alvherre@alvh.no-ip. 858 :UBC 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
859 : :
860 : : /* the second pipeline sync */
1126 alvherre@alvh.no-ip. 861 [ - + ]:CBC 1 : if ((res = PQgetResult(conn)) == NULL)
1126 alvherre@alvh.no-ip. 862 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 863 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 864 :UBC 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
865 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 866 :CBC 1 : PQclear(res);
867 : :
868 [ - + ]: 1 : if ((res = PQgetResult(conn)) != NULL)
1126 alvherre@alvh.no-ip. 869 :UBC 0 : pg_fatal("Expected null result, got %s: %s",
870 : : PQresStatus(PQresultStatus(res)),
871 : : PQerrorMessage(conn));
872 : :
873 : : /* Try to send two queries in one command */
569 alvherre@alvh.no-ip. 874 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 875 :UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 876 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 877 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 878 :CBC 1 : goterror = false;
879 [ + + ]: 2 : while ((res = PQgetResult(conn)) != NULL)
880 : : {
881 [ + - ]: 1 : switch (PQresultStatus(res))
882 : : {
883 : 1 : case PGRES_FATAL_ERROR:
884 [ - + ]: 1 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
1126 alvherre@alvh.no-ip. 885 :UBC 0 : pg_fatal("expected error about multiple commands, got %s",
886 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 887 :CBC 1 : printf("got expected %s", PQerrorMessage(conn));
888 : 1 : goterror = true;
889 : 1 : break;
1126 alvherre@alvh.no-ip. 890 :UBC 0 : default:
891 : 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
892 : : break;
893 : : }
894 : : }
1126 alvherre@alvh.no-ip. 895 [ - + ]:CBC 1 : if (!goterror)
1126 alvherre@alvh.no-ip. 896 :UBC 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
1126 alvherre@alvh.no-ip. 897 :CBC 1 : res = PQgetResult(conn);
898 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 899 :UBC 0 : pg_fatal("got NULL result");
1126 alvherre@alvh.no-ip. 900 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 901 :UBC 0 : pg_fatal("Unexpected result code %s from pipeline sync",
902 : : PQresStatus(PQresultStatus(res)));
1110 alvherre@alvh.no-ip. 903 :CBC 1 : fprintf(stderr, "ok\n");
904 : :
905 : : /* Test single-row mode with an error partways */
569 906 [ - + ]: 1 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
907 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 908 :UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 909 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 910 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 911 :CBC 1 : PQsetSingleRowMode(conn);
912 : 1 : goterror = false;
1110 913 : 1 : gotrows = 0;
1126 914 [ + + ]: 5 : while ((res = PQgetResult(conn)) != NULL)
915 : : {
916 [ + + - ]: 4 : switch (PQresultStatus(res))
917 : : {
918 : 3 : case PGRES_SINGLE_TUPLE:
919 : 3 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
1110 920 : 3 : gotrows++;
1126 921 : 3 : break;
922 : 1 : case PGRES_FATAL_ERROR:
923 [ - + ]: 1 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
1126 alvherre@alvh.no-ip. 924 :UBC 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
925 : : PQerrorMessage(conn),
926 : : PQresultErrorField(res, PG_DIAG_SQLSTATE));
1126 alvherre@alvh.no-ip. 927 :CBC 1 : printf("got expected division-by-zero\n");
928 : 1 : goterror = true;
929 : 1 : break;
1126 alvherre@alvh.no-ip. 930 :UBC 0 : default:
931 : 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
932 : : }
1126 alvherre@alvh.no-ip. 933 :CBC 4 : PQclear(res);
934 : : }
935 [ - + ]: 1 : if (!goterror)
1126 alvherre@alvh.no-ip. 936 :UBC 0 : pg_fatal("did not get division-by-zero error");
1110 alvherre@alvh.no-ip. 937 [ - + ]:CBC 1 : if (gotrows != 3)
1110 alvherre@alvh.no-ip. 938 :UBC 0 : pg_fatal("did not get three rows");
939 : : /* the third pipeline sync */
1126 alvherre@alvh.no-ip. 940 [ - + ]:CBC 1 : if ((res = PQgetResult(conn)) == NULL)
1126 alvherre@alvh.no-ip. 941 :UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 942 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 943 :UBC 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
944 : : PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 945 :CBC 1 : PQclear(res);
946 : :
947 : : /* We're still in pipeline mode... */
948 [ - + ]: 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 949 :UBC 0 : pg_fatal("Fell out of pipeline mode somehow");
950 : :
951 : : /* until we end it, which we can safely do now */
1126 alvherre@alvh.no-ip. 952 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 953 :UBC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
954 : : PQerrorMessage(conn));
955 : :
1126 alvherre@alvh.no-ip. 956 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 957 :UBC 0 : pg_fatal("exiting pipeline mode didn't seem to work");
958 : :
959 : : /*-
960 : : * Since we fired the pipelines off without a surrounding xact, the results
961 : : * should be:
962 : : *
963 : : * - Implicit xact started by server around 1st pipeline
964 : : * - First insert applied
965 : : * - Second statement aborted xact
966 : : * - Third insert skipped
967 : : * - Sync rolled back first implicit xact
968 : : * - Implicit xact created by server around 2nd pipeline
969 : : * - insert applied from 2nd pipeline
970 : : * - Sync commits 2nd xact
971 : : *
972 : : * So we should only have the value 3 that we inserted.
973 : : */
1126 alvherre@alvh.no-ip. 974 :CBC 1 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
975 : :
976 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 977 :UBC 0 : pg_fatal("Expected tuples, got %s: %s",
978 : : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 979 [ - + ]:CBC 1 : if (PQntuples(res) != 1)
1126 alvherre@alvh.no-ip. 980 :UBC 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
1126 alvherre@alvh.no-ip. 981 [ + + ]:CBC 2 : for (i = 0; i < PQntuples(res); i++)
982 : : {
983 : 1 : const char *val = PQgetvalue(res, i, 0);
984 : :
985 [ - + ]: 1 : if (strcmp(val, "3") != 0)
1126 alvherre@alvh.no-ip. 986 :UBC 0 : pg_fatal("expected only insert with value 3, got %s", val);
987 : : }
988 : :
1126 alvherre@alvh.no-ip. 989 :CBC 1 : PQclear(res);
990 : :
649 991 : 1 : fprintf(stderr, "ok\n");
1126 992 : 1 : }
993 : :
994 : : /* State machine enum for test_pipelined_insert */
995 : : enum PipelineInsertStep
996 : : {
997 : : BI_BEGIN_TX,
998 : : BI_DROP_TABLE,
999 : : BI_CREATE_TABLE,
1000 : : BI_PREPARE,
1001 : : BI_INSERT_ROWS,
1002 : : BI_COMMIT_TX,
1003 : : BI_SYNC,
1004 : : BI_DONE,
1005 : : };
1006 : :
1007 : : static void
1008 : 1 : test_pipelined_insert(PGconn *conn, int n_rows)
1009 : : {
1110 1010 : 1 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
1011 : : const char *insert_params[2];
1012 : : char insert_param_0[MAXINTLEN];
1013 : : char insert_param_1[MAXINT8LEN];
1126 1014 : 1 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
1015 : 1 : recv_step = BI_BEGIN_TX;
1016 : : int rows_to_send,
1017 : : rows_to_receive;
1018 : :
1110 1019 : 1 : insert_params[0] = insert_param_0;
1020 : 1 : insert_params[1] = insert_param_1;
1021 : :
1126 1022 : 1 : rows_to_send = rows_to_receive = n_rows;
1023 : :
1024 : : /*
1025 : : * Do a pipelined insert into a table created at the start of the pipeline
1026 : : */
1027 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1028 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1029 : :
1126 alvherre@alvh.no-ip. 1030 [ + + ]:CBC 4 : while (send_step != BI_PREPARE)
1031 : : {
1032 : : const char *sql;
1033 : :
1034 [ + + + - ]: 3 : switch (send_step)
1035 : : {
1036 : 1 : case BI_BEGIN_TX:
1037 : 1 : sql = "BEGIN TRANSACTION";
1038 : 1 : send_step = BI_DROP_TABLE;
1039 : 1 : break;
1040 : :
1041 : 1 : case BI_DROP_TABLE:
1042 : 1 : sql = drop_table_sql;
1043 : 1 : send_step = BI_CREATE_TABLE;
1044 : 1 : break;
1045 : :
1046 : 1 : case BI_CREATE_TABLE:
1047 : 1 : sql = create_table_sql;
1048 : 1 : send_step = BI_PREPARE;
1049 : 1 : break;
1050 : :
1126 alvherre@alvh.no-ip. 1051 :UBC 0 : default:
1052 : 0 : pg_fatal("invalid state");
1053 : : sql = NULL; /* keep compiler quiet */
1054 : : }
1055 : :
1056 : : pg_debug("sending: %s\n", sql);
1126 alvherre@alvh.no-ip. 1057 [ - + ]:CBC 3 : if (PQsendQueryParams(conn, sql,
1058 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1059 :UBC 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
1060 : : }
1061 : :
1126 alvherre@alvh.no-ip. 1062 [ - + ]:CBC 1 : Assert(send_step == BI_PREPARE);
1063 : : pg_debug("sending: %s\n", insert_sql2);
1110 1064 [ - + ]: 1 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
1126 alvherre@alvh.no-ip. 1065 :UBC 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1066 :CBC 1 : send_step = BI_INSERT_ROWS;
1067 : :
1068 : : /*
1069 : : * Now we start inserting. We'll be sending enough data that we could fill
1070 : : * our output buffer, so to avoid deadlocking we need to enter nonblocking
1071 : : * mode and consume input while we send more output. As results of each
1072 : : * query are processed we should pop them to allow processing of the next
1073 : : * query. There's no need to finish the pipeline before processing
1074 : : * results.
1075 : : */
1076 [ - + ]: 1 : if (PQsetnonblocking(conn, 1) != 0)
1126 alvherre@alvh.no-ip. 1077 :UBC 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
1078 : :
1126 alvherre@alvh.no-ip. 1079 [ + + ]:CBC 27686 : while (recv_step != BI_DONE)
1080 : : {
1081 : : int sock;
1082 : : fd_set input_mask;
1083 : : fd_set output_mask;
1084 : :
1085 : 27685 : sock = PQsocket(conn);
1086 : :
1087 [ - + ]: 27685 : if (sock < 0)
1126 alvherre@alvh.no-ip. 1088 :UBC 0 : break; /* shouldn't happen */
1089 : :
1126 alvherre@alvh.no-ip. 1090 [ + + ]:CBC 470645 : FD_ZERO(&input_mask);
1091 : 27685 : FD_SET(sock, &input_mask);
1092 [ + + ]: 470645 : FD_ZERO(&output_mask);
1093 : 27685 : FD_SET(sock, &output_mask);
1094 : :
1095 [ - + ]: 27685 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
1096 : : {
33 michael@paquier.xyz 1097 :UNC 0 : fprintf(stderr, "select() failed: %m\n");
1126 alvherre@alvh.no-ip. 1098 :UBC 0 : exit_nicely(conn);
1099 : : }
1100 : :
1101 : : /*
1102 : : * Process any results, so we keep the server's output buffer free
1103 : : * flowing and it can continue to process input
1104 : : */
1126 alvherre@alvh.no-ip. 1105 [ + + ]:CBC 27685 : if (FD_ISSET(sock, &input_mask))
1106 : : {
1107 : 3 : PQconsumeInput(conn);
1108 : :
1109 : : /* Read until we'd block if we tried to read */
1110 [ + + + + ]: 1414 : while (!PQisBusy(conn) && recv_step < BI_DONE)
1111 : : {
1112 : : PGresult *res;
1110 tgl@sss.pgh.pa.us 1113 : 1411 : const char *cmdtag = "";
1126 alvherre@alvh.no-ip. 1114 : 1411 : const char *description = "";
1115 : : int status;
1116 : :
1117 : : /*
1118 : : * Read next result. If no more results from this query,
1119 : : * advance to the next query
1120 : : */
1121 : 1411 : res = PQgetResult(conn);
1122 [ + + ]: 1411 : if (res == NULL)
1123 : 705 : continue;
1124 : :
1125 : 706 : status = PGRES_COMMAND_OK;
1126 [ + + + + : 706 : switch (recv_step)
+ + + -
- ]
1127 : : {
1128 : 1 : case BI_BEGIN_TX:
1129 : 1 : cmdtag = "BEGIN";
1130 : 1 : recv_step++;
1131 : 1 : break;
1132 : 1 : case BI_DROP_TABLE:
1133 : 1 : cmdtag = "DROP TABLE";
1134 : 1 : recv_step++;
1135 : 1 : break;
1136 : 1 : case BI_CREATE_TABLE:
1137 : 1 : cmdtag = "CREATE TABLE";
1138 : 1 : recv_step++;
1139 : 1 : break;
1140 : 1 : case BI_PREPARE:
1141 : 1 : cmdtag = "";
1142 : 1 : description = "PREPARE";
1143 : 1 : recv_step++;
1144 : 1 : break;
1145 : 700 : case BI_INSERT_ROWS:
1146 : 700 : cmdtag = "INSERT";
1147 : 700 : rows_to_receive--;
1148 [ + + ]: 700 : if (rows_to_receive == 0)
1149 : 1 : recv_step++;
1150 : 700 : break;
1151 : 1 : case BI_COMMIT_TX:
1152 : 1 : cmdtag = "COMMIT";
1153 : 1 : recv_step++;
1154 : 1 : break;
1155 : 1 : case BI_SYNC:
1156 : 1 : cmdtag = "";
1157 : 1 : description = "SYNC";
1158 : 1 : status = PGRES_PIPELINE_SYNC;
1159 : 1 : recv_step++;
1160 : 1 : break;
1126 alvherre@alvh.no-ip. 1161 :UBC 0 : case BI_DONE:
1162 : : /* unreachable */
1120 tgl@sss.pgh.pa.us 1163 : 0 : pg_fatal("unreachable state");
1164 : : }
1165 : :
1126 alvherre@alvh.no-ip. 1166 [ - + ]:CBC 706 : if (PQresultStatus(res) != status)
1126 alvherre@alvh.no-ip. 1167 :UBC 0 : pg_fatal("%s reported status %s, expected %s\n"
1168 : : "Error message: \"%s\"",
1169 : : description, PQresStatus(PQresultStatus(res)),
1170 : : PQresStatus(status), PQerrorMessage(conn));
1171 : :
1126 alvherre@alvh.no-ip. 1172 [ - + ]:CBC 706 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
1126 alvherre@alvh.no-ip. 1173 :UBC 0 : pg_fatal("%s expected command tag '%s', got '%s'",
1174 : : description, cmdtag, PQcmdStatus(res));
1175 : :
1176 : : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
1177 : :
1126 alvherre@alvh.no-ip. 1178 :CBC 706 : PQclear(res);
1179 : : }
1180 : : }
1181 : :
1182 : : /* Write more rows and/or the end pipeline message, if needed */
1183 [ + + ]: 27685 : if (FD_ISSET(sock, &output_mask))
1184 : : {
1185 : 27683 : PQflush(conn);
1186 : :
1187 [ + + ]: 27683 : if (send_step == BI_INSERT_ROWS)
1188 : : {
1110 1189 : 700 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
1190 : : /* use up some buffer space with a wide value */
1109 1191 : 700 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
1192 : :
1126 1193 [ + - ]: 700 : if (PQsendQueryPrepared(conn, "my_insert",
1194 : : 2, insert_params, NULL, NULL, 0) == 1)
1195 : : {
1196 : : pg_debug("sent row %d\n", rows_to_send);
1197 : :
1198 : 700 : rows_to_send--;
1199 [ + + ]: 700 : if (rows_to_send == 0)
1200 : 1 : send_step++;
1201 : : }
1202 : : else
1203 : : {
1204 : : /*
1205 : : * in nonblocking mode, so it's OK for an insert to fail
1206 : : * to send
1207 : : */
1126 alvherre@alvh.no-ip. 1208 :UBC 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
1209 : : rows_to_send, PQerrorMessage(conn));
1210 : : }
1211 : : }
1126 alvherre@alvh.no-ip. 1212 [ + + ]:CBC 26983 : else if (send_step == BI_COMMIT_TX)
1213 : : {
1214 [ + - ]: 1 : if (PQsendQueryParams(conn, "COMMIT",
1215 : : 0, NULL, NULL, NULL, NULL, 0) == 1)
1216 : : {
1217 : : pg_debug("sent COMMIT\n");
1218 : 1 : send_step++;
1219 : : }
1220 : : else
1221 : : {
1126 alvherre@alvh.no-ip. 1222 :UBC 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
1223 : : PQerrorMessage(conn));
1224 : : }
1225 : : }
1126 alvherre@alvh.no-ip. 1226 [ + + ]:CBC 26982 : else if (send_step == BI_SYNC)
1227 : : {
1228 [ + - ]: 1 : if (PQpipelineSync(conn) == 1)
1229 : : {
1230 : 1 : fprintf(stdout, "pipeline sync sent\n");
1231 : 1 : send_step++;
1232 : : }
1233 : : else
1234 : : {
1126 alvherre@alvh.no-ip. 1235 :UBC 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
1236 : : PQerrorMessage(conn));
1237 : : }
1238 : : }
1239 : : }
1240 : : }
1241 : :
1242 : : /* We've got the sync message and the pipeline should be done */
1126 alvherre@alvh.no-ip. 1243 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1244 :UBC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1245 : : PQerrorMessage(conn));
1246 : :
1126 alvherre@alvh.no-ip. 1247 [ - + ]:CBC 1 : if (PQsetnonblocking(conn, 0) != 0)
1126 alvherre@alvh.no-ip. 1248 :UBC 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
1249 : :
1126 alvherre@alvh.no-ip. 1250 :CBC 1 : fprintf(stderr, "ok\n");
1251 : 1 : }
1252 : :
1253 : : static void
1254 : 1 : test_prepared(PGconn *conn)
1255 : : {
1256 : 1 : PGresult *res = NULL;
1257 : 1 : Oid param_oids[1] = {INT4OID};
1258 : : Oid expected_oids[4];
1259 : : Oid typ;
1260 : :
1261 : 1 : fprintf(stderr, "prepared... ");
1262 : :
1263 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1264 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1265 [ - + ]:CBC 1 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
1266 : : "interval '1 sec'",
1267 : : 1, param_oids) != 1)
1126 alvherre@alvh.no-ip. 1268 :UBC 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1269 :CBC 1 : expected_oids[0] = INT4OID;
1270 : 1 : expected_oids[1] = TEXTOID;
1271 : 1 : expected_oids[2] = NUMERICOID;
1272 : 1 : expected_oids[3] = INTERVALOID;
1273 [ - + ]: 1 : if (PQsendDescribePrepared(conn, "select_one") != 1)
1126 alvherre@alvh.no-ip. 1274 :UBC 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1275 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1276 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1277 : :
1126 alvherre@alvh.no-ip. 1278 :CBC 1 : res = PQgetResult(conn);
1279 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 1280 :UBC 0 : pg_fatal("PQgetResult returned null");
1126 alvherre@alvh.no-ip. 1281 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 1282 :UBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 1283 :CBC 1 : PQclear(res);
1284 : 1 : res = PQgetResult(conn);
1285 [ - + ]: 1 : if (res != NULL)
1126 alvherre@alvh.no-ip. 1286 :UBC 0 : pg_fatal("expected NULL result");
1287 : :
1126 alvherre@alvh.no-ip. 1288 :CBC 1 : res = PQgetResult(conn);
1289 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 1290 :UBC 0 : pg_fatal("PQgetResult returned NULL");
1126 alvherre@alvh.no-ip. 1291 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 1292 :UBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1126 alvherre@alvh.no-ip. 1293 [ - + ]:CBC 1 : if (PQnfields(res) != lengthof(expected_oids))
194 peter@eisentraut.org 1294 :UNC 0 : pg_fatal("expected %zu columns, got %d",
1295 : : lengthof(expected_oids), PQnfields(res));
1126 alvherre@alvh.no-ip. 1296 [ + + ]:CBC 5 : for (int i = 0; i < PQnfields(res); i++)
1297 : : {
1298 : 4 : typ = PQftype(res, i);
1299 [ - + ]: 4 : if (typ != expected_oids[i])
1126 alvherre@alvh.no-ip. 1300 :UBC 0 : pg_fatal("field %d: expected type %u, got %u",
1301 : : i, expected_oids[i], typ);
1302 : : }
1126 alvherre@alvh.no-ip. 1303 :CBC 1 : PQclear(res);
1304 : 1 : res = PQgetResult(conn);
1305 [ - + ]: 1 : if (res != NULL)
1126 alvherre@alvh.no-ip. 1306 :UBC 0 : pg_fatal("expected NULL result");
1307 : :
1126 alvherre@alvh.no-ip. 1308 :CBC 1 : res = PQgetResult(conn);
1309 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 1310 :UBC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1311 : :
285 michael@paquier.xyz 1312 :GNC 1 : fprintf(stderr, "closing statement..");
1313 [ - + ]: 1 : if (PQsendClosePrepared(conn, "select_one") != 1)
285 michael@paquier.xyz 1314 :UNC 0 : pg_fatal("PQsendClosePrepared failed: %s", PQerrorMessage(conn));
285 michael@paquier.xyz 1315 [ - + ]:GNC 1 : if (PQpipelineSync(conn) != 1)
285 michael@paquier.xyz 1316 :UNC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1317 : :
285 michael@paquier.xyz 1318 :GNC 1 : res = PQgetResult(conn);
1319 [ - + ]: 1 : if (res == NULL)
285 michael@paquier.xyz 1320 :UNC 0 : pg_fatal("expected non-NULL result");
285 michael@paquier.xyz 1321 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
285 michael@paquier.xyz 1322 :UNC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
285 michael@paquier.xyz 1323 :GNC 1 : PQclear(res);
1324 : 1 : res = PQgetResult(conn);
1325 [ - + ]: 1 : if (res != NULL)
285 michael@paquier.xyz 1326 :UNC 0 : pg_fatal("expected NULL result");
285 michael@paquier.xyz 1327 :GNC 1 : res = PQgetResult(conn);
1328 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
285 michael@paquier.xyz 1329 :UNC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1330 : :
1126 alvherre@alvh.no-ip. 1331 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1332 :UBC 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1333 : :
1334 : : /* Now that it's closed we should get an error when describing */
285 michael@paquier.xyz 1335 :GNC 1 : res = PQdescribePrepared(conn, "select_one");
1336 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
285 michael@paquier.xyz 1337 :UNC 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1338 : :
1339 : : /*
1340 : : * Also test the blocking close, this should not fail since closing a
1341 : : * non-existent prepared statement is a no-op
1342 : : */
285 michael@paquier.xyz 1343 :GNC 1 : res = PQclosePrepared(conn, "select_one");
1344 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
285 michael@paquier.xyz 1345 :UNC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1346 : :
285 michael@paquier.xyz 1347 :GNC 1 : fprintf(stderr, "creating portal... ");
1126 alvherre@alvh.no-ip. 1348 :CBC 1 : PQexec(conn, "BEGIN");
1349 : 1 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
1350 : 1 : PQenterPipelineMode(conn);
1351 [ - + ]: 1 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
1126 alvherre@alvh.no-ip. 1352 :UBC 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1353 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1354 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1355 :CBC 1 : res = PQgetResult(conn);
1356 [ - + ]: 1 : if (res == NULL)
296 michael@paquier.xyz 1357 :UBC 0 : pg_fatal("PQgetResult returned null");
1126 alvherre@alvh.no-ip. 1358 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 1359 :UBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1360 : :
1126 alvherre@alvh.no-ip. 1361 :CBC 1 : typ = PQftype(res, 0);
1362 [ - + ]: 1 : if (typ != INT4OID)
1126 alvherre@alvh.no-ip. 1363 :UBC 0 : pg_fatal("portal: expected type %u, got %u",
1364 : : INT4OID, typ);
1126 alvherre@alvh.no-ip. 1365 :CBC 1 : PQclear(res);
1366 : 1 : res = PQgetResult(conn);
1367 [ - + ]: 1 : if (res != NULL)
1126 alvherre@alvh.no-ip. 1368 :UBC 0 : pg_fatal("expected NULL result");
1126 alvherre@alvh.no-ip. 1369 :CBC 1 : res = PQgetResult(conn);
1370 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 1371 :UBC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1372 : :
285 michael@paquier.xyz 1373 :GNC 1 : fprintf(stderr, "closing portal... ");
1374 [ - + ]: 1 : if (PQsendClosePortal(conn, "cursor_one") != 1)
285 michael@paquier.xyz 1375 :UNC 0 : pg_fatal("PQsendClosePortal failed: %s", PQerrorMessage(conn));
285 michael@paquier.xyz 1376 [ - + ]:GNC 1 : if (PQpipelineSync(conn) != 1)
285 michael@paquier.xyz 1377 :UNC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1378 : :
285 michael@paquier.xyz 1379 :GNC 1 : res = PQgetResult(conn);
1380 [ - + ]: 1 : if (res == NULL)
285 michael@paquier.xyz 1381 :UNC 0 : pg_fatal("expected non-NULL result");
285 michael@paquier.xyz 1382 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
285 michael@paquier.xyz 1383 :UNC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
285 michael@paquier.xyz 1384 :GNC 1 : PQclear(res);
1385 : 1 : res = PQgetResult(conn);
1386 [ - + ]: 1 : if (res != NULL)
285 michael@paquier.xyz 1387 :UNC 0 : pg_fatal("expected NULL result");
285 michael@paquier.xyz 1388 :GNC 1 : res = PQgetResult(conn);
1389 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
285 michael@paquier.xyz 1390 :UNC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
1391 : :
1126 alvherre@alvh.no-ip. 1392 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1393 :UBC 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
1394 : :
1395 : : /* Now that it's closed we should get an error when describing */
285 michael@paquier.xyz 1396 :GNC 1 : res = PQdescribePortal(conn, "cursor_one");
1397 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
285 michael@paquier.xyz 1398 :UNC 0 : pg_fatal("expected FATAL_ERROR, got %s", PQresStatus(PQresultStatus(res)));
1399 : :
1400 : : /*
1401 : : * Also test the blocking close, this should not fail since closing a
1402 : : * non-existent portal is a no-op
1403 : : */
285 michael@paquier.xyz 1404 :GNC 1 : res = PQclosePortal(conn, "cursor_one");
1405 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
285 michael@paquier.xyz 1406 :UNC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
1407 : :
1126 alvherre@alvh.no-ip. 1408 :CBC 1 : fprintf(stderr, "ok\n");
1409 : 1 : }
1410 : :
1411 : : /* Notice processor: print notices, and count how many we got */
1412 : : static void
649 1413 : 1 : notice_processor(void *arg, const char *message)
1414 : : {
331 tgl@sss.pgh.pa.us 1415 : 1 : int *n_notices = (int *) arg;
1416 : :
649 alvherre@alvh.no-ip. 1417 : 1 : (*n_notices)++;
1418 : 1 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
1419 : 1 : }
1420 : :
1421 : : /* Verify behavior in "idle" state */
1422 : : static void
1423 : 1 : test_pipeline_idle(PGconn *conn)
1424 : : {
1425 : : PGresult *res;
1426 : 1 : int n_notices = 0;
1427 : :
1428 : 1 : fprintf(stderr, "\npipeline idle...\n");
1429 : :
1430 : 1 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
1431 : :
1432 : : /* Try to exit pipeline mode in pipeline-idle state */
1433 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
649 alvherre@alvh.no-ip. 1434 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
569 alvherre@alvh.no-ip. 1435 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
649 alvherre@alvh.no-ip. 1436 :UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
649 alvherre@alvh.no-ip. 1437 :CBC 1 : PQsendFlushRequest(conn);
1438 : 1 : res = PQgetResult(conn);
1439 [ - + ]: 1 : if (res == NULL)
649 alvherre@alvh.no-ip. 1440 :UBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1441 : : PQerrorMessage(conn));
649 alvherre@alvh.no-ip. 1442 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
649 alvherre@alvh.no-ip. 1443 :UBC 0 : pg_fatal("unexpected result code %s from first pipeline item",
1444 : : PQresStatus(PQresultStatus(res)));
649 alvherre@alvh.no-ip. 1445 :CBC 1 : PQclear(res);
1446 : 1 : res = PQgetResult(conn);
1447 [ - + ]: 1 : if (res != NULL)
649 alvherre@alvh.no-ip. 1448 :UBC 0 : pg_fatal("did not receive terminating NULL");
569 alvherre@alvh.no-ip. 1449 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
649 alvherre@alvh.no-ip. 1450 :UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
649 alvherre@alvh.no-ip. 1451 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) == 1)
649 alvherre@alvh.no-ip. 1452 :UBC 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
649 alvherre@alvh.no-ip. 1453 [ - + ]:CBC 1 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1454 : : strlen("cannot exit pipeline mode")) != 0)
649 alvherre@alvh.no-ip. 1455 :UBC 0 : pg_fatal("did not get expected error; got: %s",
1456 : : PQerrorMessage(conn));
649 alvherre@alvh.no-ip. 1457 :CBC 1 : PQsendFlushRequest(conn);
1458 : 1 : res = PQgetResult(conn);
1459 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
649 alvherre@alvh.no-ip. 1460 :UBC 0 : pg_fatal("unexpected result code %s from second pipeline item",
1461 : : PQresStatus(PQresultStatus(res)));
649 alvherre@alvh.no-ip. 1462 :CBC 1 : PQclear(res);
1463 : 1 : res = PQgetResult(conn);
1464 [ - + ]: 1 : if (res != NULL)
649 alvherre@alvh.no-ip. 1465 :UBC 0 : pg_fatal("did not receive terminating NULL");
649 alvherre@alvh.no-ip. 1466 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
649 alvherre@alvh.no-ip. 1467 :UBC 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
1468 : :
649 alvherre@alvh.no-ip. 1469 [ - + ]:CBC 1 : if (n_notices > 0)
649 alvherre@alvh.no-ip. 1470 :UBC 0 : pg_fatal("got %d notice(s)", n_notices);
569 alvherre@alvh.no-ip. 1471 :CBC 1 : fprintf(stderr, "ok - 1\n");
1472 : :
1473 : : /* Have a WARNING in the middle of a resultset */
649 1474 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
649 alvherre@alvh.no-ip. 1475 :UBC 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
569 alvherre@alvh.no-ip. 1476 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
649 alvherre@alvh.no-ip. 1477 :UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
649 alvherre@alvh.no-ip. 1478 :CBC 1 : PQsendFlushRequest(conn);
1479 : 1 : res = PQgetResult(conn);
1480 [ - + ]: 1 : if (res == NULL)
649 alvherre@alvh.no-ip. 1481 :UBC 0 : pg_fatal("unexpected NULL result received");
649 alvherre@alvh.no-ip. 1482 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
649 alvherre@alvh.no-ip. 1483 :UBC 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
649 alvherre@alvh.no-ip. 1484 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
649 alvherre@alvh.no-ip. 1485 :UBC 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
569 alvherre@alvh.no-ip. 1486 :CBC 1 : fprintf(stderr, "ok - 2\n");
649 1487 : 1 : }
1488 : :
1489 : : static void
1126 1490 : 1 : test_simple_pipeline(PGconn *conn)
1491 : : {
1492 : 1 : PGresult *res = NULL;
1493 : 1 : const char *dummy_params[1] = {"1"};
1494 : 1 : Oid dummy_param_oids[1] = {INT4OID};
1495 : :
1496 : 1 : fprintf(stderr, "simple pipeline... ");
1497 : :
1498 : : /*
1499 : : * Enter pipeline mode and dispatch a set of operations, which we'll then
1500 : : * process the results of as they come in.
1501 : : *
1502 : : * For a simple case we should be able to do this without interim
1503 : : * processing of results since our output buffer will give us enough slush
1504 : : * to work with and we won't block on sending. So blocking mode is fine.
1505 : : */
1506 [ - + ]: 1 : if (PQisnonblocking(conn))
1126 alvherre@alvh.no-ip. 1507 :UBC 0 : pg_fatal("Expected blocking connection mode");
1508 : :
1126 alvherre@alvh.no-ip. 1509 [ - + ]:CBC 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1510 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1511 : :
1126 alvherre@alvh.no-ip. 1512 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT $1",
1513 : : 1, dummy_param_oids, dummy_params,
1514 : : NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1515 :UBC 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
1516 : :
1126 alvherre@alvh.no-ip. 1517 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 0)
1126 alvherre@alvh.no-ip. 1518 :UBC 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
1519 : :
1126 alvherre@alvh.no-ip. 1520 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1521 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1522 : :
1126 alvherre@alvh.no-ip. 1523 :CBC 1 : res = PQgetResult(conn);
1524 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 1525 :UBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
1526 : : PQerrorMessage(conn));
1527 : :
1126 alvherre@alvh.no-ip. 1528 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 1529 :UBC 0 : pg_fatal("Unexpected result code %s from first pipeline item",
1530 : : PQresStatus(PQresultStatus(res)));
1531 : :
1126 alvherre@alvh.no-ip. 1532 :CBC 1 : PQclear(res);
1533 : 1 : res = NULL;
1534 : :
1535 [ - + ]: 1 : if (PQgetResult(conn) != NULL)
1126 alvherre@alvh.no-ip. 1536 :UBC 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1537 : :
1538 : : /*
1539 : : * Even though we've processed the result there's still a sync to come and
1540 : : * we can't exit pipeline mode yet
1541 : : */
1126 alvherre@alvh.no-ip. 1542 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 0)
1126 alvherre@alvh.no-ip. 1543 :UBC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
1544 : :
1126 alvherre@alvh.no-ip. 1545 :CBC 1 : res = PQgetResult(conn);
1546 [ - + ]: 1 : if (res == NULL)
1126 alvherre@alvh.no-ip. 1547 :UBC 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
1548 : : PQerrorMessage(conn));
1549 : :
1126 alvherre@alvh.no-ip. 1550 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
1126 alvherre@alvh.no-ip. 1551 :UBC 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
1552 : : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1553 : :
1126 alvherre@alvh.no-ip. 1554 :CBC 1 : PQclear(res);
1555 : 1 : res = NULL;
1556 : :
1557 [ - + ]: 1 : if (PQgetResult(conn) != NULL)
1126 alvherre@alvh.no-ip. 1558 :UBC 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1559 : : PQresStatus(PQresultStatus(res)));
1560 : :
1561 : : /* We're still in pipeline mode... */
1126 alvherre@alvh.no-ip. 1562 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 1563 :UBC 0 : pg_fatal("Fell out of pipeline mode somehow");
1564 : :
1565 : : /* ... until we end it, which we can safely do now */
1126 alvherre@alvh.no-ip. 1566 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1567 :UBC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
1568 : : PQerrorMessage(conn));
1569 : :
1126 alvherre@alvh.no-ip. 1570 [ - + ]:CBC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
1126 alvherre@alvh.no-ip. 1571 :UBC 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
1572 : :
1126 alvherre@alvh.no-ip. 1573 :CBC 1 : fprintf(stderr, "ok\n");
1574 : 1 : }
1575 : :
1576 : : static void
1577 : 1 : test_singlerowmode(PGconn *conn)
1578 : : {
1579 : : PGresult *res;
1580 : : int i;
1581 : 1 : bool pipeline_ended = false;
1582 : :
1583 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1584 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s",
1585 : : PQerrorMessage(conn));
1586 : :
1587 : : /* One series of three commands, using single-row mode for the first two. */
1126 alvherre@alvh.no-ip. 1588 [ + + ]:CBC 4 : for (i = 0; i < 3; i++)
1589 : : {
1590 : : char *param[1];
1591 : :
1592 : 3 : param[0] = psprintf("%d", 44 + i);
1593 : :
1594 [ - + ]: 3 : if (PQsendQueryParams(conn,
1595 : : "SELECT generate_series(42, $1)",
1596 : : 1,
1597 : : NULL,
1598 : : (const char **) param,
1599 : : NULL,
1600 : : NULL,
1601 : : 0) != 1)
1126 alvherre@alvh.no-ip. 1602 :UBC 0 : pg_fatal("failed to send query: %s",
1603 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1604 :CBC 3 : pfree(param[0]);
1605 : : }
1606 [ - + ]: 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1607 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1608 : :
1126 alvherre@alvh.no-ip. 1609 [ + + ]:CBC 5 : for (i = 0; !pipeline_ended; i++)
1610 : : {
1611 : 4 : bool first = true;
1612 : : bool saw_ending_tuplesok;
1613 : 4 : bool isSingleTuple = false;
1614 : :
1615 : : /* Set single row mode for only first 2 SELECT queries */
1616 [ + + ]: 4 : if (i < 2)
1617 : : {
1618 [ - + ]: 2 : if (PQsetSingleRowMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1619 :UBC 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1620 : : }
1621 : :
1622 : : /* Consume rows for this query */
1126 alvherre@alvh.no-ip. 1623 :CBC 4 : saw_ending_tuplesok = false;
1624 [ + + ]: 14 : while ((res = PQgetResult(conn)) != NULL)
1625 : : {
1626 : 11 : ExecStatusType est = PQresultStatus(res);
1627 : :
1628 [ + + ]: 11 : if (est == PGRES_PIPELINE_SYNC)
1629 : : {
1630 : 1 : fprintf(stderr, "end of pipeline reached\n");
1631 : 1 : pipeline_ended = true;
1632 : 1 : PQclear(res);
1633 [ - + ]: 1 : if (i != 3)
1126 alvherre@alvh.no-ip. 1634 :UBC 0 : pg_fatal("Expected three results, got %d", i);
1126 alvherre@alvh.no-ip. 1635 :CBC 1 : break;
1636 : : }
1637 : :
1638 : : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
1639 [ + + ]: 10 : if (first)
1640 : : {
1641 [ + + - + ]: 3 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
1126 alvherre@alvh.no-ip. 1642 :UBC 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
1643 : : i, PQresStatus(est));
1126 alvherre@alvh.no-ip. 1644 [ + + - + ]:CBC 3 : if (i >= 2 && est != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 1645 :UBC 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1646 : : i, PQresStatus(est));
1126 alvherre@alvh.no-ip. 1647 :CBC 3 : first = false;
1648 : : }
1649 : :
1650 : 10 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
1651 [ + + - ]: 10 : switch (est)
1652 : : {
1653 : 3 : case PGRES_TUPLES_OK:
1654 : 3 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
1655 : 3 : saw_ending_tuplesok = true;
1656 [ + + ]: 3 : if (isSingleTuple)
1657 : : {
1658 [ + - ]: 2 : if (PQntuples(res) == 0)
1659 : 2 : fprintf(stderr, "all tuples received in query %d\n", i);
1660 : : else
1126 alvherre@alvh.no-ip. 1661 :UBC 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1662 : : }
1126 alvherre@alvh.no-ip. 1663 :CBC 3 : break;
1664 : :
1665 : 7 : case PGRES_SINGLE_TUPLE:
1666 : 7 : isSingleTuple = true;
1667 : 7 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
1668 : 7 : break;
1669 : :
1126 alvherre@alvh.no-ip. 1670 :UBC 0 : default:
1671 : 0 : pg_fatal("unexpected");
1672 : : }
1126 alvherre@alvh.no-ip. 1673 :CBC 10 : PQclear(res);
1674 : : }
1675 [ + + - + ]: 4 : if (!pipeline_ended && !saw_ending_tuplesok)
1126 alvherre@alvh.no-ip. 1676 :UBC 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1677 : : }
1678 : :
1679 : : /*
1680 : : * Now issue one command, get its results in with single-row mode, then
1681 : : * issue another command, and get its results in normal mode; make sure
1682 : : * the single-row mode flag is reset as expected.
1683 : : */
548 alvherre@alvh.no-ip. 1684 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1685 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
548 alvherre@alvh.no-ip. 1686 :UBC 0 : pg_fatal("failed to send query: %s",
1687 : : PQerrorMessage(conn));
548 alvherre@alvh.no-ip. 1688 [ - + ]:CBC 1 : if (PQsendFlushRequest(conn) != 1)
548 alvherre@alvh.no-ip. 1689 :UBC 0 : pg_fatal("failed to send flush request");
548 alvherre@alvh.no-ip. 1690 [ - + ]:CBC 1 : if (PQsetSingleRowMode(conn) != 1)
548 alvherre@alvh.no-ip. 1691 :UBC 0 : pg_fatal("PQsetSingleRowMode() failed");
548 alvherre@alvh.no-ip. 1692 :CBC 1 : res = PQgetResult(conn);
1693 [ - + ]: 1 : if (res == NULL)
548 alvherre@alvh.no-ip. 1694 :UBC 0 : pg_fatal("unexpected NULL");
548 alvherre@alvh.no-ip. 1695 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
548 alvherre@alvh.no-ip. 1696 :UBC 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
1697 : : PQresStatus(PQresultStatus(res)));
548 alvherre@alvh.no-ip. 1698 :CBC 1 : res = PQgetResult(conn);
1699 [ - + ]: 1 : if (res == NULL)
548 alvherre@alvh.no-ip. 1700 :UBC 0 : pg_fatal("unexpected NULL");
548 alvherre@alvh.no-ip. 1701 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
548 alvherre@alvh.no-ip. 1702 :UBC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1703 : : PQresStatus(PQresultStatus(res)));
548 alvherre@alvh.no-ip. 1704 [ - + ]:CBC 1 : if (PQgetResult(conn) != NULL)
548 alvherre@alvh.no-ip. 1705 :UBC 0 : pg_fatal("expected NULL result");
1706 : :
548 alvherre@alvh.no-ip. 1707 [ - + ]:CBC 1 : if (PQsendQueryParams(conn, "SELECT 1",
1708 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
548 alvherre@alvh.no-ip. 1709 :UBC 0 : pg_fatal("failed to send query: %s",
1710 : : PQerrorMessage(conn));
548 alvherre@alvh.no-ip. 1711 [ - + ]:CBC 1 : if (PQsendFlushRequest(conn) != 1)
548 alvherre@alvh.no-ip. 1712 :UBC 0 : pg_fatal("failed to send flush request");
548 alvherre@alvh.no-ip. 1713 :CBC 1 : res = PQgetResult(conn);
1714 [ - + ]: 1 : if (res == NULL)
548 alvherre@alvh.no-ip. 1715 :UBC 0 : pg_fatal("unexpected NULL");
548 alvherre@alvh.no-ip. 1716 [ - + ]:CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
548 alvherre@alvh.no-ip. 1717 :UBC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1718 : : PQresStatus(PQresultStatus(res)));
548 alvherre@alvh.no-ip. 1719 [ - + ]:CBC 1 : if (PQgetResult(conn) != NULL)
548 alvherre@alvh.no-ip. 1720 :UBC 0 : pg_fatal("expected NULL result");
1721 : :
1722 : : /*
1723 : : * Try chunked mode as well; make sure that it correctly delivers a
1724 : : * partial final chunk.
1725 : : */
8 tgl@sss.pgh.pa.us 1726 [ - + ]:GNC 1 : if (PQsendQueryParams(conn, "SELECT generate_series(1, 5)",
1727 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
8 tgl@sss.pgh.pa.us 1728 :UNC 0 : pg_fatal("failed to send query: %s",
1729 : : PQerrorMessage(conn));
8 tgl@sss.pgh.pa.us 1730 [ - + ]:GNC 1 : if (PQsendFlushRequest(conn) != 1)
8 tgl@sss.pgh.pa.us 1731 :UNC 0 : pg_fatal("failed to send flush request");
8 tgl@sss.pgh.pa.us 1732 [ - + ]:GNC 1 : if (PQsetChunkedRowsMode(conn, 3) != 1)
8 tgl@sss.pgh.pa.us 1733 :UNC 0 : pg_fatal("PQsetChunkedRowsMode() failed");
8 tgl@sss.pgh.pa.us 1734 :GNC 1 : res = PQgetResult(conn);
1735 [ - + ]: 1 : if (res == NULL)
8 tgl@sss.pgh.pa.us 1736 :UNC 0 : pg_fatal("unexpected NULL");
8 tgl@sss.pgh.pa.us 1737 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
8 tgl@sss.pgh.pa.us 1738 :UNC 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s: %s",
1739 : : PQresStatus(PQresultStatus(res)),
1740 : : PQerrorMessage(conn));
8 tgl@sss.pgh.pa.us 1741 [ - + ]:GNC 1 : if (PQntuples(res) != 3)
8 tgl@sss.pgh.pa.us 1742 :UNC 0 : pg_fatal("Expected 3 rows, got %d", PQntuples(res));
8 tgl@sss.pgh.pa.us 1743 :GNC 1 : res = PQgetResult(conn);
1744 [ - + ]: 1 : if (res == NULL)
8 tgl@sss.pgh.pa.us 1745 :UNC 0 : pg_fatal("unexpected NULL");
8 tgl@sss.pgh.pa.us 1746 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_TUPLES_CHUNK)
8 tgl@sss.pgh.pa.us 1747 :UNC 0 : pg_fatal("Expected PGRES_TUPLES_CHUNK, got %s",
1748 : : PQresStatus(PQresultStatus(res)));
8 tgl@sss.pgh.pa.us 1749 [ - + ]:GNC 1 : if (PQntuples(res) != 2)
8 tgl@sss.pgh.pa.us 1750 :UNC 0 : pg_fatal("Expected 2 rows, got %d", PQntuples(res));
8 tgl@sss.pgh.pa.us 1751 :GNC 1 : res = PQgetResult(conn);
1752 [ - + ]: 1 : if (res == NULL)
8 tgl@sss.pgh.pa.us 1753 :UNC 0 : pg_fatal("unexpected NULL");
8 tgl@sss.pgh.pa.us 1754 [ - + ]:GNC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
8 tgl@sss.pgh.pa.us 1755 :UNC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
1756 : : PQresStatus(PQresultStatus(res)));
8 tgl@sss.pgh.pa.us 1757 [ - + ]:GNC 1 : if (PQntuples(res) != 0)
8 tgl@sss.pgh.pa.us 1758 :UNC 0 : pg_fatal("Expected 0 rows, got %d", PQntuples(res));
8 tgl@sss.pgh.pa.us 1759 [ - + ]:GNC 1 : if (PQgetResult(conn) != NULL)
8 tgl@sss.pgh.pa.us 1760 :UNC 0 : pg_fatal("expected NULL result");
1761 : :
1126 alvherre@alvh.no-ip. 1762 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1763 :UBC 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1764 : :
649 alvherre@alvh.no-ip. 1765 :CBC 1 : fprintf(stderr, "ok\n");
1126 1766 : 1 : }
1767 : :
1768 : : /*
1769 : : * Simple test to verify that a pipeline is discarded as a whole when there's
1770 : : * an error, ignoring transaction commands.
1771 : : */
1772 : : static void
1773 : 1 : test_transaction(PGconn *conn)
1774 : : {
1775 : : PGresult *res;
1776 : : bool expect_null;
1777 : 1 : int num_syncs = 0;
1778 : :
1779 : 1 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
1780 : : "CREATE TABLE pq_pipeline_tst (id int)");
1781 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1126 alvherre@alvh.no-ip. 1782 :UBC 0 : pg_fatal("failed to create test table: %s",
1783 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1784 :CBC 1 : PQclear(res);
1785 : :
1786 [ - + ]: 1 : if (PQenterPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1787 :UBC 0 : pg_fatal("failed to enter pipeline mode: %s",
1788 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1789 [ - + ]:CBC 1 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
1126 alvherre@alvh.no-ip. 1790 :UBC 0 : pg_fatal("could not send prepare on pipeline: %s",
1791 : : PQerrorMessage(conn));
1792 : :
1126 alvherre@alvh.no-ip. 1793 [ - + ]:CBC 1 : if (PQsendQueryParams(conn,
1794 : : "BEGIN",
1795 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1796 :UBC 0 : pg_fatal("failed to send query: %s",
1797 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1798 [ - + ]:CBC 1 : if (PQsendQueryParams(conn,
1799 : : "SELECT 0/0",
1800 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1801 :UBC 0 : pg_fatal("failed to send query: %s",
1802 : : PQerrorMessage(conn));
1803 : :
1804 : : /*
1805 : : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
1806 : : * get out of the pipeline-aborted state first.
1807 : : */
1126 alvherre@alvh.no-ip. 1808 [ - + ]:CBC 1 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1126 alvherre@alvh.no-ip. 1809 :UBC 0 : pg_fatal("failed to execute prepared: %s",
1810 : : PQerrorMessage(conn));
1811 : :
1812 : : /* This insert fails because we're in pipeline-aborted state */
1126 alvherre@alvh.no-ip. 1813 [ - + ]:CBC 1 : if (PQsendQueryParams(conn,
1814 : : "INSERT INTO pq_pipeline_tst VALUES (1)",
1815 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1816 :UBC 0 : pg_fatal("failed to send query: %s",
1817 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1818 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1819 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1820 :CBC 1 : num_syncs++;
1821 : :
1822 : : /*
1823 : : * This insert fails even though the pipeline got a SYNC, because we're in
1824 : : * an aborted transaction
1825 : : */
1826 [ - + ]: 1 : if (PQsendQueryParams(conn,
1827 : : "INSERT INTO pq_pipeline_tst VALUES (2)",
1828 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1829 :UBC 0 : pg_fatal("failed to send query: %s",
1830 : : PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1831 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1832 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1833 :CBC 1 : num_syncs++;
1834 : :
1835 : : /*
1836 : : * Send ROLLBACK using prepared stmt. This one works because we just did
1837 : : * PQpipelineSync above.
1838 : : */
1839 [ - + ]: 1 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
1126 alvherre@alvh.no-ip. 1840 :UBC 0 : pg_fatal("failed to execute prepared: %s",
1841 : : PQerrorMessage(conn));
1842 : :
1843 : : /*
1844 : : * Now that we're out of a transaction and in pipeline-good mode, this
1845 : : * insert works
1846 : : */
1126 alvherre@alvh.no-ip. 1847 [ - + ]:CBC 1 : if (PQsendQueryParams(conn,
1848 : : "INSERT INTO pq_pipeline_tst VALUES (3)",
1849 : : 0, NULL, NULL, NULL, NULL, 0) != 1)
1126 alvherre@alvh.no-ip. 1850 :UBC 0 : pg_fatal("failed to send query: %s",
1851 : : PQerrorMessage(conn));
1852 : : /* Send two syncs now -- match up to SYNC messages below */
1126 alvherre@alvh.no-ip. 1853 [ - + ]:CBC 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1854 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1855 :CBC 1 : num_syncs++;
1856 [ - + ]: 1 : if (PQpipelineSync(conn) != 1)
1126 alvherre@alvh.no-ip. 1857 :UBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1858 :CBC 1 : num_syncs++;
1859 : :
1860 : 1 : expect_null = false;
1861 : 1 : for (int i = 0;; i++)
1862 : 19 : {
1863 : : ExecStatusType restype;
1864 : :
1865 : 20 : res = PQgetResult(conn);
1866 [ + + ]: 20 : if (res == NULL)
1867 : : {
1868 : 8 : printf("%d: got NULL result\n", i);
1869 [ - + ]: 8 : if (!expect_null)
1126 alvherre@alvh.no-ip. 1870 :UBC 0 : pg_fatal("did not expect NULL here");
1126 alvherre@alvh.no-ip. 1871 :CBC 8 : expect_null = false;
1872 : 8 : continue;
1873 : : }
1874 : 12 : restype = PQresultStatus(res);
1875 : 12 : printf("%d: got status %s", i, PQresStatus(restype));
1876 [ - + ]: 12 : if (expect_null)
1126 alvherre@alvh.no-ip. 1877 :UBC 0 : pg_fatal("expected NULL");
1126 alvherre@alvh.no-ip. 1878 [ + + ]:CBC 12 : if (restype == PGRES_FATAL_ERROR)
1879 : 2 : printf("; error: %s", PQerrorMessage(conn));
1880 [ + + ]: 10 : else if (restype == PGRES_PIPELINE_ABORTED)
1881 : : {
1882 : 2 : printf(": command didn't run because pipeline aborted\n");
1883 : : }
1884 : : else
1885 : 8 : printf("\n");
1886 : 12 : PQclear(res);
1887 : :
1888 [ + + ]: 12 : if (restype == PGRES_PIPELINE_SYNC)
1889 : 4 : num_syncs--;
1890 : : else
1891 : 8 : expect_null = true;
1892 [ + + ]: 12 : if (num_syncs <= 0)
1893 : 1 : break;
1894 : : }
1895 [ - + ]: 1 : if (PQgetResult(conn) != NULL)
1126 alvherre@alvh.no-ip. 1896 :UBC 0 : pg_fatal("returned something extra after all the syncs: %s",
1897 : : PQresStatus(PQresultStatus(res)));
1898 : :
1126 alvherre@alvh.no-ip. 1899 [ - + ]:CBC 1 : if (PQexitPipelineMode(conn) != 1)
1126 alvherre@alvh.no-ip. 1900 :UBC 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
1901 : :
1902 : : /* We expect to find one tuple containing the value "3" */
1126 alvherre@alvh.no-ip. 1903 :CBC 1 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
1904 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1126 alvherre@alvh.no-ip. 1905 :UBC 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
1126 alvherre@alvh.no-ip. 1906 [ - + ]:CBC 1 : if (PQntuples(res) != 1)
1126 alvherre@alvh.no-ip. 1907 :UBC 0 : pg_fatal("did not get 1 tuple");
1126 alvherre@alvh.no-ip. 1908 [ - + ]:CBC 1 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
1126 alvherre@alvh.no-ip. 1909 :UBC 0 : pg_fatal("did not get expected tuple");
1126 alvherre@alvh.no-ip. 1910 :CBC 1 : PQclear(res);
1911 : :
1912 : 1 : fprintf(stderr, "ok\n");
1913 : 1 : }
1914 : :
1915 : : /*
1916 : : * In this test mode we send a stream of queries, with one in the middle
1917 : : * causing an error. Verify that we can still send some more after the
1918 : : * error and have libpq work properly.
1919 : : */
1920 : : static void
1010 1921 : 1 : test_uniqviol(PGconn *conn)
1922 : : {
1923 : 1 : int sock = PQsocket(conn);
1924 : : PGresult *res;
1925 : 1 : Oid paramTypes[2] = {INT8OID, INT8OID};
1926 : : const char *paramValues[2];
1927 : : char paramValue0[MAXINT8LEN];
1928 : : char paramValue1[MAXINT8LEN];
1929 : 1 : int ctr = 0;
1930 : 1 : int numsent = 0;
1931 : 1 : int results = 0;
1932 : 1 : bool read_done = false;
1933 : 1 : bool write_done = false;
1934 : 1 : bool error_sent = false;
1935 : 1 : bool got_error = false;
1936 : 1 : int switched = 0;
1937 : 1 : int socketful = 0;
1938 : : fd_set in_fds;
1939 : : fd_set out_fds;
1940 : :
1941 : 1 : fprintf(stderr, "uniqviol ...");
1942 : :
1943 : 1 : PQsetnonblocking(conn, 1);
1944 : :
1945 : 1 : paramValues[0] = paramValue0;
1946 : 1 : paramValues[1] = paramValue1;
1947 : 1 : sprintf(paramValue1, "42");
1948 : :
1949 : 1 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
1950 : : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
1951 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1010 alvherre@alvh.no-ip. 1952 :UBC 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
1953 : :
1010 alvherre@alvh.no-ip. 1954 :CBC 1 : res = PQexec(conn, "begin");
1955 [ - + ]: 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1010 alvherre@alvh.no-ip. 1956 :UBC 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1957 : :
1010 alvherre@alvh.no-ip. 1958 :CBC 1 : res = PQprepare(conn, "insertion",
1959 : : "insert into ppln_uniqviol values ($1, $2) returning id",
1960 : : 2, paramTypes);
1961 [ + - - + ]: 1 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
1010 alvherre@alvh.no-ip. 1962 :UBC 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
1963 : :
1010 alvherre@alvh.no-ip. 1964 [ - + ]:CBC 1 : if (PQenterPipelineMode(conn) != 1)
1010 alvherre@alvh.no-ip. 1965 :UBC 0 : pg_fatal("failed to enter pipeline mode");
1966 : :
1010 alvherre@alvh.no-ip. 1967 [ + - ]:CBC 8 : while (!read_done)
1968 : : {
1969 : : /*
1970 : : * Avoid deadlocks by reading everything the server has sent before
1971 : : * sending anything. (Special precaution is needed here to process
1972 : : * PQisBusy before testing the socket for read-readiness, because the
1973 : : * socket does not turn read-ready after "sending" queries in aborted
1974 : : * pipeline mode.)
1975 : : */
1976 [ + + ]: 606 : while (PQisBusy(conn) == 0)
1977 : : {
1978 : : bool new_error;
1979 : :
1980 [ + + ]: 601 : if (results >= numsent)
1981 : : {
1982 [ - + ]: 1 : if (write_done)
1010 alvherre@alvh.no-ip. 1983 :UBC 0 : read_done = true;
1010 alvherre@alvh.no-ip. 1984 :CBC 1 : break;
1985 : : }
1986 : :
1987 : 600 : res = PQgetResult(conn);
1988 : 600 : new_error = process_result(conn, res, results, numsent);
1989 [ + + - + ]: 600 : if (new_error && got_error)
1010 alvherre@alvh.no-ip. 1990 :UBC 0 : pg_fatal("got two errors");
1010 alvherre@alvh.no-ip. 1991 :CBC 600 : got_error |= new_error;
1992 [ + + ]: 600 : if (results++ >= numsent - 1)
1993 : : {
1994 [ + + ]: 2 : if (write_done)
1995 : 1 : read_done = true;
1996 : 2 : break;
1997 : : }
1998 : : }
1999 : :
2000 [ + + ]: 8 : if (read_done)
2001 : 1 : break;
2002 : :
2003 [ + + ]: 119 : FD_ZERO(&out_fds);
2004 : 7 : FD_SET(sock, &out_fds);
2005 : :
2006 [ + + ]: 119 : FD_ZERO(&in_fds);
2007 : 7 : FD_SET(sock, &in_fds);
2008 : :
2009 [ - + - + ]: 7 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
2010 : : {
1010 alvherre@alvh.no-ip. 2011 [ # # ]:UBC 0 : if (errno == EINTR)
2012 : 0 : continue;
2013 : 0 : pg_fatal("select() failed: %m");
2014 : : }
2015 : :
1010 alvherre@alvh.no-ip. 2016 [ + + - + ]:CBC 7 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
1010 alvherre@alvh.no-ip. 2017 :UBC 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
2018 : :
2019 : : /*
2020 : : * If the socket is writable and we haven't finished sending queries,
2021 : : * send some.
2022 : : */
1010 alvherre@alvh.no-ip. 2023 [ + - + + ]:CBC 7 : if (!write_done && FD_ISSET(sock, &out_fds))
2024 : : {
2025 : : for (;;)
2026 : 597 : {
2027 : : int flush;
2028 : :
2029 : : /*
2030 : : * provoke uniqueness violation exactly once after having
2031 : : * switched to read mode.
2032 : : */
2033 [ + + + + : 600 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
+ + ]
2034 : : {
2035 : 1 : sprintf(paramValue0, "%d", numsent / 2);
2036 : 1 : fprintf(stderr, "E");
2037 : 1 : error_sent = true;
2038 : : }
2039 : : else
2040 : : {
2041 : 599 : fprintf(stderr, ".");
2042 : 599 : sprintf(paramValue0, "%d", ctr++);
2043 : : }
2044 : :
2045 [ - + ]: 600 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
1010 alvherre@alvh.no-ip. 2046 :UBC 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
1010 alvherre@alvh.no-ip. 2047 :CBC 600 : numsent++;
2048 : :
2049 : : /* Are we done writing? */
2050 [ + + + + : 600 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
+ + ]
2051 : : {
2052 [ - + ]: 1 : if (PQsendFlushRequest(conn) != 1)
1010 alvherre@alvh.no-ip. 2053 :UBC 0 : pg_fatal("failed to send flush request");
1010 alvherre@alvh.no-ip. 2054 :CBC 1 : write_done = true;
2055 : 1 : fprintf(stderr, "\ndone writing\n");
2056 : 1 : PQflush(conn);
2057 : 1 : break;
2058 : : }
2059 : :
2060 : : /* is the outgoing socket full? */
2061 : 599 : flush = PQflush(conn);
2062 [ - + ]: 599 : if (flush == -1)
1010 alvherre@alvh.no-ip. 2063 :UBC 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
1010 alvherre@alvh.no-ip. 2064 [ + + ]:CBC 599 : if (flush == 1)
2065 : : {
2066 [ + + ]: 2 : if (socketful == 0)
2067 : 1 : socketful = numsent;
2068 : 2 : fprintf(stderr, "\nswitch to reading\n");
2069 : 2 : switched++;
2070 : 2 : break;
2071 : : }
2072 : : }
2073 : : }
2074 : : }
2075 : :
2076 [ - + ]: 1 : if (!got_error)
1010 alvherre@alvh.no-ip. 2077 :UBC 0 : pg_fatal("did not get expected error");
2078 : :
1010 alvherre@alvh.no-ip. 2079 :CBC 1 : fprintf(stderr, "ok\n");
2080 : 1 : }
2081 : :
2082 : : /*
2083 : : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
2084 : : * the expected NULL that should follow it.
2085 : : *
2086 : : * Returns true if we read a fatal error message, otherwise false.
2087 : : */
2088 : : static bool
2089 : 600 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
2090 : : {
2091 : : PGresult *res2;
2092 : 600 : bool got_error = false;
2093 : :
2094 [ - + ]: 600 : if (res == NULL)
1010 alvherre@alvh.no-ip. 2095 :UBC 0 : pg_fatal("got unexpected NULL");
2096 : :
1010 alvherre@alvh.no-ip. 2097 [ + + + - ]:CBC 600 : switch (PQresultStatus(res))
2098 : : {
2099 : 1 : case PGRES_FATAL_ERROR:
2100 : 1 : got_error = true;
2101 : 1 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
2102 : 1 : PQclear(res);
2103 : :
2104 : 1 : res2 = PQgetResult(conn);
2105 [ - + ]: 1 : if (res2 != NULL)
1010 alvherre@alvh.no-ip. 2106 :UBC 0 : pg_fatal("expected NULL, got %s",
2107 : : PQresStatus(PQresultStatus(res2)));
1010 alvherre@alvh.no-ip. 2108 :CBC 1 : break;
2109 : :
2110 : 418 : case PGRES_TUPLES_OK:
2111 : 418 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
2112 : 418 : PQclear(res);
2113 : :
2114 : 418 : res2 = PQgetResult(conn);
2115 [ - + ]: 418 : if (res2 != NULL)
1010 alvherre@alvh.no-ip. 2116 :UBC 0 : pg_fatal("expected NULL, got %s",
2117 : : PQresStatus(PQresultStatus(res2)));
1010 alvherre@alvh.no-ip. 2118 :CBC 418 : break;
2119 : :
2120 : 181 : case PGRES_PIPELINE_ABORTED:
2121 : 181 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
2122 : 181 : res2 = PQgetResult(conn);
2123 [ - + ]: 181 : if (res2 != NULL)
1010 alvherre@alvh.no-ip. 2124 :UBC 0 : pg_fatal("expected NULL, got %s",
2125 : : PQresStatus(PQresultStatus(res2)));
1010 alvherre@alvh.no-ip. 2126 :CBC 181 : break;
2127 : :
1010 alvherre@alvh.no-ip. 2128 :UBC 0 : default:
2129 : 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
2130 : : }
2131 : :
1010 alvherre@alvh.no-ip. 2132 :CBC 600 : return got_error;
2133 : : }
2134 : :
2135 : :
2136 : : static void
1126 alvherre@alvh.no-ip. 2137 :UBC 0 : usage(const char *progname)
2138 : : {
2139 : 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
2140 : 0 : fprintf(stderr, "Usage:\n");
1111 2141 : 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
1109 2142 : 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
1111 2143 : 0 : fprintf(stderr, "\nOptions:\n");
2144 : 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
1109 2145 : 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
1126 2146 : 0 : }
2147 : :
2148 : : static void
1126 alvherre@alvh.no-ip. 2149 :CBC 1 : print_test_list(void)
2150 : : {
34 alvherre@alvh.no-ip. 2151 :GNC 1 : printf("cancel\n");
1126 alvherre@alvh.no-ip. 2152 :CBC 1 : printf("disallowed_in_pipeline\n");
2153 : 1 : printf("multi_pipelines\n");
1020 2154 : 1 : printf("nosync\n");
1126 2155 : 1 : printf("pipeline_abort\n");
649 2156 : 1 : printf("pipeline_idle\n");
1126 2157 : 1 : printf("pipelined_insert\n");
2158 : 1 : printf("prepared\n");
2159 : 1 : printf("simple_pipeline\n");
2160 : 1 : printf("singlerow\n");
2161 : 1 : printf("transaction\n");
1010 2162 : 1 : printf("uniqviol\n");
1126 2163 : 1 : }
2164 : :
2165 : : int
2166 : 13 : main(int argc, char **argv)
2167 : : {
2168 : 13 : const char *conninfo = "";
2169 : : PGconn *conn;
2170 : : FILE *trace;
2171 : : char *testname;
2172 : 13 : int numrows = 10000;
2173 : : PGresult *res;
2174 : : int c;
2175 : :
489 peter@eisentraut.org 2176 [ + + ]: 47 : while ((c = getopt(argc, argv, "r:t:")) != -1)
2177 : : {
1111 alvherre@alvh.no-ip. 2178 [ + + - ]: 21 : switch (c)
2179 : : {
1109 2180 : 12 : case 'r': /* numrows */
2181 : 12 : errno = 0;
2182 : 12 : numrows = strtol(optarg, NULL, 10);
2183 [ + - - + ]: 12 : if (errno != 0 || numrows <= 0)
2184 : : {
1109 alvherre@alvh.no-ip. 2185 :UBC 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
2186 : : optarg);
2187 : 0 : exit(1);
2188 : : }
1109 alvherre@alvh.no-ip. 2189 :CBC 12 : break;
489 peter@eisentraut.org 2190 : 9 : case 't': /* trace file */
2191 : 9 : tracefile = pg_strdup(optarg);
2192 : 9 : break;
2193 : : }
2194 : : }
2195 : :
1111 alvherre@alvh.no-ip. 2196 [ + - ]: 13 : if (optind < argc)
2197 : : {
1109 2198 : 13 : testname = pg_strdup(argv[optind]);
1111 2199 : 13 : optind++;
2200 : : }
2201 : : else
2202 : : {
1126 alvherre@alvh.no-ip. 2203 :UBC 0 : usage(argv[0]);
2204 : 0 : exit(1);
2205 : : }
2206 : :
1111 alvherre@alvh.no-ip. 2207 [ + + ]:CBC 13 : if (strcmp(testname, "tests") == 0)
2208 : : {
2209 : 1 : print_test_list();
2210 : 1 : exit(0);
2211 : : }
2212 : :
2213 [ + - ]: 12 : if (optind < argc)
2214 : : {
1109 2215 : 12 : conninfo = pg_strdup(argv[optind]);
1111 2216 : 12 : optind++;
2217 : : }
2218 : :
2219 : : /* Make a connection to the database */
1126 2220 : 12 : conn = PQconnectdb(conninfo);
2221 [ - + ]: 12 : if (PQstatus(conn) != CONNECTION_OK)
2222 : : {
1126 alvherre@alvh.no-ip. 2223 :UBC 0 : fprintf(stderr, "Connection to database failed: %s\n",
2224 : : PQerrorMessage(conn));
2225 : 0 : exit_nicely(conn);
2226 : : }
2227 : :
1110 alvherre@alvh.no-ip. 2228 :CBC 12 : res = PQexec(conn, "SET lc_messages TO \"C\"");
2229 [ - + ]: 12 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1110 alvherre@alvh.no-ip. 2230 :UBC 0 : pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
424 drowley@postgresql.o 2231 :CBC 12 : res = PQexec(conn, "SET debug_parallel_query = off");
1110 alvherre@alvh.no-ip. 2232 [ - + ]: 12 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
424 drowley@postgresql.o 2233 :UBC 0 : pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
2234 : :
2235 : : /* Set the trace file, if requested */
1111 alvherre@alvh.no-ip. 2236 [ + + ]:CBC 12 : if (tracefile != NULL)
2237 : : {
649 2238 [ - + ]: 9 : if (strcmp(tracefile, "-") == 0)
649 alvherre@alvh.no-ip. 2239 :UBC 0 : trace = stdout;
2240 : : else
649 alvherre@alvh.no-ip. 2241 :CBC 9 : trace = fopen(tracefile, "w");
1111 2242 [ - + ]: 9 : if (trace == NULL)
1111 alvherre@alvh.no-ip. 2243 :UBC 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
2244 : :
2245 : : /* Make it line-buffered */
1109 alvherre@alvh.no-ip. 2246 :CBC 9 : setvbuf(trace, NULL, PG_IOLBF, 0);
2247 : :
1111 2248 : 9 : PQtrace(conn, trace);
1039 noah@leadboat.com 2249 : 9 : PQsetTraceFlags(conn,
2250 : : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
2251 : : }
2252 : :
34 alvherre@alvh.no-ip. 2253 [ + + ]:GNC 12 : if (strcmp(testname, "cancel") == 0)
2254 : 1 : test_cancel(conn);
2255 [ + + ]: 11 : else if (strcmp(testname, "disallowed_in_pipeline") == 0)
1126 alvherre@alvh.no-ip. 2256 :CBC 1 : test_disallowed_in_pipeline(conn);
1111 2257 [ + + ]: 10 : else if (strcmp(testname, "multi_pipelines") == 0)
1126 2258 : 1 : test_multi_pipelines(conn);
1020 2259 [ + + ]: 9 : else if (strcmp(testname, "nosync") == 0)
2260 : 1 : test_nosync(conn);
1111 2261 [ + + ]: 8 : else if (strcmp(testname, "pipeline_abort") == 0)
1126 2262 : 1 : test_pipeline_abort(conn);
649 2263 [ + + ]: 7 : else if (strcmp(testname, "pipeline_idle") == 0)
2264 : 1 : test_pipeline_idle(conn);
1111 2265 [ + + ]: 6 : else if (strcmp(testname, "pipelined_insert") == 0)
1126 2266 : 1 : test_pipelined_insert(conn, numrows);
1111 2267 [ + + ]: 5 : else if (strcmp(testname, "prepared") == 0)
1126 2268 : 1 : test_prepared(conn);
1111 2269 [ + + ]: 4 : else if (strcmp(testname, "simple_pipeline") == 0)
1126 2270 : 1 : test_simple_pipeline(conn);
1111 2271 [ + + ]: 3 : else if (strcmp(testname, "singlerow") == 0)
1126 2272 : 1 : test_singlerowmode(conn);
1111 2273 [ + + ]: 2 : else if (strcmp(testname, "transaction") == 0)
1126 2274 : 1 : test_transaction(conn);
1010 2275 [ + - ]: 1 : else if (strcmp(testname, "uniqviol") == 0)
2276 : 1 : test_uniqviol(conn);
2277 : : else
2278 : : {
1111 alvherre@alvh.no-ip. 2279 :UBC 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
1126 2280 : 0 : exit(1);
2281 : : }
2282 : :
2283 : : /* close the connection to the database and cleanup */
1126 alvherre@alvh.no-ip. 2284 :CBC 12 : PQfinish(conn);
2285 : 12 : return 0;
2286 : : }
|