summaryrefslogtreecommitdiff
path: root/componentDetect.py
diff options
context:
space:
mode:
Diffstat (limited to 'componentDetect.py')
-rw-r--r--componentDetect.py445
1 files changed, 445 insertions, 0 deletions
diff --git a/componentDetect.py b/componentDetect.py
new file mode 100644
index 0000000..4b7b417
--- /dev/null
+++ b/componentDetect.py
@@ -0,0 +1,445 @@
+import aiohttp
+import json
+import logging
+import pkg_resources
+import re
+import requests
+import warnings
+
+from bs4 import BeautifulSoup
+from typing import Union
+
+logger = logging.getLogger(name=__name__)
+
+class WappalyzerError(Exception):
+ """
+ Raised for fatal Wappalyzer errors.
+ """
+ pass
+
+
+class WebPage:
+ """
+ Simple representation of a web page, decoupled
+ from any particular HTTP library's API.
+ """
+
+ def __init__(self, url, html, headers):
+ """
+ Initialize a new WebPage object.
+
+ Parameters
+ ----------
+
+ url : str
+ The web page URL.
+ html : str
+ The web page content (HTML)
+ headers : dict
+ The HTTP response headers
+ """
+ self.url = url
+ self.html = html
+ self.headers = headers
+
+ try:
+ list(self.headers.keys())
+ except AttributeError:
+ raise ValueError("Headers must be a dictionary-like object")
+
+ self._parse_html()
+
+ def _parse_html(self):
+ """
+ Parse the HTML with BeautifulSoup to find <script> and <meta> tags.
+ """
+ self.parsed_html = soup = BeautifulSoup(self.html, 'lxml')
+ self.scripts = [script['src'] for script in
+ soup.findAll('script', src=True)]
+ self.meta = {
+ meta['name'].lower():
+ meta['content'] for meta in soup.findAll(
+ 'meta', attrs=dict(name=True, content=True))
+ }
+
+ @classmethod
+ def new_from_url(cls, url: str, verify: bool = True, timeout: Union[int, float] = 10):
+ """
+ Constructs a new WebPage object for the URL,
+ using the `requests` module to fetch the HTML.
+
+ Parameters
+ ----------
+
+ url : str
+ verify: bool
+ timeout: int, float
+ """
+ response = requests.get(url, verify=verify, timeout=timeout)
+ return cls.new_from_response(response)
+
+ @classmethod
+ async def new_from_url_async(cls, url: str, verify: bool = True, timeout: Union[int, float] = 2.5,
+ aiohttp_client_session: aiohttp.ClientSession = None):
+ """
+ Same as new_from_url only Async.
+
+ Constructs a new WebPage object for the URL,
+ using the `aiohttp` module to fetch the HTML.
+
+ Parameters
+ ----------
+
+ url : str
+ verify: bool
+ """
+
+ if not aiohttp_client_session:
+ connector = aiohttp.TCPConnector(ssl=verify)
+ aiohttp_client_session = aiohttp.ClientSession(connector=connector)
+
+ async with aiohttp_client_session.get(url, timeout=timeout) as response:
+ return await cls.new_from_response_async(response)
+
+ @classmethod
+ def new_from_response(cls, response):
+ """
+ Constructs a new WebPage object for the response,
+ using the `BeautifulSoup` module to parse the HTML.
+
+ Parameters
+ ----------
+
+ response : requests.Response object
+ """
+ return cls(response.url, html=response.text, headers=response.headers)
+
+ @classmethod
+ async def new_from_response_async(cls, response):
+ """
+ Constructs a new WebPage object for the response,
+ using the `BeautifulSoup` module to parse the HTML.
+
+ Parameters
+ ----------
+
+ response : aiohttp.ClientResponse¶ object
+ """
+ html = await response.text()
+ return cls(str(response.url), html=html, headers=response.headers)
+
+
+class Wappalyzer:
+ """
+ Python Wappalyzer driver.
+ """
+
+ def __init__(self, categories, technologies):
+ """
+ Initialize a new Wappalyzer instance.
+
+ Parameters
+ ----------
+
+ categories : dict
+ Map of category ids to names, as in technologies.json.
+ technologies : dict
+ Map of technology names to technology dicts, as in technologies.json.
+ """
+ self.categories = categories
+ self.technologies = technologies
+ self.confidence_regexp = re.compile(r"(.+)\\;confidence:(\d+)")
+
+ # TODO
+ # print(self.technologies)
+ for name, technology in list(self.technologies.items()):
+ # print(technology)
+ self._prepare_technology(technology)
+
+ @classmethod
+ def latest(cls, technologies_file=None):
+ """
+ Construct a Wappalyzer instance using a technologies db path passed in via
+ technologies_file, or alternatively the default in data/technologies.json
+ """
+ if technologies_file:
+ with open(technologies_file, 'r') as fd:
+ obj = json.load(fd)
+ else:
+ obj = json.loads(pkg_resources.resource_string(__name__, "technologies.json"))
+
+ return cls(categories=obj['categories'], technologies=obj['technologies'])
+
+ def _prepare_technology(self, technology):
+ """
+ Normalize technology data, preparing it for the detection phase.
+ """
+ # Ensure these keys' values are lists
+ for key in ['url', 'html', 'scriptSrc', 'implies']:
+ try:
+ value = technology[key]
+ except KeyError:
+ technology[key] = []
+ else:
+ if not isinstance(value, list):
+ technology[key] = [value]
+ # print(technology["scriptSrc"])
+
+ # Ensure these keys exist
+ for key in ['headers', 'meta']:
+ try:
+ value = technology[key]
+ except KeyError:
+ technology[key] = {}
+
+ # Ensure the 'meta' key is a dict
+ obj = technology['meta']
+ if not isinstance(obj, dict):
+ technology['meta'] = {'generator': obj}
+
+ # Ensure keys are lowercase
+ for key in ['headers', 'meta']:
+ obj = technology[key]
+ technology[key] = {k.lower(): v for k, v in list(obj.items())}
+ """
+ techname: {
+ url:
+ html:
+ scriptSrc:
+ }
+ """
+ # Prepare regular expression patterns
+ for key in ['url', 'html', 'scriptSrc']:
+ # print("before:", technology[key])
+ technology[key] = [self._prepare_pattern(pattern) for pattern in technology[key]]
+ # print("after:", technology[key])
+ # print(technology)
+ for key in ['headers', 'meta']:
+ obj = technology[key]
+ for name, pattern in list(obj.items()):
+ try:
+ obj[name] = self._prepare_pattern(obj[name])
+ except:
+ continue
+
+ def _prepare_pattern(self, pattern):
+ """
+ Strip out key:value pairs from the pattern and compile the regular
+ expression.
+ """
+ attrs = {}
+ pattern = pattern.split('\\;')
+ for index, expression in enumerate(pattern):
+ if index == 0:
+ attrs['string'] = expression
+ try:
+ attrs['regex'] = re.compile(expression, re.I)
+ except re.error as err:
+ warnings.warn(
+ "Caught '{error}' compiling regex: {regex}".format(
+ error=err, regex=pattern)
+ )
+ # regex that never matches:
+ # http://stackoverflow.com/a/1845097/413622
+ attrs['regex'] = re.compile(r'(?!x)x')
+ else:
+ attr = expression.split(':')
+ if len(attr) > 1:
+ key = attr.pop(0)
+ attrs[str(key)] = ':'.join(attr)
+ # print(attrs)
+ return attrs
+
+ def _has_technology(self, technology, webpage):
+ """
+ Determine whether the web page matches the technology signature.
+ """
+ app = technology
+ has_app = False
+ # Search the easiest things first and save the full-text search of the
+ # HTML for last
+
+ for pattern in app['url']:
+ if pattern['regex'].search(webpage.url):
+ # print(pattern["regex"])
+ self._set_detected_app(app, 'url', pattern, webpage.url)
+
+ for name, pattern in list(app['headers'].items()):
+ if name in webpage.headers:
+ content = webpage.headers[name]
+ if pattern['regex'].search(content):
+ # print(pattern['regex'])
+ self._set_detected_app(app, 'headers', pattern, content, name)
+ has_app = True
+
+ for pattern in technology['scriptSrc']:
+ for script in webpage.scripts:
+ if pattern['regex'].search(script):
+ self._set_detected_app(app, 'scriptSrc', pattern, script)
+ # print(pattern['string'], webpage.scripts)
+ has_app = True
+
+ for name, pattern in list(technology['meta'].items()):
+ if name in webpage.meta:
+ content = webpage.meta[name]
+ if pattern['regex'].search(content):
+ # print(pattern['string'], content)
+ self._set_detected_app(app, 'meta', pattern, content, name)
+ has_app = True
+
+ for pattern in app['html']:
+ if pattern['regex'].search(webpage.html):
+ # print(pattern['string'])
+ self._set_detected_app(app, 'html', pattern, webpage.html)
+ has_app = True
+
+ # Set total confidence
+ if has_app:
+ total = 0
+ for index in app['confidence']:
+ total += app['confidence'][index]
+ app['confidenceTotal'] = total
+
+ return has_app
+
+ def _set_detected_app(self, app, app_type, pattern, value, key=''):
+ """
+ Store detected app.
+ """
+ app['detected'] = True
+
+ # Set confidence level
+ if key != '':
+ key += ' '
+ if 'confidence' not in app:
+ app['confidence'] = {}
+ if 'confidence' not in pattern:
+ pattern['confidence'] = 100
+ else:
+ # Convert to int for easy adding later
+ pattern['confidence'] = int(pattern['confidence'])
+ app['confidence'][app_type + ' ' + key + pattern['string']] = pattern['confidence']
+
+
+ def _get_implied_technologies(self, detected_technologies):
+ """
+ Get the set of technologies implied by `detected_technologies`.
+ """
+ def __get_implied_technologies(technologies):
+ _implied_technologies = set()
+ for tech in technologies:
+ try:
+ for implie in self.technologies[tech]['implies']:
+ # If we have no doubts just add technology
+ if 'confidence' not in implie:
+ _implied_technologies.add(implie)
+
+ # Case when we have "confidence" (some doubts)
+ else:
+ try:
+ # Use more strict regexp (cause we have already checked the entry of "confidence")
+ # Also, better way to compile regexp one time, instead of every time
+ app_name, confidence = self.confidence_regexp.search(implie).groups()
+ if int(confidence) >= 50:
+ _implied_technologies.add(app_name)
+ except (ValueError, AttributeError):
+ pass
+ except KeyError:
+ pass
+ return _implied_technologies
+
+ implied_technologies = __get_implied_technologies(detected_technologies)
+ all_implied_technologies = set()
+
+ # Descend recursively until we've found all implied technologies
+ while not all_implied_technologies.issuperset(implied_technologies):
+ all_implied_technologies.update(implied_technologies)
+ implied_technologies = __get_implied_technologies(all_implied_technologies)
+
+ return all_implied_technologies
+
+ def get_categories(self, tech_name):
+ """
+ Returns a list of the categories for an technology name.
+ """
+ cat_nums = self.technologies.get(tech_name, {}).get("cats", [])
+ cat_names = [self.categories.get(str(cat_num), "").get("name", "")
+ for cat_num in cat_nums]
+
+ return cat_names
+
+ def get_confidence(self, app_name):
+ """
+ Returns the total confidence for an app name.
+ """
+ return [] if 'confidenceTotal' not in self.technologies[app_name] else self.technologies[app_name]['confidenceTotal']
+
+ def analyze(self, webpage):
+ """
+ Return a list of technologylications that can be detected on the web page.
+ """
+ detected_technologies = set()
+
+ for tech_name, technology in list(self.technologies.items()):
+ if self._has_technology(technology, webpage):
+ detected_technologies.add(tech_name)
+
+ detected_technologies |= self._get_implied_technologies(detected_technologies)
+
+ return detected_technologies
+
+ def analyze_with_categories(self, webpage):
+ """
+ Return a list of technologies and categories that can be detected on the web page.
+ """
+ detected_technologies = self.analyze(webpage)
+ categorised_technologies = {}
+
+ for tech_name in detected_technologies:
+ cat_names = self.get_categories(tech_name)
+ categorised_technologies[tech_name] = {"categories": cat_names}
+
+ return categorised_technologies
+
+ def _cmp_to_key(self, mycmp):
+ """
+ Convert a cmp= function into a key= function
+ """
+
+ # https://docs.python.org/3/howto/sorting.html
+ class CmpToKey:
+ def __init__(self, obj, *args):
+ self.obj = obj
+
+ def __lt__(self, other):
+ return mycmp(self.obj, other.obj) < 0
+
+ def __gt__(self, other):
+ return mycmp(self.obj, other.obj) > 0
+
+ def __eq__(self, other):
+ return mycmp(self.obj, other.obj) == 0
+
+ def __le__(self, other):
+ return mycmp(self.obj, other.obj) <= 0
+
+ def __ge__(self, other):
+ return mycmp(self.obj, other.obj) >= 0
+
+ def __ne__(self, other):
+ return mycmp(self.obj, other.obj) != 0
+
+ return CmpToKey
+
+
+if __name__ == "__main__":
+ warnings.filterwarnings("ignore")
+ wapp = Wappalyzer.latest()
+ # 输入url
+ webpage = WebPage.new_from_url("https://youku.com")
+ # 直接输出组件
+ print(wapp.analyze(webpage))
+ # 带着类别输出组件
+ print(wapp.analyze_with_categories(webpage))
+
+