require 'elasticsearch' require 'date' require 'pp' module Ag::Storage module_function # Throws Elasticsearch::Transport::Transport::Errors::NotFound # if the list does not exist def delete_index(list) $es.indices.delete index: 'ml-' + list end def create_index(list) indexname = 'ml-' + list $es.indices.create( index: indexname, body: { mappings: { message: { properties: { attachments: { properties: { filename: { type: 'string', index: 'not_analyzed' }, mime: { type: 'string', index: 'not_analyzed' } } }, received: { properties: { hop: { type: 'string', index: 'not_analyzed' }, date: { type: 'date', format: 'dateOptionalTime' } } }, cc: { type: 'string' }, content: { type: 'string' }, date: { type: 'date', format: 'dateOptionalTime' }, from: { type: 'string' }, from_realname: { type: 'string' }, month: { type: 'integer' }, parent: { type: 'string', index: 'not_analyzed' }, raw_message_id: { type: 'string', index: 'not_analyzed' }, raw_filename: { type: 'string', index: 'not_analyzed' }, raw_parent: { type: 'string' }, subject: { type: 'string' }, to: { type: 'string' } } } } }) # Give elasticsearch some time to process the new index while $es.cluster.health['status'] != 'green' do status = $es.indices.status(index: indexname) pp status pp status['indices'][indexname]['shards'] status = status['indices'][indexname]['shards'].map do |k,v| v[0]['routing']['state'] end pp status sleep 0.01 end pp $es.indices.status(index: indexname) end def get_content(message, filename) content = 'Cannot parse MIME/contents.' begin raw_content = Ag::Rendering::HTMLizer.HTMLize(message) content = Ag::Utils.fix_encoding(raw_content || '').strip if content == '' $stderr.puts "#{message.message_id}: Content empty?" if $options.debug end rescue => e $stderr.puts "Cannot render message #{message.message_id} (file: #{filename}): #{e}" if $options.debug end content end def resolve_by_field(list, field, value) return nil if value == nil result = $es.search( index: 'ml-' + list, body: { query: { filtered: { filter: { term: { field => value } } } }, fields: ['_id'] } ) return nil if result['hits']['total'] == 0 result['hits']['hits'].first['_id'] end def resolve_message_id(list, message_id = nil) resolve_by_field(list, :raw_message_id, message_id) end def resolve_filename(list, filename) resolve_by_field(list, :raw_filename, filename) end def resolve_hash(list, hash = nil) resolve_by_field(list, :_id, hash) end def store(list, message, filename) content = get_content(message, filename) identifier = nil begin identifier = message['X-Archives-Hash'].value rescue NoMethodError raise 'No archives hash' end raw_parent = Ag::Threading.get_parent_message_id(message) from = Ag::Utils.resolve_address_header(message, :from).first from_realname = Ag::Utils.get_sender_displayname(message) to = Ag::Utils.resolve_address_header(message, :to) cc = Ag::Utils.resolve_address_header(message, :cc) subject = Ag::Utils.fix_encoding(message.subject) date = [message.received].flatten.first.field.date_time received = [] [message.received].flatten.each do |hop| begin received << { hop: hop.field.info, date: hop.field.date_time } rescue => e next end end attachments = [] if message.has_attachments? message.attachments.each do |attachment| attachments << { filename: attachment.filename, mime: attachment.mime_type } end end $es.index( index: 'ml-' + list, type: 'message', id: identifier, body: { raw_message_id: message.message_id, subject: subject, to: to, cc: cc, from: from, from_realname: from_realname, date: date, month: ("%i%02i" % [date.year, date.month]).to_i, # this is a sortable number! content: content, attachments: attachments, received: received, raw_parent: raw_parent, raw_filename: filename } ) end def fix_threading(list, pass) result = $es.search( index: 'ml-' + list, size: 5000, body: { size: 5000, query: { filtered: { filter: { and: [ { missing: { field: 'parent' } }, { exists: { field: 'raw_parent' } } ] } } } } ) opts = { :in_processes => Ag::Utils.proc_count, } opts[:progress] = "Calculating Threading (Pass #{pass})" if $options.progress Parallel.each(result['hits']['hits'], opts) do |hit| msg = resolve_message_id(list, hit['_source']['raw_parent']) unless msg == nil $es.update( index: 'ml-' + list, type: 'message', id: hit['_id'], body: { doc: { parent: msg } } ) end end result['hits']['total'] end def delete(list, id) $es.delete(index: 'ml-' + list, type: 'message', id: id) end def get(list, id) result = $es.search( index: 'ml-' + list, size: 1, body: { query: { filtered: { filter: { term: { _id: id } } } } } ) return nil if result['hits']['total'] == 0 result['hits']['hits'].first end end # vim: ts=2 sts=2 et ft=ruby: