> content >> migrate" * for analyzing data from various sources and importing them into Drupal tables. */ /** * Call a migrate hook. Like module_invoke_all, but gives modules a chance * to do one-time initialization before any of their hooks are called, and * adds "migrate" to the hook name. * @param $hook * Hook to invoke (e.g., 'types', 'fields_node', etc.) * @return * Merged array of results. */ function migrate_invoke_all($hook) { // Let modules do any one-time initialization (e.g., including migration support files) global $_migrate_inited; if (!isset($_migrate_inited)) { module_invoke_all('migrate_init'); $_migrate_inited = TRUE; } $args = func_get_args(); $hookfunc = "migrate" . "_$hook"; unset($args[0]); $return = array(); $modulelist = module_implements($hookfunc); foreach ($modulelist as $module) { $function = $module . '_' . $hookfunc; $result = call_user_func_array($function, $args); if (isset($result) && is_array($result)) { $return = array_merge_recursive($return, $result); } elseif (isset($result)) { $return[] = $result; } } return $return; } /** * Call a destination hook (e.g., hook_migrate_prepare_node). Use this version * for hooks with the precise signature below, so that the object can be passed by * reference. * * @param $hook * Hook to invoke (e.g., 'types', 'fields_node', etc.) * @param $object * Destination object being built - passed by reference, so hooks can modify it * @param $tblinfo * Metadata about the content set * @param $row * The raw source row. * @return * Merged array of results. */ function migrate_destination_invoke_all($hook, &$object, $tblinfo, $row) { // Let modules do any one-time initialization (e.g., including migration support files) global $_migrate_inited; if (!isset($_migrate_inited)) { module_invoke_all('migrate_init'); $_migrate_inited = TRUE; } // We could have used module_invoke_all, but unfortunately // module_invoke_all passes all arguments by value. $errors = array(); $hook = 'migrate_' . $hook; foreach (module_implements($hook) as $module_name) { $function = $module_name . '_' . $hook; if (function_exists($function)) { timer_start($function); $errors = array_merge($errors, (array)$function($object, $tblinfo, $row)); timer_stop($function); } } return $errors; } /** * Save a new or updated content set * * @param $content_set * An array or object representing the content set. This is passed by reference (so * when adding a new content set the ID can be set) * @param $options * Array of additional options for saving the content set. Currently: * base_table: The base table of the view - if provided, we don't need * to load the view. * base_database: The database of the base table - if base_table is present * and base_database omitted, it defaults to 'default' * @return * The ID of the content set that was saved, or NULL if nothing was saved */ function migrate_save_content_set(&$content_set, $options = array()) { // Deal with objects internally (but remember if we need to put the parameter // back to an array) if (is_array($content_set)) { $was_array = TRUE; $content_set = (object) $content_set; } else { $was_array = FALSE; } $schema_change = FALSE; // Update or insert the content set record as appropriate if (isset($content_set->mcsid)) { // If machine_name changes, need to rename the map/message tables $old_machine_name = db_query("SELECT machine_name FROM {migrate_content_sets} WHERE mcsid=%d", $content_set->mcsid); if ($old_machine_name != $content_set->machine_name) { $old_maptablename = migrate_map_table_name($content_set->mcsid); $old_msgtablename = migrate_message_table_name($content_set->mcsid); } drupal_write_record('migrate_content_sets', $content_set, 'mcsid'); if (isset($old_maptablename) && db_table_exists($old_maptablename)) { $ret = array(); $new_maptablename = migrate_map_table_name($content_set->mcsid); db_rename_table($ret, $old_maptablename, $new_maptablename); $schema_change = TRUE; } if (isset($old_msgtablename) && db_table_exists($old_msgtablename)) { $ret = array(); $new_msgtablename = migrate_message_table_name($content_set->mcsid); db_rename_table($ret, $old_msgtablename, $new_msgtablename); $schema_change = TRUE; } } else { drupal_write_record('migrate_content_sets', $content_set); } // Create or modify map and message tables $maptablename = migrate_map_table_name($content_set->mcsid); $msgtablename = migrate_message_table_name($content_set->mcsid); // TODO: For now, PK must be in base_table // If the caller tells us the base table of the view, we don't need // to load the view (which would not work when called from hook_install()) if (isset($options['base_table'])) { $tablename = $options['base_table']; if (isset($options['base_database'])) { $tabledb = $options['base_database']; } else { $tabledb = 'default'; } } else { // Get the proper field definition for the sourcekey $view = views_get_view($content_set->view_name); if (!$view) { drupal_set_message(t('View !view does not exist - either (re)create this view, or remove the content set using it.', array('!view' => $content_set->view_name))); return NULL; } // Must do this to load the database $views_version = (string)views_api_version(); if (views_api_version() >= '3') { $view->init_display('default'); } $view->init_query(); if (isset($view->base_database)) { $tabledb = $view->base_database; } else { $tabledb = 'default'; } $tablename = $view->base_table; } $sourceschema = _migrate_inspect_schema($tablename, $tabledb); // If the PK of the content set is defined, make sure we have a mapping table if (isset($content_set->sourcekey) && $content_set->sourcekey) { $sourcefield = $sourceschema['fields'][$content_set->sourcekey]; // The field name might be _... if (!$sourcefield) { $sourcekey = drupal_substr($content_set->sourcekey, drupal_strlen($tablename) + 1); $sourcefield = $sourceschema['fields'][$sourcekey]; } // But - we don't want serial fields to behave serially, so change to int if ($sourcefield['type'] == 'serial') { $sourcefield['type'] = 'int'; } if (!db_table_exists($maptablename)) { $schema = _migrate_map_table_schema($sourcefield); db_create_table($ret, $maptablename, $schema); // Expose map table to views if (module_exists('tw')) { tw_add_tables(array($maptablename)); tw_add_fk($maptablename, 'destid'); } $schema = _migrate_message_table_schema($sourcefield); db_create_table($ret, $msgtablename, $schema); // Expose messages table to views if (module_exists('tw')) { tw_add_tables(array($msgtablename)); tw_add_fk($msgtablename, 'sourceid'); } $schema_change = TRUE; } else { // TODO: Deal with varchar->int case where there is existing non-int data $desired_schema = _migrate_map_table_schema($sourcefield); $actual_schema = _migrate_inspect_schema($maptablename); if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) { $ret = array(); db_drop_primary_key($ret, $maptablename); db_change_field($ret, $maptablename, 'sourceid', 'sourceid', $sourcefield, array('primary key' => array('sourceid'))); if (module_exists('tw')) { tw_perform_analysis($maptablename); } $schema_change = TRUE; } $desired_schema = _migrate_message_table_schema($sourcefield); $actual_schema = _migrate_inspect_schema($msgtablename); if ($desired_schema['fields']['sourceid'] != $actual_schema['fields']['sourceid']) { $ret = array(); db_drop_index($ret, $msgtablename, 'sourceid'); db_change_field($ret, $msgtablename, 'sourceid', 'sourceid', $sourcefield, array('indexes' => array('sourceid' => array('sourceid')))); if (module_exists('tw')) { tw_perform_analysis($maptablename); } $schema_change = TRUE; } } // Make sure the schema gets updated to reflect changes if ($schema_change) { cache_clear_all('schema', 'cache'); } } if ($was_array) { $content_set = (array)$content_set; return $content_set['mcsid']; } else { return $content_set->mcsid; } } /** * Save a new or updated content mapping * * @param $mapping * An object representing the mapping. This is passed by reference (so * when adding a new mapping the ID can be set) * @return * The ID of the mapping that was saved, or NULL if nothing was saved */ function migrate_save_content_mapping(&$mapping) { if ($mapping->mcmid) { drupal_write_record('migrate_content_mappings', $mapping, 'mcmid'); } else { drupal_write_record('migrate_content_mappings', $mapping); } return $mapping->mcmid; } /** * Delete the specified content set, including map and message tables. * * @param $mcsid * Unique identifier of the content set to delete. */ function migrate_delete_content_set($mcsid) { // First, remove the map and message tables from the Table Wizard, and drop them $ret = array(); $maptable = migrate_map_table_name($mcsid); $msgtable = migrate_message_table_name($mcsid); if (db_table_exists($maptable)) { tw_remove_tables(array($maptable, $msgtable)); db_drop_table($ret, $maptable); db_drop_table($ret, $msgtable); } // Then, delete the content set data $sql = "DELETE FROM {migrate_content_mappings} WHERE mcsid=%d"; db_query($sql, $mcsid); $sql = "DELETE FROM {migrate_content_sets} WHERE mcsid=%d"; db_query($sql, $mcsid); } /** * Delete the specified content mapping. * * @param $mcmid * Unique identifier of the mapping to delete. */ function migrate_delete_content_mapping($mcmid) { $sql = "DELETE FROM {migrate_content_mappings} WHERE mcmid=%d"; db_query($sql, $mcmid); } /** * Convenience function for generating a message array * * @param $message * Text describing the error condition * @param $type * One of the MIGRATE_MESSAGE constants, identifying the level of error * @return * Structured array suitable for return from an import hook */ function migrate_message($message, $type = MIGRATE_MESSAGE_ERROR) { $error = array( 'level' => $type, 'message' => $message, ); return $error; } /** * Add a mapping from source ID to destination ID for the specified content set * * @param $mcsid * ID of the content set being processed * @param $sourceid * Primary key value from the source * @param $destid * Primary key value from the destination */ function migrate_add_mapping($mcsid, $sourceid, $destid) { static $maptables = array(); if (!isset($maptables[$mcsid])) { $maptables[$mcsid] = migrate_map_table_name($mcsid); } $needs_update = db_result(db_query('SELECT needs_update FROM {' . $maptables[$mcsid] . "} WHERE sourceid='%s'", $sourceid)); if ($needs_update == 1) { db_query('UPDATE {' . $maptables[$mcsid] . "} SET needs_update=0 WHERE sourceid='%s'", $sourceid); } elseif ($needs_update !== 0) { db_query('INSERT INTO {' . $maptables[$mcsid] . "} (sourceid, destid, needs_update) VALUES('%s', %d, 0)", $sourceid, $destid); } } /** * Clear migrated objects from the specified content set * * @param $mcsid * ID of the content set to clear * @param $options * Keyed array of optional options: * itemlimit - Maximum number of items to process * timelimit - Unix timestamp after which to stop processing * idlist - Comma-separated list of source IDs to process, instead of proceeding through * all unmigrated rows * feedback - Keyed array controlling status feedback to the caller * function - PHP function to call, passing a message to be displayed * frequency - How often to call the function * frequency_unit - How to interpret frequency (items or seconds) * * @return * Status of the migration process: */ function migrate_content_process_clear($mcsid, &$options = array()) { $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL; $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL; $idlist = isset($options['idlist']) ? $options['idlist'] : NULL; $lastfeedback = time(); if (isset($options['feedback'])) { $feedback = $options['feedback']['function']; $frequency = isset($options['feedback']['frequency']) ? $options['feedback']['frequency'] : NULL; $frequency_unit = isset($options['feedback']['frequency_unit']) ? $options['feedback']['frequency_unit'] : NULL; } $result = db_query("SELECT * FROM {migrate_content_sets} WHERE mcsid=%d", $mcsid); $tblinfo = db_fetch_object($result); $tblinfo->maptable = $maptable; $description = $tblinfo->description; if ($tblinfo->status != MIGRATE_STATUS_IDLE) { return MIGRATE_RESULT_IN_PROGRESS; } else { db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d", MIGRATE_STATUS_CLEARING, $mcsid); } $desttype = $tblinfo->desttype; $contenttype = $tblinfo->contenttype; $sourcekey = $tblinfo->sourcekey; $maptable = migrate_map_table_name($mcsid); $msgtablename = migrate_message_table_name($mcsid); $processstart = microtime(TRUE); $memory_limit = _migrate_memory_limit(); // If this content set is set up to update existing content, we don't // want to delete the content on clear, just the map/message tables $sql = "SELECT srcfield FROM {migrate_content_mappings} WHERE mcsid=%d AND primary_key=1"; $srcfield = db_result(db_query($sql, $mcsid)); if ($srcfield) { $full_clear = FALSE; } else { $full_clear = TRUE; } // Assume success until proven otherwise $return = MIGRATE_RESULT_COMPLETED; $deleted = 0; $args = array(); if ($idlist) { $args = array_map('trim', explode(',', $idlist)); if (is_numeric($args[0])) { $placeholders = db_placeholders($args, 'int'); } else { $placeholders = db_placeholders($args, 'varchar'); } $sql = "SELECT sourceid,destid FROM {" . $maptable . "} WHERE sourceid IN ($placeholders)"; } else { $sql = "SELECT sourceid,destid FROM {" . $maptable . "}"; } timer_start('delete query'); if ($itemlimit) { $deletelist = db_query_range($sql, $args, 0, $itemlimit); } else { $deletelist = db_query($sql, $args); } timer_stop('delete query'); while ($row = db_fetch_object($deletelist)) { // Recheck status - permits dynamic interruption of jobs $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d"; $status = db_result(db_query($sql, $mcsid)); if ($status != MIGRATE_STATUS_CLEARING) { $return = MIGRATE_RESULT_STOPPED; break; } // Check for time out if there is time info present if (isset($timelimit) && time() >= $timelimit) { $return = MIGRATE_RESULT_INCOMPLETE; break; } // Check for closeness to memory limit $usage = memory_get_usage(); $pct_memory = $usage/$memory_limit; if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) { if (isset($feedback)) { $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch', array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit))); } $return = MIGRATE_RESULT_INCOMPLETE; break; } if (isset($feedback)) { if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) || ($frequency_unit == 'items' && $deleted >= $frequency)) { $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, MIGRATE_RESULT_INCOMPLETE); $feedback($message); $lastfeedback = time(); $deleted = 0; } } // @TODO: Should return success/failure. Problem: node_delete doesn't return anything... if ($full_clear) { timer_start('delete hooks'); migrate_invoke_all("delete_$contenttype", $tblinfo, $row->destid); timer_stop('delete hooks'); } timer_start('clear map/msg'); db_query("DELETE FROM {" . $maptable . "} WHERE sourceid='%s'", $row->sourceid); db_query("DELETE FROM {" . $msgtablename . "} WHERE sourceid='%s' AND level=%d", $row->sourceid, MIGRATE_MESSAGE_INFORMATIONAL); timer_stop('clear map/msg'); $deleted++; } // Mark that we're done $sql = "UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d"; db_query($sql, MIGRATE_STATUS_IDLE, $mcsid); // If we've completed a total clear, make sure all messages are gone if ($return == MIGRATE_RESULT_COMPLETED && !$idlist && !$itemlimit) { db_query('TRUNCATE TABLE {' . $msgtablename . '}'); } // In other cases (except when we're still in the middle of a process), keep // informationals, which should still be attached to uncleared items else if ($return != MIGRATE_RESULT_INCOMPLETE) { // Remove old messages before beginning new import process db_query("DELETE FROM {" . $msgtablename . "} WHERE level <> %d", MIGRATE_MESSAGE_INFORMATIONAL); } $message = _migrate_progress_message($lastfeedback, $deleted, $description, FALSE, $return); if (isset($feedback)) { $feedback($message); } return $return; } /** * Import objects from the specified content set * * @param $mcsid * ID of the content set to clear * @param $options * Keyed array of optional options: * itemlimit - Maximum number of items to process * timelimit - Unix timestamp after which to stop processing * idlist - Comma-separated list of source IDs to process, instead of proceeding through * all unmigrated rows * feedback - Keyed array controlling status feedback to the caller * function - PHP function to call, passing a message to be displayed * frequency - How often to call the function * frequency_unit - How to interpret frequency (items or seconds) * * @return * Status of the migration process: */ function migrate_content_process_import($mcsid, &$options = array()) { $tblinfo = db_fetch_object(db_query("SELECT * FROM {migrate_content_sets} WHERE mcsid=%d", $mcsid)); if ($tblinfo->status != MIGRATE_STATUS_IDLE) { return MIGRATE_RESULT_IN_PROGRESS; } else { db_query("UPDATE {migrate_content_sets} SET status=%d WHERE mcsid=%d", MIGRATE_STATUS_IMPORTING, $mcsid); } $itemlimit = isset($options['itemlimit']) ? $options['itemlimit'] : NULL; $timelimit = isset($options['timelimit']) ? $options['timelimit'] : NULL; $idlist = isset($options['idlist']) ? $options['idlist'] : NULL; $lastfeedback = time(); if (isset($options['feedback'])) { $feedback = $options['feedback']['function']; $frequency = isset($options['feedback']['frequency']) ? $options['feedback']['frequency'] : NULL; $frequency_unit = isset($options['feedback']['frequency_unit']) ? $options['feedback']['frequency_unit'] : NULL; } $description = $tblinfo->description; $desttype = $tblinfo->desttype; $view_name = $tblinfo->view_name; $view_args = $tblinfo->view_args; $contenttype = $tblinfo->contenttype; $sourcekey = $tblinfo->sourcekey; $maptable = migrate_map_table_name($mcsid); $msgtablename = migrate_message_table_name($mcsid); $processstart = microtime(TRUE); $memory_limit = _migrate_memory_limit(); // Assume success until proven otherwise $return = MIGRATE_RESULT_COMPLETED; $collist = db_query("SELECT srcfield, destfield, default_value FROM {migrate_content_mappings} WHERE mcsid=%d AND (srcfield <> '' OR default_value <> '') ORDER BY mcmid", $mcsid); $fields = array(); while ($row = db_fetch_object($collist)) { $fields[$row->destfield]['srcfield'] = $row->srcfield; $fields[$row->destfield]['default_value'] = $row->default_value; } $tblinfo->fields = $fields; $tblinfo->maptable = $maptable; // We pick up everything in the input view that is not already imported, and // not already errored out // Emulate views execute(), so we can scroll through the results ourselves $view = views_get_view($view_name); if (!$view) { if ($feedback) { $feedback(t('View !view does not exist - either (re)create this view, or remove the content set using it.', array('!view' => $view_name))); } return MIGRATE_RESULT_FAILED; } // Identify the content set being processed. Simplifies $view alterations. $view->migrate_content_set = $tblinfo; $view->is_cacheable = FALSE; if ($view_args) { $view->set_arguments(explode('/', $view_args)); } $view->build(); // Let modules modify the view just prior to executing it. foreach (module_implements('views_pre_execute') as $module) { $function = $module . '_views_pre_execute'; $function($view); } if (isset($view->base_database)) { $viewdb = $view->base_database; } else { $viewdb = 'default'; } // Add a left join to the map table, and only include rows not in the map $join = new views_join; // Views prepends _ to column names other than the base table's // primary key - we need to strip that here for the join to work. But, it's // common for tables to have the tablename beginning field names (e.g., // table cms with PK cms_id). Deal with that as well... $baselen = drupal_strlen($view->base_table); if (!strncasecmp($sourcekey, $view->base_table . '_', $baselen + 1)) { // So, which case is it? Ask the schema module... db_set_active($viewdb); $inspect = schema_invoke('inspect', db_prefix_tables('{' . $view->base_table . '}')); db_set_active('default'); $tableschema = $inspect[$view->base_table]; $sourcefield = $tableschema['fields'][$sourcekey]; if (!$sourcefield) { $joinkey = drupal_substr($sourcekey, $baselen + 1); $sourcefield = $tableschema['fields'][$joinkey]; if (!$sourcefield) { if ($feedback) { $feedback(t("In view !view, can't find key !key for table !table", array('!view' => $view_name, '!key' => $sourcekey, '!table' => $view->base_table))); } return MIGRATE_RESULT_FAILED; } } else { $joinkey = $sourcekey; } } else { $joinkey = $sourcekey; } $join->construct($maptable, $view->base_table, $joinkey, 'sourceid'); $view->query->add_relationship($maptable, $join, $view->base_table); // We want both unimported and unupdated content $where = "$maptable.sourceid IS NULL OR $maptable.needs_update = 1"; // And as long as we have the map table, get the destination ID, the // import hook will need it to identify the existing destination object $view->query->add_field($maptable, 'destid', 'destid'); $view->query->add_where(0, $where, $view->base_table); // Ditto for the errors table $join = new views_join; $join->construct($msgtablename, $view->base_table, $joinkey, 'sourceid'); $view->query->add_relationship($msgtablename, $join, $view->base_table); $view->query->add_where(0, "$msgtablename.sourceid IS NULL", $view->base_table); // If running over a selected list of IDs, pass those in to the query if ($idlist) { $where_args = $idlist_array = array_map('trim', explode(',', $idlist)); if (is_numeric($idlist_array[0])) { $placeholders = db_placeholders($idlist_array, 'int'); } else { $placeholders = db_placeholders($idlist_array, 'varchar'); } array_unshift($where_args, $view->base_table); $view->query->add_where($view->options['group'], $view->base_table . ".$joinkey IN ($placeholders)", $where_args); } // We can't seem to get $view->build() to rebuild build_info, so go straight into the query object $query = $view->query->query(); $query = db_rewrite_sql($query, $view->base_table, $view->base_field, array('view' => &$view)); if ($idlist) { // Merge idlist into args since build_info hasn't been rebuilt. $args = array_merge($view->build_info['query_args'], $idlist_array); } else { $args = $view->build_info['query_args']; } $replacements = module_invoke_all('views_query_substitutions', $view); $query = str_replace(array_keys($replacements), $replacements, $query); if (is_array($args)) { foreach ($args as $id => $arg) { $args[$id] = str_replace(array_keys($replacements), $replacements, $arg); } } // Now, make the current db name explicit if content set is pulling tables from another DB if ($viewdb <> 'default') { global $db_url; $url = parse_url(is_array($db_url) ? $db_url['default'] : $db_url); $currdb = drupal_substr($url['path'], 1); $query = str_replace('{' . $maptable . '}', $currdb . '.' . '{' . $maptable . '}', $query); $query = str_replace('{' . $msgtablename . '}', $currdb . '.' . '{' . $msgtablename . '}', $query); db_set_active($viewdb); } //drupal_set_message($query); timer_start('execute view query'); if ($itemlimit) { $importlist = db_query_range($query, $args, 0, $itemlimit); } else { $importlist = db_query($query, $args); } timer_stop('execute view query'); if ($viewdb != 'default') { db_set_active('default'); } $imported = 0; timer_start('db_fetch_object'); while ($row = db_fetch_object($importlist)) { timer_stop('db_fetch_object'); // Recheck status - permits dynamic interruption of cron jobs $sql = "SELECT status FROM {migrate_content_sets} WHERE mcsid=%d"; $status = db_result(db_query($sql, $mcsid)); if ($status != MIGRATE_STATUS_IMPORTING) { $return = MIGRATE_RESULT_STOPPED; break; } // Check for time out if there is time info present if (isset($timelimit) && time() >= $timelimit) { $return = MIGRATE_RESULT_INCOMPLETE; break; } // Check for closeness to memory limit $usage = memory_get_usage(); $pct_memory = $usage/$memory_limit; if ($pct_memory > MIGRATE_MEMORY_THRESHOLD) { if (isset($feedback)) { $feedback(t('Memory usage is !usage (!pct% of limit !limit), starting new batch', array('!pct' => round($pct_memory*100), '!usage' => $usage, '!limit' => $memory_limit))); } $return = MIGRATE_RESULT_INCOMPLETE; break; } if (isset($feedback)) { if (($frequency_unit == 'seconds' && time()-$lastfeedback >= $frequency) || ($frequency_unit == 'items' && $imported >= $frequency)) { $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, MIGRATE_RESULT_INCOMPLETE); $feedback($message); $lastfeedback = time(); $imported = 0; } } timer_start('import hooks'); $errors = migrate_invoke_all("import_$contenttype", $tblinfo, $row); timer_stop('import hooks'); // Ok, we're done. Preview the node or save it (if no errors). if (count($errors)) { $success = TRUE; foreach ($errors as $error) { if (!isset($error['level'])) { $error['level'] = MIGRATE_MESSAGE_ERROR; } if ($error['level'] != MIGRATE_MESSAGE_INFORMATIONAL) { $success = FALSE; } db_query("INSERT INTO {" . $msgtablename . "} (sourceid, level, message) VALUES('%s', %d, '%s')", $row->$sourcekey, $error['level'], $error['message']); } if ($success) { $imported++; } } else { $imported++; } timer_start('db_fetch_object'); } timer_stop('db_fetch_object'); $message = _migrate_progress_message($lastfeedback, $imported, $description, TRUE, $return); // Remember we're done $tblinfo->status = MIGRATE_STATUS_IDLE; if ($return == MIGRATE_RESULT_COMPLETED) { $tblinfo->lastimported = date('Y-m-d H:i:s'); } if (isset($feedback)) { $feedback($message); } watchdog('migrate', $message); drupal_write_record('migrate_content_sets', $tblinfo, 'mcsid'); return $return; } /* Revisit function migrate_content_process_all_action(&$dummy, $action_context, $a1, $a2) { migrate_content_process_all(time()); } */ /** * Process all enabled migration processes in a browser, using the Batch API * to break it into manageable chunks. * * @param $clearing * Array of content set ids (keyed by content set id) to clear * @param $importing * Array of content set ids (keyed by content set id) to import * @param $limit * Maximum number of items to process * @param $idlist * Comma-separated list of source IDs to process, instead of proceeding through * all unmigrated rows * @param $context * Batch API context structure */ function migrate_content_process_batch($clearing, $importing, $limit, $idlist, &$context) { // A zero max_execution_time means no limit - but let's set a reasonable // limit anyway $starttime = time(); $maxexectime = ini_get('max_execution_time'); if (!$maxexectime) { $maxexectime = 240; } // Initialize the Batch API context $context['finished'] = 0; // The Batch API progress bar will reflect the number of operations being // done (clearing/importing) if (!isset($context['sandbox']['numops'])) { $context['sandbox']['numops'] = count($clearing) + count($importing); $context['sandbox']['numopsdone'] = 0; $context['sandbox']['clearing'] = $clearing; $context['sandbox']['importing'] = $importing; $context['sandbox']['message'] = ''; $context['sandbox']['times'] = array(); } // For the timelimit, subtract more than enough time to clean up $options = array( 'itemlimit' => $limit, 'timelimit' => $starttime + (($maxexectime < 5) ? $maxexectime : ($maxexectime - 5)), 'idlist' => $idlist, 'feedback' => array('function' => '_migrate_process_message'), ); global $_migrate_messages; if (!isset($_migrate_messages)) { $_migrate_messages = array(); } // Work on the last clearing op (if any) if (count($context['sandbox']['clearing'])) { $row = db_fetch_object(db_query( "SELECT mcsid,description FROM {migrate_content_sets} WHERE mcsid IN (%s) ORDER BY weight DESC LIMIT 1", implode(',', $context['sandbox']['clearing']))); $status = migrate_content_process_clear($row->mcsid, $options); if ($status != MIGRATE_RESULT_INCOMPLETE) { unset($context['sandbox']['clearing'][$row->mcsid]); } } // If not, work on the first importing op elseif (count($context['sandbox']['importing'])) { $row = db_fetch_object(db_query( "SELECT mcsid,description FROM {migrate_content_sets} WHERE mcsid IN (%s) ORDER BY weight ASC LIMIT 1", implode(',', $context['sandbox']['importing']))); if (variable_get('migrate_update', 0)) { migrate_content_set_update($row->mcsid); variable_set('migrate_update', 0); } $status = migrate_content_process_import($row->mcsid, $options); if ($status != MIGRATE_RESULT_INCOMPLETE) { unset($context['sandbox']['importing'][$row->mcsid]); } } // If not, nothing to do else { $context['finished'] = 1; } // Make sure the entire process stops if requested if ($status == MIGRATE_RESULT_STOPPED) { $context['finished'] = 1; } if ($context['finished'] != 1) { if ($status != MIGRATE_RESULT_INCOMPLETE) { $context['sandbox']['numopsdone']++; } $context['finished'] = $context['sandbox']['numopsdone']/$context['sandbox']['numops']; } foreach ($_migrate_messages as $message) { if (!isset($context['sandbox']['message'])) { $context['sandbox']['message'] = $message . '
'; } else { $context['sandbox']['message'] .= $message . '
'; } $context['message'] = $context['sandbox']['message']; $context['results'][] .= $message; } $context['message'] = $context['sandbox']['message']; // If requested save timers for eventual display if (variable_get('migrate_display_timers', 0)) { global $timers; foreach ($timers as $name => $timerec) { if (isset($timerec['time'])) { if (isset($context['sandbox']['times'][$name])) { $context['sandbox']['times'][$name] += $timerec['time']/1000; } else { $context['sandbox']['times'][$name] = $timerec['time']/1000; } } } // When all done, display the timers if ($context['finished'] == 1 && isset($context['sandbox']['times'])) { global $timers; arsort($context['sandbox']['times']); foreach ($context['sandbox']['times'] as $name => $total) { drupal_set_message("$name: " . round($total, 2)); } } } } /** * Capture messages generated during an import or clear process */ function _migrate_process_message($message) { global $_migrate_messages; $_migrate_messages[] = $message; } /** * Prepare a content set for updating of existing items * @param $mcsid * ID of the content set to update */ function migrate_content_set_update($mcsid) { $maptable = migrate_map_table_name($mcsid); db_query('UPDATE {' . $maptable . '} SET needs_update=1'); $msgtable = migrate_message_table_name($mcsid); db_query('TRUNCATE TABLE {' . $msgtable . '}'); } /** * Generate a progress message * @param $starttime * microtime() when this piece of work began * @param $numitems * Number of items that were processed * @param $description * Name (description) of the content set being processed * @param $import * TRUE for an import process, FALSE for a clearing process * @param $status * Status of the work * @return * Formatted message */ function _migrate_progress_message($starttime, $numitems, $description, $import = TRUE, $result = MIGRATE_RESULT_COMPLETED) { $time = (microtime(TRUE) - $starttime); if ($time > 0) { $perminute = round(60*$numitems/$time); $time = round($time, 1); } else { $perminute = '?'; } if ($import) { switch ($result) { case MIGRATE_RESULT_COMPLETED: $basetext = "Imported !numitems in !time sec (!perminute/min) - done with '!description'";; break; case MIGRATE_RESULT_FAILED: $basetext = "Imported !numitems in !time sec (!perminute/min) - failure with '!description'";; break; case MIGRATE_RESULT_INCOMPLETE: $basetext = "Imported !numitems in !time sec (!perminute/min) - continuing with '!description'"; break; case MIGRATE_RESULT_STOPPED: $basetext = "Imported !numitems in !time sec (!perminute/min) - stopped '!description'"; break; } } else { switch ($result) { case MIGRATE_RESULT_COMPLETED: $basetext = "Deleted !numitems in !time sec (!perminute/min) - done with '!description'";; break; case MIGRATE_RESULT_FAILED: $basetext = "Deleted !numitems in !time sec (!perminute/min) - failure with '!description'";; break; case MIGRATE_RESULT_INCOMPLETE: $basetext = "Deleted !numitems in !time sec (!perminute/min) - continuing with '!description'"; break; case MIGRATE_RESULT_STOPPED: $basetext = "Deleted !numitems in !time sec (!perminute/min) - stopped '!description'"; break; } } $message = t($basetext, array('!numitems' => $numitems, '!time' => $time, '!perminute' => $perminute, '!description' => $description)); return $message; } /* * Implementation of hook_init(). */ function migrate_init() { // Loads the hooks for the supported modules. // TODO: Be more lazy - only load when really needed migrate_module_include(); // Add main CSS functionality. drupal_add_css(drupal_get_path('module', 'migrate') .'/migrate.css'); } /** * Implementation of hook_action_info(). */ /* Revisit function migrate_action_info() { $info['migrate_content_process_clear'] = array( 'type' => 'migrate', 'description' => t('Clear a migration content set'), 'configurable' => FALSE, 'hooks' => array( 'cron' => array('run'), ), ); $info['migrate_content_process_import'] = array( 'type' => 'migrate', 'description' => t('Import a migration content set'), 'configurable' => FALSE, 'hooks' => array( 'cron' => array('run'), ), ); $info['migrate_content_process_all_action'] = array( 'type' => 'migrate', 'description' => t('Perform all active migration processes'), 'configurable' => FALSE, 'hooks' => array( 'cron' => array('run'), ), ); return $info; } */ /** * Implementation of hook_perm(). */ function migrate_perm() { return array(MIGRATE_ACCESS_BASIC, MIGRATE_ACCESS_ADVANCED); } /** * Implementation of hook_help(). */ function migrate_help($page, $arg) { switch ($page) { case 'admin/content/migrate': return t('

