From 9e4e71dbbd62d3c08f64d38d49d3988ffe0ecb73 Mon Sep 17 00:00:00 2001
From: Bombar Maxime <bombar@crans.org>
Date: Sun, 17 May 2020 20:28:33 +0200
Subject: [PATCH] Configuration for policyd

---
 postfix.yml                                   |   7 +
 roles/policyd/tasks/main.yml                  |  36 +++
 .../policyd/policyd-rate-limit.yaml.j2        | 107 +++++++
 roles/policyd/templates/policyd/policyd.py.j2 | 285 ++++++++++++++++++
 .../templates/update-motd.d/05-policyd.j2     |   3 +
 roles/postfix/templates/postfix/main.cf.j2    |   2 +-
 6 files changed, 439 insertions(+), 1 deletion(-)
 create mode 100644 roles/policyd/tasks/main.yml
 create mode 100644 roles/policyd/templates/policyd/policyd-rate-limit.yaml.j2
 create mode 100644 roles/policyd/templates/policyd/policyd.py.j2
 create mode 100755 roles/policyd/templates/update-motd.d/05-policyd.j2

diff --git a/postfix.yml b/postfix.yml
index e4fd36ff..e588531f 100755
--- a/postfix.yml
+++ b/postfix.yml
@@ -13,10 +13,17 @@
       masters: "{{ lookup('re2oapi', 'get_role', 'dns-authoritary-master')[0] }}"
     opendkim:
         private_key: "{{ vault_opendkim_private_key }}"
+    policyd:
+      mail: root@crans.org
+      exemptions: "{{ lookup('re2oapi', 'get_role', 'user-server')[0] }}"
+    mynetworks:
+      ipv4: "{{ lookup('re2oapi', 'cidrs', 'adherents', 'fil-new-pub', 'fil-pub', 'wifi-new-pub', 'serveurs', 'wifi-new-serveurs', 'wifi-new-federez', 'fil-new-serveurs', 'fil-new-adherents') }}"
+      ipv6: "{{ lookup('re2oapi', 'prefixv6', 'adherents', 'fil-new-pub', 'wifi-new-pub') }}"
   roles:
     - certbot
     - postfix
     - opendkim
+    - policyd
 
 - hosts: redisdead.adm.crans.org
   roles:
