Sunday, September 28, 2008

Re: [HACKERS] parallel pg_restore - WIP patch

Index: pg_backup.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup.h,v
retrieving revision 1.47
diff -c -r1.47 pg_backup.h
*** pg_backup.h 13 Apr 2008 03:49:21 -0000 1.47
--- pg_backup.h 29 Sep 2008 02:43:51 -0000
***************
*** 123,128 ****
--- 123,130 ----
int suppressDumpWarnings; /* Suppress output of WARNING entries
* to stderr */
bool single_txn;
+ int number_of_threads;
+ bool truncate_before_load;

bool *idWanted; /* array showing which dump IDs to emit */
} RestoreOptions;
***************
*** 165,170 ****
--- 167,173 ----
extern void CloseArchive(Archive *AH);

extern void RestoreArchive(Archive *AH, RestoreOptions *ropt);
+ extern void RestoreArchiveParallel(Archive *AH, RestoreOptions *ropt);

/* Open an existing archive */
extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
Index: pg_backup_archiver.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.c,v
retrieving revision 1.158
diff -c -r1.158 pg_backup_archiver.c
*** pg_backup_archiver.c 5 Sep 2008 23:53:42 -0000 1.158
--- pg_backup_archiver.c 29 Sep 2008 02:43:52 -0000
***************
*** 27,38 ****
--- 27,50 ----

#include <unistd.h>

+ #include <sys/types.h>
+ #include <sys/wait.h>
+
+
#ifdef WIN32
#include <io.h>
#endif

#include "libpq/libpq-fs.h"

+ typedef struct _parallel_slot
+ {
+ pid_t pid;
+ TocEntry *te;
+ DumpId dumpId;
+ } ParallelSlot;
+
+ #define NO_SLOT (-1)

const char *progname;

***************
*** 70,76 ****
--- 82,99 ----
static void dumpTimestamp(ArchiveHandle *AH, const char *msg, time_t tim);
static OutputContext SetOutput(ArchiveHandle *AH, char *filename, int compression);
static void ResetOutput(ArchiveHandle *AH, OutputContext savedContext);
+ static bool work_is_being_done(ParallelSlot *slot, int n_slots);
+ static int get_next_slot(ParallelSlot *slots, int n_slots);
+ static TocEntry *get_next_work_item(ArchiveHandle *AH);
+ static void prestore(ArchiveHandle *AH, TocEntry *te);
+ static void mark_work_done(ArchiveHandle *AH, pid_t worker, ParallelSlot *slots, int n_slots);
+ static int _restore_one_te(ArchiveHandle *ah, TocEntry *te, RestoreOptions *ropt,bool is_parallel);
+ static void _reduce_dependencies(ArchiveHandle * AH, TocEntry *te);
+ static void _fix_dependency_counts(ArchiveHandle *AH);
+ static void _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te);
+

+ static ArchiveHandle *GAH;

/*
* Wrapper functions.
***************
*** 125,137 ****

/* Public */
void
RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
ArchiveHandle *AH = (ArchiveHandle *) AHX;
TocEntry *te;
teReqs reqs;
OutputContext sav;
- bool defnDumped;

AH->ropt = ropt;
AH->stage = STAGE_INITIALIZING;
--- 148,579 ----