Defined content sets are listed here. New content sets may be added below, and tasks may be executed directly in the browser. A process that is actively running will be highlighted.

'); } } /** * Implementation of hook_menu(). */ function migrate_menu() { $items = array(); $items['admin/content/migrate'] = array( 'title' => 'Migrate', 'description' => 'Monitor and control the creation of Drupal content from source data', 'page callback' => 'migrate_dashboard', 'access arguments' => array(MIGRATE_ACCESS_BASIC), 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/dashboard'] = array( 'title' => 'Dashboard', 'type' => MENU_DEFAULT_LOCAL_TASK, 'weight' => -10, ); $items['admin/content/migrate/settings'] = array( 'title' => 'Settings', 'description' => 'Migrate module settings', 'weight' => 5, 'page callback' => 'migrate_settings', 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), 'file' => 'migrate_pages.inc', 'type' => MENU_LOCAL_TASK, ); $items['admin/content/migrate/content_set/%'] = array( 'title' => 'Content set', 'page callback' => 'drupal_get_form', 'page arguments' => array('migrate_content_set_mappings', 4), 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), // 'type' => MENU_CALLBACK, 'file' => 'migrate_pages.inc', ); $items['admin/content/migrate/content_set/%/edit'] = array( 'title' => 'Edit', 'type' => MENU_DEFAULT_LOCAL_TASK, ); $items['admin/content/migrate/content_set/%/export'] = array( 'title' => 'Export', 'page callback' => 'drupal_get_form', 'page arguments' => array('migrate_export_content_set', 4), 'access arguments' => array(MIGRATE_ACCESS_ADVANCED), 'type' => MENU_LOCAL_TASK, 'file' => 'migrate_pages.inc', ); $items['migrate/xlat/%'] = array( 'page callback' => 'migrate_xlat', 'access arguments' => array('access content'), 'page arguments' => array(2), 'type' => MENU_CALLBACK, ); return $items; } /** * Implementation of hook_schema_alter(). */ function migrate_schema_alter(&$schema) { // Check for table existence - at install time, hook_schema_alter() may be called // before our install hook. if (db_table_exists('migrate_content_sets')) { $result = db_query("SELECT * FROM {migrate_content_sets}"); while ($content_set = db_fetch_object($result)) { $maptablename = migrate_map_table_name($content_set->mcsid); $msgtablename = migrate_message_table_name($content_set->mcsid); // Get the proper field definition for the sourcekey $view = views_get_view($content_set->view_name); if (!$view) { drupal_set_message(t('View !view does not exist - either (re)create this view, or remove the migrate content set using it.', array('!view' => $content_set->view_name))); continue; } // Must do this to load the database $views_version = (string)views_api_version(); if (views_api_version() >= '3') { $view->init_display('default'); } $view->init_query(); // TODO: For now, PK must be in base_table if (isset($view->base_database)) { $tabledb = $view->base_database; } else { $tabledb = 'default'; } $tablename = $view->base_table; $sourceschema = _migrate_inspect_schema($tablename, $tabledb); // If the PK of the content set is defined, make sure we have a mapping table $sourcekey = $content_set->sourcekey; if ($sourcekey) { $sourcefield = $sourceschema['fields'][$sourcekey]; if (!$sourcefield) { // strip base table name if views prepended it $baselen = drupal_strlen($tablename); if (!strncasecmp($sourcekey, $tablename . '_', $baselen + 1)) { $sourcekey = drupal_substr($sourcekey, $baselen + 1); } $sourcefield = $sourceschema['fields'][$sourcekey]; } // We don't want serial fields to behave serially, so change to int if ($sourcefield['type'] == 'serial') { $sourcefield['type'] = 'int'; } $schema[$maptablename] = _migrate_map_table_schema($sourcefield); $schema[$maptablename]['name'] = $maptablename; $schema[$msgtablename] = _migrate_message_table_schema($sourcefield); $schema[$msgtablename]['name'] = $msgtablename; } } } } /* * Translate URIs from an old site to the new one * Requires adding RewriteRules to .htaccess. For example, if the URLs * for news articles had the form * http://example.com/issues/news/[OldID].html, use this rule: * * RewriteRule ^issues/news/([0-9]+).html$ /migrate/xlat/node/$1 [L] * * @param $contenttype * Content type to translate (e.g., 'node', 'user', etc.) * @param $oldid * Primary key from input view */ function migrate_xlat($contenttype, $oldid) { if ($contenttype && $oldid) { $newid = _migrate_xlat_get_new_id($contenttype, $oldid); if ($newid) { $uri = migrate_invoke_all("xlat_$contenttype", $newid); drupal_goto($uri[0], NULL, NULL, 301); } } } /* * Helper function to translate an ID from a source file to the corresponding * Drupal-side ID (nid, uid, etc.) * Note that the result may be ambiguous - for example, if you are importing * nodes from different content sets, they might have overlapping source IDs. * * @param $contenttype * Content type to translate (e.g., 'node', 'user', etc.) * @param $oldid * Primary key from input view * @return * Drupal-side ID of the object */ function _migrate_xlat_get_new_id($contenttype, $oldid) { $result = db_query("SELECT mcsid FROM {migrate_content_sets} WHERE contenttype='%s'", $contenttype); while ($row = db_fetch_object($result)) { static $maptables = array(); if (!isset($maptables[$row->mcsid])) { $maptables[$row->mcsid] = migrate_map_table_name($row->mcsid); } $sql = "SELECT destid FROM {" . $maptables[$row->mcsid] . "} WHERE sourceid='%s'"; $id = db_result(db_query($sql, $oldid)); if ($id) { return $id; } } return NULL; } /** * Implementation of hook_theme(). */ function migrate_theme() { return array( 'migrate_mapping_table' => array('arguments' => array('form')), '_migrate_dashboard_form' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_dashboard', ), '_migrate_settings_form' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_settings', ), 'migrate_content_set_mappings' => array( 'arguments' => array('form' => NULL), 'function' => 'theme_migrate_content_set_mappings', ), ); } /** * Determine the name of the map table for this content set * @param $mcsid * Unique identifier of the content set * @return * The name of the map table */ function migrate_map_table_name($mcsid) { $machine_name = db_result(db_query("SELECT machine_name FROM {migrate_content_sets} WHERE mcsid=%d", $mcsid)); return drupal_strtolower("migrate_map_$machine_name"); } /** * Determine the name of the message table for this content set * @param $mcsid * Unique identifier of the content set * @return * The name of the message table */ function migrate_message_table_name($mcsid) { $machine_name = db_result(db_query("SELECT machine_name FROM {migrate_content_sets} WHERE mcsid=%d", $mcsid)); return drupal_strtolower("migrate_msgs_$machine_name"); } /** * Generate a map table schema record, given the source PK definition * @param $sourcefield * Schema definition for the content set source's PK * @return * Schema structure for a map table */ function _migrate_map_table_schema($sourcefield) { $schema = array( 'description' => t('Mappings from source key to destination key'), 'fields' => array( 'sourceid' => $sourcefield, // @TODO: Assumes destination key is unsigned int 'destid' => array( 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE, ), 'needs_update' => array( 'type' => 'int', 'size' => 'tiny', 'unsigned' => TRUE, 'not null' => TRUE, 'default' => (int)FALSE, ), ), 'primary key' => array('sourceid'), 'indexes' => array( 'idkey' => array('destid'), ), ); return $schema; } /** * Generate a message table schema record, given the source PK definition * @param $sourcefield * Schema definition for the content set source's PK * @return * Schema structure for a message table */ function _migrate_message_table_schema($sourcefield) { $schema = array( 'description' => t('Import errors'), 'fields' => array( 'mceid' => array( 'type' => 'serial', 'unsigned' => TRUE, 'not null' => TRUE, ), 'sourceid' => $sourcefield, 'level' => array( 'type' => 'int', 'unsigned' => TRUE, 'not null' => TRUE, 'default' => 1, ), 'message' => array( 'type' => 'text', 'size' => 'medium', 'not null' => TRUE, ), ), 'primary key' => array('mceid'), 'indexes' => array( 'sourceid' => array('sourceid'), ), ); return $schema; } /** * Implementation of hook_views_api(). */ function migrate_views_api() { return array('api' => '2.0'); } /** * Check to see if the advanced help module is installed, and if not put up * a message. * * Only call this function if the user is already in a position for this to * be useful. */ function migrate_check_advanced_help() { if (variable_get('migrate_hide_help_message', FALSE)) { return; } if (!module_exists('advanced_help')) { $filename = db_result(db_query("SELECT filename FROM {system} WHERE type = 'module' AND name = 'advanced_help'")); if ($filename && file_exists($filename)) { drupal_set_message(t('If you enable the advanced help module, Migrate will provide more and better help. Hide this message.', array('@modules' => url('admin/build/modules'), '@hide' => url('admin/build/views/tools')))); } else { drupal_set_message(t('If you install the advanced help module from !href, Migrate will provide more and better help. Hide this message.', array('!href' => l('http://drupal.org/project/advanced_help', 'http://drupal.org/project/advanced_help'), '@hide' => url('admin/content/migrate/settings')))); } } } /** * Check if a date is valid and return the correct * timestamp to use. Returns -1 if the date is not * considered valid. */ function _migrate_valid_date($date) { if (empty($date)) { return -1; } if (is_numeric($date) && $date > -1) { return $date; } // strtotime() doesn't recognize dashes as separators, change to slashes $date = str_replace('-', '/', $date); $time = strtotime($date); if ($time < 0 || !$time) { // Handles form YYYY-MM-DD HH:MM:SS.garbage if (drupal_strlen($date) > 19) { $time = strtotime(drupal_substr($date, 0, 19)); if ($time < 0 || !$time) { return -1; } } } return $time; } /** * Get the row count for a view, possibly filtered by arguments * @param $view * Name of the view to query * @param $args * Optional arguments to the view, separated by '/' * @return * Number of rows in the view. */ function _migrate_get_view_count($view, $args = NULL) { if (is_string($view)) { $view = views_get_view($view); } // Force execution of count query, with minimal unnecessary results returned // @TODO: Find way to execute ONLY count query $view->pager['items_per_page'] = 1; $view->get_total_rows = TRUE; if ($args) { $view->set_arguments(explode('/', $args)); } $view->execute(); $rows = $view->total_rows; // TODO: Now, that's the total rows in the current source. However, it may be // (particularly with a content set that updates existing objects) that // previously-migrated content is no longer in the source, so the stats // (particularly Unimported) may be misleading. So, we would want to add in // anything in the map table not in the source. This is tricky, and may not // really be worth the performance impact, so we'll leave it alone for now. return $rows; } /** * Get the PHP memory_limit value in bytes */ function _migrate_memory_limit() { $value = trim(ini_get('memory_limit')); $last = strtolower($value[strlen($value)-1]); switch ($last) { case 'g': $value *= 1024; case 'm': $value *= 1024; case 'k': $value *= 1024; } return $value; } /* * Implementation of hook_migrate_api(). */ function migrate_migrate_api() { $api = array( 'api' => 1, 'path' => 'modules', 'integration modules' => array( 'comment' => array( 'description' => t('Core migration support for the comment module'), ), 'node' => array( 'description' => t('Core migration support for the node module'), ), 'profile' => array( 'description' => t('Core migration support for the profile module'), ), 'taxonomy' => array( 'description' => t('Core migration support for the taxonomy module'), ), 'user' => array( 'description' => t('Core migration support for the user module'), ), ), ); return $api; } // ------------------------------------------------------------------ // Include file helpers - @merlinofchoas: borrowing heavily from views.module /** * Load views files on behalf of modules. */ function migrate_module_include() { foreach (migrate_get_module_apis() as $module => $info) { foreach ($info['integration modules'] as $intmod => $intmod_details) { $file = "$info[path]/$intmod.migrate.inc"; if (file_exists($file) && $intmod_details['status'] == TRUE) { require_once $file; } } } } /** * Get a list of modules that support the current migrate API. */ function migrate_get_module_apis($reset = FALSE) { static $cache = NULL; if ($reset) { $cache = NULL; } if (!isset($cache)) { $cache = array(); foreach (module_implements('migrate_api') as $module) { $function = $module . '_migrate_api'; $info = $function(); if (isset($info['api']) && $info['api'] == 1.000) { if (isset($info['path'])) { $info['path'] = drupal_get_path('module', $module) . '/' . $info['path']; } else { $info['path'] = drupal_get_path('module', $module); } if (!isset($info['integration modules'])) { $info['integration modules'] = array($module => array()); } $settings = variable_get('migrate_integration_settings', NULL); foreach ($info['integration modules'] as $intmod_name => $intmod_details) { // If the module was just entered as a string without details, we have to fix. if (!is_array($intmod_details)) { unset($info['integration modules'][$intmod_name]); $intmod_name = $intmod_details; $intmod_details = array(); } $default_details = array( 'description' => t('Support for the @intmod module.', array('@intmod' => $intmod_name)), 'status' => TRUE, ); // Allow override of defaults. $info['integration modules'][$intmod_name] = $intmod_details + $default_details; // Overwrite default status if set. if (isset($settings[$module][$intmod_name])){ $info['integration modules'][$intmod_name]['status'] = $settings[$module][$intmod_name]; } } $cache[$module] = $info; } else { drupal_set_message(t('%function supports Migrate API version %modversion, Migrate module API version is %version - migration support not loaded.', array('%function' => $function, '%modversion' => $info['api'], '%version' => MIGRATE_API_VERSION))); } } } return $cache; } /** * Wrapper around schema_invoke('inspect'), to handle views tablenames * hacked to include "dbname.". */ function _migrate_inspect_schema($tablename = '', $tabledb = 'default') { static $inspect = array(); // TW can treat external MySQL tables as internal, but we can revert // if we find a period in the tablename. Todo: might be better to build // this check direcly into schema_mysql.inc if (module_exists('tw') && strpos($tablename, '.')) { list($tabledb, $tablename) = explode('.', $tablename); $dbinfo = tw_get_dbinfo(); foreach($dbinfo as $dbkey => $db_detail) { if($db_detail['name'] == $tabledb) { $tabledb = $dbkey; break; } } } if (!isset($inspect[$tabledb])) { db_set_active($tabledb); // TODO: schema_mysql_inspect($name = NULL) takes $name [tablename] which // limits the query to only get that table, which is probably a good idea. $inspect[$tabledb] = schema_invoke('inspect'); db_set_active('default'); } // If a tablename was not given, return the whole schema if ($tablename == '') { return $inspect[$tabledb]; } // Return the schema for just this table. return $inspect[$tabledb][$tablename]; }