diff --git a/roles/policyd/tasks/main.yml b/roles/policyd/tasks/main.yml
new file mode 100644
index 00000000..586a8125
--- /dev/null
+++ b/roles/policyd/tasks/main.yml
@@ -0,0 +1,36 @@
+- name: Install policyd-rate-limit
+  apt:
+    update_cache: true
+    name:
+      - policyd-rate-limit
+  register: apt_result
+  retries: 3
+  until: apt_result is succeeded
+  when: postfix.primary
+
+
+- name: Find the local network
+  set_fact:
+    limited_networksv6: ["{{ mynetworks.ipv6}}"]
+    limited_networksv4: ["{{ mynetworks.ipv4}}"]
+    cacheable: True
+
+- name: Deploy policyd-rate-limit
+  vars:
+    exempt_v4: "{{ policyd.exemptions | json_query('servers[].interface[?vlan_id==`2`].ipv4[]') }}"
+    exempt_v6: "{{ policyd.exemptions | json_query('servers[].interface[?vlan_id==`2`].ipv6[][].ipv6') }}"
+  template:
+    src: "{{ item.src }}"
+    dest: "{{ item.dest }}"
+    chmod: 0640
+  loop:
+    - { src: policyd/policyd-rate-limit.yaml.j2, dest: /etc/policyd-rate-limit.yaml }
+    - { src: policyd/policyd.py.j2, dest: /usr/lib/python3/dist-packages/policyd_rate_limit }
+  when: postfix.primary
+
+- name: Indicate role in motd
+  template:
+    src: update-motd.d/05-policyd.j2
+    dest: /etc/update-motd.d/05-policyd
+    mode: 0755
+  when: postfix.primary
diff --git a/roles/policyd/templates/policyd/policyd-rate-limit.yaml.j2 b/roles/policyd/templates/policyd/policyd-rate-limit.yaml.j2
new file mode 100644
index 00000000..f15eb7af
--- /dev/null
+++ b/roles/policyd/templates/policyd/policyd-rate-limit.yaml.j2
@@ -0,0 +1,107 @@
+{{ ansible_header | comment }}
+
+# Make policyd-rate-limit output logs to stderr
+debug: False
+
+# The user policyd-rate-limit will use to drop privileges.
+user: "policyd-rate-limit"
+# The group policyd-rate-limit will use to drop privileges.
+group: "policyd-rate-limit"
+
+# path where the program will try to write its pid to.
+pidfile: "/var/run/policyd-rate-limit/policyd-rate-limit.pid"
+
+# The config to connect to a mysql server.
+mysql_config:
+    user: "username"
+    passwd: "*****"
+    db: "database"
+    host: "localhost"
+    charset: 'utf8'
+
+# The config to connect to a sqlite3 database.
+sqlite_config:
+    database: "/var/lib/policyd-rate-limit/db.sqlite3"
+
+# The config to connect to a postgresql server.
+pgsql_config:
+    database: "database"
+    user: "username"
+    password: "*****"
+    host: "localhost"
+
+# Which data backend to use. Possible values are 0 for sqlite3, 1 for mysql and 2 for postgresql.
+backend: 0
+
+# The socket to bind to. Can be a path to an unix socket or a couple [ip, port].
+# SOCKET: ["127.0.0.1", 8552]
+SOCKET: "/var/spool/postfix/ratelimit/policy"
+# Permissions on the unix socket (if unix socket used).
+socket_permission: 0666
+
+# A list of couple [number of emails, number of seconds]. If one of the element of the list is
+# exeeded (more than 'number of emails' on 'number of seconds' for an ip address or an sasl
+# username), postfix will return a temporary failure.
+limits:
+    - [75, 60] # limit to 75 mails by minutes
+    - [200, 86400] # limits to 200 mails by days
+
+# dict of id -> limit list. Used to override limits and use custom limits for
+# a particular id. Use an empty list for no limits for a particular id.
+# ids are sasl usernames or ip addresses
+# limits_by_id:
+#     foo: []
+#     192.168.0.254:
+#         - [1000, 86400] # limits to 1000 mails by days
+#     2a06:e042:100:4:219:bbff:fe3c:4f76: []
+limits_by_id:
+{% for server in exempt_v4 %}
+    {{ server }} : []
+{% endfor %}
+{% for server in exempt_v6 %}
+    {{ server }} : [] 
+{% endfor %}
+
+# Apply limits by sasl usernames.
+limit_by_sasl: True
+# If no sasl username is found, apply limits by ip addresses.
+limit_by_ip: True
+
+# A list of ip networks in cidr notation on which limits are applied. An empty list is equal
+# to limit_by_ip: False, put "0.0.0.0/0" and "::/0" for every ip addresses.
+
+
+limited_networks: {{ limited_networksv6 | union(limited_networksv4) }}
+
+# If not limits are reach, which action postfix should do.
+# see http://www.postfix.org/access.5.html for a list of actions.
+success_action: "dunno"
+# If a limit is reach, which action postfix should do.
+# see http://www.postfix.org/access.5.html for a list of actions.
+fail_action: "defer_if_permit Rate limit reach, retry later"
+# If we are unable to to contect the database backend, which action postfix should do.
+# see http://www.postfix.org/access.5.html for a list of actions.
+db_error_action: "dunno"
+
+# If True, send a report to report_to about users reaching limits each time --clean is called
+report: True
+# from who to send emails reports. Must be defined if report: True
+report_from: "{{ policyd.mail }}"
+# Address to send emails reports to. Must be defined if report: True
+report_to: "{{ policyd.mail }}"
+# Subject of the report email
+report_subject: "policyd-rate-limit report"
+# List of number of seconds from the limits list for which you want to be reported.
+report_limits: [86400]
+# Only send a report if some users have reach a reported limit.
+# Otherwise, empty reports may be sent.
+report_only_if_needed: True
+
+# The smtp server to use to send emails [host, port]
+smtp_server: ["localhost", 25]
+# Should we use starttls (you should set this to True if you use smtp_credentials)
+smtp_starttls: False
+# Should we use credentials to connect to smtp_server ? if yes set ["user", "password"], else null
+smtp_credentials: null
+
+delay_to_close: 300
diff --git a/roles/policyd/templates/policyd/policyd.py.j2 b/roles/policyd/templates/policyd/policyd.py.j2
new file mode 100644
index 00000000..8bbb3c70
--- /dev/null
+++ b/roles/policyd/templates/policyd/policyd.py.j2
@@ -0,0 +1,285 @@
+{{ ansible_header | comment }}
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU General Public License version 3 for
+# more details.
+#
+# You should have received a copy of the GNU General Public License version 3
+# along with this program; if not, write to the Free Software Foundation, Inc., 51
+# Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# (c) 2015-2016 Valentin Samir
+import os
+import sys
+import socket
+import time
+import select
+import traceback
+
+from policyd_rate_limit import utils
+from policyd_rate_limit.utils import config
+
+class PolicydError(Exception):
+    pass
+
+class PolicydConnectionClosed(PolicydError):
+    pass
+
+class Pass(Exception):
+    pass
+
+
+class Policyd(object):
+    """The policy server class"""
+    socket_data_read = {}
+    socket_data_write = {}
+    last_used = {}
+
+    def socket(self):
+        """initialize the socket from the config parameters"""
+        # if socket is a string assume it is the path to an unix socket
+        if isinstance(config.SOCKET, str):
+            try:
+                os.remove(config.SOCKET)
+            except OSError:
+                if os.path.exists(config.SOCKET):  # pragma: no cover (should not happen)
+                    raise
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        # else asume its a tuple (bind_ip, bind_port)
+        elif ':' in config.SOCKET[0]:  # assume ipv6 bind addresse
+            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+        elif '.' in config.SOCKET[0]:  # assume ipv4 bind addresse
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        else:
+            raise ValueError("bad socket %s" % (config.SOCKET,))
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        self.sock = sock
+
+    def close_socket(self):
+        """close the socket depending of the config parameters"""
+        self.sock.close()
+        # if socket was an unix socket, delete it after closing.
+        if isinstance(config.SOCKET, str):
+            try:
+                os.remove(config.SOCKET)
+            except OSError as error:  # pragma: no cover (should not happen)
+                sys.stderr.write("%s\n" % error)
+                sys.stderr.flush()
+
+    def close_connection(self, connection):
+        """close a connection and clean read/write dict"""
+        # Clean up the connection
+        try:
+            del self.socket_data_read[connection]
+        except KeyError:
+            pass
+        try:
+            del self.socket_data_write[connection]
+        except KeyError:
+            pass
+        connection.close()
+
+    def close_write_conn(self, connection):
+        """Removes a socket from the write dict"""
+        try:
+            del self.socket_data_write[connection]
+        except KeyError:
+            if config.debug:
+                sys.stderr.write(
+                    (
+                        "Hmmm, a socket actually used to write a little "
+                        "time ago wasn\'t in socket_data_write. Weird.\n"
+                    )
+                )
+
+    def run(self):
+        """The main server loop"""
+        try:
+            sock = self.sock
+            sock.bind(config.SOCKET)
+            if isinstance(config.SOCKET, str):
+                os.chmod(config.SOCKET, config.socket_permission)
+            sock.listen(5)
+            self.socket_data_read[sock] = []
+            if config.debug:
+                sys.stderr.write('waiting for connections\n')
+                sys.stderr.flush()
+            while True:
+                # wait for a socket to read to or to write to
+                (rlist, wlist, _) = select.select(
+                    self.socket_data_read.keys(), self.socket_data_write.keys(), []
+                )
+                for socket in rlist:
+                    # if the socket is the main socket, there is a new connection to accept
+                    if socket == sock:
+                        connection, client_address = sock.accept()
+                        if config.debug:
+                            sys.stderr.write('connection from %s\n' % (client_address,))
+                            sys.stderr.flush()
+                        self.socket_data_read[connection] = []
+
+                        # Updates the last_sed time for the socket.
+                        self.last_used[connection] = time.time()
+                    # else there is data to read on a client socket
+                    else:
+                        self.read(socket)
+                for socket in wlist:
+                    try:
+                        data = self.socket_data_write[socket]
+                        sent = socket.send(data)
+                        data_not_sent = data[sent:]
+                        if data_not_sent:
+                            self.socket_data_write[socket] = data_not_sent
+                        else:
+                            self.close_write_conn(socket)
+
+                        # Socket has been used, let's update its last_used time.
+                        self.last_used[socket] = time.time()
+                    # the socket has been closed during read
+                    except KeyError:
+                        pass
+                # Closes unused socket for a long time.
+                __to_rm = []
+                for (socket, last_used) in self.last_used.items():
+                    if socket == sock:
+                        continue
+                    if time.time() - last_used > config.delay_to_close:
+                        self.close_connection(socket)
+                        __to_rm.append(socket)
+                for socket in __to_rm:
+                    self.last_used.pop(socket)
+
+        except (KeyboardInterrupt, utils.Exit):
+            for socket in list(self.socket_data_read.keys()):
+                if socket != self.sock:
+                    self.close_connection(sock)
+            raise
+
+    def read(self, connection):
+        """Called then a connection is ready for reads"""
+        try:
+            # get the current buffer of the connection
+            buffer = self.socket_data_read[connection]
+            # read data
+            data = connection.recv(1024).decode('UTF-8')
+            if not data:
+                #raise ValueError("connection closed")
+                raise PolicydConnectionClosed()
+            if config.debug:
+                sys.stderr.write(data)
+                sys.stderr.flush()
+            # accumulate it in buffer
+            buffer.append(data)
+            # if data len too short to determine if we are on an empty line, we
+            # concatene datas in buffer
+            if len(data) < 2:
+                data = u"".join(buffer)
+                buffer = [data]
+            # We reach on empty line so the client has finish to send and wait for a response
+            if data[-2:] == "\n\n":
+                data = u"".join(buffer)
+                request = {}
+                # read data are like one key=value per line
+                for line in data.split("\n"):
+                    line = line.strip()
+                    try:
+                        key, value = line.split(u"=", 1)
+                        if value:
+                            request[key] = value
+                    # if value is empty, ignore it
+                    except ValueError:
+                        pass
+                # process the collected data in the action method
+                self.action(connection, request)
+            else:
+                self.socket_data_read[connection] = buffer
+            # Socket has been used, let's update its last_used time.
+            self.last_used[connection] = time.time()
+        except (KeyboardInterrupt, utils.Exit):
+            self.close_connection(connection)
+            raise
+        except PolicydConnectionClosed:
+            if config.debug:
+                sys.stderr.write("Connection closed\n")
+                sys.stderr.flush()
+            self.close_connection(connection)
+        except Exception as error:
+            traceback.print_exc()
+            sys.stderr.flush()
+            self.close_connection(connection)
+
+    def action(self, connection, request):
+        """Called then the client has sent an empty line"""
+        id = None
+        # By default, we do not block emails
+        action = config.success_action
+        try:
+            if not config.database_is_initialized:
+                utils.database_init()
+            with utils.cursor() as cur:
+                try:
+                    # only care if the protocol states is RCTP. If the policy delegation in postfix
+                    # configuration is in smtpd_recipient_restrictions as said in the doc,
+                    # possible states are RCPT and VRFY.
+                    if 'protocol_state' in request and request['protocol_state'].upper() != "RCPT":
+                        raise Pass()
+                    # if user is authenticated, we filter by sasl username
+                    if config.limit_by_sasl and u'sasl_username' in request:
+                        id = request[u'sasl_username']
+                    # else, if activated, we filter by sender
+                    elif config.limit_by_sender and u'sender' in request:
+                        id = request[u'sender']
+                    # else, if activated, we filter by ip source addresse
+                    elif (
+                        config.limit_by_ip and
+                        u'client_address' in request and
+                        utils.is_ip_limited(request[u'client_address'])
+                    ):
+                        id = request[u'client_address']
+                    # if the client neither send us client ip adresse nor sasl username, jump
+                    # to the next section
+                    else:
+                        raise Pass()
+                    # Here we are limiting against sasl username, sender or source ip addresses.
+                    # for each limit periods, we count the number of mails already send.
+                    # if the a limit is reach, we change action to fail (deny the mail).
+                    for mail_nb, delta in config.limits_by_id.get(id, config.limits):
+                        cur.execute(
+                            (
+                                "SELECT COUNT(*) FROM mail_count "
+                                "WHERE id = %s AND date >= %s"
+                            ) % ((config.format_str,)*2),
+                            (id, int(time.time() - delta))
+                        )
+                        nb = cur.fetchone()[0]
+                        if config.debug:
+                            sys.stderr.write("%03d/%03d hit since %ss\n" % (nb, mail_nb, delta))
+                            sys.stderr.flush()
+                        if nb >= mail_nb:
+                            action = config.fail_action
+                            if config.report and delta in config.report_limits:
+                                utils.hit(cur, delta, id)
+                            raise Pass()
+                except Pass:
+                    pass
+                # If action is a success, record in the database that a new mail has just been sent
+                if action == config.success_action and id is not None:
+                    if config.debug:
+                        sys.stderr.write(u"insert id %s\n" % id)
+                        sys.stderr.flush()
+                    cur.execute(
+                        "INSERT INTO mail_count VALUES (%s, %s)" % ((config.format_str,)*2),
+                        (id, int(time.time()))
+                    )
+        except utils.cursor.backend_module.Error as error:
+            utils.cursor.del_db()
+            action = config.db_error_action
+            sys.stderr.write("Database error: %r\n" % error)
+        data = u"action=%s\n\n" % action
+        if config.debug:
+            sys.stderr.write(data)
+            sys.stderr.flush()
+        # return the result to the client
+        self.socket_data_write[connection] = data.encode('UTF-8')
+        # Socket has been used, let's update its last_used time.
+        self.last_used[connection] = time.time()
diff --git a/roles/policyd/templates/update-motd.d/05-policyd.j2 b/roles/policyd/templates/update-motd.d/05-policyd.j2
new file mode 100755
index 00000000..a03e06f3
--- /dev/null
+++ b/roles/policyd/templates/update-motd.d/05-policyd.j2
@@ -0,0 +1,3 @@
+#!/usr/bin/tail +14
+{{ ansible_header | comment }}
+> policyd-rate-limit a été déployé sur cette machine.
diff --git a/roles/postfix/templates/postfix/main.cf.j2 b/roles/postfix/templates/postfix/main.cf.j2
index bcc1d5bb..fabff795 100644
--- a/roles/postfix/templates/postfix/main.cf.j2
+++ b/roles/postfix/templates/postfix/main.cf.j2
@@ -161,7 +161,7 @@ smtpd_policy_service_request_limit = 1
 smtpd_recipient_restrictions =
 {% if postfix.primary %}
 # Test avec policyd-rate-limit pour limiter le nombre de mails par utilisateur SASL
-                               check_policy_service unix:ratelimit/policy
+                               check_policy_service { unix:ratelimit/policy, default_action=DUNNO }
 {% endif %}
 # permet si le client est dans le reseau local
                                permit_mynetworks