/* Public */
void
+ RestoreArchiveParallel(Archive *AHX, RestoreOptions *ropt)
+ {
+
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+ ParallelSlot *slots;
+ int next_slot;
+ TocEntry *next_work_item = NULL;
+ int work_status;
+ pid_t ret_child;
+ int n_slots = ropt->number_of_threads;
+ TocEntry *te;
+ teReqs reqs;
+
+
+ /* AH->debugLevel = 99; */
+ /* some routines that use ahlog() don't get passed AH */
+ GAH = AH;
+
+ ahlog(AH,1,"entering RestoreARchiveParallel\n");
+
+
+ slots = (ParallelSlot *) calloc(sizeof(ParallelSlot),n_slots);
+ AH->ropt = ropt;
+
+ /*
+ if (ropt->create)
+ die_horribly(AH,modulename,
+ "parallel restore is incompatible with --create\n");
+ */
+
+
+ if (ropt->dropSchema)
+ die_horribly(AH,modulename,
+ "parallel restore is incompatible with --clean\n");
+
+ if (!ropt->useDB)
+ die_horribly(AH,modulename,
+ "parallel restore requires direct database connection\n");
+
+
+ #ifndef HAVE_LIBZ
+
+ /* make sure we won't need (de)compression we haven't got */
+ if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ reqs = _tocEntryRequired(te, ropt, false);
+ if (te->hadDumper && (reqs & REQ_DATA) != 0)
+ die_horribly(AH, modulename,
+ "cannot restore from compressed archive (compression not supported in this installation)\n");
+ }
+ }
+ #endif
+
+ ahlog(AH, 1, "connecting to database for restore\n");
+ if (AH->version < K_VERS_1_3)
+ die_horribly(AH, modulename,
+ "direct database connections are not supported in pre-1.3 archives\n");
+
+ /* XXX Should get this from the archive */
+ AHX->minRemoteVersion = 070100;
+ AHX->maxRemoteVersion = 999999;
+
+ /* correct dependency counts in case we're doing a partial restore */
+ if (ropt->idWanted == NULL)
+ InitDummyWantedList(AHX,ropt);
+ _fix_dependency_counts(AH);
+
+ /*
+ * Since we're talking to the DB directly, don't send comments since they
+ * obscure SQL when displaying errors
+ */
+ AH->noTocComments = 1;
+
+ /* Do all the early stuff in a single connection in the parent.
+ * There's no great point in running it in parallel and it will actually
+ * run faster in a single connection because we avoid all the connection
+ * and setup overhead, including the 0.5s sleep below.
+ */
+ ConnectDatabase(AHX, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->requirePassword);
+
+
+ /*
+ * Establish important parameter values right away.
+ */
+ _doSetFixedOutputState(AH);
+
+ while((next_work_item = get_next_work_item(AH)) != NULL)
+ {
+ /* XXX need to improve this test in case there is no table data */
+ /* need to test for indexes, FKs, PK, Unique, etc */
+ if(strcmp(next_work_item->desc,"TABLE DATA") == 0)
+ break;
+ (void) _restore_one_te(AH, next_work_item, ropt, false);
+
+ next_work_item->prestored = true;
+
+ _reduce_dependencies(AH,next_work_item);
+ }
+
+
+ /*
+ * now close parent connection in prep for parallel step.
+ */
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+
+ /* blow away any preserved state from the previous connection */
+
+ if (AH->currSchema)
+ free(AH->currSchema);
+ AH->currSchema = strdup("");
+ if (AH->currUser)
+ free(AH->currUser);
+ AH->currUser = strdup("");
+ if (AH->currTablespace)
+ free(AH->currTablespace);
+ AH->currTablespace = NULL;
+ AH->currWithOids = -1;
+
+ /* main parent loop */
+
+ ahlog(AH,1,"entering main loop\n");
+
+ while (((next_work_item = get_next_work_item(AH)) != NULL) ||
+ (work_is_being_done(slots,n_slots)))
+ {
+ if (next_work_item != NULL &&
+ ((next_slot = get_next_slot(slots,n_slots)) != NO_SLOT))
+ {
+ /* there is work still to do and a worker slot available */
+
+ pid_t child;
+
+ next_work_item->prestored = true;
+
+ child = fork();
+ if (child == 0)
+ {
+ prestore(AH,next_work_item);
+ /* should not happen ... we expect prestore to exit */
+ exit(1);
+ }
+ else if (child > 0)
+ {
+ slots[next_slot].pid = child;
+ slots[next_slot].te = next_work_item;
+ slots[next_slot].dumpId = next_work_item->dumpId;
+ }
+ else
+ {
+ /* XXX fork error - handle it! */
+ }
+ /* delay just long enough betweek forks to give the catalog some
+ * breathing space. Without this sleep I got
+ * "tuple concurrently updated" errors.
+ */
+ /* pg_usleep(500000); */
+ continue; /* in case the slots are not yet full */
+ }
+ /* if we get here there must be work being done */
+ ret_child = wait(&work_status);
+
+ if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 0)
+ {
+ mark_work_done(AH, ret_child, slots, n_slots);
+ }
+ else if (WIFEXITED(work_status) && WEXITSTATUS(work_status) == 1)
+ {
+ int i;
+
+ for (i = 0; i < n_slots; i++)
+ {
+ if (slots[i].pid == ret_child)
+ _inhibit_data_for_failed_table(AH, slots[i].te);
+ break;
+ }
+ mark_work_done(AH, ret_child, slots, n_slots);
+ }
+ else
+ {
+ /* XXX something went wrong - deal with it */
+ }
+ }
+
+ /*
+ * now process the ACLs - no need to do this in parallel
+ */
+
+ /* reconnect from parent */
+ ConnectDatabase(AHX, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->requirePassword);
+
+ /*
+ * Scan TOC to output ownership commands and ACLs
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ AH->currentTE = te;
+
+ /* Work out what, if anything, we want from this entry */
+ reqs = _tocEntryRequired(te, ropt, true);
+
+ if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ {
+ ahlog(AH, 1, "setting owner and privileges for %s %s\n",
+ te->desc, te->tag);
+ _printTocEntry(AH, te, ropt, false, true);
+ }
+ }
+
+ /* clean up */
+ PQfinish(AH->connection);
+ AH->connection = NULL;
+
+ }
+
+ static bool
+ work_is_being_done(ParallelSlot *slot, int n_slots)
+ {
+ ahlog(GAH,1,"is work being done?\n");
+ while(n_slots--)
+ {
+ if (slot->pid > 0)
+ return true;
+ slot++;
+ }
+ ahlog(GAH,1,"work is not being done\n");
+ return false;
+ }
+
+ static int
+ get_next_slot(ParallelSlot *slots, int n_slots)
+ {
+ int i;
+
+ for (i = 0; i < n_slots; i++)
+ {
+ if (slots[i].pid == 0)
+ {
+ ahlog(GAH,1,"available slots is %d\n",i);
+ return i;
+ }
+ }
+ ahlog(GAH,1,"No slot available\n");
+ return NO_SLOT;
+ }
+
+ static TocEntry*
+ get_next_work_item(ArchiveHandle *AH)
+ {
+ TocEntry *te;
+ teReqs reqs;
+
+ /* just search from the top of the queue until we find an available item.
+ * Note that the queue isn't reordered in the current implementation. If
+ * we ever do reorder it, then certain code that processes entries from the
+ * current item to the end of the queue will probably need to be
+ * re-examined.
+ */
+
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ if (!te->prestored && te->depCount < 1)
+ {
+ /* make sure it's not an ACL */
+ reqs = _tocEntryRequired (te, AH->ropt, false);
+ if ((reqs & (REQ_SCHEMA | REQ_DATA)) != 0)
+ {
+ ahlog(AH,1,"next item is %d\n",te->dumpId);
+ return te;
+ }
+ }
+ }
+ ahlog(AH,1,"No item ready\n");
+ return NULL;
+ }
+
+ static void
+ prestore(ArchiveHandle *AH, TocEntry *te)
+ {
+ RestoreOptions *ropt = AH->ropt;
+ int retval;
+
+ /* close and reopen the archive so we have a private copy that doesn't
+ * stomp on anyone else's file pointer
+ */
+
+ (AH->ReopenPtr)(AH);
+
+ ConnectDatabase((Archive *)AH, ropt->dbname,
+ ropt->pghost, ropt->pgport, ropt->username,
+ ropt->requirePassword);
+
+ /*
+ * Establish important parameter values right away.
+ */
+ _doSetFixedOutputState(AH);
+
+ retval = _restore_one_te(AH, te, ropt, true);
+
+ PQfinish(AH->connection);
+ exit(retval);
+
+ }
+
+ static void
+ mark_work_done(ArchiveHandle *AH, pid_t worker,
+ ParallelSlot *slots, int n_slots)
+ {
+
+ TocEntry *te = NULL;
+ int i;
+
+ for (i = 0; i < n_slots; i++)
+ {
+ if (slots[i].pid == worker)
+ {
+ te = slots[i].te;
+ slots[i].pid = 0;
+ slots[i].te = NULL;
+ slots[i].dumpId = 0;
+ break;
+ }
+ }
+
+ /* Assert (te != NULL); */
+
+ _reduce_dependencies(AH,te);
+
+
+ }
+
+
+ /*
+ * Make sure the head of each dependency chain is a live item
+ *
+ * Once this is established the property will be maintained by
+ * _reduce_dependencies called as items are done.
+ */
+ static void
+ _fix_dependency_counts(ArchiveHandle *AH)
+ {
+ TocEntry * te;
+ RestoreOptions * ropt = AH->ropt;
+ bool * RealDumpIds;
+ int i;
+
+
+ RealDumpIds = calloc(AH->maxDumpId, sizeof(bool));
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ RealDumpIds[te->dumpId-1] = true;
+ if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+ _reduce_dependencies(AH,te);
+ }
+
+ /*
+ * It is possible that the dependencies list items that are
+ * not in the archive at all. Reduce the depcounts so those get
+ * ignored.
+ */
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ for (i = 0; i < te->nDeps; i++)
+ if (!RealDumpIds[te->dependencies[i]-1])
+ te->depCount--;
+ }
+
+ static void
+ _reduce_dependencies(ArchiveHandle * AH, TocEntry *te)
+ {
+ DumpId item = te->dumpId;
+ RestoreOptions * ropt = AH->ropt;
+ int i;
+
+ for (te = te->next; te != AH->toc; te = te->next)
+ {
+ if (te->nDeps == 0)
+ continue;
+
+ for (i = 0; i < te->nDeps; i++)
+ if (te->dependencies[i] == item)
+ te->depCount = te->depCount - 1;
+
+ /* If this is a table data item we are making available,
+ * make the table's dependencies depend on this item instead of
+ * the table definition, so they
+ * don't get scheduled until the data is loaded.
+ * Have to do this now before the main loop gets to anything
+ * further down the list.
+ */
+ if (te->depCount == 0 && strcmp(te->desc,"TABLEDATA") == 0)
+ {
+ TocEntry *tes;
+ int j;
+ for (tes = te->next; tes != AH->toc; tes = tes->next)
+ for (j = 0; j < tes->nDeps; j++)
+ if (tes->dependencies[j] == item)
+ tes->dependencies[j] = te->dumpId;
+ }
+
+ /*
+ * If this item won't in fact be done, and is now at
+ * 0 dependency count, we pretend it's been done and
+ * reduce the dependency counts of all the things that
+ * depend on it, by a recursive call
+ */
+ if (te->depCount == 0 && ! ropt->idWanted[te->dumpId -1])
+ _reduce_dependencies(AH,te);
+ }
+
+ }
+
+
+ /* Public */
+ void
RestoreArchive(Archive *AHX, RestoreOptions *ropt)
{
ArchiveHandle *AH = (ArchiveHandle *) AHX;
TocEntry *te;
teReqs reqs;
OutputContext sav;

AH->ropt = ropt;
AH->stage = STAGE_INITIALIZING;
***************
*** 171,176 ****
--- 613,632 ----
AH->noTocComments = 1;
}

