aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Legler <alex@a3li.li>2015-02-23 15:21:27 +0100
committerAlex Legler <alex@a3li.li>2015-02-23 15:21:27 +0100
commit2a13f18aa0a7ac3fe7d19eeea45842de818a615c (patch)
treec67b09ac642018550f2ca203851c87dc3ccadc33
parentImplement --delete (diff)
downloadbackend-2a13f18aa0a7ac3fe7d19eeea45842de818a615c.tar.gz
backend-2a13f18aa0a7ac3fe7d19eeea45842de818a615c.tar.bz2
backend-2a13f18aa0a7ac3fe7d19eeea45842de818a615c.zip
use more threads!
-rw-r--r--Gemfile4
-rw-r--r--Gemfile.lock4
-rwxr-xr-xag12
-rw-r--r--lib/storage.rb10
-rw-r--r--lib/threading.rb4
5 files changed, 24 insertions, 10 deletions
diff --git a/Gemfile b/Gemfile
index 6689dd1..c676b7c 100644
--- a/Gemfile
+++ b/Gemfile
@@ -4,4 +4,6 @@ gem 'mail'
gem 'maildir'
gem 'elasticsearch'
gem 'sanitize'
-gem 'charlock_holmes' \ No newline at end of file
+gem 'charlock_holmes'
+gem 'parallel'
+gem 'ruby-progressbar' \ No newline at end of file
diff --git a/Gemfile.lock b/Gemfile.lock
index ca40918..d55552e 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -24,6 +24,8 @@ GEM
mini_portile (~> 0.6.0)
nokogumbo (1.2.0)
nokogiri
+ parallel (1.4.0)
+ ruby-progressbar (1.7.1)
sanitize (3.1.1)
crass (~> 1.0.1)
nokogiri (>= 1.4.4)
@@ -37,4 +39,6 @@ DEPENDENCIES
elasticsearch
mail
maildir
+ parallel
+ ruby-progressbar
sanitize
diff --git a/ag b/ag
index dbb1584..ca81b22 100755
--- a/ag
+++ b/ag
@@ -7,6 +7,8 @@ require 'mail'
require 'maildir'
require 'elasticsearch'
require 'optparse'
+require 'parallel'
+require 'ruby-progressbar'
require_relative 'lib/utils'
require_relative 'lib/threading'
require_relative 'lib/rendering'
@@ -111,7 +113,9 @@ $es.transport.reload_connections!
def do_full
Ag::Storage.create_index($options.name)
- $maildir.list(:cur).each do |maildir_message|
+ messages = $maildir.list(:cur)
+
+ Parallel.each(messages, progress: "Importing #{$options.name}") do |maildir_message|
mail = maildir_message.data
begin
@@ -126,14 +130,16 @@ def do_full
end
def do_incremental
- $maildir.list(:new).each do |maildir_message|
+ messages = $maildir.list(:cur)
+
+ Parallel.each(messages, progress: "Importing #{$options.name}") do |maildir_message|
mail = maildir_message.data
begin
Ag::Storage.store($options.name, mail, maildir_message.filename)
maildir_message.process unless $options.readonly
rescue => e
- $stderr.puts "Cannot save message #{mail.message_id} (file #{maildir_message.filename}): #{e.message}"
+ $stderr.puts "Cannot save message #{mail.message_id} (file #{maildir_message.filename}): #{e.message}" if $options.debug
next
end
end
diff --git a/lib/storage.rb b/lib/storage.rb
index f255633..d32ba2b 100644
--- a/lib/storage.rb
+++ b/lib/storage.rb
@@ -7,7 +7,7 @@ module Ag::Storage
begin
$es.indices.delete index: 'ml-' + list
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
- $stderr.puts "Index did not exist yet. Creating."
+ $stderr.puts "Index did not exist yet. Creating." if $options.debug
end
$es.indices.create(
@@ -84,10 +84,10 @@ module Ag::Storage
content = Ag::Utils.fix_encoding(raw_content || '', true).strip
if content == ''
- $stderr.puts "#{message.message_id}: Content empty?"
+ $stderr.puts "#{message.message_id}: Content empty?" if $options.debug
end
rescue => e
- $stderr.puts "Cannot render message #{message.message_id} (file: #{filename}): #{e}"
+ $stderr.puts "Cannot render message #{message.message_id} (file: #{filename}): #{e}" if $options.debug
end
content
@@ -174,7 +174,7 @@ module Ag::Storage
)
end
- def fix_threading(list)
+ def fix_threading(list, pass)
result = $es.search(
index: 'ml-' + list,
size: 100000,
@@ -201,7 +201,7 @@ module Ag::Storage
}
)
- result['hits']['hits'].each do |hit|
+ Parallel.each(result['hits']['hits'], progress: "Calculating Threading (Pass #{pass})") do |hit|
msg = resolve_message_id(list, hit['_source']['raw_parent'])
unless msg == nil
diff --git a/lib/threading.rb b/lib/threading.rb
index 8988f23..212bb98 100644
--- a/lib/threading.rb
+++ b/lib/threading.rb
@@ -57,11 +57,13 @@ module Ag
def calc(list)
number_of_root_threads = -1
+ pass = 1
loop do
- new_num = Ag::Storage.fix_threading(list)
+ new_num = Ag::Storage.fix_threading(list, pass)
break if new_num == number_of_root_threads
number_of_root_threads = new_num
+ pass += 1
end
end
end