""" To use this lookup plugin, you need to pass ldap: ssh -L 1636:172.16.10.1:636 172.16.10.1 """ import ipaddress from ansible.errors import AnsibleError, AnsibleParserError from ansible.plugins.lookup import LookupBase from ansible.utils.display import Display try: import ldap except ImportError: raise AnsibleError("You need to install python3-ldap") display = Display() def decode_object(object): return {attribute: [value.decode('utf-8') for value in object[attribute]] for attribute in object} class LookupModule(LookupBase): def __init__(self, **kwargs): self.base = ldap.initialize('ldaps://localhost:1636/') self.base.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) self.base.set_option(ldap.OPT_X_TLS_NEWCTX, 0) self.base_dn = 'dc=crans,dc=org' def query(self, base, scope, filter='(objectClass=*)', attr=None): """ Make a LDAP query query('ldap', 'query', BASE, SCOPE[, FILTER[, ATTR]]) BASE: base dn SCOPE: 'base', 'one' or 'sub' FILTER: ldap filter (optional) ATTR: list of attributes (optional) """ scope = { 'base': ldap.SCOPE_BASE, 'one': ldap.SCOPE_ONELEVEL, 'sub': ldap.SCOPE_SUBTREE }[scope] query_id = self.base.search(f"{base}", scope, filter, attr) result = self.base.result(query_id)[1] result = { dn: decode_object(entry) for dn, entry in result } return result def ip(self, host, vlan): """ Retrieve IP addresses of an interface of a device query('ldap', 'ip', HOST, VLAN) """ if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'srv': query_id = self.base.search(f"cn={host}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result['ipHostNumber']] return result def subnet_ipv4(self, subnet): """ Retrive used IP addresses on a subnet query('ldap', 'subnet_ipv4', SUBNET) """ network_query_id = self.base.search(f"cn={subnet},ou=networks,{self.base_dn}", ldap.SCOPE_BASE) network_result = self.base.result(network_query_id) network = network_result[1][0][1] network, hostmask = network['ipNetworkNumber'][0].decode('utf-8'), network['ipNetmaskNumber'][0].decode('utf-8') subnet = ipaddress.IPv4Network(f"{network}/{hostmask}") query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=ipHost") result = self.base.result(query_id) result = [ip.decode('utf-8') for dn, entry in result[1] for ip in entry['ipHostNumber'] if ipaddress.ip_address(ip.decode('utf-8')) in subnet] return result def run(self, terms, variables=None, **kwargs): if terms[0] == 'query': result = self.query(*terms[1:]) elif terms[0] == 'ip': result = self.ip(*terms[1:]) elif terms[0] == 'subnet_ipv4': result = self.subnet_ipv4(*terms[1:]) elif terms[0] == 'group': query_id = self.base.search(f"ou=group,{self.base_dn}", ldap.SCOPE_SUBTREE, "objectClass=posixGroup") result = self.base.result(query_id) result = result[1] # query interface attribute # query('ldap', 'hosts', HOST, VLAN, ATTR) # HOST: device name # VLAN: vlan name # ATTR: attribute elif terms[0] == 'hosts': host = terms[1] vlan = terms[2] attr = terms[3] if isinstance(vlan, int): network_query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={vlan}") network_result = self.base.result(network_query_id) vlan = network_result[1][0][1]['cn'][0].decode('utf-8') if vlan == 'srv': query_id = self.base.search(f"cn={host}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) else: query_id = self.base.search(f"cn={host}.{vlan}.crans.org,cn={host},ou=hosts,{self.base_dn}", ldap.SCOPE_BASE) result = self.base.result(query_id) result = result[1][0][1] result = [res.decode('utf-8') for res in result[attr]] elif terms[0] == 'network': network = terms[1] query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork") result = self.base.result(query_id) result = result[1][0][1] return str(ipaddress.ip_network('{}/{}'.format(result['ipNetworkNumber'][0].decode('utf-8'), result['ipNetmaskNumber'][0].decode('utf-8')))) elif terms[0] == 'zones': query_id = self.base.search(f"ou=networks,{self.base_dn}", ldap.SCOPE_ONELEVEL, "objectClass=ipNetwork") result = self.base.result(query_id) res = [] for _, network in result[1]: network = network['cn'][0].decode('utf-8') if network == 'srv': res.append('crans.org') else: res.append(f"{network}.crans.org") result = res elif terms[0] == 'vlanid': network = terms[1] query_id = self.base.search(f"cn={network},ou=networks,{self.base_dn}", ldap.SCOPE_BASE, "objectClass=ipNetwork") result = self.base.result(query_id) result = result[1][0][1] return int(result['description'][0]) elif terms[0] == 'role': role = terms[1] query_id = self.base.search(f"ou=hosts,{self.base_dn}", ldap.SCOPE_ONELEVEL, f"description={role}") result = self.base.result(query_id) result = [cn.decode('utf-8') for res in result[1] for cn in res[1]['cn']] return result