+ #ifndef HAVE_LIBZ
+
+ /* make sure we won't need (de)compression we haven't got */
+ if (AH->compression != 0 && AH->PrintTocDataPtr != NULL)
+ {
+ for (te = AH->toc->next; te != AH->toc; te = te->next)
+ {
+ reqs = _tocEntryRequired(te, ropt, false);
+ if (te->hadDumper && (reqs & REQ_DATA) != 0)
+ die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
+ }
+ }
+ #endif
+
/*
* Work out if we have an implied data-only restore. This can happen if
* the dump was data only or if the user has used a toc list to exclude
***************
*** 270,409 ****
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! AH->currentTE = te;
!
! /* Work out what, if anything, we want from this entry */
! reqs = _tocEntryRequired(te, ropt, false);
!
! /* Dump any relevant dump warnings to stderr */
! if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
! {
! if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
! write_msg(modulename, "warning from original dump file: %s\n", te->defn);
! else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
! write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
! }
!
! defnDumped = false;
!
! if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
! {
! ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
!
! _printTocEntry(AH, te, ropt, false, false);
! defnDumped = true;
!
! /*
! * If we could not create a table and --no-data-for-failed-tables
! * was given, ignore the corresponding TABLE DATA
! */
! if (ropt->noDataForFailedTables &&
! AH->lastErrorTE == te &&
! strcmp(te->desc, "TABLE") == 0)
! {
! TocEntry *tes;
!
! ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
! te->tag);
!
! for (tes = te->next; tes != AH->toc; tes = tes->next)
! {
! if (strcmp(tes->desc, "TABLE DATA") == 0 &&
! strcmp(tes->tag, te->tag) == 0 &&
! strcmp(tes->namespace ? tes->namespace : "",
! te->namespace ? te->namespace : "") == 0)
! {
! /* mark it unwanted */
! ropt->idWanted[tes->dumpId - 1] = false;
! break;
! }
! }
! }
!
! /* If we created a DB, connect to it... */
! if (strcmp(te->desc, "DATABASE") == 0)
! {
! ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
! _reconnectToDB(AH, te->tag);
! }
! }
!
! /*
! * If we have a data component, then process it
! */
! if ((reqs & REQ_DATA) != 0)
! {
! /*
! * hadDumper will be set if there is genuine data component for
! * this node. Otherwise, we need to check the defn field for
! * statements that need to be executed in data-only restores.
! */
! if (te->hadDumper)
! {
! /*
! * If we can output the data, then restore it.
! */
! if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
! {
! #ifndef HAVE_LIBZ
! if (AH->compression != 0)
! die_horribly(AH, modulename, "cannot restore from compressed archive (compression not supported in this installation)\n");
! #endif
!
! _printTocEntry(AH, te, ropt, true, false);
!
! if (strcmp(te->desc, "BLOBS") == 0 ||
! strcmp(te->desc, "BLOB COMMENTS") == 0)
! {
! ahlog(AH, 1, "restoring %s\n", te->desc);
!
! _selectOutputSchema(AH, "pg_catalog");
!
! (*AH->PrintTocDataPtr) (AH, te, ropt);
! }
! else
! {
! _disableTriggersIfNecessary(AH, te, ropt);
!
! /* Select owner and schema as necessary */
! _becomeOwner(AH, te);
! _selectOutputSchema(AH, te->namespace);
!
! ahlog(AH, 1, "restoring data for table \"%s\"\n",
! te->tag);
!
! /*
! * If we have a copy statement, use it. As of V1.3,
! * these are separate to allow easy import from
! * withing a database connection. Pre 1.3 archives can
! * not use DB connections and are sent to output only.
! *
! * For V1.3+, the table data MUST have a copy
! * statement so that we can go into appropriate mode
! * with libpq.
! */
! if (te->copyStmt && strlen(te->copyStmt) > 0)
! {
! ahprintf(AH, "%s", te->copyStmt);
! AH->writingCopyData = true;
! }
!
! (*AH->PrintTocDataPtr) (AH, te, ropt);
!
! AH->writingCopyData = false;
!
! _enableTriggersIfNecessary(AH, te, ropt);
! }
! }
! }
! else if (!defnDumped)
! {
! /* If we haven't already dumped the defn part, do so now */
! ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
! _printTocEntry(AH, te, ropt, false, false);
! }
! }
! } /* end loop over TOC entries */

