'select', '#title' => t('Select the amount of CPUs you have available'), '#options' => $num_cpu_options, '#default_value' => variable_get('apachesolr_parallel_num_cpu', 2), ); $form['advanced']['apachesolr_cron_limit']['#options']['500'] = 500; $form['advanced']['apachesolr_cron_limit']['#options']['1000'] = 1000; $form['advanced']['apachesolr_cron_limit']['#options']['1500'] = 1500; $form['advanced']['apachesolr_cron_limit']['#options']['2000'] = 2000; } /** * Alter the batch process * * Implements hook_batch_alter(). * @param array $batch */ function apachesolr_parallel_batch_alter(&$batch) { if ($batch['sets']['0']['operations']['0']['0'] == 'apachesolr_index_batch_index_entities') { $batch['sets']['0']['operations']['0']['0'] = 'apachesolr_parallel_index_batch_index_entities'; } } /** * Batch Operation Callback */ function apachesolr_parallel_index_batch_index_entities($env_id, &$context) { module_load_include('inc', 'apachesolr', 'apachesolr.index'); if (empty($context['sandbox'])) { try { // Get the $solr object $solr = apachesolr_get_solr($env_id); // If there is no server available, don't continue. if (!$solr->ping()) { throw new Exception(t('No Solr instance available during indexing.')); } } catch (Exception $e) { watchdog('Apache Solr', $e->getMessage(), NULL, WATCHDOG_ERROR); return FALSE; } $status = apachesolr_index_status($env_id); $context['sandbox']['progress'] = 0; $context['sandbox']['submitted'] = 0; $context['sandbox']['max'] = $status['remaining']; } // We can safely process the apachesolr_cron_limit nodes at a time without a // timeout or out of memory error. $limit = variable_get('apachesolr_cron_limit', 50); if ($context['sandbox']['max'] >= $context['sandbox']['progress'] + $limit) { $context['sandbox']['progress'] += $limit; } else { $context['sandbox']['progress'] = $context['sandbox']['max']; } $context['sandbox']['submitted'] += apachesolr_parallel_index_entities($env_id, $limit); $arguments = array( '@current' => $context['sandbox']['progress'], '@total' => $context['sandbox']['max'], '@submitted' => $context['sandbox']['submitted'], ); $context['message'] = t('Inspected @current of @total entities. Submitted @submitted documents to Solr in parallel', $arguments); // Inform the batch engine that we are not finished, and provide an // estimation of the completion level we reached. $context['finished'] = empty($context['sandbox']['max']) ? 1 : $context['sandbox']['progress'] / $context['sandbox']['max']; // Put the total into the results section when we're finished so we can // show it to the admin. if ($context['finished']) { $context['results']['count'] = $context['sandbox']['progress']; $context['results']['submitted'] = $context['sandbox']['submitted']; } } /** * Wrapper for indexing the documents so our httprl callback is simple to * understand. * * @param type $rows * @param type $entity_type * @param type $env_id * @return type */ function apachesolr_parallel_index_entities_document_wrapper($rows = array(), $entity_type = 'node', $env_id = 'solr') { module_load_include('inc', 'apachesolr', 'apachesolr.index'); $documents = array(); // We explicitely do not work with excludes or other hook invocations // as this could dramatically decrease the speed foreach ($rows as $row) { $documents_row = apachesolr_index_entities_document($row, $entity_type, $env_id); $documents = array_merge($documents_row, $documents); } $indexed = apachesolr_index_send_to_solr($env_id, $documents); if ($indexed !== FALSE) { $documents_submitted = count($documents); } return $documents_submitted; } function apachesolr_parallel_index_entities($env_id, $limit) { // dont forget : // variable_set('httprl_server_addr', -1) module_load_include('inc', 'apachesolr', 'apachesolr.index'); // Disable all caching apachesolr_parallel_cache_disable(); $documents_submitted = 0; foreach (entity_get_info() as $entity_type => $info) { $documents = array(); $return_values = array(); // Get old position $index_position = apachesolr_get_last_index_position($env_id, $entity_type); $max_changed = $index_position['last_changed']; $max_entity_id = $index_position['last_entity_id']; // We divide it in customized threads of a variable set of items per thread $amount_of_cpu = variable_get('apachesolr_parallel_num_cpu', 2); $amount_per_thread = round($limit/$amount_of_cpu); $amount_of_threads = ceil($limit/$amount_per_thread); // With each pass through the callback, retrieve the next group of nids. //$rows = apachesolr_parallel_index_get_entities_to_index($env_id, $entity_type, $limit, 0); //$rows_all_threads = array_chunk($rows, $amount_per_thread); for ($i = 0; $i < $amount_of_threads; $i++) { $rows_in_thread = apachesolr_parallel_index_get_entities_to_index($env_id, $entity_type, $amount_per_thread, $amount_per_thread * $i); $return_values[$i] = ''; # the return value will be stored in here, so make sure there's something $callback_options = array( array( # this function will be called in parallel 'function' => 'apachesolr_parallel_index_entities_document_wrapper', # its return value will be placed here 'return' => &$return_values[$i], ), # pass this as an argument to the callback $rows_in_thread, $entity_type, $env_id, ); # queue the callback httprl_queue_background_callback($callback_options); // Set our new position foreach ($rows_in_thread as $row) { if (!empty($row->status)) { if ($row->changed > $max_changed) { $max_changed = $row->changed; } if ($row->entity_id > $max_entity_id) { $max_entity_id = $row->entity_id; } } } } // Set long time-outs since we can wait patiently // Make sure our calculation of timeouts is defined by the amount of cpus $timeout = 360; $global_timeout = 360 * $amount_of_threads; $options = array( 'global_timeout' => $global_timeout, 'timeout' => $timeout, ); httprl_set_default_options($options); $response = httprl_send_request($fp = NULL, $url = '', $request = '', $options); if ($response === FALSE) { return $documents_submitted; } foreach ($return_values as $documents_submitted_per_thread) { if (!empty($documents_submitted_per_thread)) { $documents_submitted += $documents_submitted_per_thread; } } apachesolr_set_last_index_position($env_id, $entity_type, $max_changed, $max_entity_id); apachesolr_set_last_index_updated($env_id, REQUEST_TIME); } // Enable all caching again apachesolr_parallel_cache_enable(); return $documents_submitted; } function apachesolr_parallel_form_apachesolr_index_action_form_alter(&$form_state, $env_id) { $form_state['action']['cron']['#submit'] = array('apachesolr_parallel_index_action_form_cron_submit'); } /** * Submit handler for the deletion form. */ function apachesolr_parallel_index_action_form_cron_submit($form, &$form_state) { if (!empty($form_state['build_info']['args'][0])) { $env_id = $form_state['build_info']['args'][0]; } else { $env_id = apachesolr_default_environment(); } // $form_state['storage'] must be unset for redirection to occur. Otherwise // $form_state['rebuild'] is automatically set and this form will be // rebuilt. unset($form_state['storage']); $form_state['redirect'] = 'admin/config/search/apachesolr'; apachesolr_parallel_cron($env_id); drupal_set_message(t('Apachesolr cron succesfully executed')); } /** * We run the indexing process only for the default environment * Implements hook_cron(). * @todo See if we can add info to the content type array for the cron_check */ function apachesolr_parallel_cron($env_id = NULL) { if (empty($env_id)) { $env_id = apachesolr_default_environment(); } // Indexes in read-only mode do not change the index, so will not update, delete, or optimize during cron. if (apachesolr_environment_variable_get($env_id, 'apachesolr_read_only', APACHESOLR_READ_WRITE) == APACHESOLR_READ_ONLY) { return; } module_load_include('inc', 'apachesolr', 'apachesolr.index'); // For every entity type that requires extra validation // Drupal 6 node only $bundles = apachesolr_get_index_bundles($env_id, 'node'); // If we're not checking any bundles of this entity type, just skip them all. if (empty($bundles)) { return; } // We can safely process the apachesolr_cron_limit nodes at a time without a // timeout or out of memory error. $limit = variable_get('apachesolr_cron_limit', 50); apachesolr_parallel_index_entities($env_id, $limit); } /** * Returns an array of rows from a query based on an indexing environment. * @todo Remove the read only because it is not environment specific */ function apachesolr_parallel_index_get_entities_to_index($env_id, $entity_type, $limit, $from = 0) { $rows = array(); if (variable_get('apachesolr_read_only', 0)) { return $rows; } $bundles = apachesolr_get_index_bundles($env_id, $entity_type); if (empty($bundles)) { return $rows; } // Drupal 6 specifically only supports nodes $type = 'node'; if (($type != $entity_type)) { return $rows; } $bundles = apachesolr_get_index_bundles($env_id, $type); $table = apachesolr_get_indexer_table($entity_type); // Get $last_entity_id and $last_changed. extract(apachesolr_get_last_index_position($env_id, $entity_type)); // We can't use PDO in httprl apparently... // Find the next batch of entities to index for this entity type. Note that // for ordering we're grabbing the oldest first and then ordering by ID so // that we get a definitive order. $query = "SELECT * FROM {{$table}} aie WHERE (aie.bundle IN (:bundles)) AND ((aie.changed > :last_changed) OR ((aie.changed <= :last_changed) AND (aie.entity_id > :last_entity_id)))"; $arguments = array( ':bundles' => $bundles, ':last_changed' => $last_changed, ':last_entity_id' => $last_entity_id, ); if ($table == 'apachesolr_index_entities') { // Other, entity-specific tables don't need this condition. $query .= " AND aie.entity_type = :entity_type"; $arguments[':entity_type'] = $entity_type; } $query .= " ORDER BY aie.entity_id ASC"; $query = db_query_range($query, $from, $limit, $arguments); $records = $query->fetchAllAssoc('entity_id'); // We omit the status callback since it is very slow return $records; } function apachesolr_parallel_cache_disable() { if (!class_exists('DrupalFakeCache')) { $cache_backends = variable_get('cache_backends', array()); $cache_backends[] = 'includes/cache-install.inc'; variable_set('cache_backends', $cache_backends); } // Default to throwing away cache data variable_set('cache_default_class','DrupalFakeCache'); // Rely on the DB cache for form caching - otherwise forms fail. variable_set('cache_class_cache_form', 'DrupalDatabaseCache'); } function apachesolr_parallel_cache_enable() { // Default to throwing away cache data variable_del('cache_default_class'); // Rely on the DB cache for form caching - otherwise forms fail. variable_del('cache_class_cache_form'); }