Files
tripwithbonus/BaseModels/mailSender.py
2025-02-03 12:38:50 +03:00

313 lines
9.0 KiB
Python

## -*- coding: utf-8 -*-
__author__ = 'SDE'
from django.core.mail import EmailMultiAlternatives
# from AuthApp.models import UserProfileModel
import smtplib
# from tEDataProj.settings import prod_server
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from os.path import basename
from email.mime.base import MIMEBase
from email import encoders
import ssl
import time
import random
# from tEDataProj import settings
from django.conf import settings
# def fix_mailing_links_in_mail(html):
# from GeneralApp.views import get_cur_domain
# serv_domain, client_domain = get_cur_domain()
#
# while 'src="/media/' in html:
# html = html.replace('src="/media/', f'src="{serv_domain}media/')
#
# return html
def mailing_direct_by_maillist(subject, from_email, to, html_content, attachments=None):
log = ''
for email in to:
res = admin_send_mail_by_SMTPlib(subject, from_email, email, html_content, attachments)
# print(str(res))
log = '{0}<br>{1}'.format(log, str(res))
time.sleep(random.randint(1, 5))
return log
def prepare_attach_file(filepath, filename=None):
try:
if not filename:
filename = basename(filepath)
if not settings.MEDIA_ROOT in filepath:
filepath = f'{settings.MEDIA_ROOT}{filepath}'
with open(filepath, "rb") as fil:
part = MIMEApplication(
fil.read(),
Name=filename
)
# After the file is closed
part['Content-Disposition'] = 'attachment; filename="%s"' % filename
except Exception as e:
msg = f'prepare_attach_file Error = {str(e)}'
techSendMail(msg, title='prepare_attach_file')
return msg
return part
def prepare_xls_attach_by_xls_virtual_file(virtual_file, filename):
ctype = 'application/octet-stream'
maintype, subtype = ctype.split('/', 1)
# with open(filepath, 'rb') as fp:
file = MIMEBase(maintype, subtype) # Используем общий MIME-тип
file.set_payload(virtual_file) # Добавляем содержимое общего типа (полезную нагрузку)
# fp.close()
encoders.encode_base64(file) # Содержимое должно кодироваться как Base64
file.add_header('Content-Disposition', 'attachment', filename=filename)
return file
def admin_send_mail_by_SMTPlib(sets, subject, from_email, to, html_content, attachments=None):
res = None
try:
smtp_server = sets['mail_server_url']
smtp_port = sets['mail_server_smtp_port']
smtp_password = sets['sender_mail_password']
smtp_login = sets['sender_mail_login']
res = send_mail_by_SMTPlib(sets, subject, from_email, to, html_content, smtp_server, smtp_port, smtp_login,
smtp_password, attachments)
except Exception as e:
msg = 'admin_send_mail_by_SMTPlib error = {0}'.format(str(e))
print(msg)
# techSendMail(msg)
return str(res)
def send_mail_by_SMTPlib(sets, subject, from_email, to_init, html_content, smtp_server, smtp_port, smtp_login, smtp_password,
attachments=None):
print('send_mail_by_SMTPlib')
to = to_init
# if not settings.prod_server:
# to = 'web@syncsystems.net'
# else:
# to = to_init
# try:
# from settings_local import DEBUG
# except:
# print('get settings_local fail')
res = None
mail_lib = None
time.sleep(1)
try:
# context = ssl.create_default_context()
print(f'connect to mail server smtp_server={str(smtp_server)} smtp_port={str(smtp_port)}')
mail_lib = smtplib.SMTP(smtp_server, smtp_port, timeout=60)
print('connection established')
res = mail_lib.ehlo()
res = mail_lib.starttls() # context=context)
# print('mail_lib.starttls = {0}'.format(str(res)))
res = mail_lib.ehlo()
# print('mail_lib.ehlo = {0}'.format(str(res)))
res = mail_lib.set_debuglevel = 2
# print('mail_lib.set_debuglevel = {0}'.format(str(res)))
res = mail_lib.esmtp_features['auth'] = 'LOGIN PLAIN'
# print('mail_lib.esmtp_features = {0}'.format(str(res)))
print('try to login')
res = mail_lib.login(smtp_login, smtp_password)
# print('mail_lib.login = {0}'.format(str(res)))
print('login')
res = None
if type(to) in (list, tuple):
# if sets['sender_email'] in to:
# to.remove(sets['sender_email'])
if len(to) > 1:
to_str = u', '.join(to)
else:
to_str = to[0]
else:
# if to == sets['sender_email']:
# return None
to_str = to
to = []
to.append(to_str)
print(f'send mail to {str(to)}')
if type(subject) != str:
try:
subject = subject.decode('utf-8')
except:
try:
subject = subject.encode('utf-8')
except:
pass
print(f'add context')
msg = MIMEMultipart()
from email.headerregistry import Address
msg['From'] = from_email
msg['Reply-To'] = from_email
# msg['In-Reply-To'] = "email2@example.com"
msg['To'] = to_str
msg['Subject'] = subject
msg.attach(MIMEText(html_content, 'html', 'utf-8'))
# print('attach message complete')
if attachments:
if type(attachments) in (list, tuple):
try:
for item in attachments:
res = msg.attach(item)
# print('attach file complete = {0}'.format(str(res)))
except:
res = msg.attach(attachments)
# print('except attach file complete = {0}'.format(str(res)))
else:
res = msg.attach(attachments)
# print('else attach file complete = {0}'.format(str(res)))
print(f'send mail')
res = mail_lib.sendmail(from_email, to, msg.as_string())
msg = mail_lib.quit()
# print('mail_lib.quit = {0}'.format(str(msg)))
except Exception as e:
import traceback
msg = (f'send_mail_by_SMTPlib error = {str(e)}\n<br>'
f'{str(traceback.format_exc())}')
print(msg)
try:
mail_lib.quit()
# print('mail_lib.quit = {0}'.format(str(msg)))
except:
pass
try:
print(str(mail_lib.__dict__))
except:
pass
return msg
# techSendMail(msg)
msg = 'send_mail_by_SMTPlib subj={3} init_to={2} to={0} res={1}'.format(str(to), str(res), str(to_init),
str(subject))
print(msg)
return msg
def sendMail(sets, subject, text_content, from_email, to, html_content):
print('sendMail to {0}'.format(str(to)))
admin_send_mail_by_SMTPlib(sets, subject, from_email, [to], html_content)
# msg = EmailMultiAlternatives(subject, text_content, from_email, [to])
# msg.attach_alternative(html_content, "text/html")
# msg.send()
print(u'Accept')
return u'Accept'
def techSendMail_for_specified_email_list(sets, html_content, email_list, title=None):
try:
print('techSendMail_for_specified_email_list')
if title:
subject = title
else:
subject = u'truEnergy Data техническое оповещение'
from_email = 'support@truenergy.by'
res = admin_send_mail_by_SMTPlib(sets, subject, from_email, email_list, html_content)
print(res)
return u'Accept'
except Exception as e:
msg = 'techSendMail_for_specified_email_list error={0}'.format(str(e))
techSendMail(sets, msg)
print(msg)
return 'Fail'
def techSendMail(sets, html_content, title=None, add_emails=None):
# if not prod_server:
# msg = '{0}. Not sended because is local'.format(html_content)
# print(msg)
# return msg
print('techSendMail')
project_name = ''
if 'project_name' in sets:
project_name = sets['project_name']
try:
# subject = u'truEnergy Data техническое оповещение'
from_email = sets['sender_email']
to = ['web@syncsystems.net']
if add_emails:
to.extend(add_emails)
text_content = f'Technical message from {project_name}'
if title:
subject = title
else:
subject = f'{project_name} - техническое оповещение'
res = admin_send_mail_by_SMTPlib(sets, subject, from_email, to, html_content)
print(res)
except Exception as e:
msg = 'techSendMail error={0}'.format(str(e))
# techSendMail(msg)
print(msg)
return u'Accept'