/*
* Scan TOC again to output ownership commands and ACLs
--- 726,733 ----
*/
for (te = AH->toc->next; te != AH->toc; te = te->next)
{
! (void) _restore_one_te(AH, te, ropt, false);
! }

/*
* Scan TOC again to output ownership commands and ACLs
***************
*** 451,456 ****
--- 775,955 ----
}
}

+ static int
+ _restore_one_te(ArchiveHandle *AH, TocEntry *te,
+ RestoreOptions *ropt, bool is_parallel)
+ {
+ teReqs reqs;
+ bool defnDumped;
+ int retval = 0;
+
+ AH->currentTE = te;
+
+ /* Work out what, if anything, we want from this entry */
+ reqs = _tocEntryRequired(te, ropt, false);
+
+ /* Dump any relevant dump warnings to stderr */
+ if (!ropt->suppressDumpWarnings && strcmp(te->desc, "WARNING") == 0)
+ {
+ if (!ropt->dataOnly && te->defn != NULL && strlen(te->defn) != 0)
+ write_msg(modulename, "warning from original dump file: %s\n", te->defn);
+ else if (te->copyStmt != NULL && strlen(te->copyStmt) != 0)
+ write_msg(modulename, "warning from original dump file: %s\n", te->copyStmt);
+ }
+
+ defnDumped = false;
+
+ if ((reqs & REQ_SCHEMA) != 0) /* We want the schema */
+ {
+ ahlog(AH, 1, "creating %s %s\n", te->desc, te->tag);
+
+ _printTocEntry(AH, te, ropt, false, false);
+ defnDumped = true;
+
+ /*
+ * If we could not create a table and --no-data-for-failed-tables
+ * was given, ignore the corresponding TABLE DATA
+ *
+ * For the parallel case this must be done in the parent, so we just
+ * set a return value.
+ */
+ if (ropt->noDataForFailedTables &&
+ AH->lastErrorTE == te &&
+ strcmp(te->desc, "TABLE") == 0)
+ {
+ if (is_parallel)
+ retval = 1;
+ else
+ _inhibit_data_for_failed_table(AH,te);
+ }
+
+ /* If we created a DB, connect to it... */
+ /* won't happen in parallel restore */
+ if (strcmp(te->desc, "DATABASE") == 0)
+ {
+ ahlog(AH, 1, "connecting to new database \"%s\"\n", te->tag);
+ _reconnectToDB(AH, te->tag);
+ }
+ }
+
+ /*
+ * If we have a data component, then process it
+ */
+ if ((reqs & REQ_DATA) != 0)
+ {
+ /*
+ * hadDumper will be set if there is genuine data component for
+ * this node. Otherwise, we need to check the defn field for
+ * statements that need to be executed in data-only restores.
+ */
+ if (te->hadDumper)
+ {
+ /*
+ * If we can output the data, then restore it.
+ */
+ if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
+ {
+ _printTocEntry(AH, te, ropt, true, false);
+
+ if (strcmp(te->desc, "BLOBS") == 0 ||
+ strcmp(te->desc, "BLOB COMMENTS") == 0)
+ {
+ ahlog(AH, 1, "restoring %s\n", te->desc);
+
+ _selectOutputSchema(AH, "pg_catalog");
+
+ (*AH->PrintTocDataPtr) (AH, te, ropt);
+ }
+ else
+ {
+ _disableTriggersIfNecessary(AH, te, ropt);
+
+ /* Select owner and schema as necessary */
+ _becomeOwner(AH, te);
+ _selectOutputSchema(AH, te->namespace);
+
+ ahlog(AH, 1, "restoring data for table \"%s\"\n",
+ te->tag);
+
+ if (ropt->truncate_before_load)
+ {
+ if (AH->connection)
+ StartTransaction(AH);
+ else
+ ahprintf(AH, "BEGIN;\n\n");
+
+ ahprintf(AH, "TRUNCATE TABLE %s;\n\n",
+ fmtId(te->tag)); }
+
+ /*
+ * If we have a copy statement, use it. As of V1.3,
+ * these are separate to allow easy import from
+ * withing a database connection. Pre 1.3 archives can
+ * not use DB connections and are sent to output only.
+ *
+ * For V1.3+, the table data MUST have a copy
+ * statement so that we can go into appropriate mode
+ * with libpq.
+ */
+ if (te->copyStmt && strlen(te->copyStmt) > 0)
+ {
+ ahprintf(AH, "%s", te->copyStmt);
+ AH->writingCopyData = true;
+ }
+
+ (*AH->PrintTocDataPtr) (AH, te, ropt);
+
+ AH->writingCopyData = false;
+
+ if (ropt->truncate_before_load)
+ {
+ if (AH->connection)
+ CommitTransaction(AH);
+ else
+ ahprintf(AH, "COMMIT;\n\n");
+ }
+
+
+ _enableTriggersIfNecessary(AH, te, ropt);
+ }
+ }
+ }
+ else if (!defnDumped)
+ {
+ /* If we haven't already dumped the defn part, do so now */
+ ahlog(AH, 1, "executing %s %s\n", te->desc, te->tag);
+ _printTocEntry(AH, te, ropt, false, false);
+ }
+ }
+
+ return retval;
+ }
+
+ static void
+ _inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry * te)
+ {
+ TocEntry *tes;
+ RestoreOptions *ropt = AH->ropt;
+
+ ahlog(AH, 1, "table \"%s\" could not be created, will not restore its data\n",
+ te->tag);
+
+ for (tes = te->next; tes != AH->toc; tes = tes->next)
+ {
+ if (strcmp(tes->desc, "TABLE DATA") == 0 &&
+ strcmp(tes->tag, te->tag) == 0 &&
+ strcmp(tes->namespace ? tes->namespace : "",
+ te->namespace ? te->namespace : "") == 0)
+ {
+ /* mark it unwanted */
+ ropt->idWanted[tes->dumpId - 1] = false;
+
+ _reduce_dependencies(AH, tes);
+ break;
+ }
+ }
+ }
+
/*
* Allocate a new RestoreOptions block.
* This is mainly so we can initialize it, but also for future expansion,
***************
*** 653,662 ****
while (te != AH->toc)
{
if (_tocEntryRequired(te, ropt, true) != 0)
! ahprintf(AH, "%d; %u %u %s %s %s %s\n", te->dumpId,
te->catalogId.tableoid, te->catalogId.oid,
te->desc, te->namespace ? te->namespace : "-",
te->tag, te->owner);
te = te->next;
}

--- 1152,1167 ----
while (te != AH->toc)
{
if (_tocEntryRequired(te, ropt, true) != 0)
! {
! int i;
! ahprintf(AH, "%d;[%d: ",te->dumpId, te->nDeps);
! for (i=0 ;i<te->nDeps; i++)
! ahprintf(AH, "%d ",te->dependencies[i]);
! ahprintf(AH, "] %u %u %s %s %s %s\n",
te->catalogId.tableoid, te->catalogId.oid,
te->desc, te->namespace ? te->namespace : "-",
te->tag, te->owner);
+ }
te = te->next;
}

***************
*** 1948,1965 ****
--- 2453,2473 ----
deps = (DumpId *) realloc(deps, sizeof(DumpId) * depIdx);
te->dependencies = deps;
te->nDeps = depIdx;
+ te->depCount = depIdx;
}
else
{
free(deps);
te->dependencies = NULL;
te->nDeps = 0;
+ te->depCount = 0;
}
}
else
{
te->dependencies = NULL;
te->nDeps = 0;
+ te->depCount = 0;
}

if (AH->ReadExtraTocPtr)
Index: pg_backup_archiver.h
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_archiver.h,v
retrieving revision 1.76
diff -c -r1.76 pg_backup_archiver.h
*** pg_backup_archiver.h 7 Nov 2007 12:24:24 -0000 1.76
--- pg_backup_archiver.h 29 Sep 2008 02:43:52 -0000
***************
*** 99,104 ****
--- 99,105 ----
struct _restoreList;

typedef void (*ClosePtr) (struct _archiveHandle * AH);
+ typedef void (*ReopenPtr) (struct _archiveHandle * AH);
typedef void (*ArchiveEntryPtr) (struct _archiveHandle * AH, struct _tocEntry * te);

typedef void (*StartDataPtr) (struct _archiveHandle * AH, struct _tocEntry * te);
***************
*** 212,217 ****
--- 213,219 ----
WriteBufPtr WriteBufPtr; /* Write a buffer of output to the archive */
ReadBufPtr ReadBufPtr; /* Read a buffer of input from the archive */
ClosePtr ClosePtr; /* Close the archive */
+ ReopenPtr ReopenPtr; /* Reopen the archive */
WriteExtraTocPtr WriteExtraTocPtr; /* Write extra TOC entry data
* associated with the current archive
* format */
***************
*** 231,236 ****
--- 233,239 ----
char *archdbname; /* DB name *read* from archive */
bool requirePassword;
PGconn *connection;
+ char *cachepw;
int connectToDB; /* Flag to indicate if direct DB connection is
* required */
bool writingCopyData; /* True when we are sending COPY data */
***************
*** 284,289 ****
--- 287,293 ----
DumpId dumpId;
bool hadDumper; /* Archiver was passed a dumper routine (used
* in restore) */
+ bool prestored; /* keep track of parallel restore */
char *tag; /* index tag */
char *namespace; /* null or empty string if not in a schema */
char *tablespace; /* null if not in a tablespace; empty string
***************
*** 296,301 ****
--- 300,306 ----
char *copyStmt;
DumpId *dependencies; /* dumpIds of objects this one depends on */
int nDeps; /* number of dependencies */
+ int depCount; /* adjustable tally of dependencies */

