diff --git a/nmap3/exceptions.py b/nmap3/exceptions.py index 9cc5f7b..c37be2b 100644 --- a/nmap3/exceptions.py +++ b/nmap3/exceptions.py @@ -17,11 +17,6 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, # MA 02110-1301, USA. # -# -import shlex -import subprocess -import sys -import re __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.6.0' @@ -30,9 +25,11 @@ class NmapNotInstalledError(Exception): """Exception raised when nmap is not installed""" - def __init__(self, message="Nmap is either not installed or we couldn't locate nmap path Please ensure nmap is installed"): - self.message = message - super().__init__(message) + def __init__(self, path=""): + self.message = f"Nmap is either not installed or we couldn't locate \ +nmap path. Please ensure nmap is installed and provide right path string. \n\ +Provided: *{path if path else 'Not provided'}*" + super().__init__(self.message) class NmapXMLParserError(Exception): """Exception raised when we can't parse the output""" diff --git a/nmap3/nmap3.py b/nmap3/nmap3.py index 303a7cf..db06765 100644 --- a/nmap3/nmap3.py +++ b/nmap3/nmap3.py @@ -19,19 +19,16 @@ # # -import os import shlex import subprocess import sys -import simplejson as json import argparse import asyncio from xml.etree import ElementTree as ET from xml.etree.ElementTree import ParseError from nmap3.nmapparser import NmapCommandParser from nmap3.utils import get_nmap_path, user_is_root -from nmap3.exceptions import NmapNotInstalledError, NmapXMLParserError, NmapExecutionError -import xml +from nmap3.exceptions import NmapXMLParserError, NmapExecutionError import re __author__ = 'Wangolo Joel (inquiry@nmapper.com)' @@ -46,14 +43,14 @@ class Nmap(object): by calling nmap3.Nmap() """ - def __init__(self, path=None): + def __init__(self, path:str=''): """ Module initialization :param path: Path where nmap is installed on a user system. On linux system it's typically on /usr/bin/nmap. """ - self.nmaptool = path if path else get_nmap_path() + self.nmaptool = get_nmap_path(path) # check path, search or raise error self.default_args = "{nmap} {outarg} - " self.maxport = 65535 self.target = "" @@ -114,7 +111,7 @@ def nmap_version(self): # Unique method for repetitive tasks - Use of 'target' variable instead of 'host' or 'subnet' - no need to make difference between 2 strings that are used for the same purpose def scan_command(self, target, arg, args=None, timeout=None): - self.target == target + self.target = target command_args = "{target} {default}".format(target=target, default=arg) scancommand = self.default_command() + command_args @@ -216,7 +213,7 @@ def nmap_detect_firewall(self, target, arg="-sA", args=None): # requires root nmap -oX - nmmapper.com -sA @ TODO """ - xml_root = self.scan_command(target=target, arg=arg, args=args) + return self.scan_command(target=target, arg=arg, args=args) # TODO @user_is_root @@ -257,22 +254,26 @@ def run_command(self, cmd, timeout=None): @param: cmd--> the command we want run eg /usr/bin/nmap -oX - nmmapper.com --top-ports 10 @param: timeout--> command subprocess timeout in seconds. """ - if (os.path.exists(self.nmaptool)): - sub_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - try: - output, errs = sub_proc.communicate(timeout=timeout) - except Exception as e: - sub_proc.kill() - raise (e) - else: - if 0 != sub_proc.returncode: - raise NmapExecutionError('Error during command: "' + ' '.join(cmd) + '"\n\n' + errs.decode('utf8')) - - # Response is bytes so decode the output and return - return output.decode('utf8').strip() + sub_proc = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE + ) + try: + output, errs = sub_proc.communicate(timeout=timeout) + except Exception as e: + sub_proc.kill() + raise (e) else: - raise NmapNotInstalledError() + if 0 != sub_proc.returncode: + raise NmapExecutionError( + 'Error during command: "' + ' '.join(cmd) + '"\n\n' \ + + errs.decode('utf8') + ) + # Response is bytes so decode the output and return + return output.decode('utf8').strip() + def get_xml_et(self, command_output): """ @ return xml ET @@ -309,7 +310,7 @@ class NmapScanTechniques(Nmap): 7) IP Scan (-sO) """ - def __init__(self, path=None): + def __init__(self, path:str=''): super(NmapScanTechniques, self).__init__(path=path) self.sync_scan = "-sS" @@ -350,7 +351,9 @@ def tpl(i): output = self.run_command(scan_shlex, timeout=timeout) xml_root = self.get_xml_et(output) - return xml_root + return xml_root + raise Exception("Something went wrong") + @user_is_root def nmap_fin_scan(self, target, args=None): @@ -442,7 +445,7 @@ class NmapHostDiscovery(Nmap): 4) Disable DNS resolution (-n) """ - def __init__(self, path=None): + def __init__(self, path:str=''): super(NmapHostDiscovery, self).__init__(path=path) self.port_scan_only = "-Pn" @@ -476,7 +479,8 @@ def tpl(i): output = self.run_command(scan_shlex, timeout=timeout) xml_root = self.get_xml_et(output) - return xml_root + return xml_root + raise Exception("Something went wrong") def nmap_portscan_only(self, target, args=None): """ @@ -523,30 +527,34 @@ def nmap_disable_dns(self, target, args=None): return results class NmapAsync(Nmap): - def __init__(self, path=None): + def __init__(self, path:str=''): super(NmapAsync, self).__init__(path=path) self.stdout = asyncio.subprocess.PIPE self.stderr = asyncio.subprocess.PIPE async def run_command(self, cmd, timeout=None): - if (os.path.exists(self.nmaptool)): - process = await asyncio.create_subprocess_shell(cmd,stdout=self.stdout,stderr=self.stderr) + process = await asyncio.create_subprocess_shell( + cmd, + stdout=self.stdout, + stderr=self.stderr + ) - try: - data, stderr = await process.communicate() - except Exception as e: - raise (e) - else: - if 0 != process.returncode: - raise NmapExecutionError('Error during command: "' + ' '.join(cmd) + '"\n\n' + stderr.decode('utf8')) - - # Response is bytes so decode the output and return - return data.decode('utf8').strip() + try: + data, stderr = await process.communicate() + except Exception as e: + raise (e) else: - raise NmapNotInstalledError() + if 0 != process.returncode: + raise NmapExecutionError( + 'Error during command: "' + ' '.join(cmd) + '"\n\n' + \ + stderr.decode('utf8') + ) + + # Response is bytes so decode the output and return + return data.decode('utf8').strip() async def scan_command(self, target, arg, args=None, timeout=None): - self.target == target + self.target = target command_args = "{target} {default}".format(target=target, default=arg) scancommand = self.default_command() + command_args @@ -611,7 +619,7 @@ async def nmap_list_scan(self, target, arg="-sL", args=None): # requires root return results class NmapScanTechniquesAsync(NmapAsync,NmapScanTechniques): - def __init__(self, path=None): + def __init__(self, path:str=''): super(NmapScanTechniquesAsync, self).__init__(path=path) self.udp_scan = "-sU" @@ -640,7 +648,8 @@ def tpl(i): output = await self.run_command(scan_type_command, timeout=timeout) xml_root = self.get_xml_et(output) - return xml_root + return xml_root + raise Exception("Something went wrong") async def nmap_udp_scan(self, target, args=None): if (args): diff --git a/nmap3/utils.py b/nmap3/utils.py index d7a6ac4..daa16bb 100644 --- a/nmap3/utils.py +++ b/nmap3/utils.py @@ -21,20 +21,26 @@ import shlex import subprocess import sys -import re import os import ctypes import functools +from nmap3.exceptions import NmapNotInstalledError + __author__ = 'Wangolo Joel (inquiry@nmapper.com)' __version__ = '1.6.0' __last_modification__ = 'Sep/15/2024' -def get_nmap_path(): +def get_nmap_path(path:str='') -> str: """ + Accepts path, validate it. If not valide, search nmap path Returns the location path where nmap is installed by calling which nmap + If not found raises NmapNotInstalledError """ + if path and (os.path.exists(path)): + return path + os_type = sys.platform if os_type == 'win32': cmd = "where nmap" @@ -43,16 +49,15 @@ def get_nmap_path(): args = shlex.split(cmd) sub_proc = subprocess.Popen(args, stdout=subprocess.PIPE) - try: - output, errs = sub_proc.communicate(timeout=15) - except Exception as e: + output, e = sub_proc.communicate(timeout=15) + if e: print(e) - sub_proc.kill() - else: - if os_type == 'win32': - return output.decode('utf8').strip().replace("\\", "/") - else: - return output.decode('utf8').strip() + + if not output: + raise NmapNotInstalledError(path=path) + if os_type == 'win32': + return output.decode('utf8').strip().replace("\\", "/") + return output.decode('utf8').strip() def get_nmap_version(): nmap = get_nmap_path() @@ -62,7 +67,7 @@ def get_nmap_version(): sub_proc = subprocess.Popen(args, stdout=subprocess.PIPE) try: - output, errs = sub_proc.communicate(timeout=15) + output, _ = sub_proc.communicate(timeout=15) except Exception as e: print(e) sub_proc.kill() @@ -73,7 +78,7 @@ def user_is_root(func): def wrapper(*args, **kwargs): try: is_root_or_admin = (os.getuid() == 0) - except AttributeError as e: + except AttributeError: is_root_or_admin = ctypes.windll.shell32.IsUserAnAdmin() != 0 if(is_root_or_admin):