class Fluent::SecureForwardInput::Session

require 'resolv'

Attributes

auth_salt[RW]
node[RW]
receiver[RW]
socket[RW]
state[RW]
thread[RW]
unpacker[RW]

Public Class Methods

new(receiver, socket) click to toggle source
# File lib/fluent/plugin/input_session.rb, line 11
def initialize(receiver, socket)
  @receiver = receiver

  @state = :helo

  @socket = socket
  @socket.sync = true

  @ipaddress = nil
  @node = nil
  @unpacker = MessagePack::Unpacker.new
  @thread = Thread.new(&method(:start))
end

Public Instance Methods

check_node(ipaddress) click to toggle source
# File lib/fluent/plugin/input_session.rb, line 41
def check_node(ipaddress)
  node = nil
  @receiver.nodes.each do |n|
    if n[:address].include?(ipaddress)
      node = n
      break
    end
  end
  node
end
check_ping(message) click to toggle source
# File lib/fluent/plugin/input_session.rb, line 68
def check_ping(message)
  log.debug "checking ping"
  # ['PING', self_hostname, shared_key\_salt, sha512\_hex(shared_key\_salt + self_hostname + nonce + shared_key),
  #  username || '', sha512\_hex(auth\_salt + username + password) || '']
  unless message.size == 6 && message[0] == 'PING'
    return false, 'invalid ping message'
  end
  _ping, hostname, shared_key_salt, shared_key_hexdigest, username, password_digest = message

  shared_key = if @node && @node[:shared_key]
                 @node[:shared_key]
               else
                 @receiver.shared_key
               end
  serverside = Digest::SHA512.new.update(shared_key_salt).update(hostname).update(@shared_key_nonce).update(shared_key).hexdigest
  if shared_key_hexdigest != serverside
    log.warn "Shared key mismatch from '#{hostname}'"
    return false, 'shared_key mismatch'
  end

  if @receiver.authentication
    users = @receiver.select_authenticate_users(@node, username)
    success = false
    users.each do |user|
      passhash = Digest::SHA512.new.update(@auth_key_salt).update(username).update(user[:password]).hexdigest
      success ||= (passhash == password_digest)
    end
    unless success
      log.warn "Authentication failed from client '#{hostname}', username '#{username}'"
      return false, 'username/password mismatch'
    end
  end

  return true, shared_key_salt
end
closed?() click to toggle source
# File lib/fluent/plugin/input_session.rb, line 29
def closed?
  @state == :closed
end
established?() click to toggle source
# File lib/fluent/plugin/input_session.rb, line 33
def established?
  @state == :established
end
generate_helo() click to toggle source
not implemented yet

def check_hostname_reverse_lookup(ipaddress)

rev_name = Resolv.getname(ipaddress)
proto, port, host, ipaddr, family_num, socktype_num, proto_num = Socket.getaddrinfo(rev_name, DUMMY_PORT)
unless ipaddr == ipaddress
  return false
end
true

end

# File lib/fluent/plugin/input_session.rb, line 62
def generate_helo
  log.debug "generating helo"
  # ['HELO', options(hash)]
  [ 'HELO', {'nonce' => @shared_key_nonce, 'auth' => (@receiver.authentication ? @auth_key_salt : ''), 'keepalive' => @receiver.allow_keepalive } ]
end
generate_pong(auth_result, reason_or_salt) click to toggle source
# File lib/fluent/plugin/input_session.rb, line 104
def generate_pong(auth_result, reason_or_salt)
  log.debug "generating pong"
  # ['PONG', bool(authentication result), 'reason if authentication failed',
  #  self_hostname, sha512\_hex(salt + self_hostname + nonce + sharedkey)]
  if not auth_result
    return ['PONG', false, reason_or_salt, '', '']
  end

  shared_key = if @node && @node[:shared_key]
                 @node[:shared_key]
               else
                 @receiver.shared_key
               end
  shared_key_hex = Digest::SHA512.new.update(reason_or_salt).update(@receiver.self_hostname).update(@shared_key_nonce).update(shared_key).hexdigest
  [ 'PONG', true, '', @receiver.self_hostname, shared_key_hex ]
end
generate_salt() click to toggle source
# File lib/fluent/plugin/input_session.rb, line 37
def generate_salt
  OpenSSL::Random.random_bytes(16)
end
log() click to toggle source
# File lib/fluent/plugin/input_session.rb, line 25
def log
  @receiver.log
end
on_read(data) click to toggle source
# File lib/fluent/plugin/input_session.rb, line 121
def on_read(data)
  log.debug "on_read"
  if self.established?
    @receiver.on_message(data)
  end

  case @state
  when :pingpong
    success, reason_or_salt = self.check_ping(data)
    if not success
      send_data generate_pong(false, reason_or_salt)
      self.shutdown
      return
    end
    send_data generate_pong(true, reason_or_salt)

    log.debug "connection established"
    @state = :established
  end
end
send_data(data) click to toggle source
# File lib/fluent/plugin/input_session.rb, line 142
def send_data(data)
  # not nonblock because write data (response) needs sequence
  @socket.write data.to_msgpack
end
shutdown() click to toggle source
# File lib/fluent/plugin/input_session.rb, line 205
def shutdown
  @state = :closed
  log.debug "Shutdown called"
  @socket.close
  if @thread == Thread.current
    @thread.kill
  else
    if @thread
      @thread.kill
      @thread.join
    end
  end
rescue => e
  log.debug "#{e.class}:#{e.message}"
end
start() click to toggle source
# File lib/fluent/plugin/input_session.rb, line 147
def start
  log.debug "starting server"

  log.trace "accepting ssl session"
  begin
    @socket.accept
  rescue OpenSSL::SSL::SSLError => e
    log.debug "failed to establish ssl session", error_class: e.class, error: e
    self.shutdown
    return
  end

  _proto, port, host, ipaddr = @socket.io.peeraddr
  @node = check_node(ipaddr)
  if @node.nil? && (! @receiver.allow_anonymous_source)
    log.warn "Connection required from unknown host '#{host}' (#{ipaddr}), disconnecting..."
    self.shutdown
    return
  end

  @shared_key_nonce = generate_salt
  @auth_key_salt = generate_salt

  buf = ''
  read_length = @receiver.read_length
  read_interval = @receiver.read_interval
  socket_interval = @receiver.socket_interval

  send_data generate_helo()
  @state = :pingpong

  loop do
    begin
      while @socket.read_nonblock(read_length, buf)
        if buf == ''
          sleep read_interval
          next
        end
        @unpacker.feed_each(buf, &method(:on_read))
        buf = ''
      end
    rescue OpenSSL::SSL::SSLError => e
      # to wait i/o restart
      sleep socket_interval
    rescue EOFError => e
      log.debug "Connection closed from '#{host}'(#{ipaddr})"
      break
    end
  end
rescue Errno::ECONNRESET => e
  # disconnected from client
rescue => e
  log.warn "unexpected error in in_secure_forward from #{host}:#{port}", error_class: e.class, error: e
ensure
  log.debug "Shutting down #{host}:#{port}"
  self.shutdown
end