DataDumperPtr dataDumper; /* Routine to dump data for object */
void *dataDumperArg; /* Arg for above routine */
Index: pg_backup_custom.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_custom.c,v
retrieving revision 1.40
diff -c -r1.40 pg_backup_custom.c
*** pg_backup_custom.c 28 Oct 2007 21:55:52 -0000 1.40
--- pg_backup_custom.c 29 Sep 2008 02:43:52 -0000
***************
*** 40,45 ****
--- 40,46 ----
static size_t _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len);
static size_t _ReadBuf(ArchiveHandle *AH, void *buf, size_t len);
static void _CloseArchive(ArchiveHandle *AH);
+ static void _ReopenArchive(ArchiveHandle *AH);
static void _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _WriteExtraToc(ArchiveHandle *AH, TocEntry *te);
static void _ReadExtraToc(ArchiveHandle *AH, TocEntry *te);
***************
*** 120,125 ****
--- 121,127 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = _ReopenArchive;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
***************
*** 835,840 ****
--- 837,879 ----
AH->FH = NULL;
}

+ static void
+ _ReopenArchive(ArchiveHandle *AH)
+ {
+ lclContext *ctx = (lclContext *) AH->formatData;
+ pgoff_t tpos;
+
+ if (AH->mode == archModeWrite)
+ {
+ die_horribly(AH,modulename,"Can only reopen input archives");
+ }
+ else if ((! AH->fSpec) || strcmp(AH->fSpec, "") == 0)
+ {
+ die_horribly(AH,modulename,"Cannot reopen stdin");
+ }
+
+ tpos = ftello(AH->FH);
+
+ if (fclose(AH->FH) != 0)
+ die_horribly(AH, modulename, "could not close archive file: %s\n",
+ strerror(errno));
+
+ AH->FH = fopen(AH->fSpec, PG_BINARY_R);
+ if (!AH->FH)
+ die_horribly(AH, modulename, "could not open input file \"%s\": %s\n",
+ AH->fSpec, strerror(errno));
+
+ if (ctx->hasSeek)
+ {
+ fseeko(AH->FH, tpos, SEEK_SET);
+ }
+ else
+ {
+ die_horribly(AH,modulename,"cannot reopen non-seekable file");
+ }
+
+ }
+
/*--------------------------------------------------
* END OF FORMAT CALLBACKS
*--------------------------------------------------
Index: pg_backup_db.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_db.c,v
retrieving revision 1.80
diff -c -r1.80 pg_backup_db.c
*** pg_backup_db.c 16 Aug 2008 02:25:06 -0000 1.80
--- pg_backup_db.c 29 Sep 2008 02:43:52 -0000
***************
*** 138,148 ****

ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

! if (AH->requirePassword)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
}

do
--- 138,153 ----

ahlog(AH, 1, "connecting to database \"%s\" as user \"%s\"\n", newdb, newuser);

! if (AH->requirePassword && AH->cachepw == NULL)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
+ AH->requirePassword = true;
+ }
+ else if (AH->requirePassword)
+ {
+ password = AH->cachepw;
}

do
***************
*** 174,180 ****
}
} while (new_pass);

! if (password)
free(password);

/* check for version mismatch */
--- 179,185 ----
}
} while (new_pass);

! if (password != AH->cachepw)
free(password);

/* check for version mismatch */
***************
*** 206,220 ****
if (AH->connection)
die_horribly(AH, modulename, "already connected to a database\n");

! if (reqPwd)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
AH->requirePassword = true;
}
else
AH->requirePassword = false;

/*
* Start the connection. Loop until we have a password if requested by
--- 211,231 ----
if (AH->connection)
die_horribly(AH, modulename, "already connected to a database\n");

! if (reqPwd && AH->cachepw == NULL)
{
password = simple_prompt("Password: ", 100, false);
if (password == NULL)
die_horribly(AH, modulename, "out of memory\n");
AH->requirePassword = true;
}
+ else if (reqPwd)
+ {
+ password = AH->cachepw;
+ }
else
+ {
AH->requirePassword = false;
+ }

/*
* Start the connection. Loop until we have a password if requested by
***************
*** 241,247 ****
} while (new_pass);

if (password)
! free(password);

/* check to see that the backend connection was successfully made */
if (PQstatus(AH->connection) == CONNECTION_BAD)
--- 252,258 ----
} while (new_pass);

if (password)
! AH->cachepw = password;

/* check to see that the backend connection was successfully made */
if (PQstatus(AH->connection) == CONNECTION_BAD)
Index: pg_backup_files.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_files.c,v
retrieving revision 1.34
diff -c -r1.34 pg_backup_files.c
*** pg_backup_files.c 28 Oct 2007 21:55:52 -0000 1.34
--- pg_backup_files.c 29 Sep 2008 02:43:52 -0000
***************
*** 87,92 ****
--- 87,93 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = NULL;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_backup_tar.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_backup_tar.c,v
retrieving revision 1.62
diff -c -r1.62 pg_backup_tar.c
*** pg_backup_tar.c 15 Nov 2007 21:14:41 -0000 1.62
--- pg_backup_tar.c 29 Sep 2008 02:43:52 -0000
***************
*** 143,148 ****
--- 143,149 ----
AH->WriteBufPtr = _WriteBuf;
AH->ReadBufPtr = _ReadBuf;
AH->ClosePtr = _CloseArchive;
+ AH->ReopenPtr = NULL;
AH->PrintTocDataPtr = _PrintTocData;
AH->ReadExtraTocPtr = _ReadExtraToc;
AH->WriteExtraTocPtr = _WriteExtraToc;
Index: pg_restore.c
===================================================================
RCS file: /cvsroot/pgsql/src/bin/pg_dump/pg_restore.c,v
retrieving revision 1.88
diff -c -r1.88 pg_restore.c
*** pg_restore.c 13 Apr 2008 03:49:22 -0000 1.88
--- pg_restore.c 29 Sep 2008 02:43:52 -0000
***************
*** 78,83 ****
--- 78,84 ----
static int no_data_for_failed_tables = 0;
static int outputNoTablespaces = 0;
static int use_setsessauth = 0;
+ static int truncate_before_load = 0;

