Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpq_pipeline.c
4 : * Verify libpq pipeline execution functionality
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/test/modules/libpq_pipeline/libpq_pipeline.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 :
16 : #include "postgres_fe.h"
17 :
18 : #include <sys/select.h>
19 : #include <sys/time.h>
20 :
21 : #include "catalog/pg_type_d.h"
22 : #include "common/fe_memutils.h"
23 : #include "libpq-fe.h"
24 : #include "pg_getopt.h"
25 : #include "portability/instr_time.h"
26 :
27 :
28 : static void exit_nicely(PGconn *conn);
29 : static void pg_attribute_noreturn() pg_fatal_impl(int line, const char *fmt,...)
30 : pg_attribute_printf(2, 3);
31 : static bool process_result(PGconn *conn, PGresult *res, int results,
32 : int numsent);
33 :
34 : const char *const progname = "libpq_pipeline";
35 :
36 : /* Options and defaults */
37 : char *tracefile = NULL; /* path to PQtrace() file */
38 :
39 :
40 : #ifdef DEBUG_OUTPUT
41 : #define pg_debug(...) do { fprintf(stderr, __VA_ARGS__); } while (0)
42 : #else
43 : #define pg_debug(...)
44 : #endif
45 :
46 : static const char *const drop_table_sql =
47 : "DROP TABLE IF EXISTS pq_pipeline_demo";
48 : static const char *const create_table_sql =
49 : "CREATE UNLOGGED TABLE pq_pipeline_demo(id serial primary key, itemno integer,"
50 : "int8filler int8);";
51 : static const char *const insert_sql =
52 : "INSERT INTO pq_pipeline_demo(itemno) VALUES ($1)";
53 : static const char *const insert_sql2 =
54 : "INSERT INTO pq_pipeline_demo(itemno,int8filler) VALUES ($1, $2)";
55 :
56 : /* max char length of an int32/64, plus sign and null terminator */
57 : #define MAXINTLEN 12
58 : #define MAXINT8LEN 20
755 alvherre 59 EUB :
60 : static void
755 alvherre 61 UBC 0 : exit_nicely(PGconn *conn)
755 alvherre 62 EUB : {
755 alvherre 63 UIC 0 : PQfinish(conn);
64 0 : exit(1);
65 : }
66 :
67 : /*
68 : * Print an error to stderr and terminate the program.
69 : */
70 : #define pg_fatal(...) pg_fatal_impl(__LINE__, __VA_ARGS__)
755 alvherre 71 EUB : static void
72 : pg_attribute_noreturn()
755 alvherre 73 UIC 0 : pg_fatal_impl(int line, const char *fmt,...)
74 : {
75 : va_list args;
755 alvherre 76 EUB :
77 :
755 alvherre 78 UBC 0 : fflush(stdout);
755 alvherre 79 EUB :
755 alvherre 80 UBC 0 : fprintf(stderr, "\n%s:%d: ", progname, line);
81 0 : va_start(args, fmt);
82 0 : vfprintf(stderr, fmt, args);
83 0 : va_end(args);
84 0 : Assert(fmt[strlen(fmt) - 1] != '\n');
755 alvherre 85 UIC 0 : fprintf(stderr, "\n");
86 0 : exit(1);
87 : }
755 alvherre 88 ECB :
89 : static void
755 alvherre 90 CBC 1 : test_disallowed_in_pipeline(PGconn *conn)
91 : {
92 1 : PGresult *res = NULL;
93 :
94 1 : fprintf(stderr, "test error cases... ");
755 alvherre 95 EUB :
755 alvherre 96 GIC 1 : if (PQisnonblocking(conn))
755 alvherre 97 LBC 0 : pg_fatal("Expected blocking connection mode");
755 alvherre 98 EUB :
755 alvherre 99 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 100 LBC 0 : pg_fatal("Unable to enter pipeline mode");
755 alvherre 101 EUB :
755 alvherre 102 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
755 alvherre 103 UIC 0 : pg_fatal("Pipeline mode not activated properly");
755 alvherre 104 ECB :
105 : /* PQexec should fail in pipeline mode */
755 alvherre 106 GBC 1 : res = PQexec(conn, "SELECT 1");
755 alvherre 107 CBC 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
755 alvherre 108 UIC 0 : pg_fatal("PQexec should fail in pipeline mode but succeeded");
198 alvherre 109 GBC 1 : if (strcmp(PQerrorMessage(conn),
110 : "synchronous command execution functions are not allowed in pipeline mode\n") != 0)
198 alvherre 111 UIC 0 : pg_fatal("did not get expected error message; got: \"%s\"",
112 : PQerrorMessage(conn));
198 alvherre 113 ECB :
198 alvherre 114 EUB : /* PQsendQuery should fail in pipeline mode */
198 alvherre 115 CBC 1 : if (PQsendQuery(conn, "SELECT 1") != 0)
198 alvherre 116 UIC 0 : pg_fatal("PQsendQuery should fail in pipeline mode but succeeded");
198 alvherre 117 GBC 1 : if (strcmp(PQerrorMessage(conn),
118 : "PQsendQuery not allowed in pipeline mode\n") != 0)
198 alvherre 119 UIC 0 : pg_fatal("did not get expected error message; got: \"%s\"",
120 : PQerrorMessage(conn));
755 alvherre 121 ECB :
755 alvherre 122 EUB : /* Entering pipeline mode when already in pipeline mode is OK */
755 alvherre 123 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 124 LBC 0 : pg_fatal("re-entering pipeline mode should be a no-op but failed");
755 alvherre 125 EUB :
755 alvherre 126 GIC 1 : if (PQisBusy(conn) != 0)
755 alvherre 127 UIC 0 : pg_fatal("PQisBusy should return 0 when idle in pipeline mode, returned 1");
755 alvherre 128 ECB :
755 alvherre 129 EUB : /* ok, back to normal command mode */
755 alvherre 130 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 131 LBC 0 : pg_fatal("couldn't exit idle empty pipeline mode");
755 alvherre 132 EUB :
755 alvherre 133 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
755 alvherre 134 UIC 0 : pg_fatal("Pipeline mode not terminated properly");
755 alvherre 135 ECB :
755 alvherre 136 EUB : /* exiting pipeline mode when not in pipeline mode should be a no-op */
755 alvherre 137 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 138 UIC 0 : pg_fatal("pipeline mode exit when not in pipeline mode should succeed but failed");
755 alvherre 139 ECB :
140 : /* can now PQexec again */
755 alvherre 141 GBC 1 : res = PQexec(conn, "SELECT 1");
755 alvherre 142 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
755 alvherre 143 UIC 0 : pg_fatal("PQexec should succeed after exiting pipeline mode but failed with: %s",
755 alvherre 144 ECB : PQerrorMessage(conn));
145 :
755 alvherre 146 GIC 1 : fprintf(stderr, "ok\n");
147 1 : }
755 alvherre 148 ECB :
149 : static void
755 alvherre 150 CBC 1 : test_multi_pipelines(PGconn *conn)
755 alvherre 151 ECB : {
755 alvherre 152 CBC 1 : PGresult *res = NULL;
755 alvherre 153 GIC 1 : const char *dummy_params[1] = {"1"};
755 alvherre 154 CBC 1 : Oid dummy_param_oids[1] = {INT4OID};
155 :
755 alvherre 156 GIC 1 : fprintf(stderr, "multi pipeline... ");
157 :
158 : /*
159 : * Queue up a couple of small pipelines and process each without returning
755 alvherre 160 ECB : * to command mode first.
755 alvherre 161 EUB : */
755 alvherre 162 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 163 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
164 :
755 alvherre 165 GBC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
166 : dummy_params, NULL, NULL, 0) != 1)
755 alvherre 167 LBC 0 : pg_fatal("dispatching first SELECT failed: %s", PQerrorMessage(conn));
755 alvherre 168 EUB :
755 alvherre 169 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 170 LBC 0 : pg_fatal("Pipeline sync failed: %s", PQerrorMessage(conn));
171 :
755 alvherre 172 GBC 1 : if (PQsendQueryParams(conn, "SELECT $1", 1, dummy_param_oids,
173 : dummy_params, NULL, NULL, 0) != 1)
755 alvherre 174 LBC 0 : pg_fatal("dispatching second SELECT failed: %s", PQerrorMessage(conn));
755 alvherre 175 EUB :
755 alvherre 176 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 177 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 178 ECB :
179 : /* OK, start processing the results */
755 alvherre 180 GBC 1 : res = PQgetResult(conn);
755 alvherre 181 GIC 1 : if (res == NULL)
755 alvherre 182 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
755 alvherre 183 ECB : PQerrorMessage(conn));
755 alvherre 184 EUB :
755 alvherre 185 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
755 alvherre 186 LBC 0 : pg_fatal("Unexpected result code %s from first pipeline item",
755 alvherre 187 ECB : PQresStatus(PQresultStatus(res)));
755 alvherre 188 GIC 1 : PQclear(res);
755 alvherre 189 CBC 1 : res = NULL;
755 alvherre 190 EUB :
755 alvherre 191 GIC 1 : if (PQgetResult(conn) != NULL)
755 alvherre 192 LBC 0 : pg_fatal("PQgetResult returned something extra after first result");
755 alvherre 193 EUB :
755 alvherre 194 GIC 1 : if (PQexitPipelineMode(conn) != 0)
755 alvherre 195 LBC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
755 alvherre 196 ECB :
755 alvherre 197 GBC 1 : res = PQgetResult(conn);
755 alvherre 198 GIC 1 : if (res == NULL)
755 alvherre 199 UIC 0 : pg_fatal("PQgetResult returned null when sync result expected: %s",
755 alvherre 200 ECB : PQerrorMessage(conn));
755 alvherre 201 EUB :
755 alvherre 202 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 203 LBC 0 : pg_fatal("Unexpected result code %s instead of sync result, error: %s",
204 : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
755 alvherre 205 GIC 1 : PQclear(res);
206 :
755 alvherre 207 ECB : /* second pipeline */
208 :
755 alvherre 209 GBC 1 : res = PQgetResult(conn);
755 alvherre 210 GIC 1 : if (res == NULL)
755 alvherre 211 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
755 alvherre 212 ECB : PQerrorMessage(conn));
755 alvherre 213 EUB :
755 alvherre 214 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
755 alvherre 215 UIC 0 : pg_fatal("Unexpected result code %s from second pipeline item",
755 alvherre 216 ECB : PQresStatus(PQresultStatus(res)));
217 :
755 alvherre 218 GBC 1 : res = PQgetResult(conn);
755 alvherre 219 GIC 1 : if (res != NULL)
755 alvherre 220 UIC 0 : pg_fatal("Expected null result, got %s",
755 alvherre 221 ECB : PQresStatus(PQresultStatus(res)));
222 :
755 alvherre 223 GBC 1 : res = PQgetResult(conn);
755 alvherre 224 GIC 1 : if (res == NULL)
755 alvherre 225 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
755 alvherre 226 ECB : PQerrorMessage(conn));
755 alvherre 227 EUB :
755 alvherre 228 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 229 UIC 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
230 : PQresStatus(PQresultStatus(res)));
755 alvherre 231 ECB :
755 alvherre 232 EUB : /* We're still in pipeline mode ... */
755 alvherre 233 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
755 alvherre 234 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
755 alvherre 235 ECB :
755 alvherre 236 EUB : /* until we end it, which we can safely do now */
755 alvherre 237 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 238 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
755 alvherre 239 ECB : PQerrorMessage(conn));
755 alvherre 240 EUB :
755 alvherre 241 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
755 alvherre 242 LBC 0 : pg_fatal("exiting pipeline mode didn't seem to work");
755 alvherre 243 ECB :
755 alvherre 244 GIC 1 : fprintf(stderr, "ok\n");
245 1 : }
246 :
247 : /*
248 : * Test behavior when a pipeline dispatches a number of commands that are
249 : * not flushed by a sync point.
649 alvherre 250 ECB : */
251 : static void
649 alvherre 252 CBC 1 : test_nosync(PGconn *conn)
649 alvherre 253 ECB : {
649 alvherre 254 CBC 1 : int numqueries = 10;
649 alvherre 255 GIC 1 : int results = 0;
649 alvherre 256 CBC 1 : int sock = PQsocket(conn);
257 :
258 1 : fprintf(stderr, "nosync... ");
649 alvherre 259 EUB :
649 alvherre 260 GIC 1 : if (sock < 0)
649 alvherre 261 LBC 0 : pg_fatal("invalid socket");
649 alvherre 262 EUB :
649 alvherre 263 CBC 1 : if (PQenterPipelineMode(conn) != 1)
649 alvherre 264 UIC 0 : pg_fatal("could not enter pipeline mode");
649 alvherre 265 GIC 11 : for (int i = 0; i < numqueries; i++)
266 : {
267 : fd_set input_mask;
649 alvherre 268 ECB : struct timeval tv;
269 :
649 alvherre 270 GBC 10 : if (PQsendQueryParams(conn, "SELECT repeat('xyzxz', 12)",
649 alvherre 271 ECB : 0, NULL, NULL, NULL, NULL, 0) != 1)
649 alvherre 272 UIC 0 : pg_fatal("error sending select: %s", PQerrorMessage(conn));
649 alvherre 273 GIC 10 : PQflush(conn);
274 :
275 : /*
649 alvherre 276 ECB : * If the server has written anything to us, read (some of) it now.
277 : */
649 alvherre 278 CBC 170 : FD_ZERO(&input_mask);
279 10 : FD_SET(sock, &input_mask);
280 10 : tv.tv_sec = 0;
649 alvherre 281 GIC 10 : tv.tv_usec = 0;
649 alvherre 282 GBC 10 : if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
649 alvherre 283 EUB : {
649 alvherre 284 UIC 0 : fprintf(stderr, "select() failed: %s\n", strerror(errno));
649 alvherre 285 LBC 0 : exit_nicely(conn);
649 alvherre 286 EUB : }
649 alvherre 287 GIC 10 : if (FD_ISSET(sock, &input_mask) && PQconsumeInput(conn) != 1)
649 alvherre 288 UIC 0 : pg_fatal("failed to read from server: %s", PQerrorMessage(conn));
289 : }
649 alvherre 290 ECB :
649 alvherre 291 EUB : /* tell server to flush its output buffer */
649 alvherre 292 CBC 1 : if (PQsendFlushRequest(conn) != 1)
649 alvherre 293 UIC 0 : pg_fatal("failed to send flush request");
649 alvherre 294 GIC 1 : PQflush(conn);
295 :
649 alvherre 296 ECB : /* Now read all results */
297 : for (;;)
649 alvherre 298 GIC 9 : {
649 alvherre 299 ECB : PGresult *res;
300 :
649 alvherre 301 GIC 10 : res = PQgetResult(conn);
649 alvherre 302 ECB :
649 alvherre 303 EUB : /* NULL results are only expected after TUPLES_OK */
649 alvherre 304 GIC 10 : if (res == NULL)
649 alvherre 305 UIC 0 : pg_fatal("got unexpected NULL result after %d results", results);
649 alvherre 306 ECB :
307 : /* We expect exactly one TUPLES_OK result for each query we sent */
649 alvherre 308 GIC 10 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
309 9 : {
310 : PGresult *res2;
649 alvherre 311 ECB :
312 : /* and one NULL result should follow each */
649 alvherre 313 GBC 10 : res2 = PQgetResult(conn);
649 alvherre 314 GIC 10 : if (res2 != NULL)
649 alvherre 315 LBC 0 : pg_fatal("expected NULL, got %s",
649 alvherre 316 ECB : PQresStatus(PQresultStatus(res2)));
649 alvherre 317 GIC 10 : PQclear(res);
318 10 : results++;
649 alvherre 319 ECB :
320 : /* if we're done, we're done */
649 alvherre 321 GIC 10 : if (results == numqueries)
649 alvherre 322 CBC 1 : break;
323 :
649 alvherre 324 GIC 9 : continue;
325 : }
649 alvherre 326 EUB :
327 : /* anything else is unexpected */
649 alvherre 328 UIC 0 : pg_fatal("got unexpected %s\n", PQresStatus(PQresultStatus(res)));
649 alvherre 329 ECB : }
330 :
649 alvherre 331 GIC 1 : fprintf(stderr, "ok\n");
332 1 : }
333 :
334 : /*
335 : * When an operation in a pipeline fails the rest of the pipeline is flushed. We
336 : * still have to get results for each pipeline item, but the item will just be
337 : * a PGRES_PIPELINE_ABORTED code.
338 : *
339 : * This intentionally doesn't use a transaction to wrap the pipeline. You should
340 : * usually use an xact, but in this case we want to observe the effects of each
341 : * statement.
755 alvherre 342 ECB : */
343 : static void
755 alvherre 344 CBC 1 : test_pipeline_abort(PGconn *conn)
755 alvherre 345 ECB : {
755 alvherre 346 CBC 1 : PGresult *res = NULL;
755 alvherre 347 GIC 1 : const char *dummy_params[1] = {"1"};
348 1 : Oid dummy_param_oids[1] = {INT4OID};
349 : int i;
350 : int gotrows;
755 alvherre 351 ECB : bool goterror;
352 :
755 alvherre 353 CBC 1 : fprintf(stderr, "aborted pipeline... ");
755 alvherre 354 ECB :
755 alvherre 355 GBC 1 : res = PQexec(conn, drop_table_sql);
755 alvherre 356 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 357 LBC 0 : pg_fatal("dispatching DROP TABLE failed: %s", PQerrorMessage(conn));
755 alvherre 358 ECB :
755 alvherre 359 GBC 1 : res = PQexec(conn, create_table_sql);
755 alvherre 360 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 361 UIC 0 : pg_fatal("dispatching CREATE TABLE failed: %s", PQerrorMessage(conn));
362 :
363 : /*
364 : * Queue up a couple of small pipelines and process each without returning
365 : * to command mode first. Make sure the second operation in the first
755 alvherre 366 ECB : * pipeline ERRORs.
755 alvherre 367 EUB : */
755 alvherre 368 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 369 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
755 alvherre 370 ECB :
755 alvherre 371 GIC 1 : dummy_params[0] = "1";
755 alvherre 372 GBC 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
373 : dummy_params, NULL, NULL, 0) != 1)
755 alvherre 374 LBC 0 : pg_fatal("dispatching first insert failed: %s", PQerrorMessage(conn));
375 :
755 alvherre 376 GIC 1 : if (PQsendQueryParams(conn, "SELECT no_such_function($1)",
755 alvherre 377 EUB : 1, dummy_param_oids, dummy_params,
378 : NULL, NULL, 0) != 1)
755 alvherre 379 LBC 0 : pg_fatal("dispatching error select failed: %s", PQerrorMessage(conn));
755 alvherre 380 ECB :
755 alvherre 381 GIC 1 : dummy_params[0] = "2";
755 alvherre 382 GBC 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
383 : dummy_params, NULL, NULL, 0) != 1)
755 alvherre 384 LBC 0 : pg_fatal("dispatching second insert failed: %s", PQerrorMessage(conn));
755 alvherre 385 EUB :
755 alvherre 386 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 387 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 388 ECB :
755 alvherre 389 GIC 1 : dummy_params[0] = "3";
755 alvherre 390 GBC 1 : if (PQsendQueryParams(conn, insert_sql, 1, dummy_param_oids,
391 : dummy_params, NULL, NULL, 0) != 1)
755 alvherre 392 UIC 0 : pg_fatal("dispatching second-pipeline insert failed: %s",
755 alvherre 393 ECB : PQerrorMessage(conn));
755 alvherre 394 EUB :
755 alvherre 395 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 396 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
397 :
398 : /*
399 : * OK, start processing the pipeline results.
400 : *
401 : * We should get a command-ok for the first query, then a fatal error and
402 : * a pipeline aborted message for the second insert, a pipeline-end, then
755 alvherre 403 ECB : * a command-ok and a pipeline-ok for the second pipeline operation.
404 : */
755 alvherre 405 GBC 1 : res = PQgetResult(conn);
755 alvherre 406 CBC 1 : if (res == NULL)
755 alvherre 407 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 408 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 409 UIC 0 : pg_fatal("Unexpected result status %s: %s",
755 alvherre 410 ECB : PQresStatus(PQresultStatus(res)),
411 : PQresultErrorMessage(res));
755 alvherre 412 GIC 1 : PQclear(res);
755 alvherre 413 ECB :
755 alvherre 414 EUB : /* NULL result to signal end-of-results for this command */
755 alvherre 415 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
755 alvherre 416 UIC 0 : pg_fatal("Expected null result, got %s",
417 : PQresStatus(PQresultStatus(res)));
755 alvherre 418 ECB :
419 : /* Second query caused error, so we expect an error next */
755 alvherre 420 GBC 1 : res = PQgetResult(conn);
755 alvherre 421 CBC 1 : if (res == NULL)
755 alvherre 422 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 423 GIC 1 : if (PQresultStatus(res) != PGRES_FATAL_ERROR)
755 alvherre 424 LBC 0 : pg_fatal("Unexpected result code -- expected PGRES_FATAL_ERROR, got %s",
425 : PQresStatus(PQresultStatus(res)));
755 alvherre 426 GIC 1 : PQclear(res);
755 alvherre 427 ECB :
755 alvherre 428 EUB : /* NULL result to signal end-of-results for this command */
755 alvherre 429 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
755 alvherre 430 UIC 0 : pg_fatal("Expected null result, got %s",
431 : PQresStatus(PQresultStatus(res)));
432 :
433 : /*
434 : * pipeline should now be aborted.
435 : *
436 : * Note that we could still queue more queries at this point if we wanted;
437 : * they'd get added to a new third pipeline since we've already sent a
755 alvherre 438 ECB : * second. The aborted flag relates only to the pipeline being received.
755 alvherre 439 EUB : */
755 alvherre 440 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
755 alvherre 441 UIC 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
755 alvherre 442 ECB :
443 : /* third query in pipeline, the second insert */
755 alvherre 444 GBC 1 : res = PQgetResult(conn);
755 alvherre 445 CBC 1 : if (res == NULL)
755 alvherre 446 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 447 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_ABORTED)
755 alvherre 448 LBC 0 : pg_fatal("Unexpected result code -- expected PGRES_PIPELINE_ABORTED, got %s",
449 : PQresStatus(PQresultStatus(res)));
755 alvherre 450 GIC 1 : PQclear(res);
755 alvherre 451 ECB :
755 alvherre 452 EUB : /* NULL result to signal end-of-results for this command */
755 alvherre 453 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
755 alvherre 454 LBC 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 455 EUB :
755 alvherre 456 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_ABORTED)
755 alvherre 457 UIC 0 : pg_fatal("pipeline should be flagged as aborted but isn't");
755 alvherre 458 ECB :
755 alvherre 459 EUB : /* Ensure we're still in pipeline */
755 alvherre 460 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
755 alvherre 461 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
462 :
463 : /*
464 : * The end of a failed pipeline is a PGRES_PIPELINE_SYNC.
465 : *
466 : * (This is so clients know to start processing results normally again and
755 alvherre 467 ECB : * can tell the difference between skipped commands and the sync.)
468 : */
755 alvherre 469 GBC 1 : res = PQgetResult(conn);
755 alvherre 470 CBC 1 : if (res == NULL)
755 alvherre 471 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 472 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 473 UIC 0 : pg_fatal("Unexpected result code from first pipeline sync\n"
755 alvherre 474 ECB : "Expected PGRES_PIPELINE_SYNC, got %s",
475 : PQresStatus(PQresultStatus(res)));
755 alvherre 476 CBC 1 : PQclear(res);
755 alvherre 477 EUB :
755 alvherre 478 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_ABORTED)
755 alvherre 479 UIC 0 : pg_fatal("sync should've cleared the aborted flag but didn't");
755 alvherre 480 ECB :
755 alvherre 481 EUB : /* We're still in pipeline mode... */
755 alvherre 482 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
755 alvherre 483 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
755 alvherre 484 ECB :
485 : /* the insert from the second pipeline */
755 alvherre 486 GBC 1 : res = PQgetResult(conn);
755 alvherre 487 CBC 1 : if (res == NULL)
755 alvherre 488 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 489 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 490 LBC 0 : pg_fatal("Unexpected result code %s from first item in second pipeline",
491 : PQresStatus(PQresultStatus(res)));
755 alvherre 492 GIC 1 : PQclear(res);
755 alvherre 493 ECB :
755 alvherre 494 EUB : /* Read the NULL result at the end of the command */
755 alvherre 495 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
755 alvherre 496 UIC 0 : pg_fatal("Expected null result, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 497 ECB :
755 alvherre 498 EUB : /* the second pipeline sync */
755 alvherre 499 CBC 1 : if ((res = PQgetResult(conn)) == NULL)
755 alvherre 500 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 501 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 502 LBC 0 : pg_fatal("Unexpected result code %s from second pipeline sync",
503 : PQresStatus(PQresultStatus(res)));
755 alvherre 504 CBC 1 : PQclear(res);
755 alvherre 505 EUB :
755 alvherre 506 GIC 1 : if ((res = PQgetResult(conn)) != NULL)
755 alvherre 507 UIC 0 : pg_fatal("Expected null result, got %s: %s",
508 : PQresStatus(PQresultStatus(res)),
509 : PQerrorMessage(conn));
755 alvherre 510 ECB :
755 alvherre 511 EUB : /* Try to send two queries in one command */
198 alvherre 512 CBC 1 : if (PQsendQueryParams(conn, "SELECT 1; SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 513 UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
755 alvherre 514 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 515 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 516 GIC 1 : goterror = false;
755 alvherre 517 CBC 2 : while ((res = PQgetResult(conn)) != NULL)
518 : {
519 1 : switch (PQresultStatus(res))
755 alvherre 520 ECB : {
755 alvherre 521 GBC 1 : case PGRES_FATAL_ERROR:
755 alvherre 522 GIC 1 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "42601") != 0)
755 alvherre 523 LBC 0 : pg_fatal("expected error about multiple commands, got %s",
755 alvherre 524 ECB : PQerrorMessage(conn));
755 alvherre 525 CBC 1 : printf("got expected %s", PQerrorMessage(conn));
755 alvherre 526 GBC 1 : goterror = true;
527 1 : break;
755 alvherre 528 UIC 0 : default:
529 0 : pg_fatal("got unexpected status %s", PQresStatus(PQresultStatus(res)));
530 : break;
755 alvherre 531 ECB : }
755 alvherre 532 EUB : }
755 alvherre 533 CBC 1 : if (!goterror)
755 alvherre 534 LBC 0 : pg_fatal("did not get cannot-insert-multiple-commands error");
755 alvherre 535 GBC 1 : res = PQgetResult(conn);
755 alvherre 536 CBC 1 : if (res == NULL)
755 alvherre 537 UBC 0 : pg_fatal("got NULL result");
755 alvherre 538 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 539 LBC 0 : pg_fatal("Unexpected result code %s from pipeline sync",
540 : PQresStatus(PQresultStatus(res)));
739 alvherre 541 GIC 1 : fprintf(stderr, "ok\n");
755 alvherre 542 ECB :
543 : /* Test single-row mode with an error partways */
198 alvherre 544 GBC 1 : if (PQsendQueryParams(conn, "SELECT 1.0/g FROM generate_series(3, -1, -1) g",
198 alvherre 545 ECB : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 546 UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
755 alvherre 547 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 548 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 549 CBC 1 : PQsetSingleRowMode(conn);
550 1 : goterror = false;
739 alvherre 551 GIC 1 : gotrows = 0;
755 alvherre 552 CBC 5 : while ((res = PQgetResult(conn)) != NULL)
553 : {
554 4 : switch (PQresultStatus(res))
755 alvherre 555 ECB : {
755 alvherre 556 CBC 3 : case PGRES_SINGLE_TUPLE:
557 3 : printf("got row: %s\n", PQgetvalue(res, 0, 0));
739 558 3 : gotrows++;
755 559 3 : break;
755 alvherre 560 GBC 1 : case PGRES_FATAL_ERROR:
755 alvherre 561 GIC 1 : if (strcmp(PQresultErrorField(res, PG_DIAG_SQLSTATE), "22012") != 0)
755 alvherre 562 UIC 0 : pg_fatal("expected division-by-zero, got: %s (%s)",
755 alvherre 563 ECB : PQerrorMessage(conn),
564 : PQresultErrorField(res, PG_DIAG_SQLSTATE));
755 alvherre 565 CBC 1 : printf("got expected division-by-zero\n");
755 alvherre 566 GBC 1 : goterror = true;
567 1 : break;
755 alvherre 568 UIC 0 : default:
755 alvherre 569 LBC 0 : pg_fatal("got unexpected result %s", PQresStatus(PQresultStatus(res)));
570 : }
755 alvherre 571 CBC 4 : PQclear(res);
755 alvherre 572 EUB : }
755 alvherre 573 CBC 1 : if (!goterror)
755 alvherre 574 UBC 0 : pg_fatal("did not get division-by-zero error");
739 alvherre 575 GIC 1 : if (gotrows != 3)
739 alvherre 576 LBC 0 : pg_fatal("did not get three rows");
755 alvherre 577 EUB : /* the third pipeline sync */
755 alvherre 578 CBC 1 : if ((res = PQgetResult(conn)) == NULL)
755 alvherre 579 UBC 0 : pg_fatal("Unexpected NULL result: %s", PQerrorMessage(conn));
755 alvherre 580 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 581 LBC 0 : pg_fatal("Unexpected result code %s from third pipeline sync",
582 : PQresStatus(PQresultStatus(res)));
755 alvherre 583 GIC 1 : PQclear(res);
755 alvherre 584 ECB :
755 alvherre 585 EUB : /* We're still in pipeline mode... */
755 alvherre 586 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
755 alvherre 587 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
755 alvherre 588 ECB :
755 alvherre 589 EUB : /* until we end it, which we can safely do now */
755 alvherre 590 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 591 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
755 alvherre 592 ECB : PQerrorMessage(conn));
755 alvherre 593 EUB :
755 alvherre 594 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
755 alvherre 595 UIC 0 : pg_fatal("exiting pipeline mode didn't seem to work");
596 :
597 : /*-
598 : * Since we fired the pipelines off without a surrounding xact, the results
599 : * should be:
600 : *
601 : * - Implicit xact started by server around 1st pipeline
602 : * - First insert applied
603 : * - Second statement aborted xact
604 : * - Third insert skipped
605 : * - Sync rolled back first implicit xact
606 : * - Implicit xact created by server around 2nd pipeline
607 : * - insert applied from 2nd pipeline
608 : * - Sync commits 2nd xact
609 : *
755 alvherre 610 ECB : * So we should only have the value 3 that we inserted.
611 : */
755 alvherre 612 CBC 1 : res = PQexec(conn, "SELECT itemno FROM pq_pipeline_demo");
755 alvherre 613 EUB :
755 alvherre 614 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
755 alvherre 615 LBC 0 : pg_fatal("Expected tuples, got %s: %s",
755 alvherre 616 EUB : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
755 alvherre 617 CBC 1 : if (PQntuples(res) != 1)
755 alvherre 618 UIC 0 : pg_fatal("expected 1 result, got %d", PQntuples(res));
755 alvherre 619 CBC 2 : for (i = 0; i < PQntuples(res); i++)
620 : {
621 1 : const char *val = PQgetvalue(res, i, 0);
755 alvherre 622 EUB :
755 alvherre 623 GIC 1 : if (strcmp(val, "3") != 0)
755 alvherre 624 UIC 0 : pg_fatal("expected only insert with value 3, got %s", val);
755 alvherre 625 ECB : }
626 :
755 alvherre 627 CBC 1 : PQclear(res);
278 alvherre 628 ECB :
278 alvherre 629 GIC 1 : fprintf(stderr, "ok\n");
755 630 1 : }
631 :
632 : /* State machine enum for test_pipelined_insert */
633 : enum PipelineInsertStep
634 : {
635 : BI_BEGIN_TX,
636 : BI_DROP_TABLE,
637 : BI_CREATE_TABLE,
638 : BI_PREPARE,
639 : BI_INSERT_ROWS,
640 : BI_COMMIT_TX,
641 : BI_SYNC,
642 : BI_DONE
643 : };
755 alvherre 644 ECB :
645 : static void
755 alvherre 646 CBC 1 : test_pipelined_insert(PGconn *conn, int n_rows)
647 : {
739 alvherre 648 GIC 1 : Oid insert_param_oids[2] = {INT4OID, INT8OID};
649 : const char *insert_params[2];
755 alvherre 650 ECB : char insert_param_0[MAXINTLEN];
739 651 : char insert_param_1[MAXINT8LEN];
755 alvherre 652 GIC 1 : enum PipelineInsertStep send_step = BI_BEGIN_TX,
653 1 : recv_step = BI_BEGIN_TX;
654 : int rows_to_send,
755 alvherre 655 ECB : rows_to_receive;
656 :
739 alvherre 657 GIC 1 : insert_params[0] = insert_param_0;
739 alvherre 658 CBC 1 : insert_params[1] = insert_param_1;
659 :
755 alvherre 660 GIC 1 : rows_to_send = rows_to_receive = n_rows;
661 :
662 : /*
755 alvherre 663 ECB : * Do a pipelined insert into a table created at the start of the pipeline
755 alvherre 664 EUB : */
755 alvherre 665 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 666 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
667 :
755 alvherre 668 GIC 4 : while (send_step != BI_PREPARE)
669 : {
755 alvherre 670 ECB : const char *sql;
671 :
755 alvherre 672 CBC 3 : switch (send_step)
755 alvherre 673 ECB : {
755 alvherre 674 CBC 1 : case BI_BEGIN_TX:
675 1 : sql = "BEGIN TRANSACTION";
755 alvherre 676 GIC 1 : send_step = BI_DROP_TABLE;
755 alvherre 677 CBC 1 : break;
755 alvherre 678 ECB :
755 alvherre 679 CBC 1 : case BI_DROP_TABLE:
680 1 : sql = drop_table_sql;
755 alvherre 681 GIC 1 : send_step = BI_CREATE_TABLE;
755 alvherre 682 CBC 1 : break;
755 alvherre 683 ECB :
755 alvherre 684 CBC 1 : case BI_CREATE_TABLE:
685 1 : sql = create_table_sql;
755 alvherre 686 GIC 1 : send_step = BI_PREPARE;
755 alvherre 687 GBC 1 : break;
755 alvherre 688 EUB :
755 alvherre 689 UIC 0 : default:
690 0 : pg_fatal("invalid state");
691 : sql = NULL; /* keep compiler quiet */
692 : }
755 alvherre 693 ECB :
694 : pg_debug("sending: %s\n", sql);
755 alvherre 695 GBC 3 : if (PQsendQueryParams(conn, sql,
696 : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 697 UIC 0 : pg_fatal("dispatching %s failed: %s", sql, PQerrorMessage(conn));
755 alvherre 698 ECB : }
699 :
755 alvherre 700 CBC 1 : Assert(send_step == BI_PREPARE);
739 alvherre 701 EUB : pg_debug("sending: %s\n", insert_sql2);
739 alvherre 702 CBC 1 : if (PQsendPrepare(conn, "my_insert", insert_sql2, 2, insert_param_oids) != 1)
755 alvherre 703 UIC 0 : pg_fatal("dispatching PREPARE failed: %s", PQerrorMessage(conn));
755 alvherre 704 GIC 1 : send_step = BI_INSERT_ROWS;
705 :
706 : /*
707 : * Now we start inserting. We'll be sending enough data that we could fill
708 : * our output buffer, so to avoid deadlocking we need to enter nonblocking
709 : * mode and consume input while we send more output. As results of each
710 : * query are processed we should pop them to allow processing of the next
711 : * query. There's no need to finish the pipeline before processing
755 alvherre 712 ECB : * results.
755 alvherre 713 EUB : */
755 alvherre 714 GIC 1 : if (PQsetnonblocking(conn, 1) != 0)
755 alvherre 715 LBC 0 : pg_fatal("failed to set nonblocking mode: %s", PQerrorMessage(conn));
716 :
755 alvherre 717 GIC 33248 : while (recv_step != BI_DONE)
718 : {
719 : int sock;
720 : fd_set input_mask;
755 alvherre 721 ECB : fd_set output_mask;
722 :
755 alvherre 723 CBC 33247 : sock = PQsocket(conn);
755 alvherre 724 EUB :
755 alvherre 725 GIC 33247 : if (sock < 0)
755 alvherre 726 LBC 0 : break; /* shouldn't happen */
755 alvherre 727 ECB :
755 alvherre 728 CBC 565199 : FD_ZERO(&input_mask);
729 33247 : FD_SET(sock, &input_mask);
755 alvherre 730 GIC 565199 : FD_ZERO(&output_mask);
755 alvherre 731 CBC 33247 : FD_SET(sock, &output_mask);
732 :
755 alvherre 733 GBC 33247 : if (select(sock + 1, &input_mask, &output_mask, NULL, NULL) < 0)
755 alvherre 734 EUB : {
755 alvherre 735 UIC 0 : fprintf(stderr, "select() failed: %s\n", strerror(errno));
736 0 : exit_nicely(conn);
737 : }
738 :
739 : /*
740 : * Process any results, so we keep the server's output buffer free
755 alvherre 741 ECB : * flowing and it can continue to process input
742 : */
755 alvherre 743 CBC 33247 : if (FD_ISSET(sock, &input_mask))
744 : {
755 alvherre 745 GIC 3 : PQconsumeInput(conn);
755 alvherre 746 ECB :
747 : /* Read until we'd block if we tried to read */
755 alvherre 748 GIC 1414 : while (!PQisBusy(conn) && recv_step < BI_DONE)
755 alvherre 749 ECB : {
750 : PGresult *res;
739 tgl 751 GIC 1411 : const char *cmdtag = "";
755 alvherre 752 1411 : const char *description = "";
753 : int status;
754 :
755 : /*
756 : * Read next result. If no more results from this query,
755 alvherre 757 ECB : * advance to the next query
758 : */
755 alvherre 759 CBC 1411 : res = PQgetResult(conn);
755 alvherre 760 GIC 1411 : if (res == NULL)
755 alvherre 761 CBC 705 : continue;
755 alvherre 762 ECB :
755 alvherre 763 GIC 706 : status = PGRES_COMMAND_OK;
755 alvherre 764 CBC 706 : switch (recv_step)
755 alvherre 765 ECB : {
755 alvherre 766 CBC 1 : case BI_BEGIN_TX:
767 1 : cmdtag = "BEGIN";
768 1 : recv_step++;
769 1 : break;
770 1 : case BI_DROP_TABLE:
771 1 : cmdtag = "DROP TABLE";
772 1 : recv_step++;
773 1 : break;
774 1 : case BI_CREATE_TABLE:
775 1 : cmdtag = "CREATE TABLE";
776 1 : recv_step++;
777 1 : break;
778 1 : case BI_PREPARE:
779 1 : cmdtag = "";
780 1 : description = "PREPARE";
781 1 : recv_step++;
782 1 : break;
783 700 : case BI_INSERT_ROWS:
784 700 : cmdtag = "INSERT";
785 700 : rows_to_receive--;
786 700 : if (rows_to_receive == 0)
787 1 : recv_step++;
788 700 : break;
789 1 : case BI_COMMIT_TX:
790 1 : cmdtag = "COMMIT";
791 1 : recv_step++;
792 1 : break;
793 1 : case BI_SYNC:
794 1 : cmdtag = "";
795 1 : description = "SYNC";
796 1 : status = PGRES_PIPELINE_SYNC;
755 alvherre 797 GBC 1 : recv_step++;
755 alvherre 798 GIC 1 : break;
755 alvherre 799 UBC 0 : case BI_DONE:
800 : /* unreachable */
749 tgl 801 UIC 0 : pg_fatal("unreachable state");
755 alvherre 802 ECB : }
755 alvherre 803 EUB :
755 alvherre 804 GIC 706 : if (PQresultStatus(res) != status)
755 alvherre 805 UIC 0 : pg_fatal("%s reported status %s, expected %s\n"
806 : "Error message: \"%s\"",
807 : description, PQresStatus(PQresultStatus(res)),
755 alvherre 808 ECB : PQresStatus(status), PQerrorMessage(conn));
755 alvherre 809 EUB :
755 alvherre 810 GIC 706 : if (strncmp(PQcmdStatus(res), cmdtag, strlen(cmdtag)) != 0)
755 alvherre 811 UIC 0 : pg_fatal("%s expected command tag '%s', got '%s'",
812 : description, cmdtag, PQcmdStatus(res));
813 :
755 alvherre 814 ECB : pg_debug("Got %s OK\n", cmdtag[0] != '\0' ? cmdtag : description);
815 :
755 alvherre 816 GIC 706 : PQclear(res);
817 : }
818 : }
755 alvherre 819 ECB :
820 : /* Write more rows and/or the end pipeline message, if needed */
755 alvherre 821 CBC 33247 : if (FD_ISSET(sock, &output_mask))
822 : {
823 33245 : PQflush(conn);
824 :
825 33245 : if (send_step == BI_INSERT_ROWS)
826 : {
739 827 700 : snprintf(insert_param_0, MAXINTLEN, "%d", rows_to_send);
828 : /* use up some buffer space with a wide value */
738 829 700 : snprintf(insert_param_1, MAXINT8LEN, "%lld", 1LL << 62);
830 :
755 alvherre 831 GIC 700 : if (PQsendQueryPrepared(conn, "my_insert",
832 : 2, insert_params, NULL, NULL, 0) == 1)
833 : {
755 alvherre 834 ECB : pg_debug("sent row %d\n", rows_to_send);
835 :
755 alvherre 836 CBC 700 : rows_to_send--;
755 alvherre 837 GIC 700 : if (rows_to_send == 0)
838 1 : send_step++;
839 : }
840 : else
841 : {
842 : /*
843 : * in nonblocking mode, so it's OK for an insert to fail
755 alvherre 844 EUB : * to send
845 : */
755 alvherre 846 UIC 0 : fprintf(stderr, "WARNING: failed to send insert #%d: %s\n",
847 : rows_to_send, PQerrorMessage(conn));
755 alvherre 848 ECB : }
849 : }
755 alvherre 850 CBC 32545 : else if (send_step == BI_COMMIT_TX)
851 : {
755 alvherre 852 GIC 1 : if (PQsendQueryParams(conn, "COMMIT",
853 : 0, NULL, NULL, NULL, NULL, 0) == 1)
755 alvherre 854 ECB : {
855 : pg_debug("sent COMMIT\n");
755 alvherre 856 GIC 1 : send_step++;
857 : }
755 alvherre 858 EUB : else
859 : {
755 alvherre 860 UIC 0 : fprintf(stderr, "WARNING: failed to send commit: %s\n",
861 : PQerrorMessage(conn));
755 alvherre 862 ECB : }
863 : }
755 alvherre 864 CBC 32544 : else if (send_step == BI_SYNC)
865 : {
866 1 : if (PQpipelineSync(conn) == 1)
755 alvherre 867 ECB : {
755 alvherre 868 GIC 1 : fprintf(stdout, "pipeline sync sent\n");
869 1 : send_step++;
870 : }
755 alvherre 871 EUB : else
872 : {
755 alvherre 873 UIC 0 : fprintf(stderr, "WARNING: pipeline sync failed: %s\n",
874 : PQerrorMessage(conn));
875 : }
876 : }
877 : }
878 : }
755 alvherre 879 ECB :
755 alvherre 880 EUB : /* We've got the sync message and the pipeline should be done */
755 alvherre 881 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 882 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
755 alvherre 883 ECB : PQerrorMessage(conn));
755 alvherre 884 EUB :
755 alvherre 885 GIC 1 : if (PQsetnonblocking(conn, 0) != 0)
755 alvherre 886 LBC 0 : pg_fatal("failed to clear nonblocking mode: %s", PQerrorMessage(conn));
755 alvherre 887 ECB :
755 alvherre 888 GIC 1 : fprintf(stderr, "ok\n");
889 1 : }
755 alvherre 890 ECB :
891 : static void
755 alvherre 892 CBC 1 : test_prepared(PGconn *conn)
755 alvherre 893 ECB : {
755 alvherre 894 GIC 1 : PGresult *res = NULL;
895 1 : Oid param_oids[1] = {INT4OID};
896 : Oid expected_oids[4];
755 alvherre 897 ECB : Oid typ;
898 :
755 alvherre 899 CBC 1 : fprintf(stderr, "prepared... ");
755 alvherre 900 EUB :
755 alvherre 901 CBC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 902 UIC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
755 alvherre 903 GIC 1 : if (PQsendPrepare(conn, "select_one", "SELECT $1, '42', $1::numeric, "
755 alvherre 904 EUB : "interval '1 sec'",
755 alvherre 905 ECB : 1, param_oids) != 1)
755 alvherre 906 LBC 0 : pg_fatal("preparing query failed: %s", PQerrorMessage(conn));
755 alvherre 907 CBC 1 : expected_oids[0] = INT4OID;
908 1 : expected_oids[1] = TEXTOID;
909 1 : expected_oids[2] = NUMERICOID;
755 alvherre 910 GBC 1 : expected_oids[3] = INTERVALOID;
755 alvherre 911 CBC 1 : if (PQsendDescribePrepared(conn, "select_one") != 1)
755 alvherre 912 UBC 0 : pg_fatal("failed to send describePrepared: %s", PQerrorMessage(conn));
755 alvherre 913 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 914 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 915 ECB :
755 alvherre 916 GBC 1 : res = PQgetResult(conn);
755 alvherre 917 CBC 1 : if (res == NULL)
755 alvherre 918 UBC 0 : pg_fatal("PQgetResult returned null");
755 alvherre 919 CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 920 LBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 921 CBC 1 : PQclear(res);
755 alvherre 922 GBC 1 : res = PQgetResult(conn);
755 alvherre 923 GIC 1 : if (res != NULL)
755 alvherre 924 LBC 0 : pg_fatal("expected NULL result");
755 alvherre 925 ECB :
755 alvherre 926 GBC 1 : res = PQgetResult(conn);
755 alvherre 927 CBC 1 : if (res == NULL)
755 alvherre 928 UBC 0 : pg_fatal("PQgetResult returned NULL");
755 alvherre 929 CBC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 930 UBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 931 GIC 1 : if (PQnfields(res) != lengthof(expected_oids))
206 tgl 932 LBC 0 : pg_fatal("expected %zd columns, got %d",
933 : lengthof(expected_oids), PQnfields(res));
755 alvherre 934 CBC 5 : for (int i = 0; i < PQnfields(res); i++)
755 alvherre 935 ECB : {
755 alvherre 936 GBC 4 : typ = PQftype(res, i);
755 alvherre 937 GIC 4 : if (typ != expected_oids[i])
755 alvherre 938 UIC 0 : pg_fatal("field %d: expected type %u, got %u",
755 alvherre 939 ECB : i, expected_oids[i], typ);
940 : }
755 alvherre 941 CBC 1 : PQclear(res);
755 alvherre 942 GBC 1 : res = PQgetResult(conn);
755 alvherre 943 GIC 1 : if (res != NULL)
755 alvherre 944 LBC 0 : pg_fatal("expected NULL result");
755 alvherre 945 ECB :
755 alvherre 946 GBC 1 : res = PQgetResult(conn);
755 alvherre 947 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 948 LBC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 949 EUB :
755 alvherre 950 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 951 LBC 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
755 alvherre 952 ECB :
755 alvherre 953 CBC 1 : PQexec(conn, "BEGIN");
954 1 : PQexec(conn, "DECLARE cursor_one CURSOR FOR SELECT 1");
755 alvherre 955 GBC 1 : PQenterPipelineMode(conn);
755 alvherre 956 CBC 1 : if (PQsendDescribePortal(conn, "cursor_one") != 1)
755 alvherre 957 UBC 0 : pg_fatal("PQsendDescribePortal failed: %s", PQerrorMessage(conn));
755 alvherre 958 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 959 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 960 GBC 1 : res = PQgetResult(conn);
755 alvherre 961 CBC 1 : if (res == NULL)
755 alvherre 962 UBC 0 : pg_fatal("expected NULL result");
755 alvherre 963 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 964 LBC 0 : pg_fatal("expected COMMAND_OK, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 965 ECB :
755 alvherre 966 GBC 1 : typ = PQftype(res, 0);
755 alvherre 967 GIC 1 : if (typ != INT4OID)
755 alvherre 968 LBC 0 : pg_fatal("portal: expected type %u, got %u",
755 alvherre 969 ECB : INT4OID, typ);
755 alvherre 970 CBC 1 : PQclear(res);
755 alvherre 971 GBC 1 : res = PQgetResult(conn);
755 alvherre 972 CBC 1 : if (res != NULL)
755 alvherre 973 LBC 0 : pg_fatal("expected NULL result");
755 alvherre 974 GBC 1 : res = PQgetResult(conn);
755 alvherre 975 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 976 LBC 0 : pg_fatal("expected PGRES_PIPELINE_SYNC, got %s", PQresStatus(PQresultStatus(res)));
755 alvherre 977 EUB :
755 alvherre 978 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 979 LBC 0 : pg_fatal("could not exit pipeline mode: %s", PQerrorMessage(conn));
755 alvherre 980 ECB :
755 alvherre 981 GIC 1 : fprintf(stderr, "ok\n");
982 1 : }
983 :
278 alvherre 984 ECB : /* Notice processor: print notices, and count how many we got */
985 : static void
278 alvherre 986 CBC 1 : notice_processor(void *arg, const char *message)
987 : {
988 1 : int *n_notices = (int *) arg;
278 alvherre 989 ECB :
278 alvherre 990 CBC 1 : (*n_notices)++;
278 alvherre 991 GIC 1 : fprintf(stderr, "NOTICE %d: %s", *n_notices, message);
992 1 : }
993 :
278 alvherre 994 ECB : /* Verify behavior in "idle" state */
995 : static void
278 alvherre 996 GIC 1 : test_pipeline_idle(PGconn *conn)
278 alvherre 997 ECB : {
998 : PGresult *res;
278 alvherre 999 CBC 1 : int n_notices = 0;
1000 :
1001 1 : fprintf(stderr, "\npipeline idle...\n");
1002 :
278 alvherre 1003 GIC 1 : PQsetNoticeProcessor(conn, notice_processor, &n_notices);
278 alvherre 1004 ECB :
278 alvherre 1005 EUB : /* Try to exit pipeline mode in pipeline-idle state */
278 alvherre 1006 CBC 1 : if (PQenterPipelineMode(conn) != 1)
278 alvherre 1007 UBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
198 alvherre 1008 CBC 1 : if (PQsendQueryParams(conn, "SELECT 1", 0, NULL, NULL, NULL, NULL, 0) != 1)
278 alvherre 1009 LBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
278 alvherre 1010 CBC 1 : PQsendFlushRequest(conn);
278 alvherre 1011 GBC 1 : res = PQgetResult(conn);
278 alvherre 1012 GIC 1 : if (res == NULL)
278 alvherre 1013 LBC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
278 alvherre 1014 EUB : PQerrorMessage(conn));
278 alvherre 1015 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
278 alvherre 1016 LBC 0 : pg_fatal("unexpected result code %s from first pipeline item",
278 alvherre 1017 ECB : PQresStatus(PQresultStatus(res)));
278 alvherre 1018 CBC 1 : PQclear(res);
278 alvherre 1019 GBC 1 : res = PQgetResult(conn);
278 alvherre 1020 CBC 1 : if (res != NULL)
278 alvherre 1021 UBC 0 : pg_fatal("did not receive terminating NULL");
198 alvherre 1022 CBC 1 : if (PQsendQueryParams(conn, "SELECT 2", 0, NULL, NULL, NULL, NULL, 0) != 1)
278 alvherre 1023 UBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
278 alvherre 1024 CBC 1 : if (PQexitPipelineMode(conn) == 1)
278 alvherre 1025 UIC 0 : pg_fatal("exiting pipeline succeeded when it shouldn't");
278 alvherre 1026 GBC 1 : if (strncmp(PQerrorMessage(conn), "cannot exit pipeline mode",
1027 : strlen("cannot exit pipeline mode")) != 0)
278 alvherre 1028 LBC 0 : pg_fatal("did not get expected error; got: %s",
278 alvherre 1029 ECB : PQerrorMessage(conn));
278 alvherre 1030 CBC 1 : PQsendFlushRequest(conn);
278 alvherre 1031 GBC 1 : res = PQgetResult(conn);
278 alvherre 1032 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
278 alvherre 1033 LBC 0 : pg_fatal("unexpected result code %s from second pipeline item",
278 alvherre 1034 ECB : PQresStatus(PQresultStatus(res)));
278 alvherre 1035 CBC 1 : PQclear(res);
278 alvherre 1036 GBC 1 : res = PQgetResult(conn);
278 alvherre 1037 CBC 1 : if (res != NULL)
278 alvherre 1038 UBC 0 : pg_fatal("did not receive terminating NULL");
278 alvherre 1039 GIC 1 : if (PQexitPipelineMode(conn) != 1)
278 alvherre 1040 LBC 0 : pg_fatal("exiting pipeline failed: %s", PQerrorMessage(conn));
278 alvherre 1041 EUB :
278 alvherre 1042 CBC 1 : if (n_notices > 0)
278 alvherre 1043 UIC 0 : pg_fatal("got %d notice(s)", n_notices);
198 alvherre 1044 GIC 1 : fprintf(stderr, "ok - 1\n");
278 alvherre 1045 ECB :
278 alvherre 1046 EUB : /* Have a WARNING in the middle of a resultset */
278 alvherre 1047 CBC 1 : if (PQenterPipelineMode(conn) != 1)
278 alvherre 1048 UBC 0 : pg_fatal("entering pipeline mode failed: %s", PQerrorMessage(conn));
198 alvherre 1049 CBC 1 : if (PQsendQueryParams(conn, "SELECT pg_catalog.pg_advisory_unlock(1,1)", 0, NULL, NULL, NULL, NULL, 0) != 1)
278 alvherre 1050 LBC 0 : pg_fatal("failed to send query: %s", PQerrorMessage(conn));
278 alvherre 1051 CBC 1 : PQsendFlushRequest(conn);
278 alvherre 1052 GBC 1 : res = PQgetResult(conn);
278 alvherre 1053 CBC 1 : if (res == NULL)
278 alvherre 1054 UBC 0 : pg_fatal("unexpected NULL result received");
278 alvherre 1055 CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
278 alvherre 1056 UBC 0 : pg_fatal("unexpected result code %s", PQresStatus(PQresultStatus(res)));
278 alvherre 1057 CBC 1 : if (PQexitPipelineMode(conn) != 1)
278 alvherre 1058 LBC 0 : pg_fatal("failed to exit pipeline mode: %s", PQerrorMessage(conn));
198 alvherre 1059 GIC 1 : fprintf(stderr, "ok - 2\n");
278 1060 1 : }
278 alvherre 1061 ECB :
1062 : static void
755 alvherre 1063 CBC 1 : test_simple_pipeline(PGconn *conn)
755 alvherre 1064 ECB : {
755 alvherre 1065 CBC 1 : PGresult *res = NULL;
755 alvherre 1066 GIC 1 : const char *dummy_params[1] = {"1"};
755 alvherre 1067 CBC 1 : Oid dummy_param_oids[1] = {INT4OID};
1068 :
755 alvherre 1069 GIC 1 : fprintf(stderr, "simple pipeline... ");
1070 :
1071 : /*
1072 : * Enter pipeline mode and dispatch a set of operations, which we'll then
1073 : * process the results of as they come in.
1074 : *
1075 : * For a simple case we should be able to do this without interim
1076 : * processing of results since our output buffer will give us enough slush
755 alvherre 1077 ECB : * to work with and we won't block on sending. So blocking mode is fine.
755 alvherre 1078 EUB : */
755 alvherre 1079 GIC 1 : if (PQisnonblocking(conn))
755 alvherre 1080 LBC 0 : pg_fatal("Expected blocking connection mode");
755 alvherre 1081 EUB :
755 alvherre 1082 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 1083 LBC 0 : pg_fatal("failed to enter pipeline mode: %s", PQerrorMessage(conn));
1084 :
755 alvherre 1085 GIC 1 : if (PQsendQueryParams(conn, "SELECT $1",
755 alvherre 1086 EUB : 1, dummy_param_oids, dummy_params,
1087 : NULL, NULL, 0) != 1)
755 alvherre 1088 LBC 0 : pg_fatal("dispatching SELECT failed: %s", PQerrorMessage(conn));
755 alvherre 1089 EUB :
755 alvherre 1090 GIC 1 : if (PQexitPipelineMode(conn) != 0)
755 alvherre 1091 LBC 0 : pg_fatal("exiting pipeline mode with work in progress should fail, but succeeded");
755 alvherre 1092 EUB :
755 alvherre 1093 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 1094 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 1095 ECB :
755 alvherre 1096 GBC 1 : res = PQgetResult(conn);
755 alvherre 1097 GIC 1 : if (res == NULL)
755 alvherre 1098 UIC 0 : pg_fatal("PQgetResult returned null when there's a pipeline item: %s",
755 alvherre 1099 ECB : PQerrorMessage(conn));
755 alvherre 1100 EUB :
755 alvherre 1101 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
755 alvherre 1102 UIC 0 : pg_fatal("Unexpected result code %s from first pipeline item",
755 alvherre 1103 ECB : PQresStatus(PQresultStatus(res)));
1104 :
755 alvherre 1105 GIC 1 : PQclear(res);
755 alvherre 1106 CBC 1 : res = NULL;
755 alvherre 1107 EUB :
755 alvherre 1108 GIC 1 : if (PQgetResult(conn) != NULL)
755 alvherre 1109 UIC 0 : pg_fatal("PQgetResult returned something extra after first query result.");
1110 :
1111 : /*
1112 : * Even though we've processed the result there's still a sync to come and
755 alvherre 1113 ECB : * we can't exit pipeline mode yet
755 alvherre 1114 EUB : */
755 alvherre 1115 GIC 1 : if (PQexitPipelineMode(conn) != 0)
755 alvherre 1116 LBC 0 : pg_fatal("exiting pipeline mode after query but before sync succeeded incorrectly");
755 alvherre 1117 ECB :
755 alvherre 1118 GBC 1 : res = PQgetResult(conn);
755 alvherre 1119 GIC 1 : if (res == NULL)
755 alvherre 1120 UIC 0 : pg_fatal("PQgetResult returned null when sync result PGRES_PIPELINE_SYNC expected: %s",
755 alvherre 1121 ECB : PQerrorMessage(conn));
755 alvherre 1122 EUB :
755 alvherre 1123 GIC 1 : if (PQresultStatus(res) != PGRES_PIPELINE_SYNC)
755 alvherre 1124 UIC 0 : pg_fatal("Unexpected result code %s instead of PGRES_PIPELINE_SYNC, error: %s",
755 alvherre 1125 ECB : PQresStatus(PQresultStatus(res)), PQerrorMessage(conn));
1126 :
755 alvherre 1127 GIC 1 : PQclear(res);
755 alvherre 1128 CBC 1 : res = NULL;
755 alvherre 1129 EUB :
755 alvherre 1130 GIC 1 : if (PQgetResult(conn) != NULL)
755 alvherre 1131 UIC 0 : pg_fatal("PQgetResult returned something extra after pipeline end: %s",
1132 : PQresStatus(PQresultStatus(res)));
755 alvherre 1133 ECB :
755 alvherre 1134 EUB : /* We're still in pipeline mode... */
755 alvherre 1135 GIC 1 : if (PQpipelineStatus(conn) == PQ_PIPELINE_OFF)
755 alvherre 1136 UIC 0 : pg_fatal("Fell out of pipeline mode somehow");
755 alvherre 1137 ECB :
755 alvherre 1138 EUB : /* ... until we end it, which we can safely do now */
755 alvherre 1139 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 1140 UIC 0 : pg_fatal("attempt to exit pipeline mode failed when it should've succeeded: %s",
755 alvherre 1141 ECB : PQerrorMessage(conn));
755 alvherre 1142 EUB :
755 alvherre 1143 GIC 1 : if (PQpipelineStatus(conn) != PQ_PIPELINE_OFF)
755 alvherre 1144 LBC 0 : pg_fatal("Exiting pipeline mode didn't seem to work");
755 alvherre 1145 ECB :
755 alvherre 1146 GIC 1 : fprintf(stderr, "ok\n");
1147 1 : }
755 alvherre 1148 ECB :
1149 : static void
755 alvherre 1150 GIC 1 : test_singlerowmode(PGconn *conn)
1151 : {
755 alvherre 1152 ECB : PGresult *res;
1153 : int i;
755 alvherre 1154 CBC 1 : bool pipeline_ended = false;
755 alvherre 1155 EUB :
755 alvherre 1156 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 1157 UIC 0 : pg_fatal("failed to enter pipeline mode: %s",
1158 : PQerrorMessage(conn));
755 alvherre 1159 ECB :
1160 : /* One series of three commands, using single-row mode for the first two. */
755 alvherre 1161 GIC 4 : for (i = 0; i < 3; i++)
1162 : {
755 alvherre 1163 ECB : char *param[1];
1164 :
755 alvherre 1165 CBC 3 : param[0] = psprintf("%d", 44 + i);
1166 :
755 alvherre 1167 GIC 3 : if (PQsendQueryParams(conn,
1168 : "SELECT generate_series(42, $1)",
1169 : 1,
1170 : NULL,
1171 : (const char **) param,
1172 : NULL,
755 alvherre 1173 EUB : NULL,
1174 : 0) != 1)
755 alvherre 1175 LBC 0 : pg_fatal("failed to send query: %s",
1176 : PQerrorMessage(conn));
755 alvherre 1177 CBC 3 : pfree(param[0]);
755 alvherre 1178 EUB : }
755 alvherre 1179 GIC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 1180 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
1181 :
755 alvherre 1182 CBC 5 : for (i = 0; !pipeline_ended; i++)
1183 : {
1184 4 : bool first = true;
1185 : bool saw_ending_tuplesok;
755 alvherre 1186 GIC 4 : bool isSingleTuple = false;
755 alvherre 1187 ECB :
1188 : /* Set single row mode for only first 2 SELECT queries */
755 alvherre 1189 CBC 4 : if (i < 2)
755 alvherre 1190 EUB : {
755 alvherre 1191 GIC 2 : if (PQsetSingleRowMode(conn) != 1)
755 alvherre 1192 UIC 0 : pg_fatal("PQsetSingleRowMode() failed for i=%d", i);
1193 : }
755 alvherre 1194 ECB :
1195 : /* Consume rows for this query */
755 alvherre 1196 GIC 4 : saw_ending_tuplesok = false;
755 alvherre 1197 CBC 14 : while ((res = PQgetResult(conn)) != NULL)
1198 : {
1199 11 : ExecStatusType est = PQresultStatus(res);
1200 :
1201 11 : if (est == PGRES_PIPELINE_SYNC)
755 alvherre 1202 ECB : {
755 alvherre 1203 CBC 1 : fprintf(stderr, "end of pipeline reached\n");
1204 1 : pipeline_ended = true;
755 alvherre 1205 GBC 1 : PQclear(res);
755 alvherre 1206 CBC 1 : if (i != 3)
755 alvherre 1207 UIC 0 : pg_fatal("Expected three results, got %d", i);
755 alvherre 1208 GIC 1 : break;
1209 : }
755 alvherre 1210 ECB :
1211 : /* Expect SINGLE_TUPLE for queries 0 and 1, TUPLES_OK for 2 */
755 alvherre 1212 CBC 10 : if (first)
755 alvherre 1213 EUB : {
755 alvherre 1214 GIC 3 : if (i <= 1 && est != PGRES_SINGLE_TUPLE)
755 alvherre 1215 LBC 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE for query %d, got %s",
755 alvherre 1216 EUB : i, PQresStatus(est));
755 alvherre 1217 GIC 3 : if (i >= 2 && est != PGRES_TUPLES_OK)
755 alvherre 1218 LBC 0 : pg_fatal("Expected PGRES_TUPLES_OK for query %d, got %s",
1219 : i, PQresStatus(est));
755 alvherre 1220 GIC 3 : first = false;
755 alvherre 1221 ECB : }
1222 :
755 alvherre 1223 GIC 10 : fprintf(stderr, "Result status %s for query %d", PQresStatus(est), i);
755 alvherre 1224 CBC 10 : switch (est)
755 alvherre 1225 ECB : {
755 alvherre 1226 CBC 3 : case PGRES_TUPLES_OK:
1227 3 : fprintf(stderr, ", tuples: %d\n", PQntuples(res));
755 alvherre 1228 GIC 3 : saw_ending_tuplesok = true;
755 alvherre 1229 CBC 3 : if (isSingleTuple)
755 alvherre 1230 ECB : {
755 alvherre 1231 GIC 2 : if (PQntuples(res) == 0)
755 alvherre 1232 GBC 2 : fprintf(stderr, "all tuples received in query %d\n", i);
1233 : else
755 alvherre 1234 LBC 0 : pg_fatal("Expected to follow PGRES_SINGLE_TUPLE, but received PGRES_TUPLES_OK directly instead");
1235 : }
755 alvherre 1236 CBC 3 : break;
755 alvherre 1237 ECB :
755 alvherre 1238 CBC 7 : case PGRES_SINGLE_TUPLE:
1239 7 : isSingleTuple = true;
755 alvherre 1240 GIC 7 : fprintf(stderr, ", %d tuple: %s\n", PQntuples(res), PQgetvalue(res, 0, 0));
755 alvherre 1241 GBC 7 : break;
755 alvherre 1242 EUB :
755 alvherre 1243 UIC 0 : default:
755 alvherre 1244 LBC 0 : pg_fatal("unexpected");
1245 : }
755 alvherre 1246 CBC 10 : PQclear(res);
755 alvherre 1247 EUB : }
755 alvherre 1248 GIC 4 : if (!pipeline_ended && !saw_ending_tuplesok)
755 alvherre 1249 UIC 0 : pg_fatal("didn't get expected terminating TUPLES_OK");
1250 : }
1251 :
1252 : /*
1253 : * Now issue one command, get its results in with single-row mode, then
1254 : * issue another command, and get its results in normal mode; make sure
177 alvherre 1255 ECB : * the single-row mode flag is reset as expected.
1256 : */
177 alvherre 1257 GBC 1 : if (PQsendQueryParams(conn, "SELECT generate_series(0, 0)",
1258 : 0, NULL, NULL, NULL, NULL, 0) != 1)
177 alvherre 1259 LBC 0 : pg_fatal("failed to send query: %s",
177 alvherre 1260 EUB : PQerrorMessage(conn));
177 alvherre 1261 CBC 1 : if (PQsendFlushRequest(conn) != 1)
177 alvherre 1262 UBC 0 : pg_fatal("failed to send flush request");
177 alvherre 1263 CBC 1 : if (PQsetSingleRowMode(conn) != 1)
177 alvherre 1264 LBC 0 : pg_fatal("PQsetSingleRowMode() failed");
177 alvherre 1265 GBC 1 : res = PQgetResult(conn);
177 alvherre 1266 CBC 1 : if (res == NULL)
177 alvherre 1267 UBC 0 : pg_fatal("unexpected NULL");
177 alvherre 1268 GIC 1 : if (PQresultStatus(res) != PGRES_SINGLE_TUPLE)
177 alvherre 1269 LBC 0 : pg_fatal("Expected PGRES_SINGLE_TUPLE, got %s",
177 alvherre 1270 ECB : PQresStatus(PQresultStatus(res)));
177 alvherre 1271 GBC 1 : res = PQgetResult(conn);
177 alvherre 1272 CBC 1 : if (res == NULL)
177 alvherre 1273 UBC 0 : pg_fatal("unexpected NULL");
177 alvherre 1274 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
177 alvherre 1275 LBC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
177 alvherre 1276 EUB : PQresStatus(PQresultStatus(res)));
177 alvherre 1277 GIC 1 : if (PQgetResult(conn) != NULL)
177 alvherre 1278 LBC 0 : pg_fatal("expected NULL result");
1279 :
177 alvherre 1280 GBC 1 : if (PQsendQueryParams(conn, "SELECT 1",
1281 : 0, NULL, NULL, NULL, NULL, 0) != 1)
177 alvherre 1282 LBC 0 : pg_fatal("failed to send query: %s",
177 alvherre 1283 EUB : PQerrorMessage(conn));
177 alvherre 1284 CBC 1 : if (PQsendFlushRequest(conn) != 1)
177 alvherre 1285 LBC 0 : pg_fatal("failed to send flush request");
177 alvherre 1286 GBC 1 : res = PQgetResult(conn);
177 alvherre 1287 CBC 1 : if (res == NULL)
177 alvherre 1288 UBC 0 : pg_fatal("unexpected NULL");
177 alvherre 1289 GIC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
177 alvherre 1290 LBC 0 : pg_fatal("Expected PGRES_TUPLES_OK, got %s",
177 alvherre 1291 EUB : PQresStatus(PQresultStatus(res)));
177 alvherre 1292 GIC 1 : if (PQgetResult(conn) != NULL)
177 alvherre 1293 LBC 0 : pg_fatal("expected NULL result");
177 alvherre 1294 EUB :
755 alvherre 1295 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 1296 LBC 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
278 alvherre 1297 ECB :
278 alvherre 1298 GIC 1 : fprintf(stderr, "ok\n");
755 1299 1 : }
1300 :
1301 : /*
1302 : * Simple test to verify that a pipeline is discarded as a whole when there's
1303 : * an error, ignoring transaction commands.
755 alvherre 1304 ECB : */
1305 : static void
755 alvherre 1306 GIC 1 : test_transaction(PGconn *conn)
1307 : {
755 alvherre 1308 ECB : PGresult *res;
1309 : bool expect_null;
755 alvherre 1310 CBC 1 : int num_syncs = 0;
1311 :
1312 1 : res = PQexec(conn, "DROP TABLE IF EXISTS pq_pipeline_tst;"
755 alvherre 1313 EUB : "CREATE TABLE pq_pipeline_tst (id int)");
755 alvherre 1314 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
755 alvherre 1315 LBC 0 : pg_fatal("failed to create test table: %s",
1316 : PQerrorMessage(conn));
755 alvherre 1317 CBC 1 : PQclear(res);
755 alvherre 1318 EUB :
755 alvherre 1319 GIC 1 : if (PQenterPipelineMode(conn) != 1)
755 alvherre 1320 LBC 0 : pg_fatal("failed to enter pipeline mode: %s",
755 alvherre 1321 EUB : PQerrorMessage(conn));
755 alvherre 1322 GIC 1 : if (PQsendPrepare(conn, "rollback", "ROLLBACK", 0, NULL) != 1)
755 alvherre 1323 UIC 0 : pg_fatal("could not send prepare on pipeline: %s",
755 alvherre 1324 ECB : PQerrorMessage(conn));
1325 :
755 alvherre 1326 GIC 1 : if (PQsendQueryParams(conn,
755 alvherre 1327 EUB : "BEGIN",
1328 : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 1329 LBC 0 : pg_fatal("failed to send query: %s",
1330 : PQerrorMessage(conn));
755 alvherre 1331 GIC 1 : if (PQsendQueryParams(conn,
755 alvherre 1332 EUB : "SELECT 0/0",
1333 : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 1334 UIC 0 : pg_fatal("failed to send query: %s",
1335 : PQerrorMessage(conn));
1336 :
1337 : /*
1338 : * send a ROLLBACK using a prepared stmt. Doesn't work because we need to
755 alvherre 1339 ECB : * get out of the pipeline-aborted state first.
755 alvherre 1340 EUB : */
755 alvherre 1341 GIC 1 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
755 alvherre 1342 UIC 0 : pg_fatal("failed to execute prepared: %s",
1343 : PQerrorMessage(conn));
755 alvherre 1344 ECB :
1345 : /* This insert fails because we're in pipeline-aborted state */
755 alvherre 1346 GIC 1 : if (PQsendQueryParams(conn,
755 alvherre 1347 EUB : "INSERT INTO pq_pipeline_tst VALUES (1)",
1348 : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 1349 LBC 0 : pg_fatal("failed to send query: %s",
755 alvherre 1350 EUB : PQerrorMessage(conn));
755 alvherre 1351 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 1352 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 1353 GIC 1 : num_syncs++;
1354 :
1355 : /*
1356 : * This insert fails even though the pipeline got a SYNC, because we're in
755 alvherre 1357 ECB : * an aborted transaction
1358 : */
755 alvherre 1359 GIC 1 : if (PQsendQueryParams(conn,
755 alvherre 1360 EUB : "INSERT INTO pq_pipeline_tst VALUES (2)",
1361 : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 1362 LBC 0 : pg_fatal("failed to send query: %s",
755 alvherre 1363 EUB : PQerrorMessage(conn));
755 alvherre 1364 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 1365 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 1366 GIC 1 : num_syncs++;
1367 :
1368 : /*
1369 : * Send ROLLBACK using prepared stmt. This one works because we just did
755 alvherre 1370 ECB : * PQpipelineSync above.
755 alvherre 1371 EUB : */
755 alvherre 1372 GIC 1 : if (PQsendQueryPrepared(conn, "rollback", 0, NULL, NULL, NULL, 1) != 1)
755 alvherre 1373 UIC 0 : pg_fatal("failed to execute prepared: %s",
1374 : PQerrorMessage(conn));
1375 :
1376 : /*
1377 : * Now that we're out of a transaction and in pipeline-good mode, this
755 alvherre 1378 ECB : * insert works
1379 : */
755 alvherre 1380 GIC 1 : if (PQsendQueryParams(conn,
755 alvherre 1381 EUB : "INSERT INTO pq_pipeline_tst VALUES (3)",
1382 : 0, NULL, NULL, NULL, NULL, 0) != 1)
755 alvherre 1383 UIC 0 : pg_fatal("failed to send query: %s",
755 alvherre 1384 ECB : PQerrorMessage(conn));
755 alvherre 1385 EUB : /* Send two syncs now -- match up to SYNC messages below */
755 alvherre 1386 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 1387 LBC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 1388 GBC 1 : num_syncs++;
755 alvherre 1389 CBC 1 : if (PQpipelineSync(conn) != 1)
755 alvherre 1390 UIC 0 : pg_fatal("pipeline sync failed: %s", PQerrorMessage(conn));
755 alvherre 1391 CBC 1 : num_syncs++;
755 alvherre 1392 ECB :
755 alvherre 1393 CBC 1 : expect_null = false;
755 alvherre 1394 GIC 1 : for (int i = 0;; i++)
1395 19 : {
755 alvherre 1396 ECB : ExecStatusType restype;
1397 :
755 alvherre 1398 GIC 20 : res = PQgetResult(conn);
755 alvherre 1399 CBC 20 : if (res == NULL)
755 alvherre 1400 ECB : {
755 alvherre 1401 GBC 8 : printf("%d: got NULL result\n", i);
755 alvherre 1402 CBC 8 : if (!expect_null)
755 alvherre 1403 LBC 0 : pg_fatal("did not expect NULL here");
755 alvherre 1404 GIC 8 : expect_null = false;
755 alvherre 1405 CBC 8 : continue;
755 alvherre 1406 ECB : }
755 alvherre 1407 CBC 12 : restype = PQresultStatus(res);
755 alvherre 1408 GBC 12 : printf("%d: got status %s", i, PQresStatus(restype));
755 alvherre 1409 CBC 12 : if (expect_null)
755 alvherre 1410 LBC 0 : pg_fatal("expected NULL");
755 alvherre 1411 CBC 12 : if (restype == PGRES_FATAL_ERROR)
755 alvherre 1412 GIC 2 : printf("; error: %s", PQerrorMessage(conn));
755 alvherre 1413 CBC 10 : else if (restype == PGRES_PIPELINE_ABORTED)
1414 : {
755 alvherre 1415 GIC 2 : printf(": command didn't run because pipeline aborted\n");
755 alvherre 1416 ECB : }
1417 : else
755 alvherre 1418 GIC 8 : printf("\n");
755 alvherre 1419 CBC 12 : PQclear(res);
755 alvherre 1420 ECB :
755 alvherre 1421 GIC 12 : if (restype == PGRES_PIPELINE_SYNC)
755 alvherre 1422 CBC 4 : num_syncs--;
755 alvherre 1423 ECB : else
755 alvherre 1424 CBC 8 : expect_null = true;
755 alvherre 1425 GIC 12 : if (num_syncs <= 0)
755 alvherre 1426 CBC 1 : break;
755 alvherre 1427 EUB : }
755 alvherre 1428 GIC 1 : if (PQgetResult(conn) != NULL)
755 alvherre 1429 UIC 0 : pg_fatal("returned something extra after all the syncs: %s",
755 alvherre 1430 ECB : PQresStatus(PQresultStatus(res)));
755 alvherre 1431 EUB :
755 alvherre 1432 GIC 1 : if (PQexitPipelineMode(conn) != 1)
755 alvherre 1433 UIC 0 : pg_fatal("failed to end pipeline mode: %s", PQerrorMessage(conn));
755 alvherre 1434 ECB :
1435 : /* We expect to find one tuple containing the value "3" */
755 alvherre 1436 GBC 1 : res = PQexec(conn, "SELECT * FROM pq_pipeline_tst");
755 alvherre 1437 CBC 1 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
755 alvherre 1438 UBC 0 : pg_fatal("failed to obtain result: %s", PQerrorMessage(conn));
755 alvherre 1439 CBC 1 : if (PQntuples(res) != 1)
755 alvherre 1440 UBC 0 : pg_fatal("did not get 1 tuple");
755 alvherre 1441 CBC 1 : if (strcmp(PQgetvalue(res, 0, 0), "3") != 0)
755 alvherre 1442 UIC 0 : pg_fatal("did not get expected tuple");
755 alvherre 1443 CBC 1 : PQclear(res);
755 alvherre 1444 ECB :
755 alvherre 1445 GIC 1 : fprintf(stderr, "ok\n");
1446 1 : }
1447 :
1448 : /*
1449 : * In this test mode we send a stream of queries, with one in the middle
1450 : * causing an error. Verify that we can still send some more after the
1451 : * error and have libpq work properly.
639 alvherre 1452 ECB : */
1453 : static void
639 alvherre 1454 CBC 1 : test_uniqviol(PGconn *conn)
1455 : {
1456 1 : int sock = PQsocket(conn);
1457 : PGresult *res;
639 alvherre 1458 GIC 1 : Oid paramTypes[2] = {INT8OID, INT8OID};
1459 : const char *paramValues[2];
639 alvherre 1460 ECB : char paramValue0[MAXINT8LEN];
1461 : char paramValue1[MAXINT8LEN];
639 alvherre 1462 CBC 1 : int ctr = 0;
1463 1 : int numsent = 0;
1464 1 : int results = 0;
1465 1 : bool read_done = false;
1466 1 : bool write_done = false;
1467 1 : bool error_sent = false;
1468 1 : bool got_error = false;
639 alvherre 1469 GIC 1 : int switched = 0;
1470 1 : int socketful = 0;
1471 : fd_set in_fds;
639 alvherre 1472 ECB : fd_set out_fds;
1473 :
639 alvherre 1474 CBC 1 : fprintf(stderr, "uniqviol ...");
1475 :
1476 1 : PQsetnonblocking(conn, 1);
639 alvherre 1477 ECB :
639 alvherre 1478 CBC 1 : paramValues[0] = paramValue0;
639 alvherre 1479 GIC 1 : paramValues[1] = paramValue1;
639 alvherre 1480 CBC 1 : sprintf(paramValue1, "42");
1481 :
1482 1 : res = PQexec(conn, "drop table if exists ppln_uniqviol;"
639 alvherre 1483 EUB : "create table ppln_uniqviol(id bigint primary key, idata bigint)");
639 alvherre 1484 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
639 alvherre 1485 LBC 0 : pg_fatal("failed to create table: %s", PQerrorMessage(conn));
639 alvherre 1486 ECB :
639 alvherre 1487 GBC 1 : res = PQexec(conn, "begin");
639 alvherre 1488 GIC 1 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
639 alvherre 1489 LBC 0 : pg_fatal("failed to begin transaction: %s", PQerrorMessage(conn));
1490 :
639 alvherre 1491 GIC 1 : res = PQprepare(conn, "insertion",
639 alvherre 1492 ECB : "insert into ppln_uniqviol values ($1, $2) returning id",
639 alvherre 1493 EUB : 2, paramTypes);
639 alvherre 1494 GIC 1 : if (res == NULL || PQresultStatus(res) != PGRES_COMMAND_OK)
639 alvherre 1495 LBC 0 : pg_fatal("failed to prepare query: %s", PQerrorMessage(conn));
639 alvherre 1496 EUB :
639 alvherre 1497 GIC 1 : if (PQenterPipelineMode(conn) != 1)
639 alvherre 1498 LBC 0 : pg_fatal("failed to enter pipeline mode");
1499 :
639 alvherre 1500 GIC 8 : while (!read_done)
1501 : {
1502 : /*
1503 : * Avoid deadlocks by reading everything the server has sent before
1504 : * sending anything. (Special precaution is needed here to process
1505 : * PQisBusy before testing the socket for read-readiness, because the
1506 : * socket does not turn read-ready after "sending" queries in aborted
639 alvherre 1507 ECB : * pipeline mode.)
1508 : */
639 alvherre 1509 GIC 606 : while (PQisBusy(conn) == 0)
1510 : {
639 alvherre 1511 ECB : bool new_error;
1512 :
639 alvherre 1513 CBC 601 : if (results >= numsent)
639 alvherre 1514 EUB : {
639 alvherre 1515 CBC 1 : if (write_done)
639 alvherre 1516 UIC 0 : read_done = true;
639 alvherre 1517 GIC 1 : break;
639 alvherre 1518 ECB : }
1519 :
639 alvherre 1520 CBC 600 : res = PQgetResult(conn);
639 alvherre 1521 GBC 600 : new_error = process_result(conn, res, results, numsent);
639 alvherre 1522 CBC 600 : if (new_error && got_error)
639 alvherre 1523 LBC 0 : pg_fatal("got two errors");
639 alvherre 1524 GIC 600 : got_error |= new_error;
639 alvherre 1525 CBC 600 : if (results++ >= numsent - 1)
639 alvherre 1526 ECB : {
639 alvherre 1527 CBC 2 : if (write_done)
639 alvherre 1528 GIC 1 : read_done = true;
1529 2 : break;
1530 : }
639 alvherre 1531 ECB : }
1532 :
639 alvherre 1533 GIC 8 : if (read_done)
639 alvherre 1534 CBC 1 : break;
639 alvherre 1535 ECB :
639 alvherre 1536 GIC 119 : FD_ZERO(&out_fds);
639 alvherre 1537 CBC 7 : FD_SET(sock, &out_fds);
639 alvherre 1538 ECB :
639 alvherre 1539 GIC 119 : FD_ZERO(&in_fds);
639 alvherre 1540 CBC 7 : FD_SET(sock, &in_fds);
1541 :
639 alvherre 1542 GBC 7 : if (select(sock + 1, &in_fds, write_done ? NULL : &out_fds, NULL, NULL) == -1)
639 alvherre 1543 EUB : {
639 alvherre 1544 UBC 0 : if (errno == EINTR)
639 alvherre 1545 UIC 0 : continue;
1546 0 : pg_fatal("select() failed: %m");
639 alvherre 1547 ECB : }
639 alvherre 1548 EUB :
639 alvherre 1549 GIC 7 : if (FD_ISSET(sock, &in_fds) && PQconsumeInput(conn) == 0)
639 alvherre 1550 UIC 0 : pg_fatal("PQconsumeInput failed: %s", PQerrorMessage(conn));
1551 :
1552 : /*
1553 : * If the socket is writable and we haven't finished sending queries,
639 alvherre 1554 ECB : * send some.
1555 : */
639 alvherre 1556 GIC 7 : if (!write_done && FD_ISSET(sock, &out_fds))
639 alvherre 1557 ECB : {
1558 : for (;;)
639 alvherre 1559 GIC 597 : {
1560 : int flush;
1561 :
1562 : /*
1563 : * provoke uniqueness violation exactly once after having
639 alvherre 1564 ECB : * switched to read mode.
1565 : */
639 alvherre 1566 CBC 600 : if (switched >= 1 && !error_sent && ctr % socketful >= socketful / 2)
639 alvherre 1567 ECB : {
639 alvherre 1568 CBC 1 : sprintf(paramValue0, "%d", numsent / 2);
639 alvherre 1569 GIC 1 : fprintf(stderr, "E");
1570 1 : error_sent = true;
1571 : }
639 alvherre 1572 ECB : else
1573 : {
639 alvherre 1574 GIC 599 : fprintf(stderr, ".");
1575 599 : sprintf(paramValue0, "%d", ctr++);
639 alvherre 1576 ECB : }
639 alvherre 1577 EUB :
639 alvherre 1578 CBC 600 : if (PQsendQueryPrepared(conn, "insertion", 2, paramValues, NULL, NULL, 0) != 1)
639 alvherre 1579 UIC 0 : pg_fatal("failed to execute prepared query: %s", PQerrorMessage(conn));
639 alvherre 1580 GIC 600 : numsent++;
639 alvherre 1581 ECB :
1582 : /* Are we done writing? */
639 alvherre 1583 CBC 600 : if (socketful != 0 && numsent % socketful == 42 && error_sent)
639 alvherre 1584 EUB : {
639 alvherre 1585 CBC 1 : if (PQsendFlushRequest(conn) != 1)
639 alvherre 1586 LBC 0 : pg_fatal("failed to send flush request");
639 alvherre 1587 CBC 1 : write_done = true;
1588 1 : fprintf(stderr, "\ndone writing\n");
639 alvherre 1589 GIC 1 : PQflush(conn);
1590 1 : break;
1591 : }
639 alvherre 1592 ECB :
1593 : /* is the outgoing socket full? */
639 alvherre 1594 GBC 599 : flush = PQflush(conn);
639 alvherre 1595 CBC 599 : if (flush == -1)
639 alvherre 1596 UIC 0 : pg_fatal("failed to flush: %s", PQerrorMessage(conn));
639 alvherre 1597 CBC 599 : if (flush == 1)
639 alvherre 1598 ECB : {
639 alvherre 1599 CBC 2 : if (socketful == 0)
1600 1 : socketful = numsent;
1601 2 : fprintf(stderr, "\nswitch to reading\n");
639 alvherre 1602 GIC 2 : switched++;
1603 2 : break;
1604 : }
1605 : }
1606 : }
639 alvherre 1607 ECB : }
639 alvherre 1608 EUB :
639 alvherre 1609 GIC 1 : if (!got_error)
639 alvherre 1610 LBC 0 : pg_fatal("did not get expected error");
639 alvherre 1611 ECB :
639 alvherre 1612 GIC 1 : fprintf(stderr, "ok\n");
1613 1 : }
1614 :
1615 : /*
1616 : * Subroutine for test_uniqviol; given a PGresult, print it out and consume
1617 : * the expected NULL that should follow it.
1618 : *
1619 : * Returns true if we read a fatal error message, otherwise false.
639 alvherre 1620 ECB : */
1621 : static bool
639 alvherre 1622 GIC 600 : process_result(PGconn *conn, PGresult *res, int results, int numsent)
639 alvherre 1623 ECB : {
1624 : PGresult *res2;
639 alvherre 1625 CBC 600 : bool got_error = false;
639 alvherre 1626 EUB :
639 alvherre 1627 GIC 600 : if (res == NULL)
639 alvherre 1628 LBC 0 : pg_fatal("got unexpected NULL");
1629 :
639 alvherre 1630 CBC 600 : switch (PQresultStatus(res))
639 alvherre 1631 ECB : {
639 alvherre 1632 CBC 1 : case PGRES_FATAL_ERROR:
1633 1 : got_error = true;
639 alvherre 1634 GIC 1 : fprintf(stderr, "result %d/%d (error): %s\n", results, numsent, PQerrorMessage(conn));
639 alvherre 1635 CBC 1 : PQclear(res);
639 alvherre 1636 ECB :
639 alvherre 1637 GBC 1 : res2 = PQgetResult(conn);
639 alvherre 1638 GIC 1 : if (res2 != NULL)
639 alvherre 1639 LBC 0 : pg_fatal("expected NULL, got %s",
1640 : PQresStatus(PQresultStatus(res2)));
639 alvherre 1641 CBC 1 : break;
639 alvherre 1642 ECB :
639 alvherre 1643 CBC 418 : case PGRES_TUPLES_OK:
639 alvherre 1644 GIC 418 : fprintf(stderr, "result %d/%d: %s\n", results, numsent, PQgetvalue(res, 0, 0));
639 alvherre 1645 CBC 418 : PQclear(res);
639 alvherre 1646 ECB :
639 alvherre 1647 GBC 418 : res2 = PQgetResult(conn);
639 alvherre 1648 GIC 418 : if (res2 != NULL)
639 alvherre 1649 LBC 0 : pg_fatal("expected NULL, got %s",
1650 : PQresStatus(PQresultStatus(res2)));
639 alvherre 1651 CBC 418 : break;
639 alvherre 1652 ECB :
639 alvherre 1653 CBC 181 : case PGRES_PIPELINE_ABORTED:
1654 181 : fprintf(stderr, "result %d/%d: pipeline aborted\n", results, numsent);
639 alvherre 1655 GBC 181 : res2 = PQgetResult(conn);
639 alvherre 1656 GIC 181 : if (res2 != NULL)
639 alvherre 1657 LBC 0 : pg_fatal("expected NULL, got %s",
1658 : PQresStatus(PQresultStatus(res2)));
639 alvherre 1659 GBC 181 : break;
639 alvherre 1660 EUB :
639 alvherre 1661 UIC 0 : default:
1662 0 : pg_fatal("got unexpected %s", PQresStatus(PQresultStatus(res)));
639 alvherre 1663 ECB : }
1664 :
639 alvherre 1665 GIC 600 : return got_error;
1666 : }
1667 :
639 alvherre 1668 EUB :
1669 : static void
755 alvherre 1670 UBC 0 : usage(const char *progname)
755 alvherre 1671 EUB : {
755 alvherre 1672 UBC 0 : fprintf(stderr, "%s tests libpq's pipeline mode.\n\n", progname);
1673 0 : fprintf(stderr, "Usage:\n");
740 1674 0 : fprintf(stderr, " %s [OPTION] tests\n", progname);
738 1675 0 : fprintf(stderr, " %s [OPTION] TESTNAME [CONNINFO]\n", progname);
740 1676 0 : fprintf(stderr, "\nOptions:\n");
1677 0 : fprintf(stderr, " -t TRACEFILE generate a libpq trace to TRACEFILE\n");
738 alvherre 1678 UIC 0 : fprintf(stderr, " -r NUMROWS use NUMROWS as the test size\n");
755 1679 0 : }
755 alvherre 1680 ECB :
1681 : static void
755 alvherre 1682 CBC 1 : print_test_list(void)
755 alvherre 1683 ECB : {
755 alvherre 1684 CBC 1 : printf("disallowed_in_pipeline\n");
1685 1 : printf("multi_pipelines\n");
649 1686 1 : printf("nosync\n");
755 1687 1 : printf("pipeline_abort\n");
278 1688 1 : printf("pipeline_idle\n");
755 1689 1 : printf("pipelined_insert\n");
1690 1 : printf("prepared\n");
1691 1 : printf("simple_pipeline\n");
1692 1 : printf("singlerow\n");
1693 1 : printf("transaction\n");
639 alvherre 1694 GIC 1 : printf("uniqviol\n");
755 1695 1 : }
755 alvherre 1696 ECB :
1697 : int
755 alvherre 1698 CBC 12 : main(int argc, char **argv)
1699 : {
755 alvherre 1700 GIC 12 : const char *conninfo = "";
1701 : PGconn *conn;
740 alvherre 1702 ECB : FILE *trace;
1703 : char *testname;
755 alvherre 1704 GIC 12 : int numrows = 10000;
1705 : PGresult *res;
740 alvherre 1706 ECB : int c;
1707 :
118 peter 1708 GNC 44 : while ((c = getopt(argc, argv, "r:t:")) != -1)
1709 : {
740 alvherre 1710 CBC 20 : switch (c)
740 alvherre 1711 ECB : {
738 alvherre 1712 GBC 11 : case 'r': /* numrows */
738 alvherre 1713 GIC 11 : errno = 0;
738 alvherre 1714 GBC 11 : numrows = strtol(optarg, NULL, 10);
738 alvherre 1715 GIC 11 : if (errno != 0 || numrows <= 0)
738 alvherre 1716 ECB : {
738 alvherre 1717 LBC 0 : fprintf(stderr, "couldn't parse \"%s\" as a positive integer\n",
738 alvherre 1718 ECB : optarg);
738 alvherre 1719 LBC 0 : exit(1);
1720 : }
738 alvherre 1721 GIC 11 : break;
118 peter 1722 GNC 9 : case 't': /* trace file */
1723 9 : tracefile = pg_strdup(optarg);
1724 9 : break;
1725 : }
755 alvherre 1726 ECB : }
1727 :
740 alvherre 1728 CBC 12 : if (optind < argc)
740 alvherre 1729 ECB : {
738 alvherre 1730 GIC 12 : testname = pg_strdup(argv[optind]);
740 1731 12 : optind++;
1732 : }
740 alvherre 1733 EUB : else
755 1734 : {
755 alvherre 1735 UIC 0 : usage(argv[0]);
1736 0 : exit(1);
755 alvherre 1737 ECB : }
1738 :
740 alvherre 1739 CBC 12 : if (strcmp(testname, "tests") == 0)
740 alvherre 1740 ECB : {
740 alvherre 1741 GIC 1 : print_test_list();
1742 1 : exit(0);
740 alvherre 1743 ECB : }
1744 :
740 alvherre 1745 CBC 11 : if (optind < argc)
740 alvherre 1746 ECB : {
738 alvherre 1747 GIC 11 : conninfo = pg_strdup(argv[optind]);
740 1748 11 : optind++;
1749 : }
755 alvherre 1750 ECB :
1751 : /* Make a connection to the database */
755 alvherre 1752 GIC 11 : conn = PQconnectdb(conninfo);
755 alvherre 1753 GBC 11 : if (PQstatus(conn) != CONNECTION_OK)
1754 : {
755 alvherre 1755 UBC 0 : fprintf(stderr, "Connection to database failed: %s\n",
1756 : PQerrorMessage(conn));
755 alvherre 1757 UIC 0 : exit_nicely(conn);
755 alvherre 1758 ECB : }
740 1759 :
739 alvherre 1760 GBC 11 : res = PQexec(conn, "SET lc_messages TO \"C\"");
739 alvherre 1761 CBC 11 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
739 alvherre 1762 LBC 0 : pg_fatal("failed to set lc_messages: %s", PQerrorMessage(conn));
53 drowley 1763 GNC 11 : res = PQexec(conn, "SET debug_parallel_query = off");
739 alvherre 1764 GIC 11 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
53 drowley 1765 UNC 0 : pg_fatal("failed to set debug_parallel_query: %s", PQerrorMessage(conn));
739 alvherre 1766 ECB :
1767 : /* Set the trace file, if requested */
740 alvherre 1768 CBC 11 : if (tracefile != NULL)
740 alvherre 1769 EUB : {
278 alvherre 1770 GIC 9 : if (strcmp(tracefile, "-") == 0)
278 alvherre 1771 LBC 0 : trace = stdout;
278 alvherre 1772 ECB : else
278 alvherre 1773 GBC 9 : trace = fopen(tracefile, "w");
740 alvherre 1774 GIC 9 : if (trace == NULL)
740 alvherre 1775 UIC 0 : pg_fatal("could not open file \"%s\": %m", tracefile);
739 alvherre 1776 ECB :
1777 : /* Make it line-buffered */
738 alvherre 1778 CBC 9 : setvbuf(trace, NULL, PG_IOLBF, 0);
739 alvherre 1779 ECB :
740 alvherre 1780 GIC 9 : PQtrace(conn, trace);
668 noah 1781 9 : PQsetTraceFlags(conn,
1782 : PQTRACE_SUPPRESS_TIMESTAMPS | PQTRACE_REGRESS_MODE);
740 alvherre 1783 ECB : }
1784 :
740 alvherre 1785 CBC 11 : if (strcmp(testname, "disallowed_in_pipeline") == 0)
755 1786 1 : test_disallowed_in_pipeline(conn);
740 1787 10 : else if (strcmp(testname, "multi_pipelines") == 0)
755 1788 1 : test_multi_pipelines(conn);
649 1789 9 : else if (strcmp(testname, "nosync") == 0)
1790 1 : test_nosync(conn);
740 1791 8 : else if (strcmp(testname, "pipeline_abort") == 0)
755 1792 1 : test_pipeline_abort(conn);
278 1793 7 : else if (strcmp(testname, "pipeline_idle") == 0)
1794 1 : test_pipeline_idle(conn);
740 1795 6 : else if (strcmp(testname, "pipelined_insert") == 0)
755 1796 1 : test_pipelined_insert(conn, numrows);
740 1797 5 : else if (strcmp(testname, "prepared") == 0)
755 1798 1 : test_prepared(conn);
740 1799 4 : else if (strcmp(testname, "simple_pipeline") == 0)
755 1800 1 : test_simple_pipeline(conn);
740 1801 3 : else if (strcmp(testname, "singlerow") == 0)
755 1802 1 : test_singlerowmode(conn);
740 1803 2 : else if (strcmp(testname, "transaction") == 0)
755 1804 1 : test_transaction(conn);
639 alvherre 1805 GIC 1 : else if (strcmp(testname, "uniqviol") == 0)
1806 1 : test_uniqviol(conn);
755 alvherre 1807 EUB : else
1808 : {
740 alvherre 1809 UIC 0 : fprintf(stderr, "\"%s\" is not a recognized test name\n", testname);
755 1810 0 : exit(1);
1811 : }
755 alvherre 1812 ECB :
1813 : /* close the connection to the database and cleanup */
755 alvherre 1814 GIC 11 : PQfinish(conn);
1815 11 : return 0;
1816 : }
|