struct option cmdopts[] = {
{"clean", 0, NULL, 'c'},
***************
*** 92,97 ****
--- 93,99 ----
{"ignore-version", 0, NULL, 'i'},
{"index", 1, NULL, 'I'},
{"list", 0, NULL, 'l'},
+ {"multi-thread",1,NULL,'m'},
{"no-privileges", 0, NULL, 'x'},
{"no-acl", 0, NULL, 'x'},
{"no-owner", 0, NULL, 'O'},
***************
*** 114,119 ****
--- 116,122 ----
{"disable-triggers", no_argument, &disable_triggers, 1},
{"no-data-for-failed-tables", no_argument, &no_data_for_failed_tables, 1},
{"no-tablespaces", no_argument, &outputNoTablespaces, 1},
+ {"truncate-before-load", no_argument, &truncate_before_load, 1},
{"use-set-session-authorization", no_argument, &use_setsessauth, 1},

{NULL, 0, NULL, 0}
***************
*** 139,145 ****
}
}

! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:n:Op:P:RsS:t:T:U:vWxX:1",
cmdopts, NULL)) != -1)
{
switch (c)
--- 142,148 ----
}
}

! while ((c = getopt_long(argc, argv, "acCd:ef:F:h:iI:lL:m:n:Op:P:RsS:t:T:U:vWxX:1",
cmdopts, NULL)) != -1)
{
switch (c)
***************
*** 182,187 ****
--- 185,194 ----
opts->tocFile = strdup(optarg);
break;

+ case 'm':
+ opts->number_of_threads = atoi(optarg); /* XXX fix error checking */
+ break;
+
case 'n': /* Dump data for this schema only */
opts->schemaNames = strdup(optarg);
break;
***************
*** 262,268 ****
break;

case 0:
! /* This covers the long options equivalent to -X xxx. */
break;

case '1': /* Restore data in a single transaction */
--- 269,278 ----
break;

case 0:
! /*
! * This covers the long options without a short equivalent,
! * including those equivalent to -X xxx.
! */
break;

case '1': /* Restore data in a single transaction */
***************
*** 299,304 ****
--- 309,329 ----
opts->noDataForFailedTables = no_data_for_failed_tables;
opts->noTablespace = outputNoTablespaces;
opts->use_setsessauth = use_setsessauth;
+ opts->truncate_before_load = truncate_before_load;
+
+ if (opts->single_txn)
+ {
+ if (opts->number_of_threads > 1)
+ {
+ write_msg(NULL, "single transaction not compatible with multi-threading");
+ exit(1);
+ }
+ else if (opts->truncate_before_load)
+ {
+ write_msg(NULL, "single transaction not compatible with truncate-before-load");
+ exit(1);
+ }
+ }

if (opts->formatName)
{
***************
*** 330,335 ****
--- 355,362 ----

AH = OpenArchive(inputFileSpec, opts->format);

+ /* XXX looks like we'll have to do sanity checks in the parallel archiver */
+
/* Let the archiver know how noisy to be */
AH->verbose = opts->verbose;

***************
*** 351,356 ****
--- 378,385 ----

if (opts->tocSummary)
PrintTOCSummary(AH, opts);
+ else if (opts->number_of_threads > 1)
+ RestoreArchiveParallel(AH, opts);
else
RestoreArchive(AH, opts);

Andrew Dunstan wrote:
>
>
>>
>> this works better but there is something fishy still - using the same
>> dump file I get a proper restore using pg_restore normally. If I
>> however use -m for a parallel one I only get parts (in this case only
>> 243 of the 709 tables) of the database restored ...
>>
>>
>>
>
> Yes, there are several funny things going on, including some stuff
> with dependencies. I'll have a new patch tomorrow with luck. Thanks
> for testing.
>
>

OK, in this version a whole heap of bugs are fixed, mainly those to do
with dependencies and saved state. I get identical row counts in the
source and destination now, quite reliably.

cheers

andrew

No comments: